.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
impl.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2# pylint: disable=missing-module-docstring, invalid-name
3
4import gc
5import typing
6import types
7import functools
8import itertools
9from time import time
10from timeit import default_timer
11from urllib.parse import urlparse
12
13import re
14import httpx
15
16from searx import network, logger
17from searx.utils import gen_useragent, detect_language
18from searx.results import ResultContainer
19from searx.search.models import SearchQuery, EngineRef
20from searx.search.processors import EngineProcessor
21from searx.metrics import counter_inc
22
23
24logger = logger.getChild('searx.search.checker')
25
26HTML_TAGS = [
27 # fmt: off
28 'embed', 'iframe', 'object', 'param', 'picture', 'source', 'svg', 'math', 'canvas', 'noscript', 'script',
29 'del', 'ins', 'area', 'audio', 'img', 'map', 'track', 'video', 'a', 'abbr', 'b', 'bdi', 'bdo', 'br', 'cite',
30 'code', 'data', 'dfn', 'em', 'i', 'kdb', 'mark', 'q', 'rb', 'rp', 'rt', 'rtc', 'ruby', 's', 'samp', 'small',
31 'span', 'strong', 'sub', 'sup', 'time', 'u', 'var', 'wbr', 'style', 'blockquote', 'dd', 'div', 'dl', 'dt',
32 'figcaption', 'figure', 'hr', 'li', 'ol', 'p', 'pre', 'ul', 'button', 'datalist', 'fieldset', 'form', 'input',
33 'label', 'legend', 'meter', 'optgroup', 'option', 'output', 'progress', 'select', 'textarea', 'applet',
34 'frame', 'frameset'
35 # fmt: on
36]
37
38
40 rep = ['<' + tag + r'[^>]*>' for tag in HTML_TAGS]
41 rep += ['</' + tag + '>' for tag in HTML_TAGS]
42 pattern = re.compile('|'.join(rep))
43
44 def f(text):
45 return pattern.search(text.lower()) is None
46
47 return f
48
49
50_check_no_html = get_check_no_html()
51
52
53def _is_url(url):
54 try:
55 result = urlparse(url)
56 except ValueError:
57 return False
58 if result.scheme not in ('http', 'https'):
59 return False
60 return True
61
62
63@functools.lru_cache(maxsize=8192)
64def _download_and_check_if_image(image_url: str) -> bool:
65 """Download an URL and check if the Content-Type starts with "image/"
66 This function should not be called directly: use _is_url_image
67 otherwise the cache of functools.lru_cache contains data: URL which might be huge.
68 """
69 retry = 2
70
71 while retry > 0:
72 a = time()
73 try:
74 # use "image_proxy" (avoid HTTP/2)
75 network.set_context_network_name('image_proxy')
76 r, stream = network.stream(
77 'GET',
78 image_url,
79 timeout=10.0,
80 allow_redirects=True,
81 headers={
82 'User-Agent': gen_useragent(),
83 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
84 'Accept-Language': 'en-US;q=0.5,en;q=0.3',
85 'Accept-Encoding': 'gzip, deflate, br',
86 'DNT': '1',
87 'Connection': 'keep-alive',
88 'Upgrade-Insecure-Requests': '1',
89 'Sec-GPC': '1',
90 'Cache-Control': 'max-age=0',
91 },
92 )
93 r.close()
94 if r.status_code == 200:
95 is_image = r.headers.get('content-type', '').startswith('image/')
96 else:
97 is_image = False
98 del r
99 del stream
100 return is_image
101 except httpx.TimeoutException:
102 logger.error('Timeout for %s: %i', image_url, int(time() - a))
103 retry -= 1
104 except httpx.HTTPError:
105 logger.exception('Exception for %s', image_url)
106 return False
107 return False
108
109
110def _is_url_image(image_url) -> bool:
111 """Normalize image_url"""
112 if not isinstance(image_url, str):
113 return False
114
115 if image_url.startswith('//'):
116 image_url = 'https:' + image_url
117
118 if image_url.startswith('data:'):
119 return image_url.startswith('data:image/')
120
121 if not _is_url(image_url):
122 return False
123
124 return _download_and_check_if_image(image_url)
125
126
127def _search_query_to_dict(search_query: SearchQuery) -> typing.Dict[str, typing.Any]:
128 return {
129 'query': search_query.query,
130 'lang': search_query.lang,
131 'pageno': search_query.pageno,
132 'safesearch': search_query.safesearch,
133 'time_range': search_query.time_range,
134 }
135
136
138 sq1: SearchQuery, sq2: SearchQuery
139) -> typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]]:
140 param1 = _search_query_to_dict(sq1)
141 param2 = _search_query_to_dict(sq2)
142 common = {}
143 diff = {}
144 for k, value1 in param1.items():
145 value2 = param2[k]
146 if value1 == value2:
147 common[k] = value1
148 else:
149 diff[k] = (value1, value2)
150 return (common, diff)
151
152
153class TestResults: # pylint: disable=missing-class-docstring
154
155 __slots__ = 'errors', 'logs', 'languages'
156
157 def __init__(self):
158 self.errors: typing.Dict[str, typing.List[str]] = {}
159 self.logs: typing.Dict[str, typing.List[typing.Any]] = {}
160 self.languages: typing.Set[str] = set()
161
162 def add_error(self, test, message, *args):
163 # message to self.errors
164 errors_for_test = self.errors.setdefault(test, [])
165 if message not in errors_for_test:
166 errors_for_test.append(message)
167 # (message, *args) to self.logs
168 logs_for_test = self.logs.setdefault(test, [])
169 if (message, *args) not in logs_for_test:
170 logs_for_test.append((message, *args))
171
172 def add_language(self, language):
173 self.languages.add(language)
174
175 @property
176 def successful(self):
177 return len(self.errors) == 0
178
179 def __iter__(self):
180 for test_name, errors in self.errors.items():
181 for error in sorted(errors):
182 yield (test_name, error)
183
184
185class ResultContainerTests: # pylint: disable=missing-class-docstring
186
187 __slots__ = 'test_name', 'search_query', 'result_container', 'languages', 'stop_test', 'test_results'
188
190 self, test_results: TestResults, test_name: str, search_query: SearchQuery, result_container: ResultContainer
191 ):
192 self.test_name = test_name
193 self.search_query = search_query
194 self.result_container = result_container
195 self.languages: typing.Set[str] = set()
196 self.test_results = test_results
197 self.stop_test = False
198
199 @property
200 def result_urls(self):
201 results = self.result_container.get_ordered_results()
202 return [result['url'] for result in results if 'url' in result]
203
204 def _record_error(self, message: str, *args) -> None:
206 sqstr = ' '.join(['{}={!r}'.format(k, v) for k, v in sq.items()])
207 self.test_results.add_error(self.test_name, message, *args, '(' + sqstr + ')')
208
209 def _add_language(self, text: str) -> typing.Optional[str]:
210 langStr = detect_language(text)
211 if langStr:
212 self.languages.add(langStr)
213 self.test_results.add_language(langStr)
214
215 def _check_result(self, result):
216 if not _check_no_html(result.get('title', '')):
217 self._record_error('HTML in title', repr(result.get('title', '')))
218 if not _check_no_html(result.get('content', '')):
219 self._record_error('HTML in content', repr(result.get('content', '')))
220 if result.get('url') is None:
221 self._record_error('url is None')
222
223 self._add_language(result.get('title', ''))
224 self._add_language(result.get('content', ''))
225
226 template = result.get('template', 'default.html')
227 if template == 'default.html':
228 return
229 if template == 'code.html':
230 return
231 if template == 'torrent.html':
232 return
233 if template == 'map.html':
234 return
235 if template == 'images.html':
236 thumbnail_src = result.get('thumbnail_src')
237 if thumbnail_src is not None:
238 if not _is_url_image(thumbnail_src):
239 self._record_error('thumbnail_src URL is invalid', thumbnail_src)
240 elif not _is_url_image(result.get('img_src')):
241 self._record_error('img_src URL is invalid', result.get('img_src'))
242 if template == 'videos.html' and not _is_url_image(result.get('thumbnail')):
243 self._record_error('thumbnail URL is invalid', result.get('img_src'))
244
245 def _check_results(self, results: list):
246 for result in results:
247 self._check_result(result)
248
249 def _check_answers(self, answers):
250 for answer in answers:
251 if not _check_no_html(answer):
252 self._record_error('HTML in answer', answer)
253
254 def _check_infoboxes(self, infoboxes):
255 for infobox in infoboxes:
256 if not _check_no_html(infobox.get('content', '')):
257 self._record_error('HTML in infobox content', infobox.get('content', ''))
258 self._add_language(infobox.get('content', ''))
259 for attribute in infobox.get('attributes', {}):
260 if not _check_no_html(attribute.get('value', '')):
261 self._record_error('HTML in infobox attribute value', attribute.get('value', ''))
262
263 def check_basic(self):
264 if len(self.result_container.unresponsive_engines) > 0:
265 for message in self.result_container.unresponsive_engines:
266 self._record_error(message[1] + ' ' + (message[2] or ''))
267 self.stop_test = True
268 return
269
270 results = self.result_container.get_ordered_results()
271 if len(results) > 0:
272 self._check_results(results)
273
274 if len(self.result_container.answers) > 0:
275 self._check_answers(self.result_container.answers)
276
277 if len(self.result_container.infoboxes) > 0:
278 self._check_infoboxes(self.result_container.infoboxes)
279
280 def has_infobox(self):
281 """Check the ResultContainer has at least one infobox"""
282 if len(self.result_container.infoboxes) == 0:
283 self._record_error('No infobox')
284
285 def has_answer(self):
286 """Check the ResultContainer has at least one answer"""
287 if len(self.result_container.answers) == 0:
288 self._record_error('No answer')
289
290 def has_language(self, lang):
291 """Check at least one title or content of the results is written in the `lang`.
292
293 Detected using pycld3, may be not accurate"""
294 if lang not in self.languages:
295 self._record_error(lang + ' not found')
296
297 def not_empty(self):
298 """Check the ResultContainer has at least one answer or infobox or result"""
299 result_types = set()
300 results = self.result_container.get_ordered_results()
301 if len(results) > 0:
302 result_types.add('results')
303
304 if len(self.result_container.answers) > 0:
305 result_types.add('answers')
306
307 if len(self.result_container.infoboxes) > 0:
308 result_types.add('infoboxes')
309
310 if len(result_types) == 0:
311 self._record_error('No result')
312
313 def one_title_contains(self, title: str):
314 """Check one of the title contains `title` (case insensitive comparison)"""
315 title = title.lower()
316 for result in self.result_container.get_ordered_results():
317 if title in result['title'].lower():
318 return
319 self._record_error(('{!r} not found in the title'.format(title)))
320
321
322class CheckerTests: # pylint: disable=missing-class-docstring, too-few-public-methods
323
324 __slots__ = 'test_results', 'test_name', 'result_container_tests_list'
325
326 def __init__(
327 self, test_results: TestResults, test_name: str, result_container_tests_list: typing.List[ResultContainerTests]
328 ):
329 self.test_results = test_results
330 self.test_name = test_name
331 self.result_container_tests_list = result_container_tests_list
332
333 def unique_results(self):
334 """Check the results of each ResultContainer is unique"""
335 urls_list = [rct.result_urls for rct in self.result_container_tests_list]
336 if len(urls_list[0]) > 0:
337 # results on the first page
338 for i, urls_i in enumerate(urls_list):
339 for j, urls_j in enumerate(urls_list):
340 if i < j and urls_i == urls_j:
341 common, diff = _search_query_diff(
342 self.result_container_tests_list[i].search_query,
343 self.result_container_tests_list[j].search_query,
344 )
345 common_str = ' '.join(['{}={!r}'.format(k, v) for k, v in common.items()])
346 diff1_str = ', '.join(['{}={!r}'.format(k, v1) for (k, (v1, v2)) in diff.items()])
347 diff2_str = ', '.join(['{}={!r}'.format(k, v2) for (k, (v1, v2)) in diff.items()])
348 self.test_results.add_error(
349 self.test_name,
350 'results are identical for {} and {} ({})'.format(diff1_str, diff2_str, common_str),
351 )
352
353
354class Checker: # pylint: disable=missing-class-docstring
355
356 __slots__ = 'processor', 'tests', 'test_results'
357
358 def __init__(self, processor: EngineProcessor):
359 self.processor = processor
360 self.tests = self.processor.get_tests()
361 self.test_results = TestResults()
362
363 @property
364 def engineref_list(self):
365 engine_name = self.processor.engine_name
366 engine_category = self.processor.engine.categories[0]
367 return [EngineRef(engine_name, engine_category)]
368
369 @staticmethod
370 def search_query_matrix_iterator(engineref_list, matrix):
371 p = []
372 for name, values in matrix.items():
373 if isinstance(values, (tuple, list)):
374 l = [(name, value) for value in values]
375 else:
376 l = [(name, values)]
377 p.append(l)
378
379 for kwargs in itertools.product(*p):
380 kwargs = dict(kwargs)
381 query = kwargs['query']
382 params = dict(kwargs)
383 del params['query']
384 yield SearchQuery(query, engineref_list, **params)
385
386 def call_test(self, obj, test_description):
387 if isinstance(test_description, (tuple, list)):
388 method, args = test_description[0], test_description[1:]
389 else:
390 method = test_description
391 args = ()
392 if isinstance(method, str) and hasattr(obj, method):
393 getattr(obj, method)(*args)
394 elif isinstance(method, types.FunctionType):
395 method(*args)
396 else:
397 self.test_results.add_error(
398 obj.test_name,
399 'method {!r} ({}) not found for {}'.format(method, method.__class__.__name__, obj.__class__.__name__),
400 )
401
402 def call_tests(self, obj, test_descriptions):
403 for test_description in test_descriptions:
404 self.call_test(obj, test_description)
405
406 def search(self, search_query: SearchQuery) -> ResultContainer:
407 result_container = ResultContainer()
408 engineref_category = search_query.engineref_list[0].category
409 params = self.processor.get_params(search_query, engineref_category)
410 if params is not None:
411 counter_inc('engine', search_query.engineref_list[0].name, 'search', 'count', 'sent')
412 self.processor.search(search_query.query, params, result_container, default_timer(), 5)
413 return result_container
414
415 def get_result_container_tests(self, test_name: str, search_query: SearchQuery) -> ResultContainerTests:
416 result_container = self.search(search_query)
417 result_container_check = ResultContainerTests(self.test_results, test_name, search_query, result_container)
418 result_container_check.check_basic()
419 return result_container_check
420
421 def run_test(self, test_name):
422 test_parameters = self.tests[test_name]
423 search_query_list = list(Checker.search_query_matrix_iterator(self.engineref_list, test_parameters['matrix']))
424 rct_list = [self.get_result_container_tests(test_name, search_query) for search_query in search_query_list]
425 stop_test = False
426 if 'result_container' in test_parameters:
427 for rct in rct_list:
428 stop_test = stop_test or rct.stop_test
429 if not rct.stop_test:
430 self.call_tests(rct, test_parameters['result_container'])
431 if not stop_test:
432 if 'test' in test_parameters:
433 checker_tests = CheckerTests(self.test_results, test_name, rct_list)
434 self.call_tests(checker_tests, test_parameters['test'])
435
436 def run(self):
437 for test_name in self.tests:
438 self.run_test(test_name)
439 # clear cache
440 _download_and_check_if_image.cache_clear()
441 # force a garbage collector
442 gc.collect()
None _record_error(self, str message, *args)
Definition impl.py:204
typing.Optional[str] _add_language(self, str text)
Definition impl.py:209
__init__(self, TestResults test_results, str test_name, SearchQuery search_query, ResultContainer result_container)
Definition impl.py:191
add_error(self, test, message, *args)
Definition impl.py:162
typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]] _search_query_diff(SearchQuery sq1, SearchQuery sq2)
Definition impl.py:139
typing.Dict[str, typing.Any] _search_query_to_dict(SearchQuery search_query)
Definition impl.py:127
bool _is_url_image(image_url)
Definition impl.py:110
bool _download_and_check_if_image(str image_url)
Definition impl.py:64