.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
webutils.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2# pylint: disable=missing-module-docstring, invalid-name
3
4from __future__ import annotations
5
6import os
7import pathlib
8import csv
9import hashlib
10import hmac
11import re
12import itertools
13import json
14from datetime import datetime, timedelta
15from typing import Iterable, List, Tuple, TYPE_CHECKING
16
17from io import StringIO
18from codecs import getincrementalencoder
19
20from flask_babel import gettext, format_date # type: ignore
21
22from searx import logger, get_setting
23
24from searx.engines import DEFAULT_CATEGORY
25
26if TYPE_CHECKING:
27 from searx.enginelib import Engine
28 from searx.results import ResultContainer
29 from searx.search import SearchQuery
30 from searx.results import UnresponsiveEngine
31
32VALID_LANGUAGE_CODE = re.compile(r'^[a-z]{2,3}(-[a-zA-Z]{2})?$')
33
34logger = logger.getChild('webutils')
35
36timeout_text = gettext('timeout')
37parsing_error_text = gettext('parsing error')
38http_protocol_error_text = gettext('HTTP protocol error')
39network_error_text = gettext('network error')
40ssl_cert_error_text = gettext("SSL error: certificate validation has failed")
41exception_classname_to_text = {
42 None: gettext('unexpected crash'),
43 'timeout': timeout_text,
44 'asyncio.TimeoutError': timeout_text,
45 'httpx.TimeoutException': timeout_text,
46 'httpx.ConnectTimeout': timeout_text,
47 'httpx.ReadTimeout': timeout_text,
48 'httpx.WriteTimeout': timeout_text,
49 'httpx.HTTPStatusError': gettext('HTTP error'),
50 'httpx.ConnectError': gettext("HTTP connection error"),
51 'httpx.RemoteProtocolError': http_protocol_error_text,
52 'httpx.LocalProtocolError': http_protocol_error_text,
53 'httpx.ProtocolError': http_protocol_error_text,
54 'httpx.ReadError': network_error_text,
55 'httpx.WriteError': network_error_text,
56 'httpx.ProxyError': gettext("proxy error"),
57 'searx.exceptions.SearxEngineCaptchaException': gettext("CAPTCHA"),
58 'searx.exceptions.SearxEngineTooManyRequestsException': gettext("too many requests"),
59 'searx.exceptions.SearxEngineAccessDeniedException': gettext("access denied"),
60 'searx.exceptions.SearxEngineAPIException': gettext("server API error"),
61 'searx.exceptions.SearxEngineXPathException': parsing_error_text,
62 'KeyError': parsing_error_text,
63 'json.decoder.JSONDecodeError': parsing_error_text,
64 'lxml.etree.ParserError': parsing_error_text,
65 'ssl.SSLCertVerificationError': ssl_cert_error_text, # for Python > 3.7
66 'ssl.CertificateError': ssl_cert_error_text, # for Python 3.7
67}
68
69
70def get_translated_errors(unresponsive_engines: Iterable[UnresponsiveEngine]):
71 translated_errors = []
72
73 for unresponsive_engine in unresponsive_engines:
74 error_user_text = exception_classname_to_text.get(unresponsive_engine.error_type)
75 if not error_user_text:
76 error_user_text = exception_classname_to_text[None]
77 error_msg = gettext(error_user_text)
78 if unresponsive_engine.suspended:
79 error_msg = gettext('Suspended') + ': ' + error_msg
80 translated_errors.append((unresponsive_engine.engine, error_msg))
81
82 return sorted(translated_errors, key=lambda e: e[0])
83
84
86 """A CSV writer which will write rows to CSV file "f", which is encoded in
87 the given encoding."""
88
89 def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
90 # Redirect output to a queue
91 self.queue = StringIO()
92 self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
93 self.stream = f
94 self.encoder = getincrementalencoder(encoding)()
95
96 def writerow(self, row):
97 self.writer.writerow(row)
98 # Fetch UTF-8 output from the queue ...
99 data = self.queue.getvalue()
100 data = data.strip('\x00')
101 # ... and re-encode it into the target encoding
102 data = self.encoder.encode(data)
103 # write to the target stream
104 self.stream.write(data.decode())
105 # empty queue
106 self.queue.truncate(0)
107
108 def writerows(self, rows):
109 for row in rows:
110 self.writerow(row)
111
112
113def write_csv_response(csv: CSVWriter, rc: ResultContainer) -> None: # pylint: disable=redefined-outer-name
114 """Write rows of the results to a query (``application/csv``) into a CSV
115 table (:py:obj:`CSVWriter`). First line in the table contain the column
116 names. The column "type" specifies the type, the following types are
117 included in the table:
118
119 - result
120 - answer
121 - suggestion
122 - correction
123
124 """
125
126 keys = ('title', 'url', 'content', 'host', 'engine', 'score', 'type')
127 csv.writerow(keys)
128
129 for res in rc.get_ordered_results():
130 row = res.as_dict()
131 row['host'] = row['parsed_url'].netloc
132 row['type'] = 'result'
133 csv.writerow([row.get(key, '') for key in keys])
134
135 for a in rc.answers:
136 row = a.as_dict()
137 row['host'] = row['parsed_url'].netloc
138 csv.writerow([row.get(key, '') for key in keys])
139
140 for a in rc.suggestions:
141 row = {'title': a, 'type': 'suggestion'}
142 csv.writerow([row.get(key, '') for key in keys])
143
144 for a in rc.corrections:
145 row = {'title': a, 'type': 'correction'}
146 csv.writerow([row.get(key, '') for key in keys])
147
148
149class JSONEncoder(json.JSONEncoder): # pylint: disable=missing-class-docstring
150 def default(self, o):
151 if isinstance(o, datetime):
152 return o.isoformat()
153 if isinstance(o, timedelta):
154 return o.total_seconds()
155 if isinstance(o, set):
156 return list(o)
157 return super().default(o)
158
159
160def get_json_response(sq: SearchQuery, rc: ResultContainer) -> str:
161 """Returns the JSON string of the results to a query (``application/json``)"""
162 data = {
163 'query': sq.query,
164 'number_of_results': rc.number_of_results,
165 'results': [_.as_dict() for _ in rc.get_ordered_results()],
166 'answers': [_.as_dict() for _ in rc.answers],
167 'corrections': list(rc.corrections),
168 'infoboxes': rc.infoboxes,
169 'suggestions': list(rc.suggestions),
170 'unresponsive_engines': get_translated_errors(rc.unresponsive_engines),
171 }
172 response = json.dumps(data, cls=JSONEncoder)
173 return response
174
175
176def get_themes(templates_path):
177 """Returns available themes list."""
178 return os.listdir(templates_path)
179
180
181def get_static_file_list() -> list[str]:
182 file_list = []
183 static_path = pathlib.Path(str(get_setting("ui.static_path")))
184
185 def _walk(path: pathlib.Path):
186 for f in path.iterdir():
187 if f.name.startswith('.'):
188 # ignore hidden file
189 continue
190 if f.is_file():
191 file_list.append(str(f.relative_to(static_path)))
192 if f.is_dir():
193 _walk(f)
194
195 _walk(static_path)
196 return file_list
197
198
199def get_result_templates(templates_path):
200 result_templates = set()
201 templates_path_length = len(templates_path) + 1
202 for directory, _, files in os.walk(templates_path):
203 if directory.endswith('result_templates'):
204 for filename in files:
205 f = os.path.join(directory[templates_path_length:], filename)
206 result_templates.add(f)
207 return result_templates
208
209
210def new_hmac(secret_key, url):
211 return hmac.new(secret_key.encode(), url, hashlib.sha256).hexdigest()
212
213
214def is_hmac_of(secret_key, value, hmac_to_check):
215 hmac_of_value = new_hmac(secret_key, value)
216 return len(hmac_of_value) == len(hmac_to_check) and hmac.compare_digest(hmac_of_value, hmac_to_check)
217
218
219def prettify_url(url, max_length=74):
220 if len(url) > max_length:
221 chunk_len = int(max_length / 2 + 1)
222 return '{0}[...]{1}'.format(url[:chunk_len], url[-chunk_len:])
223 return url
224
225
226def contains_cjko(s: str) -> bool:
227 """This function check whether or not a string contains Chinese, Japanese,
228 or Korean characters. It employs regex and uses the u escape sequence to
229 match any character in a set of Unicode ranges.
230
231 Args:
232 s (str): string to be checked.
233
234 Returns:
235 bool: True if the input s contains the characters and False otherwise.
236 """
237 unicode_ranges = (
238 '\u4e00-\u9fff' # Chinese characters
239 '\u3040-\u309f' # Japanese hiragana
240 '\u30a0-\u30ff' # Japanese katakana
241 '\u4e00-\u9faf' # Japanese kanji
242 '\uac00-\ud7af' # Korean hangul syllables
243 '\u1100-\u11ff' # Korean hangul jamo
244 )
245 return bool(re.search(fr'[{unicode_ranges}]', s))
246
247
248def regex_highlight_cjk(word: str) -> str:
249 """Generate the regex pattern to match for a given word according
250 to whether or not the word contains CJK characters or not.
251 If the word is and/or contains CJK character, the regex pattern
252 will match standalone word by taking into account the presence
253 of whitespace before and after it; if not, it will match any presence
254 of the word throughout the text, ignoring the whitespace.
255
256 Args:
257 word (str): the word to be matched with regex pattern.
258
259 Returns:
260 str: the regex pattern for the word.
261 """
262 rword = re.escape(word)
263 if contains_cjko(rword):
264 return fr'({rword})'
265 return fr'\b({rword})(?!\w)'
266
267
268def highlight_content(content, query):
269
270 if not content:
271 return None
272
273 # ignoring html contents
274 if content.find('<') != -1:
275 return content
276
277 querysplit = query.split()
278 queries = []
279 for qs in querysplit:
280 qs = qs.replace("'", "").replace('"', '').replace(" ", "")
281 if len(qs) > 0:
282 queries.extend(re.findall(regex_highlight_cjk(qs), content, flags=re.I | re.U))
283 if len(queries) > 0:
284 regex = re.compile("|".join(map(regex_highlight_cjk, queries)))
285 return regex.sub(lambda match: f'<span class="highlight">{match.group(0)}</span>'.replace('\\', r'\\'), content)
286 return content
287
288
289def searxng_l10n_timespan(dt: datetime) -> str: # pylint: disable=invalid-name
290 """Returns a human-readable and translated string indicating how long ago
291 a date was in the past / the time span of the date to the present.
292
293 On January 1st, midnight, the returned string only indicates how many years
294 ago the date was.
295 """
296 # TODO, check if timezone is calculated right # pylint: disable=fixme
297 d = dt.date()
298 t = dt.time()
299 if d.month == 1 and d.day == 1 and t.hour == 0 and t.minute == 0 and t.second == 0:
300 return str(d.year)
301 if dt.replace(tzinfo=None) >= datetime.now() - timedelta(days=1):
302 timedifference = datetime.now() - dt.replace(tzinfo=None)
303 minutes = int((timedifference.seconds / 60) % 60)
304 hours = int(timedifference.seconds / 60 / 60)
305 if hours == 0:
306 return gettext('{minutes} minute(s) ago').format(minutes=minutes)
307 return gettext('{hours} hour(s), {minutes} minute(s) ago').format(hours=hours, minutes=minutes)
308 return format_date(dt)
309
310
311NO_SUBGROUPING = 'without further subgrouping'
312
313
314def group_engines_in_tab(engines: Iterable[Engine]) -> List[Tuple[str, Iterable[Engine]]]:
315 """Groups an Iterable of engines by their first non tab category (first subgroup)"""
316
317 def get_subgroup(eng):
318 non_tab_categories = [c for c in eng.categories if c not in tabs + [DEFAULT_CATEGORY]]
319 return non_tab_categories[0] if len(non_tab_categories) > 0 else NO_SUBGROUPING
320
321 def group_sort_key(group):
322 return (group[0] == NO_SUBGROUPING, group[0].lower())
323
324 def engine_sort_key(engine):
325 return (engine.about.get('language', ''), engine.name)
326
327 tabs = list(get_setting('categories_as_tabs').keys())
328 subgroups = itertools.groupby(sorted(engines, key=get_subgroup), get_subgroup)
329 sorted_groups = sorted(((name, list(engines)) for name, engines in subgroups), key=group_sort_key)
330
331 ret_val = []
332 for groupname, _engines in sorted_groups:
333 group_bang = '!' + groupname.replace(' ', '_') if groupname != NO_SUBGROUPING else ''
334 ret_val.append((groupname, group_bang, sorted(_engines, key=engine_sort_key)))
335
336 return ret_val
__init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds)
Definition webutils.py:89
::1337x
Definition 1337x.py:1
get_themes(templates_path)
Definition webutils.py:176
List[Tuple[str, Iterable[Engine]]] group_engines_in_tab(Iterable[Engine] engines)
Definition webutils.py:314
prettify_url(url, max_length=74)
Definition webutils.py:219
str regex_highlight_cjk(str word)
Definition webutils.py:248
get_result_templates(templates_path)
Definition webutils.py:199
new_hmac(secret_key, url)
Definition webutils.py:210
is_hmac_of(secret_key, value, hmac_to_check)
Definition webutils.py:214
get_translated_errors(Iterable[UnresponsiveEngine] unresponsive_engines)
Definition webutils.py:70
str searxng_l10n_timespan(datetime dt)
Definition webutils.py:289
highlight_content(content, query)
Definition webutils.py:268
str get_json_response(SearchQuery sq, ResultContainer rc)
Definition webutils.py:160
None write_csv_response(CSVWriter csv, ResultContainer rc)
Definition webutils.py:113
bool contains_cjko(str s)
Definition webutils.py:226
list[str] get_static_file_list()
Definition webutils.py:181
get_setting(name, default=_unset)
Definition __init__.py:69