.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
calculator.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2"""Calculate mathematical expressions using :py:obj:`ast.parse` (mode="eval").
3"""
4
5from __future__ import annotations
6import typing
7
8import ast
9import re
10import operator
11import multiprocessing
12
13import babel
14import babel.numbers
15from flask_babel import gettext
16
17from searx.result_types import EngineResults
18from searx.plugins import Plugin, PluginInfo
19
20if typing.TYPE_CHECKING:
21 from searx.search import SearchWithPlugins
22 from searx.extended_types import SXNG_Request
23 from searx.plugins import PluginCfg
24
25
27 """Plugin converts strings to different hash digests. The results are
28 displayed in area for the "answers".
29 """
30
31 id = "calculator"
32
33 def __init__(self, plg_cfg: "PluginCfg") -> None:
34 super().__init__(plg_cfg)
35
37 id=self.id,
38 name=gettext("Basic Calculator"),
39 description=gettext("Calculate mathematical expressions via the search bar"),
40 preference_section="general",
41 )
42
43 def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults:
44 results = EngineResults()
45
46 # only show the result of the expression on the first page
47 if search.search_query.pageno > 1:
48 return results
49
50 query = search.search_query.query
51 # in order to avoid DoS attacks with long expressions, ignore long expressions
52 if len(query) > 100:
53 return results
54
55 # replace commonly used math operators with their proper Python operator
56 query = query.replace("x", "*").replace(":", "/")
57
58 # use UI language
59 ui_locale = babel.Locale.parse(request.preferences.get_value("locale"), sep="-")
60
61 # parse the number system in a localized way
62 def _decimal(match: re.Match) -> str:
63 val = match.string[match.start() : match.end()]
64 val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
65 return str(val)
66
67 decimal = ui_locale.number_symbols["latn"]["decimal"]
68 group = ui_locale.number_symbols["latn"]["group"]
69 query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
70
71 # only numbers and math operators are accepted
72 if any(str.isalpha(c) for c in query):
73 return results
74
75 # in python, powers are calculated via **
76 query_py_formatted = query.replace("^", "**")
77
78 # Prevent the runtime from being longer than 50 ms
79 res = timeout_func(0.05, _eval_expr, query_py_formatted)
80 if res is None or res == "":
81 return results
82
83 res = babel.numbers.format_decimal(res, locale=ui_locale)
84 results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
85
86 return results
87
88
89operators: dict[type, typing.Callable] = {
90 ast.Add: operator.add,
91 ast.Sub: operator.sub,
92 ast.Mult: operator.mul,
93 ast.Div: operator.truediv,
94 ast.Pow: operator.pow,
95 ast.BitXor: operator.xor,
96 ast.USub: operator.neg,
97}
98
99# with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
100# the old behavior "fork") but it will not solve the core problem of fork, nor
101# will it remove the deprecation warnings in py3.12 & py3.13. Issue is
102# ddiscussed here: https://github.com/searxng/searxng/issues/4159
103mp_fork = multiprocessing.get_context("fork")
104
105
106def _eval_expr(expr):
107 """
108 >>> _eval_expr('2^6')
109 64
110 >>> _eval_expr('2**6')
111 64
112 >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
113 -5.0
114 """
115 try:
116 return _eval(ast.parse(expr, mode='eval').body)
117 except ZeroDivisionError:
118 # This is undefined
119 return ""
120
121
122def _eval(node):
123 if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
124 return node.value
125
126 if isinstance(node, ast.BinOp):
127 return operators[type(node.op)](_eval(node.left), _eval(node.right))
128
129 if isinstance(node, ast.UnaryOp):
130 return operators[type(node.op)](_eval(node.operand))
131
132 raise TypeError(node)
133
134
135def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name
136 try:
137 q.put(func(*args, **kwargs))
138 except:
139 q.put(None)
140 raise
141
142
143def timeout_func(timeout, func, *args, **kwargs):
144
145 que = mp_fork.Queue()
146 p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
147 p.start()
148 p.join(timeout=timeout)
149 ret_val = None
150 # pylint: disable=used-before-assignment,undefined-variable
151 if not p.is_alive():
152 ret_val = que.get()
153 else:
154 logger.debug("terminate function after timeout is exceeded") # type: ignore
155 p.terminate()
156 p.join()
157 p.close()
158 return ret_val
None __init__(self, "PluginCfg" plg_cfg)
Definition calculator.py:33
EngineResults post_search(self, "SXNG_Request" request, "SearchWithPlugins" search)
Definition calculator.py:43
timeout_func(timeout, func, *args, **kwargs)
handler(multiprocessing.Queue q, func, args, **kwargs)