.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
webutils.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2# pylint: disable=missing-module-docstring, invalid-name
3
4
5import os
6import pathlib
7import csv
8import hashlib
9import hmac
10import re
11import itertools
12import json
13from datetime import datetime, timedelta
14from typing import Iterable, List, Tuple, TYPE_CHECKING
15
16from io import StringIO
17from codecs import getincrementalencoder
18
19from flask_babel import gettext, format_date # type: ignore
20
21from searx import logger, get_setting
22
23from searx.engines import DEFAULT_CATEGORY
24
25if TYPE_CHECKING:
26 from searx.enginelib import Engine
27 from searx.results import ResultContainer
28 from searx.search import SearchQuery
29 from searx.results import UnresponsiveEngine
30
31VALID_LANGUAGE_CODE = re.compile(r'^[a-z]{2,3}(-[a-zA-Z]{2})?$')
32
33logger = logger.getChild('webutils')
34
35timeout_text = gettext('timeout')
36parsing_error_text = gettext('parsing error')
37http_protocol_error_text = gettext('HTTP protocol error')
38network_error_text = gettext('network error')
39ssl_cert_error_text = gettext("SSL error: certificate validation has failed")
40exception_classname_to_text = {
41 None: gettext('unexpected crash'),
42 'timeout': timeout_text,
43 'asyncio.TimeoutError': timeout_text,
44 'httpx.TimeoutException': timeout_text,
45 'httpx.ConnectTimeout': timeout_text,
46 'httpx.ReadTimeout': timeout_text,
47 'httpx.WriteTimeout': timeout_text,
48 'httpx.HTTPStatusError': gettext('HTTP error'),
49 'httpx.ConnectError': gettext("HTTP connection error"),
50 'httpx.RemoteProtocolError': http_protocol_error_text,
51 'httpx.LocalProtocolError': http_protocol_error_text,
52 'httpx.ProtocolError': http_protocol_error_text,
53 'httpx.ReadError': network_error_text,
54 'httpx.WriteError': network_error_text,
55 'httpx.ProxyError': gettext("proxy error"),
56 'searx.exceptions.SearxEngineCaptchaException': gettext("CAPTCHA"),
57 'searx.exceptions.SearxEngineTooManyRequestsException': gettext("too many requests"),
58 'searx.exceptions.SearxEngineAccessDeniedException': gettext("access denied"),
59 'searx.exceptions.SearxEngineAPIException': gettext("server API error"),
60 'searx.exceptions.SearxEngineXPathException': parsing_error_text,
61 'KeyError': parsing_error_text,
62 'json.decoder.JSONDecodeError': parsing_error_text,
63 'lxml.etree.ParserError': parsing_error_text,
64 'ssl.SSLCertVerificationError': ssl_cert_error_text, # for Python > 3.7
65 'ssl.CertificateError': ssl_cert_error_text, # for Python 3.7
66}
67
68
69def get_translated_errors(unresponsive_engines: "Iterable[UnresponsiveEngine]"):
70 translated_errors = []
71
72 for unresponsive_engine in unresponsive_engines:
73 error_user_text = exception_classname_to_text.get(unresponsive_engine.error_type)
74 if not error_user_text:
75 error_user_text = exception_classname_to_text[None]
76 error_msg = gettext(error_user_text)
77 if unresponsive_engine.suspended:
78 error_msg = gettext('Suspended') + ': ' + error_msg
79 translated_errors.append((unresponsive_engine.engine, error_msg))
80
81 return sorted(translated_errors, key=lambda e: e[0])
82
83
85 """A CSV writer which will write rows to CSV file "f", which is encoded in
86 the given encoding."""
87
88 def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
89 # Redirect output to a queue
90 self.queue = StringIO()
91 self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
92 self.stream = f
93 self.encoder = getincrementalencoder(encoding)()
94
95 def writerow(self, row):
96 self.writer.writerow(row)
97 # Fetch UTF-8 output from the queue ...
98 data = self.queue.getvalue()
99 data = data.strip('\x00')
100 # ... and re-encode it into the target encoding
101 data = self.encoder.encode(data)
102 # write to the target stream
103 self.stream.write(data.decode())
104 # empty queue
105 self.queue.truncate(0)
106
107 def writerows(self, rows):
108 for row in rows:
109 self.writerow(row)
110
111
112def write_csv_response(csv: CSVWriter, rc: "ResultContainer") -> None: # pylint: disable=redefined-outer-name
113 """Write rows of the results to a query (``application/csv``) into a CSV
114 table (:py:obj:`CSVWriter`). First line in the table contain the column
115 names. The column "type" specifies the type, the following types are
116 included in the table:
117
118 - result
119 - answer
120 - suggestion
121 - correction
122
123 """
124
125 keys = ('title', 'url', 'content', 'host', 'engine', 'score', 'type')
126 csv.writerow(keys)
127
128 for res in rc.get_ordered_results():
129 row = res.as_dict()
130 row['host'] = row['parsed_url'].netloc
131 row['type'] = 'result'
132 csv.writerow([row.get(key, '') for key in keys])
133
134 for a in rc.answers:
135 row = a.as_dict()
136 row['host'] = row['parsed_url'].netloc
137 csv.writerow([row.get(key, '') for key in keys])
138
139 for a in rc.suggestions:
140 row = {'title': a, 'type': 'suggestion'}
141 csv.writerow([row.get(key, '') for key in keys])
142
143 for a in rc.corrections:
144 row = {'title': a, 'type': 'correction'}
145 csv.writerow([row.get(key, '') for key in keys])
146
147
148class JSONEncoder(json.JSONEncoder): # pylint: disable=missing-class-docstring
149 def default(self, o):
150 if isinstance(o, datetime):
151 return o.isoformat()
152 if isinstance(o, timedelta):
153 return o.total_seconds()
154 if isinstance(o, set):
155 return list(o)
156 return super().default(o)
157
158
159def get_json_response(sq: "SearchQuery", rc: "ResultContainer") -> str:
160 """Returns the JSON string of the results to a query (``application/json``)"""
161 data = {
162 'query': sq.query,
163 'number_of_results': rc.number_of_results,
164 'results': [_.as_dict() for _ in rc.get_ordered_results()],
165 'answers': [_.as_dict() for _ in rc.answers],
166 'corrections': list(rc.corrections),
167 'infoboxes': rc.infoboxes,
168 'suggestions': list(rc.suggestions),
169 'unresponsive_engines': get_translated_errors(rc.unresponsive_engines),
170 }
171 response = json.dumps(data, cls=JSONEncoder)
172 return response
173
174
175def get_themes(templates_path):
176 """Returns available themes list."""
177 return os.listdir(templates_path)
178
179
180def get_static_file_list() -> list[str]:
181 file_list = []
182 static_path = pathlib.Path(str(get_setting("ui.static_path")))
183
184 def _walk(path: pathlib.Path):
185 for f in path.iterdir():
186 if f.name.startswith('.'):
187 # ignore hidden file
188 continue
189 if f.is_file():
190 file_list.append(str(f.relative_to(static_path)))
191 if f.is_dir():
192 _walk(f)
193
194 _walk(static_path)
195 return file_list
196
197
198def get_result_templates(templates_path):
199 result_templates = set()
200 templates_path_length = len(templates_path) + 1
201 for directory, _, files in os.walk(templates_path):
202 if directory.endswith('result_templates'):
203 for filename in files:
204 f = os.path.join(directory[templates_path_length:], filename)
205 result_templates.add(f)
206 return result_templates
207
208
209def new_hmac(secret_key, url):
210 return hmac.new(secret_key.encode(), url, hashlib.sha256).hexdigest()
211
212
213def is_hmac_of(secret_key, value, hmac_to_check):
214 hmac_of_value = new_hmac(secret_key, value)
215 return len(hmac_of_value) == len(hmac_to_check) and hmac.compare_digest(hmac_of_value, hmac_to_check)
216
217
218def prettify_url(url, max_length=74):
219 if len(url) > max_length:
220 chunk_len = int(max_length / 2 + 1)
221 return '{0}[...]{1}'.format(url[:chunk_len], url[-chunk_len:])
222 return url
223
224
225def contains_cjko(s: str) -> bool:
226 """This function check whether or not a string contains Chinese, Japanese,
227 or Korean characters. It employs regex and uses the u escape sequence to
228 match any character in a set of Unicode ranges.
229
230 Args:
231 s (str): string to be checked.
232
233 Returns:
234 bool: True if the input s contains the characters and False otherwise.
235 """
236 unicode_ranges = (
237 '\u4e00-\u9fff' # Chinese characters
238 '\u3040-\u309f' # Japanese hiragana
239 '\u30a0-\u30ff' # Japanese katakana
240 '\u4e00-\u9faf' # Japanese kanji
241 '\uac00-\ud7af' # Korean hangul syllables
242 '\u1100-\u11ff' # Korean hangul jamo
243 )
244 return bool(re.search(fr'[{unicode_ranges}]', s))
245
246
247def regex_highlight_cjk(word: str) -> str:
248 """Generate the regex pattern to match for a given word according
249 to whether or not the word contains CJK characters or not.
250 If the word is and/or contains CJK character, the regex pattern
251 will match standalone word by taking into account the presence
252 of whitespace before and after it; if not, it will match any presence
253 of the word throughout the text, ignoring the whitespace.
254
255 Args:
256 word (str): the word to be matched with regex pattern.
257
258 Returns:
259 str: the regex pattern for the word.
260 """
261 rword = re.escape(word)
262 if contains_cjko(rword):
263 return fr'({rword})'
264 return fr'\b({rword})(?!\w)'
265
266
267def highlight_content(content, query):
268
269 if not content:
270 return None
271
272 # ignoring html contents
273 if content.find('<') != -1:
274 return content
275
276 querysplit = query.split()
277 queries = []
278 for qs in querysplit:
279 qs = qs.replace("'", "").replace('"', '').replace(" ", "")
280 if len(qs) > 0:
281 queries.extend(re.findall(regex_highlight_cjk(qs), content, flags=re.I | re.U))
282 if len(queries) > 0:
283 regex = re.compile("|".join(map(regex_highlight_cjk, queries)))
284 return regex.sub(lambda match: f'<span class="highlight">{match.group(0)}</span>'.replace('\\', r'\\'), content)
285 return content
286
287
288def searxng_l10n_timespan(dt: datetime) -> str: # pylint: disable=invalid-name
289 """Returns a human-readable and translated string indicating how long ago
290 a date was in the past / the time span of the date to the present.
291
292 On January 1st, midnight, the returned string only indicates how many years
293 ago the date was.
294 """
295 # TODO, check if timezone is calculated right # pylint: disable=fixme
296 d = dt.date()
297 t = dt.time()
298 if d.month == 1 and d.day == 1 and t.hour == 0 and t.minute == 0 and t.second == 0:
299 return str(d.year)
300 if dt.replace(tzinfo=None) >= datetime.now() - timedelta(days=1):
301 timedifference = datetime.now() - dt.replace(tzinfo=None)
302 minutes = int((timedifference.seconds / 60) % 60)
303 hours = int(timedifference.seconds / 60 / 60)
304 if hours == 0:
305 return gettext('{minutes} minute(s) ago').format(minutes=minutes)
306 return gettext('{hours} hour(s), {minutes} minute(s) ago').format(hours=hours, minutes=minutes)
307 return format_date(dt)
308
309
310NO_SUBGROUPING = 'without further subgrouping'
311
312
313def group_engines_in_tab(engines: "Iterable[Engine]") -> List[Tuple[str, "Iterable[Engine]"]]:
314 """Groups an Iterable of engines by their first non tab category (first subgroup)"""
315
316 def get_subgroup(eng):
317 non_tab_categories = [c for c in eng.categories if c not in tabs + [DEFAULT_CATEGORY]]
318 return non_tab_categories[0] if len(non_tab_categories) > 0 else NO_SUBGROUPING
319
320 def group_sort_key(group):
321 return (group[0] == NO_SUBGROUPING, group[0].lower())
322
323 def engine_sort_key(engine):
324 return (engine.about.get('language', ''), engine.name)
325
326 tabs = list(get_setting('categories_as_tabs').keys())
327 subgroups = itertools.groupby(sorted(engines, key=get_subgroup), get_subgroup)
328 sorted_groups = sorted(((name, list(engines)) for name, engines in subgroups), key=group_sort_key)
329
330 ret_val = []
331 for groupname, _engines in sorted_groups:
332 group_bang = '!' + groupname.replace(' ', '_') if groupname != NO_SUBGROUPING else ''
333 ret_val.append((groupname, group_bang, sorted(_engines, key=engine_sort_key)))
334
335 return ret_val
__init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds)
Definition webutils.py:88
::1337x
Definition 1337x.py:1
get_themes(templates_path)
Definition webutils.py:175
prettify_url(url, max_length=74)
Definition webutils.py:218
List[Tuple[str, "Iterable[Engine]"]] group_engines_in_tab("Iterable[Engine]" engines)
Definition webutils.py:313
str regex_highlight_cjk(str word)
Definition webutils.py:247
get_result_templates(templates_path)
Definition webutils.py:198
new_hmac(secret_key, url)
Definition webutils.py:209
str get_json_response("SearchQuery" sq, "ResultContainer" rc)
Definition webutils.py:159
is_hmac_of(secret_key, value, hmac_to_check)
Definition webutils.py:213
str searxng_l10n_timespan(datetime dt)
Definition webutils.py:288
highlight_content(content, query)
Definition webutils.py:267
None write_csv_response(CSVWriter csv, "ResultContainer" rc)
Definition webutils.py:112
get_translated_errors("Iterable[UnresponsiveEngine]" unresponsive_engines)
Definition webutils.py:69
bool contains_cjko(str s)
Definition webutils.py:225
list[str] get_static_file_list()
Definition webutils.py:180
t.Any get_setting(str name, t.Any default=_unset)
Definition __init__.py:74