.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
webutils.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2# pylint: disable=missing-module-docstring, invalid-name
3
4from __future__ import annotations
5
6import os
7import pathlib
8import csv
9import hashlib
10import hmac
11import re
12import itertools
13import json
14from datetime import datetime, timedelta
15from typing import Iterable, List, Tuple, Dict, TYPE_CHECKING
16
17from io import StringIO
18from codecs import getincrementalencoder
19
20from flask_babel import gettext, format_date # type: ignore
21
22from searx import logger, settings
23from searx.engines import DEFAULT_CATEGORY
24
25if TYPE_CHECKING:
26 from searx.enginelib import Engine
27 from searx.results import ResultContainer
28 from searx.search import SearchQuery
29 from searx.results import UnresponsiveEngine
30
31VALID_LANGUAGE_CODE = re.compile(r'^[a-z]{2,3}(-[a-zA-Z]{2})?$')
32
33logger = logger.getChild('webutils')
34
35timeout_text = gettext('timeout')
36parsing_error_text = gettext('parsing error')
37http_protocol_error_text = gettext('HTTP protocol error')
38network_error_text = gettext('network error')
39ssl_cert_error_text = gettext("SSL error: certificate validation has failed")
40exception_classname_to_text = {
41 None: gettext('unexpected crash'),
42 'timeout': timeout_text,
43 'asyncio.TimeoutError': timeout_text,
44 'httpx.TimeoutException': timeout_text,
45 'httpx.ConnectTimeout': timeout_text,
46 'httpx.ReadTimeout': timeout_text,
47 'httpx.WriteTimeout': timeout_text,
48 'httpx.HTTPStatusError': gettext('HTTP error'),
49 'httpx.ConnectError': gettext("HTTP connection error"),
50 'httpx.RemoteProtocolError': http_protocol_error_text,
51 'httpx.LocalProtocolError': http_protocol_error_text,
52 'httpx.ProtocolError': http_protocol_error_text,
53 'httpx.ReadError': network_error_text,
54 'httpx.WriteError': network_error_text,
55 'httpx.ProxyError': gettext("proxy error"),
56 'searx.exceptions.SearxEngineCaptchaException': gettext("CAPTCHA"),
57 'searx.exceptions.SearxEngineTooManyRequestsException': gettext("too many requests"),
58 'searx.exceptions.SearxEngineAccessDeniedException': gettext("access denied"),
59 'searx.exceptions.SearxEngineAPIException': gettext("server API error"),
60 'searx.exceptions.SearxEngineXPathException': parsing_error_text,
61 'KeyError': parsing_error_text,
62 'json.decoder.JSONDecodeError': parsing_error_text,
63 'lxml.etree.ParserError': parsing_error_text,
64 'ssl.SSLCertVerificationError': ssl_cert_error_text, # for Python > 3.7
65 'ssl.CertificateError': ssl_cert_error_text, # for Python 3.7
66}
67
68
69def get_translated_errors(unresponsive_engines: Iterable[UnresponsiveEngine]):
70 translated_errors = []
71
72 for unresponsive_engine in unresponsive_engines:
73 error_user_text = exception_classname_to_text.get(unresponsive_engine.error_type)
74 if not error_user_text:
75 error_user_text = exception_classname_to_text[None]
76 error_msg = gettext(error_user_text)
77 if unresponsive_engine.suspended:
78 error_msg = gettext('Suspended') + ': ' + error_msg
79 translated_errors.append((unresponsive_engine.engine, error_msg))
80
81 return sorted(translated_errors, key=lambda e: e[0])
82
83
85 """A CSV writer which will write rows to CSV file "f", which is encoded in
86 the given encoding."""
87
88 def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
89 # Redirect output to a queue
90 self.queue = StringIO()
91 self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
92 self.stream = f
93 self.encoder = getincrementalencoder(encoding)()
94
95 def writerow(self, row):
96 self.writer.writerow(row)
97 # Fetch UTF-8 output from the queue ...
98 data = self.queue.getvalue()
99 data = data.strip('\x00')
100 # ... and re-encode it into the target encoding
101 data = self.encoder.encode(data)
102 # write to the target stream
103 self.stream.write(data.decode())
104 # empty queue
105 self.queue.truncate(0)
106
107 def writerows(self, rows):
108 for row in rows:
109 self.writerow(row)
110
111
112def write_csv_response(csv: CSVWriter, rc: ResultContainer) -> None: # pylint: disable=redefined-outer-name
113 """Write rows of the results to a query (``application/csv``) into a CSV
114 table (:py:obj:`CSVWriter`). First line in the table contain the column
115 names. The column "type" specifies the type, the following types are
116 included in the table:
117
118 - result
119 - answer
120 - suggestion
121 - correction
122
123 """
124
125 keys = ('title', 'url', 'content', 'host', 'engine', 'score', 'type')
126 csv.writerow(keys)
127
128 for res in rc.get_ordered_results():
129 row = res.as_dict()
130 row['host'] = row['parsed_url'].netloc
131 row['type'] = 'result'
132 csv.writerow([row.get(key, '') for key in keys])
133
134 for a in rc.answers:
135 row = a.as_dict()
136 row['host'] = row['parsed_url'].netloc
137 csv.writerow([row.get(key, '') for key in keys])
138
139 for a in rc.suggestions:
140 row = {'title': a, 'type': 'suggestion'}
141 csv.writerow([row.get(key, '') for key in keys])
142
143 for a in rc.corrections:
144 row = {'title': a, 'type': 'correction'}
145 csv.writerow([row.get(key, '') for key in keys])
146
147
148class JSONEncoder(json.JSONEncoder): # pylint: disable=missing-class-docstring
149 def default(self, o):
150 if isinstance(o, datetime):
151 return o.isoformat()
152 if isinstance(o, timedelta):
153 return o.total_seconds()
154 if isinstance(o, set):
155 return list(o)
156 return super().default(o)
157
158
159def get_json_response(sq: SearchQuery, rc: ResultContainer) -> str:
160 """Returns the JSON string of the results to a query (``application/json``)"""
161 data = {
162 'query': sq.query,
163 'number_of_results': rc.number_of_results,
164 'results': [_.as_dict() for _ in rc.get_ordered_results()],
165 'answers': [_.as_dict() for _ in rc.answers],
166 'corrections': list(rc.corrections),
167 'infoboxes': rc.infoboxes,
168 'suggestions': list(rc.suggestions),
169 'unresponsive_engines': get_translated_errors(rc.unresponsive_engines),
170 }
171 response = json.dumps(data, cls=JSONEncoder)
172 return response
173
174
175def get_themes(templates_path):
176 """Returns available themes list."""
177 return os.listdir(templates_path)
178
179
180def get_hash_for_file(file: pathlib.Path) -> str:
181 m = hashlib.sha1()
182 with file.open('rb') as f:
183 m.update(f.read())
184 return m.hexdigest()
185
186
187def get_static_files(static_path: str) -> Dict[str, str]:
188 static_files: Dict[str, str] = {}
189 static_path_path = pathlib.Path(static_path)
190
191 def walk(path: pathlib.Path):
192 for file in path.iterdir():
193 if file.name.startswith('.'):
194 # ignore hidden file
195 continue
196 if file.is_file():
197 static_files[str(file.relative_to(static_path_path))] = get_hash_for_file(file)
198 if file.is_dir() and file.name not in ('node_modules', 'src'):
199 # ignore "src" and "node_modules" directories
200 walk(file)
201
202 walk(static_path_path)
203 return static_files
204
205
206def get_result_templates(templates_path):
207 result_templates = set()
208 templates_path_length = len(templates_path) + 1
209 for directory, _, files in os.walk(templates_path):
210 if directory.endswith('result_templates'):
211 for filename in files:
212 f = os.path.join(directory[templates_path_length:], filename)
213 result_templates.add(f)
214 return result_templates
215
216
217def new_hmac(secret_key, url):
218 return hmac.new(secret_key.encode(), url, hashlib.sha256).hexdigest()
219
220
221def is_hmac_of(secret_key, value, hmac_to_check):
222 hmac_of_value = new_hmac(secret_key, value)
223 return len(hmac_of_value) == len(hmac_to_check) and hmac.compare_digest(hmac_of_value, hmac_to_check)
224
225
226def prettify_url(url, max_length=74):
227 if len(url) > max_length:
228 chunk_len = int(max_length / 2 + 1)
229 return '{0}[...]{1}'.format(url[:chunk_len], url[-chunk_len:])
230 return url
231
232
233def contains_cjko(s: str) -> bool:
234 """This function check whether or not a string contains Chinese, Japanese,
235 or Korean characters. It employs regex and uses the u escape sequence to
236 match any character in a set of Unicode ranges.
237
238 Args:
239 s (str): string to be checked.
240
241 Returns:
242 bool: True if the input s contains the characters and False otherwise.
243 """
244 unicode_ranges = (
245 '\u4e00-\u9fff' # Chinese characters
246 '\u3040-\u309f' # Japanese hiragana
247 '\u30a0-\u30ff' # Japanese katakana
248 '\u4e00-\u9faf' # Japanese kanji
249 '\uac00-\ud7af' # Korean hangul syllables
250 '\u1100-\u11ff' # Korean hangul jamo
251 )
252 return bool(re.search(fr'[{unicode_ranges}]', s))
253
254
255def regex_highlight_cjk(word: str) -> str:
256 """Generate the regex pattern to match for a given word according
257 to whether or not the word contains CJK characters or not.
258 If the word is and/or contains CJK character, the regex pattern
259 will match standalone word by taking into account the presence
260 of whitespace before and after it; if not, it will match any presence
261 of the word throughout the text, ignoring the whitespace.
262
263 Args:
264 word (str): the word to be matched with regex pattern.
265
266 Returns:
267 str: the regex pattern for the word.
268 """
269 rword = re.escape(word)
270 if contains_cjko(rword):
271 return fr'({rword})'
272 return fr'\b({rword})(?!\w)'
273
274
275def highlight_content(content, query):
276
277 if not content:
278 return None
279
280 # ignoring html contents
281 if content.find('<') != -1:
282 return content
283
284 querysplit = query.split()
285 queries = []
286 for qs in querysplit:
287 qs = qs.replace("'", "").replace('"', '').replace(" ", "")
288 if len(qs) > 0:
289 queries.extend(re.findall(regex_highlight_cjk(qs), content, flags=re.I | re.U))
290 if len(queries) > 0:
291 regex = re.compile("|".join(map(regex_highlight_cjk, queries)))
292 return regex.sub(lambda match: f'<span class="highlight">{match.group(0)}</span>'.replace('\\', r'\\'), content)
293 return content
294
295
296def searxng_l10n_timespan(dt: datetime) -> str: # pylint: disable=invalid-name
297 """Returns a human-readable and translated string indicating how long ago
298 a date was in the past / the time span of the date to the present.
299
300 On January 1st, midnight, the returned string only indicates how many years
301 ago the date was.
302 """
303 # TODO, check if timezone is calculated right # pylint: disable=fixme
304 d = dt.date()
305 t = dt.time()
306 if d.month == 1 and d.day == 1 and t.hour == 0 and t.minute == 0 and t.second == 0:
307 return str(d.year)
308 if dt.replace(tzinfo=None) >= datetime.now() - timedelta(days=1):
309 timedifference = datetime.now() - dt.replace(tzinfo=None)
310 minutes = int((timedifference.seconds / 60) % 60)
311 hours = int(timedifference.seconds / 60 / 60)
312 if hours == 0:
313 return gettext('{minutes} minute(s) ago').format(minutes=minutes)
314 return gettext('{hours} hour(s), {minutes} minute(s) ago').format(hours=hours, minutes=minutes)
315 return format_date(dt)
316
317
318NO_SUBGROUPING = 'without further subgrouping'
319
320
321def group_engines_in_tab(engines: Iterable[Engine]) -> List[Tuple[str, Iterable[Engine]]]:
322 """Groups an Iterable of engines by their first non tab category (first subgroup)"""
323
324 def get_subgroup(eng):
325 non_tab_categories = [c for c in eng.categories if c not in tabs + [DEFAULT_CATEGORY]]
326 return non_tab_categories[0] if len(non_tab_categories) > 0 else NO_SUBGROUPING
327
328 def group_sort_key(group):
329 return (group[0] == NO_SUBGROUPING, group[0].lower())
330
331 def engine_sort_key(engine):
332 return (engine.about.get('language', ''), engine.name)
333
334 tabs = list(settings['categories_as_tabs'].keys())
335 subgroups = itertools.groupby(sorted(engines, key=get_subgroup), get_subgroup)
336 sorted_groups = sorted(((name, list(engines)) for name, engines in subgroups), key=group_sort_key)
337
338 ret_val = []
339 for groupname, _engines in sorted_groups:
340 group_bang = '!' + groupname.replace(' ', '_') if groupname != NO_SUBGROUPING else ''
341 ret_val.append((groupname, group_bang, sorted(_engines, key=engine_sort_key)))
342
343 return ret_val
__init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds)
Definition webutils.py:88
::1337x
Definition 1337x.py:1
get_themes(templates_path)
Definition webutils.py:175
Dict[str, str] get_static_files(str static_path)
Definition webutils.py:187
List[Tuple[str, Iterable[Engine]]] group_engines_in_tab(Iterable[Engine] engines)
Definition webutils.py:321
prettify_url(url, max_length=74)
Definition webutils.py:226
str regex_highlight_cjk(str word)
Definition webutils.py:255
get_result_templates(templates_path)
Definition webutils.py:206
new_hmac(secret_key, url)
Definition webutils.py:217
is_hmac_of(secret_key, value, hmac_to_check)
Definition webutils.py:221
get_translated_errors(Iterable[UnresponsiveEngine] unresponsive_engines)
Definition webutils.py:69
str get_hash_for_file(pathlib.Path file)
Definition webutils.py:180
str searxng_l10n_timespan(datetime dt)
Definition webutils.py:296
highlight_content(content, query)
Definition webutils.py:275
str get_json_response(SearchQuery sq, ResultContainer rc)
Definition webutils.py:159
None write_csv_response(CSVWriter csv, ResultContainer rc)
Definition webutils.py:112
bool contains_cjko(str s)
Definition webutils.py:233