4from __future__
import annotations
15from datetime
import datetime, timedelta
16from typing
import Iterable, List, Tuple, Dict, TYPE_CHECKING
18from io
import StringIO
19from codecs
import getincrementalencoder
21from flask_babel
import gettext, format_date
23from searx
import logger, settings
32VALID_LANGUAGE_CODE = re.compile(
r'^[a-z]{2,3}(-[a-zA-Z]{2})?$')
34logger = logger.getChild(
'webutils')
36timeout_text = gettext(
'timeout')
37parsing_error_text = gettext(
'parsing error')
38http_protocol_error_text = gettext(
'HTTP protocol error')
39network_error_text = gettext(
'network error')
40ssl_cert_error_text = gettext(
"SSL error: certificate validation has failed")
41exception_classname_to_text = {
42 None: gettext(
'unexpected crash'),
43 'timeout': timeout_text,
44 'asyncio.TimeoutError': timeout_text,
45 'httpx.TimeoutException': timeout_text,
46 'httpx.ConnectTimeout': timeout_text,
47 'httpx.ReadTimeout': timeout_text,
48 'httpx.WriteTimeout': timeout_text,
49 'httpx.HTTPStatusError': gettext(
'HTTP error'),
50 'httpx.ConnectError': gettext(
"HTTP connection error"),
51 'httpx.RemoteProtocolError': http_protocol_error_text,
52 'httpx.LocalProtocolError': http_protocol_error_text,
53 'httpx.ProtocolError': http_protocol_error_text,
54 'httpx.ReadError': network_error_text,
55 'httpx.WriteError': network_error_text,
56 'httpx.ProxyError': gettext(
"proxy error"),
57 'searx.exceptions.SearxEngineCaptchaException': gettext(
"CAPTCHA"),
58 'searx.exceptions.SearxEngineTooManyRequestsException': gettext(
"too many requests"),
59 'searx.exceptions.SearxEngineAccessDeniedException': gettext(
"access denied"),
60 'searx.exceptions.SearxEngineAPIException': gettext(
"server API error"),
61 'searx.exceptions.SearxEngineXPathException': parsing_error_text,
62 'KeyError': parsing_error_text,
63 'json.decoder.JSONDecodeError': parsing_error_text,
64 'lxml.etree.ParserError': parsing_error_text,
65 'ssl.SSLCertVerificationError': ssl_cert_error_text,
66 'ssl.CertificateError': ssl_cert_error_text,
71 translated_errors = []
73 for unresponsive_engine
in unresponsive_engines:
74 error_user_text = exception_classname_to_text.get(unresponsive_engine.error_type)
75 if not error_user_text:
76 error_user_text = exception_classname_to_text[
None]
77 error_msg = gettext(error_user_text)
78 if unresponsive_engine.suspended:
79 error_msg = gettext(
'Suspended') +
': ' + error_msg
80 translated_errors.append((unresponsive_engine.engine, error_msg))
82 return sorted(translated_errors, key=
lambda e: e[0])
86 """A CSV writer which will write rows to CSV file "f", which is encoded in
87 the given encoding."""
89 def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
94 self.
encoder = getincrementalencoder(encoding)()
99 data = self.
queue.getvalue()
100 data = data.strip(
'\x00')
102 data = self.
encoder.encode(data)
104 self.
stream.write(data.decode())
106 self.
queue.truncate(0)
114 """Write rows of the results to a query (``application/csv``) into a CSV
115 table (:py:obj:`CSVWriter`). First line in the table contain the column
116 names. The column "type" specifies the type, the following types are
117 included in the table:
126 results = rc.get_ordered_results()
127 keys = (
'title',
'url',
'content',
'host',
'engine',
'score',
'type')
131 row[
'host'] = row[
'parsed_url'].netloc
132 row[
'type'] =
'result'
133 csv.writerow([row.get(key,
'')
for key
in keys])
136 row = {
'title': a,
'type':
'answer'}
137 csv.writerow([row.get(key,
'')
for key
in keys])
139 for a
in rc.suggestions:
140 row = {
'title': a,
'type':
'suggestion'}
141 csv.writerow([row.get(key,
'')
for key
in keys])
143 for a
in rc.corrections:
144 row = {
'title': a,
'type':
'correction'}
145 csv.writerow([row.get(key,
'')
for key
in keys])
150 if isinstance(o, datetime):
152 if isinstance(o, timedelta):
153 return o.total_seconds()
154 if isinstance(o, set):
160 """Returns the JSON string of the results to a query (``application/json``)"""
161 results = rc.number_of_results
164 'number_of_results': results,
165 'results': rc.get_ordered_results(),
166 'answers': list(rc.answers),
167 'corrections': list(rc.corrections),
168 'infoboxes': rc.infoboxes,
169 'suggestions': list(rc.suggestions),
172 response = json.dumps(x, cls=JSONEncoder)
177 """Returns available themes list."""
178 return os.listdir(templates_path)
183 with file.open(
'rb')
as f:
189 static_files: Dict[str, str] = {}
190 static_path_path = pathlib.Path(static_path)
192 def walk(path: pathlib.Path):
193 for file
in path.iterdir():
194 if file.name.startswith(
'.'):
199 if file.is_dir()
and file.name
not in (
'node_modules',
'src'):
203 walk(static_path_path)
208 result_templates = set()
209 templates_path_length = len(templates_path) + 1
210 for directory, _, files
in os.walk(templates_path):
211 if directory.endswith(
'result_templates'):
212 for filename
in files:
213 f = os.path.join(directory[templates_path_length:], filename)
214 result_templates.add(f)
215 return result_templates
219 return hmac.new(secret_key.encode(), url, hashlib.sha256).hexdigest()
223 hmac_of_value = new_hmac(secret_key, value)
224 return len(hmac_of_value) == len(hmac_to_check)
and hmac.compare_digest(hmac_of_value, hmac_to_check)
228 if len(url) > max_length:
229 chunk_len = int(max_length / 2 + 1)
230 return '{0}[...]{1}'.
format(url[:chunk_len], url[-chunk_len:])
235 """This function check whether or not a string contains Chinese, Japanese,
236 or Korean characters. It employs regex and uses the u escape sequence to
237 match any character in a set of Unicode ranges.
240 s (str): string to be checked.
243 bool: True if the input s contains the characters and False otherwise.
253 return bool(re.search(fr
'[{unicode_ranges}]', s))
257 """Generate the regex pattern to match for a given word according
258 to whether or not the word contains CJK characters or not.
259 If the word is and/or contains CJK character, the regex pattern
260 will match standalone word by taking into account the presence
261 of whitespace before and after it; if not, it will match any presence
262 of the word throughout the text, ignoring the whitespace.
265 word (str): the word to be matched with regex pattern.
268 str: the regex pattern for the word.
270 rword = re.escape(word)
273 return fr
'\b({rword})(?!\w)'
282 if content.find(
'<') != -1:
285 querysplit = query.split()
287 for qs
in querysplit:
288 qs = qs.replace(
"'",
"").replace(
'"',
'').replace(
" ",
"")
292 regex = re.compile(
"|".join(map(regex_highlight_cjk, queries)))
293 return regex.sub(
lambda match: f
'<span class="highlight">{match.group(0)}</span>'.replace(
'\\',
r'\\'), content)
298 """Returns a human-readable and translated string indicating how long ago
299 a date was in the past / the time span of the date to the present.
301 On January 1st, midnight, the returned string only indicates how many years
307 if d.month == 1
and d.day == 1
and t.hour == 0
and t.minute == 0
and t.second == 0:
309 if dt.replace(tzinfo=
None) >= datetime.now() - timedelta(days=1):
310 timedifference = datetime.now() - dt.replace(tzinfo=
None)
311 minutes = int((timedifference.seconds / 60) % 60)
312 hours = int(timedifference.seconds / 60 / 60)
314 return gettext(
'{minutes} minute(s) ago').
format(minutes=minutes)
315 return gettext(
'{hours} hour(s), {minutes} minute(s) ago').
format(hours=hours, minutes=minutes)
316 return format_date(dt)
320 """Check if the application was started using "flask run" command line
322 Inspect the callstack.
323 See https://github.com/pallets/flask/blob/master/src/flask/__main__.py
326 bool: True if the application was started using "flask run".
328 frames = inspect.stack()
331 return frames[-2].filename.endswith(
'flask/cli.py')
334NO_SUBGROUPING =
'without further subgrouping'
338 """Groups an Iterable of engines by their first non tab category (first subgroup)"""
340 def get_subgroup(eng):
341 non_tab_categories = [c
for c
in eng.categories
if c
not in tabs + [DEFAULT_CATEGORY]]
342 return non_tab_categories[0]
if len(non_tab_categories) > 0
else NO_SUBGROUPING
344 def group_sort_key(group):
345 return (group[0] == NO_SUBGROUPING, group[0].lower())
347 def engine_sort_key(engine):
348 return (engine.about.get(
'language',
''), engine.name)
350 tabs = list(settings[
'categories_as_tabs'].keys())
351 subgroups = itertools.groupby(sorted(engines, key=get_subgroup), get_subgroup)
352 sorted_groups = sorted(((name, list(engines))
for name, engines
in subgroups), key=group_sort_key)
355 for groupname, _engines
in sorted_groups:
356 group_bang =
'!' + groupname.replace(
' ',
'_')
if groupname != NO_SUBGROUPING
else ''
357 ret_val.append((groupname, group_bang, sorted(_engines, key=engine_sort_key)))
__init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds)
get_themes(templates_path)
Dict[str, str] get_static_files(str static_path)
List[Tuple[str, Iterable[Engine]]] group_engines_in_tab(Iterable[Engine] engines)
prettify_url(url, max_length=74)
str regex_highlight_cjk(str word)
get_result_templates(templates_path)
new_hmac(secret_key, url)
is_hmac_of(secret_key, value, hmac_to_check)
get_translated_errors(Iterable[UnresponsiveEngine] unresponsive_engines)
str get_hash_for_file(pathlib.Path file)
str searxng_l10n_timespan(datetime dt)
highlight_content(content, query)
str get_json_response(SearchQuery sq, ResultContainer rc)
None write_csv_response(CSVWriter csv, ResultContainer rc)
bool contains_cjko(str s)