.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
calculator.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2"""Calculate mathematical expressions using :py:obj:`ast.parse` (mode="eval")."""
3
4from __future__ import annotations
5import typing
6
7import ast
8import math
9import re
10import operator
11import multiprocessing
12
13import babel
14import babel.numbers
15from flask_babel import gettext
16
17from searx.result_types import EngineResults
18from searx.plugins import Plugin, PluginInfo
19
20if typing.TYPE_CHECKING:
21 from searx.search import SearchWithPlugins
22 from searx.extended_types import SXNG_Request
23 from searx.plugins import PluginCfg
24
25
27 """Plugin converts strings to different hash digests. The results are
28 displayed in area for the "answers".
29 """
30
31 id = "calculator"
32
33 def __init__(self, plg_cfg: "PluginCfg") -> None:
34 super().__init__(plg_cfg)
35
37 id=self.id,
38 name=gettext("Basic Calculator"),
39 description=gettext("Calculate mathematical expressions via the search bar"),
40 preference_section="general",
41 )
42
43 def timeout_func(self, timeout, func, *args, **kwargs):
44 que = mp_fork.Queue()
45 p = mp_fork.Process(target=handler, args=(que, func, args), kwargs=kwargs)
46 p.start()
47 p.join(timeout=timeout)
48 ret_val = None
49 # pylint: disable=used-before-assignment,undefined-variable
50 if not p.is_alive():
51 ret_val = que.get()
52 else:
53 self.log.debug("terminate function (%s: %s // %s) after timeout is exceeded", func.__name__, args, kwargs)
54 p.terminate()
55 p.join()
56 p.close()
57 return ret_val
58
59 def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults:
60 results = EngineResults()
61
62 # only show the result of the expression on the first page
63 if search.search_query.pageno > 1:
64 return results
65
66 query = search.search_query.query
67 # in order to avoid DoS attacks with long expressions, ignore long expressions
68 if len(query) > 100:
69 return results
70
71 # replace commonly used math operators with their proper Python operator
72 query = query.replace("x", "*").replace(":", "/")
73
74 # use UI language
75 ui_locale = babel.Locale.parse(request.preferences.get_value("locale"), sep="-")
76
77 # parse the number system in a localized way
78 def _decimal(match: re.Match) -> str:
79 val = match.string[match.start() : match.end()]
80 val = babel.numbers.parse_decimal(val, ui_locale, numbering_system="latn")
81 return str(val)
82
83 decimal = ui_locale.number_symbols["latn"]["decimal"]
84 group = ui_locale.number_symbols["latn"]["group"]
85 query = re.sub(f"[0-9]+[{decimal}|{group}][0-9]+[{decimal}|{group}]?[0-9]?", _decimal, query)
86
87 # in python, powers are calculated via **
88 query_py_formatted = query.replace("^", "**")
89
90 # Prevent the runtime from being longer than 50 ms
91 res = self.timeout_func(0.05, _eval_expr, query_py_formatted)
92 if res is None or res[0] == "":
93 return results
94
95 res, is_boolean = res
96 if is_boolean:
97 res = "True" if res != 0 else "False"
98 else:
99 res = babel.numbers.format_decimal(res, locale=ui_locale)
100 results.add(results.types.Answer(answer=f"{search.search_query.query} = {res}"))
101
102 return results
103
104
105def _compare(ops: list[ast.cmpop], values: list[int | float]) -> int:
106 """
107 2 < 3 becomes ops=[ast.Lt] and values=[2,3]
108 2 < 3 <= 4 becomes ops=[ast.Lt, ast.LtE] and values=[2,3, 4]
109 """
110 for op, a, b in zip(ops, values, values[1:]): # pylint: disable=invalid-name
111 if isinstance(op, ast.Eq) and a == b:
112 continue
113 if isinstance(op, ast.NotEq) and a != b:
114 continue
115 if isinstance(op, ast.Lt) and a < b:
116 continue
117 if isinstance(op, ast.LtE) and a <= b:
118 continue
119 if isinstance(op, ast.Gt) and a > b:
120 continue
121 if isinstance(op, ast.GtE) and a >= b:
122 continue
123
124 # Ignore impossible ops:
125 # * ast.Is
126 # * ast.IsNot
127 # * ast.In
128 # * ast.NotIn
129
130 # the result is False for a and b and operation op
131 return 0
132 # the results for all the ops are True
133 return 1
134
135
136operators: dict[type, typing.Callable] = {
137 ast.Add: operator.add,
138 ast.Sub: operator.sub,
139 ast.Mult: operator.mul,
140 ast.Div: operator.truediv,
141 ast.Pow: operator.pow,
142 ast.BitXor: operator.xor,
143 ast.BitOr: operator.or_,
144 ast.BitAnd: operator.and_,
145 ast.USub: operator.neg,
146 ast.RShift: operator.rshift,
147 ast.LShift: operator.lshift,
148 ast.Mod: operator.mod,
149 ast.Compare: _compare,
150}
151
152
153math_constants = {
154 'e': math.e,
155 'pi': math.pi,
156}
157
158
159# with multiprocessing.get_context("fork") we are ready for Py3.14 (by emulating
160# the old behavior "fork") but it will not solve the core problem of fork, nor
161# will it remove the deprecation warnings in py3.12 & py3.13. Issue is
162# ddiscussed here: https://github.com/searxng/searxng/issues/4159
163mp_fork = multiprocessing.get_context("fork")
164
165
166def _eval_expr(expr):
167 """
168 Evaluates the given textual expression.
169
170 Returns a tuple of (numericResult, isBooleanResult).
171
172 >>> _eval_expr('2^6')
173 64, False
174 >>> _eval_expr('2**6')
175 64, False
176 >>> _eval_expr('1 + 2*3**(4^5) / (6 + -7)')
177 -5.0, False
178 >>> _eval_expr('1 < 3')
179 1, True
180 >>> _eval_expr('5 < 3')
181 0, True
182 >>> _eval_expr('17 == 11+1+5 == 7+5+5')
183 1, True
184 """
185 try:
186 root_expr = ast.parse(expr, mode='eval').body
187 return _eval(root_expr), isinstance(root_expr, ast.Compare)
188
189 except (SyntaxError, TypeError, ZeroDivisionError):
190 # Expression that can't be evaluated (i.e. not a math expression)
191 return "", False
192
193
194def _eval(node):
195 if isinstance(node, ast.Constant) and isinstance(node.value, (int, float)):
196 return node.value
197
198 if isinstance(node, ast.BinOp):
199 return operators[type(node.op)](_eval(node.left), _eval(node.right))
200
201 if isinstance(node, ast.UnaryOp):
202 return operators[type(node.op)](_eval(node.operand))
203
204 if isinstance(node, ast.Compare):
205 return _compare(node.ops, [_eval(node.left)] + [_eval(c) for c in node.comparators])
206
207 if isinstance(node, ast.Name) and node.id in math_constants:
208 return math_constants[node.id]
209
210 raise TypeError(node)
211
212
213def handler(q: multiprocessing.Queue, func, args, **kwargs): # pylint:disable=invalid-name
214 try:
215 q.put(func(*args, **kwargs))
216 except:
217 q.put(None)
218 raise
None __init__(self, "PluginCfg" plg_cfg)
Definition calculator.py:33
EngineResults post_search(self, "SXNG_Request" request, "SearchWithPlugins" search)
Definition calculator.py:59
timeout_func(self, timeout, func, *args, **kwargs)
Definition calculator.py:43
int _compare(list[ast.cmpop] ops, list[int|float] values)
handler(multiprocessing.Queue q, func, args, **kwargs)