.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
calculator.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2"""Calculate mathematical expressions using :py:obj:`ast.parse` (mode="eval")."""
3
4import typing
5
6import ast
7import math
8import re
9import operator
10import multiprocessing
11
12import babel
13import babel.numbers
14from flask_babel import gettext
15
16from searx.result_types import EngineResults
17from searx.plugins import Plugin, PluginInfo
18
19if typing.TYPE_CHECKING:
20 from searx.search import SearchWithPlugins
21 from searx.extended_types import SXNG_Request
22 from searx.plugins import PluginCfg
23
24
26 """Plugin converts strings to different hash digests. The results are
27 displayed in area for the "answers".
28 """
29
30 id = "calculator"
31
32 def __init__(self, plg_cfg: "PluginCfg") -> None:
33 super().__init__(plg_cfg)
34
36 id=self.id,
37 name=gettext("Basic Calculator"),
38 description=gettext("Calculate mathematical expressions via the search bar"),
39 preference_section="general",
40 )
41
42 def timeout_func(self, timeout, func, *args, **kwargs):
43 que = mp_fork.Queue()
44 p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
45 p.start()
46 p.join(timeout=timeout)
47 ret_val = None
48 # pylint: disable=used-before-assignment,undefined-variable
49 if not p.is_alive():
50 ret_val = que.get()
51 else:
52 self.log.debug("terminate function (%s: %s // %s) after timeout is exceeded", func.__name__, args, kwargs)
53 p.terminate()
54 p.join()
55 p.close()
56 return ret_val
57
58 def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults:
59 results = EngineResults()
60
61 # only show the result of the expression on the first page
62 if search.search_query.pageno > 1:
63 return results
64
65 query = search.search_query.query
66 # in order to avoid DoS attacks with long expressions, ignore long expressions
67 if len(query) > 100:
68 return results
69
70 # replace commonly used math operators with their proper Python operator
71 query = query.replace("x", "*").replace(":", "/")
72
73 # Is this a term that can be calculated?
74 word, constants = "", set()
75 for x in query:
76 # Alphabetic characters are defined as "Letters" in the Unicode
77 # character database and are the constants in an equation.
78 if x.isalpha():
79 word += x.strip()
80 elif word:
81 constants.add(word)
82 word = ""
83
84 # In the term of an arithmetic operation there should be no other
85 # alphabetic characters besides the constants
86 if constants - set(math_constants):
87 return results
88
89 # use UI language
90 ui_locale = babel.Locale.parse(request.preferences.get_value("locale"), sep="-")
91
92 # parse the number system in a localized way
93 def _decimal(match: re.Match) -> str:
94 val = match.string[match.start() : match.end()]
95 val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
96 return str(val)
97
98 decimal = ui_locale.number_symbols["latn"]["decimal"]
99 group = ui_locale.number_symbols["latn"]["group"]
100 query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
101
102 # in python, powers are calculated via **
103 query_py_formatted = query.replace("^", "**")
104
105 # Prevent the runtime from being longer than 50 ms
106 res = self.timeout_func(0.05, _eval_expr, query_py_formatted)
107 if res is None or res[0] == "":
108 return results
109
110 res, is_boolean = res
111 if is_boolean:
112 res = "True" if res != 0 else "False"
113 else:
114 res = babel.numbers.format_decimal(res, locale=ui_locale)
115 results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
116
117 return results
118
119
120def _compare(ops: list[ast.cmpop], values: list[int | float]) -> int:
121 """
122 2 < 3 becomes ops=[ast.Lt] and values=[2,3]
123 2 < 3 <= 4 becomes ops=[ast.Lt, ast.LtE] and values=[2,3, 4]
124 """
125 for op, a, b in zip(ops, values, values[1:]): # pylint: disable=invalid-name
126 if isinstance(op, ast.Eq) and a == b:
127 continue
128 if isinstance(op, ast.NotEq) and a != b:
129 continue
130 if isinstance(op, ast.Lt) and a < b:
131 continue
132 if isinstance(op, ast.LtE) and a <= b:
133 continue
134 if isinstance(op, ast.Gt) and a > b:
135 continue
136 if isinstance(op, ast.GtE) and a >= b:
137 continue
138
139 # Ignore impossible ops:
140 # * ast.Is
141 # * ast.IsNot
142 # * ast.In
143 # * ast.NotIn
144
145 # the result is False for a and b and operation op
146 return 0
147 # the results for all the ops are True
148 return 1
149
150
151operators: dict[type, typing.Callable] = {
152 ast.Add: operator.add,
153 ast.Sub: operator.sub,
154 ast.Mult: operator.mul,
155 ast.Div: operator.truediv,
156 ast.Pow: operator.pow,
157 ast.BitXor: operator.xor,
158 ast.BitOr: operator.or_,
159 ast.BitAnd: operator.and_,
160 ast.USub: operator.neg,
161 ast.RShift: operator.rshift,
162 ast.LShift: operator.lshift,
163 ast.Mod: operator.mod,
164 ast.Compare: _compare,
165}
166
167
168math_constants = {
169 'e': math.e,
170 'pi': math.pi,
171}
172
173
174# with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
175# the old behavior "fork") but it will not solve the core problem of fork, nor
176# will it remove the deprecation warnings in py3.12 & py3.13. Issue is
177# ddiscussed here: https://github.com/searxng/searxng/issues/4159
178mp_fork = multiprocessing.get_context("fork")
179
180
181def _eval_expr(expr):
182 """
183 Evaluates the given textual expression.
184
185 Returns a tuple of (numericResult, isBooleanResult).
186
187 >>> _eval_expr('2^6')
188 64, False
189 >>> _eval_expr('2**6')
190 64, False
191 >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
192 -5.0, False
193 >>> _eval_expr('1 < 3')
194 1, True
195 >>> _eval_expr('5 < 3')
196 0, True
197 >>> _eval_expr('17 == 11+1+5 == 7+5+5')
198 1, True
199 """
200 try:
201 root_expr = ast.parse(expr, mode='eval').body
202 return _eval(root_expr), isinstance(root_expr, ast.Compare)
203
204 except (SyntaxError, TypeError, ZeroDivisionError):
205 # Expression that can't be evaluated (i.e. not a math expression)
206 return "", False
207
208
209def _eval(node):
210 if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
211 return node.value
212
213 if isinstance(node, ast.BinOp):
214 return operators[type(node.op)](_eval(node.left), _eval(node.right))
215
216 if isinstance(node, ast.UnaryOp):
217 return operators[type(node.op)](_eval(node.operand))
218
219 if isinstance(node, ast.Compare):
220 return _compare(node.ops, [_eval(node.left)] + [_eval(c) for c in node.comparators])
221
222 if isinstance(node, ast.Name) and node.id in math_constants:
223 return math_constants[node.id]
224
225 raise TypeError(node)
226
227
228def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name
229 try:
230 q.put(func(*args, **kwargs))
231 except:
232 q.put(None)
233 raise
None __init__(self, "PluginCfg" plg_cfg)
Definition calculator.py:32
EngineResults post_search(self, "SXNG_Request" request, "SearchWithPlugins" search)
Definition calculator.py:58
timeout_func(self, timeout, func, *args, **kwargs)
Definition calculator.py:42
int _compare(list[ast.cmpop] ops, list[int|float] values)
handler(multiprocessing.Queue q, func, args, **kwargs)