.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
calculator.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2"""Calculate mathematical expressions using :py:obj:`ast.parse` (mode="eval")."""
3
4from __future__ import annotations
5import typing
6
7import ast
8import math
9import re
10import operator
11import multiprocessing
12
13import babel
14import babel.numbers
15from flask_babel import gettext
16
17from searx.result_types import EngineResults
18from searx.plugins import Plugin, PluginInfo
19
20if typing.TYPE_CHECKING:
21 from searx.search import SearchWithPlugins
22 from searx.extended_types import SXNG_Request
23 from searx.plugins import PluginCfg
24
25
27 """Plugin converts strings to different hash digests. The results are
28 displayed in area for the "answers".
29 """
30
31 id = "calculator"
32
33 def __init__(self, plg_cfg: "PluginCfg") -> None:
34 super().__init__(plg_cfg)
35
37 id=self.id,
38 name=gettext("Basic Calculator"),
39 description=gettext("Calculate mathematical expressions via the search bar"),
40 preference_section="general",
41 )
42
43 def timeout_func(self, timeout, func, *args, **kwargs):
44 que = mp_fork.Queue()
45 p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
46 p.start()
47 p.join(timeout=timeout)
48 ret_val = None
49 # pylint: disable=used-before-assignment,undefined-variable
50 if not p.is_alive():
51 ret_val = que.get()
52 else:
53 self.log.debug("terminate function (%s: %s // %s) after timeout is exceeded", func.__name__, args, kwargs)
54 p.terminate()
55 p.join()
56 p.close()
57 return ret_val
58
59 def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults:
60 results = EngineResults()
61
62 # only show the result of the expression on the first page
63 if search.search_query.pageno > 1:
64 return results
65
66 query = search.search_query.query
67 # in order to avoid DoS attacks with long expressions, ignore long expressions
68 if len(query) > 100:
69 return results
70
71 # replace commonly used math operators with their proper Python operator
72 query = query.replace("x", "*").replace(":", "/")
73
74 # Is this a term that can be calculated?
75 word, constants = "", set()
76 for x in query:
77 # Alphabetic characters are defined as "Letters" in the Unicode
78 # character database and are the constants in an equation.
79 if x.isalpha():
80 word += x.strip()
81 elif word:
82 constants.add(word)
83 word = ""
84
85 # In the term of an arithmetic operation there should be no other
86 # alphabetic characters besides the constants
87 if constants - set(math_constants):
88 return results
89
90 # use UI language
91 ui_locale = babel.Locale.parse(request.preferences.get_value("locale"), sep="-")
92
93 # parse the number system in a localized way
94 def _decimal(match: re.Match) -> str:
95 val = match.string[match.start() : match.end()]
96 val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
97 return str(val)
98
99 decimal = ui_locale.number_symbols["latn"]["decimal"]
100 group = ui_locale.number_symbols["latn"]["group"]
101 query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
102
103 # in python, powers are calculated via **
104 query_py_formatted = query.replace("^", "**")
105
106 # Prevent the runtime from being longer than 50 ms
107 res = self.timeout_func(0.05, _eval_expr, query_py_formatted)
108 if res is None or res[0] == "":
109 return results
110
111 res, is_boolean = res
112 if is_boolean:
113 res = "True" if res != 0 else "False"
114 else:
115 res = babel.numbers.format_decimal(res, locale=ui_locale)
116 results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
117
118 return results
119
120
121def _compare(ops: list[ast.cmpop], values: list[int | float]) -> int:
122 """
123 2 < 3 becomes ops=[ast.Lt] and values=[2,3]
124 2 < 3 <= 4 becomes ops=[ast.Lt, ast.LtE] and values=[2,3, 4]
125 """
126 for op, a, b in zip(ops, values, values[1:]): # pylint: disable=invalid-name
127 if isinstance(op, ast.Eq) and a == b:
128 continue
129 if isinstance(op, ast.NotEq) and a != b:
130 continue
131 if isinstance(op, ast.Lt) and a < b:
132 continue
133 if isinstance(op, ast.LtE) and a <= b:
134 continue
135 if isinstance(op, ast.Gt) and a > b:
136 continue
137 if isinstance(op, ast.GtE) and a >= b:
138 continue
139
140 # Ignore impossible ops:
141 # * ast.Is
142 # * ast.IsNot
143 # * ast.In
144 # * ast.NotIn
145
146 # the result is False for a and b and operation op
147 return 0
148 # the results for all the ops are True
149 return 1
150
151
152operators: dict[type, typing.Callable] = {
153 ast.Add: operator.add,
154 ast.Sub: operator.sub,
155 ast.Mult: operator.mul,
156 ast.Div: operator.truediv,
157 ast.Pow: operator.pow,
158 ast.BitXor: operator.xor,
159 ast.BitOr: operator.or_,
160 ast.BitAnd: operator.and_,
161 ast.USub: operator.neg,
162 ast.RShift: operator.rshift,
163 ast.LShift: operator.lshift,
164 ast.Mod: operator.mod,
165 ast.Compare: _compare,
166}
167
168
169math_constants = {
170 'e': math.e,
171 'pi': math.pi,
172}
173
174
175# with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
176# the old behavior "fork") but it will not solve the core problem of fork, nor
177# will it remove the deprecation warnings in py3.12 & py3.13. Issue is
178# ddiscussed here: https://github.com/searxng/searxng/issues/4159
179mp_fork = multiprocessing.get_context("fork")
180
181
182def _eval_expr(expr):
183 """
184 Evaluates the given textual expression.
185
186 Returns a tuple of (numericResult, isBooleanResult).
187
188 >>> _eval_expr('2^6')
189 64, False
190 >>> _eval_expr('2**6')
191 64, False
192 >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
193 -5.0, False
194 >>> _eval_expr('1 < 3')
195 1, True
196 >>> _eval_expr('5 < 3')
197 0, True
198 >>> _eval_expr('17 == 11+1+5 == 7+5+5')
199 1, True
200 """
201 try:
202 root_expr = ast.parse(expr, mode='eval').body
203 return _eval(root_expr), isinstance(root_expr, ast.Compare)
204
205 except (SyntaxError, TypeError, ZeroDivisionError):
206 # Expression that can't be evaluated (i.e. not a math expression)
207 return "", False
208
209
210def _eval(node):
211 if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
212 return node.value
213
214 if isinstance(node, ast.BinOp):
215 return operators[type(node.op)](_eval(node.left), _eval(node.right))
216
217 if isinstance(node, ast.UnaryOp):
218 return operators[type(node.op)](_eval(node.operand))
219
220 if isinstance(node, ast.Compare):
221 return _compare(node.ops, [_eval(node.left)] + [_eval(c) for c in node.comparators])
222
223 if isinstance(node, ast.Name) and node.id in math_constants:
224 return math_constants[node.id]
225
226 raise TypeError(node)
227
228
229def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name
230 try:
231 q.put(func(*args, **kwargs))
232 except:
233 q.put(None)
234 raise
None __init__(self, "PluginCfg" plg_cfg)
Definition calculator.py:33
EngineResults post_search(self, "SXNG_Request" request, "SearchWithPlugins" search)
Definition calculator.py:59
timeout_func(self, timeout, func, *args, **kwargs)
Definition calculator.py:43
int _compare(list[ast.cmpop] ops, list[int|float] values)
handler(multiprocessing.Queue q, func, args, **kwargs)