.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
webutils.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2# pylint: disable=missing-module-docstring, invalid-name
3
4from __future__ import annotations
5
6import os
7import pathlib
8import csv
9import hashlib
10import hmac
11import re
12import inspect
13import itertools
14import json
15from datetime import datetime, timedelta
16from typing import Iterable, List, Tuple, Dict, TYPE_CHECKING
17
18from io import StringIO
19from codecs import getincrementalencoder
20
21from flask_babel import gettext, format_date # type: ignore
22
23from searx import logger, settings
24from searx.engines import DEFAULT_CATEGORY
25
26if TYPE_CHECKING:
27 from searx.enginelib import Engine
28 from searx.results import ResultContainer
29 from searx.search import SearchQuery
30 from searx.results import UnresponsiveEngine
31
32VALID_LANGUAGE_CODE = re.compile(r'^[a-z]{2,3}(-[a-zA-Z]{2})?$')
33
34logger = logger.getChild('webutils')
35
36timeout_text = gettext('timeout')
37parsing_error_text = gettext('parsing error')
38http_protocol_error_text = gettext('HTTP protocol error')
39network_error_text = gettext('network error')
40ssl_cert_error_text = gettext("SSL error: certificate validation has failed")
41exception_classname_to_text = {
42 None: gettext('unexpected crash'),
43 'timeout': timeout_text,
44 'asyncio.TimeoutError': timeout_text,
45 'httpx.TimeoutException': timeout_text,
46 'httpx.ConnectTimeout': timeout_text,
47 'httpx.ReadTimeout': timeout_text,
48 'httpx.WriteTimeout': timeout_text,
49 'httpx.HTTPStatusError': gettext('HTTP error'),
50 'httpx.ConnectError': gettext("HTTP connection error"),
51 'httpx.RemoteProtocolError': http_protocol_error_text,
52 'httpx.LocalProtocolError': http_protocol_error_text,
53 'httpx.ProtocolError': http_protocol_error_text,
54 'httpx.ReadError': network_error_text,
55 'httpx.WriteError': network_error_text,
56 'httpx.ProxyError': gettext("proxy error"),
57 'searx.exceptions.SearxEngineCaptchaException': gettext("CAPTCHA"),
58 'searx.exceptions.SearxEngineTooManyRequestsException': gettext("too many requests"),
59 'searx.exceptions.SearxEngineAccessDeniedException': gettext("access denied"),
60 'searx.exceptions.SearxEngineAPIException': gettext("server API error"),
61 'searx.exceptions.SearxEngineXPathException': parsing_error_text,
62 'KeyError': parsing_error_text,
63 'json.decoder.JSONDecodeError': parsing_error_text,
64 'lxml.etree.ParserError': parsing_error_text,
65 'ssl.SSLCertVerificationError': ssl_cert_error_text, # for Python > 3.7
66 'ssl.CertificateError': ssl_cert_error_text, # for Python 3.7
67}
68
69
70def get_translated_errors(unresponsive_engines: Iterable[UnresponsiveEngine]):
71 translated_errors = []
72
73 for unresponsive_engine in unresponsive_engines:
74 error_user_text = exception_classname_to_text.get(unresponsive_engine.error_type)
75 if not error_user_text:
76 error_user_text = exception_classname_to_text[None]
77 error_msg = gettext(error_user_text)
78 if unresponsive_engine.suspended:
79 error_msg = gettext('Suspended') + ': ' + error_msg
80 translated_errors.append((unresponsive_engine.engine, error_msg))
81
82 return sorted(translated_errors, key=lambda e: e[0])
83
84
86 """A CSV writer which will write rows to CSV file "f", which is encoded in
87 the given encoding."""
88
89 def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
90 # Redirect output to a queue
91 self.queue = StringIO()
92 self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
93 self.stream = f
94 self.encoder = getincrementalencoder(encoding)()
95
96 def writerow(self, row):
97 self.writer.writerow(row)
98 # Fetch UTF-8 output from the queue ...
99 data = self.queue.getvalue()
100 data = data.strip('\x00')
101 # ... and re-encode it into the target encoding
102 data = self.encoder.encode(data)
103 # write to the target stream
104 self.stream.write(data.decode())
105 # empty queue
106 self.queue.truncate(0)
107
108 def writerows(self, rows):
109 for row in rows:
110 self.writerow(row)
111
112
113def write_csv_response(csv: CSVWriter, rc: ResultContainer) -> None: # pylint: disable=redefined-outer-name
114 """Write rows of the results to a query (``application/csv``) into a CSV
115 table (:py:obj:`CSVWriter`). First line in the table contain the column
116 names. The column "type" specifies the type, the following types are
117 included in the table:
118
119 - result
120 - answer
121 - suggestion
122 - correction
123
124 """
125
126 results = rc.get_ordered_results()
127 keys = ('title', 'url', 'content', 'host', 'engine', 'score', 'type')
128 csv.writerow(keys)
129
130 for row in results:
131 row['host'] = row['parsed_url'].netloc
132 row['type'] = 'result'
133 csv.writerow([row.get(key, '') for key in keys])
134
135 for a in rc.answers:
136 row = {'title': a, 'type': 'answer'}
137 csv.writerow([row.get(key, '') for key in keys])
138
139 for a in rc.suggestions:
140 row = {'title': a, 'type': 'suggestion'}
141 csv.writerow([row.get(key, '') for key in keys])
142
143 for a in rc.corrections:
144 row = {'title': a, 'type': 'correction'}
145 csv.writerow([row.get(key, '') for key in keys])
146
147
148class JSONEncoder(json.JSONEncoder): # pylint: disable=missing-class-docstring
149 def default(self, o):
150 if isinstance(o, datetime):
151 return o.isoformat()
152 if isinstance(o, timedelta):
153 return o.total_seconds()
154 if isinstance(o, set):
155 return list(o)
156 return super().default(o)
157
158
159def get_json_response(sq: SearchQuery, rc: ResultContainer) -> str:
160 """Returns the JSON string of the results to a query (``application/json``)"""
161 results = rc.number_of_results
162 x = {
163 'query': sq.query,
164 'number_of_results': results,
165 'results': rc.get_ordered_results(),
166 'answers': list(rc.answers),
167 'corrections': list(rc.corrections),
168 'infoboxes': rc.infoboxes,
169 'suggestions': list(rc.suggestions),
170 'unresponsive_engines': get_translated_errors(rc.unresponsive_engines),
171 }
172 response = json.dumps(x, cls=JSONEncoder)
173 return response
174
175
176def get_themes(templates_path):
177 """Returns available themes list."""
178 return os.listdir(templates_path)
179
180
181def get_hash_for_file(file: pathlib.Path) -> str:
182 m = hashlib.sha1()
183 with file.open('rb') as f:
184 m.update(f.read())
185 return m.hexdigest()
186
187
188def get_static_files(static_path: str) -> Dict[str, str]:
189 static_files: Dict[str, str] = {}
190 static_path_path = pathlib.Path(static_path)
191
192 def walk(path: pathlib.Path):
193 for file in path.iterdir():
194 if file.name.startswith('.'):
195 # ignore hidden file
196 continue
197 if file.is_file():
198 static_files[str(file.relative_to(static_path_path))] = get_hash_for_file(file)
199 if file.is_dir() and file.name not in ('node_modules', 'src'):
200 # ignore "src" and "node_modules" directories
201 walk(file)
202
203 walk(static_path_path)
204 return static_files
205
206
207def get_result_templates(templates_path):
208 result_templates = set()
209 templates_path_length = len(templates_path) + 1
210 for directory, _, files in os.walk(templates_path):
211 if directory.endswith('result_templates'):
212 for filename in files:
213 f = os.path.join(directory[templates_path_length:], filename)
214 result_templates.add(f)
215 return result_templates
216
217
218def new_hmac(secret_key, url):
219 return hmac.new(secret_key.encode(), url, hashlib.sha256).hexdigest()
220
221
222def is_hmac_of(secret_key, value, hmac_to_check):
223 hmac_of_value = new_hmac(secret_key, value)
224 return len(hmac_of_value) == len(hmac_to_check) and hmac.compare_digest(hmac_of_value, hmac_to_check)
225
226
227def prettify_url(url, max_length=74):
228 if len(url) > max_length:
229 chunk_len = int(max_length / 2 + 1)
230 return '{0}[...]{1}'.format(url[:chunk_len], url[-chunk_len:])
231 return url
232
233
234def contains_cjko(s: str) -> bool:
235 """This function check whether or not a string contains Chinese, Japanese,
236 or Korean characters. It employs regex and uses the u escape sequence to
237 match any character in a set of Unicode ranges.
238
239 Args:
240 s (str): string to be checked.
241
242 Returns:
243 bool: True if the input s contains the characters and False otherwise.
244 """
245 unicode_ranges = (
246 '\u4e00-\u9fff' # Chinese characters
247 '\u3040-\u309f' # Japanese hiragana
248 '\u30a0-\u30ff' # Japanese katakana
249 '\u4e00-\u9faf' # Japanese kanji
250 '\uac00-\ud7af' # Korean hangul syllables
251 '\u1100-\u11ff' # Korean hangul jamo
252 )
253 return bool(re.search(fr'[{unicode_ranges}]', s))
254
255
256def regex_highlight_cjk(word: str) -> str:
257 """Generate the regex pattern to match for a given word according
258 to whether or not the word contains CJK characters or not.
259 If the word is and/or contains CJK character, the regex pattern
260 will match standalone word by taking into account the presence
261 of whitespace before and after it; if not, it will match any presence
262 of the word throughout the text, ignoring the whitespace.
263
264 Args:
265 word (str): the word to be matched with regex pattern.
266
267 Returns:
268 str: the regex pattern for the word.
269 """
270 rword = re.escape(word)
271 if contains_cjko(rword):
272 return fr'({rword})'
273 return fr'\b({rword})(?!\w)'
274
275
276def highlight_content(content, query):
277
278 if not content:
279 return None
280
281 # ignoring html contents
282 if content.find('<') != -1:
283 return content
284
285 querysplit = query.split()
286 queries = []
287 for qs in querysplit:
288 qs = qs.replace("'", "").replace('"', '').replace(" ", "")
289 if len(qs) > 0:
290 queries.extend(re.findall(regex_highlight_cjk(qs), content, flags=re.I | re.U))
291 if len(queries) > 0:
292 regex = re.compile("|".join(map(regex_highlight_cjk, queries)))
293 return regex.sub(lambda match: f'<span class="highlight">{match.group(0)}</span>'.replace('\\', r'\\'), content)
294 return content
295
296
297def searxng_l10n_timespan(dt: datetime) -> str: # pylint: disable=invalid-name
298 """Returns a human-readable and translated string indicating how long ago
299 a date was in the past / the time span of the date to the present.
300
301 On January 1st, midnight, the returned string only indicates how many years
302 ago the date was.
303 """
304 # TODO, check if timezone is calculated right # pylint: disable=fixme
305 d = dt.date()
306 t = dt.time()
307 if d.month == 1 and d.day == 1 and t.hour == 0 and t.minute == 0 and t.second == 0:
308 return str(d.year)
309 if dt.replace(tzinfo=None) >= datetime.now() - timedelta(days=1):
310 timedifference = datetime.now() - dt.replace(tzinfo=None)
311 minutes = int((timedifference.seconds / 60) % 60)
312 hours = int(timedifference.seconds / 60 / 60)
313 if hours == 0:
314 return gettext('{minutes} minute(s) ago').format(minutes=minutes)
315 return gettext('{hours} hour(s), {minutes} minute(s) ago').format(hours=hours, minutes=minutes)
316 return format_date(dt)
317
318
320 """Check if the application was started using "flask run" command line
321
322 Inspect the callstack.
323 See https://github.com/pallets/flask/blob/master/src/flask/__main__.py
324
325 Returns:
326 bool: True if the application was started using "flask run".
327 """
328 frames = inspect.stack()
329 if len(frames) < 2:
330 return False
331 return frames[-2].filename.endswith('flask/cli.py')
332
333
334NO_SUBGROUPING = 'without further subgrouping'
335
336
337def group_engines_in_tab(engines: Iterable[Engine]) -> List[Tuple[str, Iterable[Engine]]]:
338 """Groups an Iterable of engines by their first non tab category (first subgroup)"""
339
340 def get_subgroup(eng):
341 non_tab_categories = [c for c in eng.categories if c not in tabs + [DEFAULT_CATEGORY]]
342 return non_tab_categories[0] if len(non_tab_categories) > 0 else NO_SUBGROUPING
343
344 def group_sort_key(group):
345 return (group[0] == NO_SUBGROUPING, group[0].lower())
346
347 def engine_sort_key(engine):
348 return (engine.about.get('language', ''), engine.name)
349
350 tabs = list(settings['categories_as_tabs'].keys())
351 subgroups = itertools.groupby(sorted(engines, key=get_subgroup), get_subgroup)
352 sorted_groups = sorted(((name, list(engines)) for name, engines in subgroups), key=group_sort_key)
353
354 ret_val = []
355 for groupname, _engines in sorted_groups:
356 group_bang = '!' + groupname.replace(' ', '_') if groupname != NO_SUBGROUPING else ''
357 ret_val.append((groupname, group_bang, sorted(_engines, key=engine_sort_key)))
358
359 return ret_val
__init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds)
Definition webutils.py:89
::1337x
Definition 1337x.py:1
get_themes(templates_path)
Definition webutils.py:176
Dict[str, str] get_static_files(str static_path)
Definition webutils.py:188
List[Tuple[str, Iterable[Engine]]] group_engines_in_tab(Iterable[Engine] engines)
Definition webutils.py:337
prettify_url(url, max_length=74)
Definition webutils.py:227
str regex_highlight_cjk(str word)
Definition webutils.py:256
get_result_templates(templates_path)
Definition webutils.py:207
new_hmac(secret_key, url)
Definition webutils.py:218
is_hmac_of(secret_key, value, hmac_to_check)
Definition webutils.py:222
get_translated_errors(Iterable[UnresponsiveEngine] unresponsive_engines)
Definition webutils.py:70
str get_hash_for_file(pathlib.Path file)
Definition webutils.py:181
str searxng_l10n_timespan(datetime dt)
Definition webutils.py:297
highlight_content(content, query)
Definition webutils.py:276
str get_json_response(SearchQuery sq, ResultContainer rc)
Definition webutils.py:159
None write_csv_response(CSVWriter csv, ResultContainer rc)
Definition webutils.py:113
bool contains_cjko(str s)
Definition webutils.py:234