2"""Utility functions for the engines
6from __future__
import annotations
14from typing
import Optional, Union, Any, Set, List, Dict, MutableMapping, Tuple, Callable
15from numbers
import Number
16from os.path
import splitext, join
17from random
import choice
18from html.parser
import HTMLParser
19from html
import escape
20from urllib.parse
import urljoin, urlparse, parse_qs, urlencode
21from markdown_it
import MarkdownIt
24from lxml.etree
import ElementBase, XPath, XPathError, XPathSyntaxError
26from searx
import settings
30from searx.exceptions import SearxXPathSyntaxException, SearxEngineXPathException
31from searx
import logger
34logger = logger.getChild(
'utils')
36XPathSpecType = Union[str, XPath]
38_BLOCKED_TAGS = (
'script',
'style')
40_ECMA_UNESCAPE4_RE = re.compile(
r'%u([0-9a-fA-F]{4})', re.UNICODE)
41_ECMA_UNESCAPE2_RE = re.compile(
r'%([0-9a-fA-F]{2})', re.UNICODE)
43_JS_QUOTE_KEYS_RE = re.compile(
r'([\{\s,])(\w+)(:)')
44_JS_VOID_RE = re.compile(
r'void\s+[0-9]+|void\s*\([0-9]+\)')
45_JS_DECIMAL_RE = re.compile(
r":\s*\.")
47_XPATH_CACHE: Dict[str, XPath] = {}
48_LANG_TO_LC_CACHE: Dict[str, Dict[str, str]] = {}
50_FASTTEXT_MODEL: Optional[
"fasttext.FastText._FastText"] =
None
51"""fasttext model to predict language of a search term"""
53SEARCH_LANGUAGE_CODES = frozenset([searxng_locale[0].split(
'-')[0]
for searxng_locale
in sxng_locales])
54"""Languages supported by most searxng engines (:py:obj:`searx.sxng_locales.sxng_locales`)."""
58 """Internal class for this module, do not create instance of this class.
59 Replace the None value, allow explicitly pass None as a function argument"""
62_NOTSET = _NotSetClass()
66 """Return the searx User Agent"""
67 return 'searx/{searx_version} {suffix}'.
format(
68 searx_version=VERSION_TAG, suffix=settings[
'outgoing'][
'useragent_suffix']
73 """Return a random browser User Agent
75 See searx/data/useragents.json
77 return USER_AGENTS[
'ua'].
format(os=os_string
or choice(USER_AGENTS[
'os']), version=choice(USER_AGENTS[
'versions']))
81 """Internal exception raised when the HTML is invalid"""
84class _HTMLTextExtractor(HTMLParser):
85 """Internal class to extract text from HTML"""
88 HTMLParser.__init__(self)
101 if tag != self.
tags[-1]:
107 return not self.
tags or self.
tags[-1]
not in _BLOCKED_TAGS
117 if name[0]
in (
'x',
'X'):
118 codepoint = int(name[1:], 16)
120 codepoint = int(name)
121 self.
result.append(chr(codepoint))
131 return ''.join(self.
result).strip()
136 raise AssertionError(message)
140 """Extract text from a HTML string
143 * html_str (str): string HTML
146 * str: extracted text
149 >>> html_to_text('Example <span id="42">#2</span>')
152 >>> html_to_text('<style>.span { color: red; }</style><span>Example</span>')
155 >>> html_to_text(r'regexp: (?<![a-zA-Z]')
156 'regexp: (?<![a-zA-Z]'
158 html_str = html_str.replace(
'\n',
' ').replace(
'\r',
' ')
159 html_str =
' '.join(html_str.split())
163 except AssertionError:
165 s.feed(escape(html_str, quote=
True))
166 except _HTMLTextExtractorException:
167 logger.debug(
"HTMLTextExtractor: invalid HTML\n%s", html_str)
172 """Extract text from a Markdown string
175 * markdown_str (str): string Markdown
178 * str: extracted text
181 >>> markdown_to_text('[example](https://example.com)')
184 >>> markdown_to_text('## Headline')
189 MarkdownIt(
"commonmark", {
"typographer":
True}).enable([
"replacements",
"smartquotes"]).render(markdown_str)
191 return html_to_text(html_str)
194def extract_text(xpath_results, allow_none: bool =
False) -> Optional[str]:
195 """Extract text from a lxml result
197 * if xpath_results is list, extract the text from each result and concat the list
198 * if xpath_results is a xml element, extract all the text node from it
199 ( text_content() method from lxml )
200 * if xpath_results is a string element, then it's already done
202 if isinstance(xpath_results, list):
205 for e
in xpath_results:
206 result = result + (extract_text(e)
or '')
207 return result.strip()
208 if isinstance(xpath_results, ElementBase):
210 text: str = html.tostring(xpath_results, encoding=
'unicode', method=
'text', with_tail=
False)
211 text = text.strip().replace(
'\n',
' ')
212 return ' '.join(text.split())
213 if isinstance(xpath_results, (str, Number, bool)):
214 return str(xpath_results)
215 if xpath_results
is None and allow_none:
217 if xpath_results
is None and not allow_none:
218 raise ValueError(
'extract_text(None, allow_none=False)')
219 raise ValueError(
'unsupported type')
223 """Normalize URL: add protocol, join URL with base_url, add trailing slash if there is no path
226 * url (str): Relative URL
227 * base_url (str): Base URL, it must be an absolute URL.
230 >>> normalize_url('https://example.com', 'http://example.com/')
231 'https://example.com/'
232 >>> normalize_url('//example.com', 'http://example.com/')
233 'http://example.com/'
234 >>> normalize_url('//example.com', 'https://example.com/')
235 'https://example.com/'
236 >>> normalize_url('/path?a=1', 'https://example.com')
237 'https://example.com/path?a=1'
238 >>> normalize_url('', 'https://example.com')
239 'https://example.com/'
240 >>> normalize_url('/test', '/path')
244 * lxml.etree.ParserError
247 * str: normalized URL
249 if url.startswith(
'//'):
251 parsed_search_url = urlparse(base_url)
252 url =
'{0}:{1}'.
format(parsed_search_url.scheme
or 'http', url)
253 elif url.startswith(
'/'):
255 url = urljoin(base_url, url)
259 url = urljoin(base_url, url)
261 parsed_url = urlparse(url)
264 if not parsed_url.netloc:
265 raise ValueError(
'Cannot parse url')
266 if not parsed_url.path:
273 """Extract and normalize URL from lxml Element
276 * xpath_results (Union[List[html.HtmlElement], html.HtmlElement]): lxml Element(s)
277 * base_url (str): Base URL
280 >>> def f(s, search_url):
281 >>> return searx.utils.extract_url(html.fromstring(s), search_url)
282 >>> f('<span id="42">https://example.com</span>', 'http://example.com/')
283 'https://example.com/'
284 >>> f('https://example.com', 'http://example.com/')
285 'https://example.com/'
286 >>> f('//example.com', 'http://example.com/')
287 'http://example.com/'
288 >>> f('//example.com', 'https://example.com/')
289 'https://example.com/'
290 >>> f('/path?a=1', 'https://example.com')
291 'https://example.com/path?a=1'
292 >>> f('', 'https://example.com')
293 raise lxml.etree.ParserError
294 >>> searx.utils.extract_url([], 'https://example.com')
299 * lxml.etree.ParserError
302 * str: normalized URL
304 if xpath_results == []:
305 raise ValueError(
'Empty url resultset')
307 url = extract_text(xpath_results)
310 raise ValueError(
'URL not found')
313def dict_subset(dictionary: MutableMapping, properties: Set[str]) -> Dict:
314 """Extract a subset of a dict
317 >>> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'C'])
319 >>> >> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'D'])
322 return {k: dictionary[k]
for k
in properties
if k
in dictionary}
326 """Determine the *human readable* value of bytes on 1024 base (1KB=1024B)."""
327 s = [
'B ',
'KB',
'MB',
'GB',
'TB']
331 while size > 1024
and p < x:
334 return "%.*f %s" % (precision, size, s[p])
338 """Determine the *human readable* value of a decimal number."""
339 s = [
'',
'K',
'M',
'B',
'T']
343 while size > 1000
and p < x:
346 return "%.*f%s" % (precision, size, s[p])
350 """Convert number_str to int or 0 if number_str is not a number."""
351 if number_str.isdigit():
352 return int(number_str)
356def extr(txt: str, begin: str, end: str, default: str =
""):
357 """Extract the string between ``begin`` and ``end`` from ``txt``
359 :param txt: String to search in
360 :param begin: First string to be searched for
361 :param end: Second string to be searched for after ``begin``
362 :param default: Default value if one of ``begin`` or ``end`` is not
363 found. Defaults to an empty string.
364 :return: The string between the two search-strings ``begin`` and ``end``.
365 If at least one of ``begin`` or ``end`` is not found, the value of
366 ``default`` is returned.
369 >>> extr("abcde", "a", "e")
371 >>> extr("abcde", "a", "z", deafult="nothing")
379 first = txt.index(begin) + len(begin)
380 return txt[first : txt.index(end, first)]
386 """Convert num to int or 0. num can be either a str or a list.
387 If num is a list, the first element is converted to int (or return 0 if the list is empty).
388 If num is a str, see convert_str_to_int
390 if isinstance(num, list):
398 """Return language code and name if lang describe a language.
401 >>> is_valid_lang('zz')
403 >>> is_valid_lang('uk')
404 (True, 'uk', 'ukrainian')
405 >>> is_valid_lang(b'uk')
406 (True, 'uk', 'ukrainian')
407 >>> is_valid_lang('en')
408 (True, 'en', 'english')
409 >>> searx.utils.is_valid_lang('Español')
410 (True, 'es', 'spanish')
411 >>> searx.utils.is_valid_lang('Spanish')
412 (True, 'es', 'spanish')
414 if isinstance(lang, bytes):
416 is_abbr = len(lang) == 2
419 for l
in sxng_locales:
421 return (
True, l[0][:2], l[3].lower())
423 for l
in sxng_locales:
424 if l[1].lower() == lang
or l[3].lower() == lang:
425 return (
True, l[0][:2], l[3].lower())
429def load_module(filename: str, module_dir: str) -> types.ModuleType:
430 modname = splitext(filename)[0]
431 modpath = join(module_dir, filename)
433 spec = importlib.util.spec_from_file_location(modname, modpath)
435 raise ValueError(f
"Error loading '{modpath}' module")
436 module = importlib.util.module_from_spec(spec)
438 raise ValueError(f
"Error loading '{modpath}' module")
439 spec.loader.exec_module(module)
444 """Convert obj to its string representation."""
445 if isinstance(obj, str):
447 if hasattr(obj,
'__str__'):
453 """Python implementation of the unescape javascript function
455 https://www.ecma-international.org/ecma-262/6.0/#sec-unescape-string
456 https://developer.mozilla.org/fr/docs/Web/JavaScript/Reference/Objets_globaux/unescape
459 >>> ecma_unescape('%u5409')
461 >>> ecma_unescape('%20')
463 >>> ecma_unescape('%F3')
467 string = _ECMA_UNESCAPE4_RE.sub(
lambda e: chr(int(e.group(1), 16)), string)
469 string = _ECMA_UNESCAPE2_RE.sub(
lambda e: chr(int(e.group(1), 16)), string)
474 rep = {re.escape(k): v
for k, v
in replaces.items()}
475 pattern = re.compile(
"|".join(rep.keys()))
478 return pattern.sub(
lambda m: rep[re.escape(m.group(0))], text)
484 """Return engine configuration from settings.yml of a given engine name"""
486 if 'engines' not in settings:
489 for engine
in settings[
'engines']:
490 if 'name' not in engine:
492 if name == engine[
'name']:
499 """Return cached compiled XPath
501 There is no thread lock.
502 Worst case scenario, xpath_str is compiled more than one time.
505 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
508 * result (bool, float, list, str): Results.
511 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
512 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
514 if isinstance(xpath_spec, str):
515 result = _XPATH_CACHE.get(xpath_spec,
None)
518 result = XPath(xpath_spec)
519 except XPathSyntaxError
as e:
521 _XPATH_CACHE[xpath_spec] = result
524 if isinstance(xpath_spec, XPath):
527 raise TypeError(
'xpath_spec must be either a str or a lxml.etree.XPath')
530def eval_xpath(element: ElementBase, xpath_spec: XPathSpecType):
531 """Equivalent of element.xpath(xpath_str) but compile xpath_str once for all.
532 See https://lxml.de/xpathxslt.html#xpath-return-values
535 * element (ElementBase): [description]
536 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
539 * result (bool, float, list, str): Results.
542 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
543 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
544 * SearxEngineXPathException: Raise when the XPath can't be evaluated.
548 return xpath(element)
549 except XPathError
as e:
550 arg =
' '.join([str(i)
for i
in e.args])
554def eval_xpath_list(element: ElementBase, xpath_spec: XPathSpecType, min_len: Optional[int] =
None):
555 """Same as eval_xpath, check if the result is a list
558 * element (ElementBase): [description]
559 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
560 * min_len (int, optional): [description]. Defaults to None.
563 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
564 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
565 * SearxEngineXPathException: raise if the result is not a list
568 * result (bool, float, list, str): Results.
570 result = eval_xpath(element, xpath_spec)
571 if not isinstance(result, list):
573 if min_len
is not None and min_len > len(result):
579 """Call eval_xpath_list then get one element using the index parameter.
580 If the index does not exist, either raise an exception is default is not set,
581 other return the default value (can be None).
584 * elements (ElementBase): lxml element to apply the xpath.
585 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath.
586 * index (int): index to get
587 * default (Object, optional): Defaults if index doesn't exist.
590 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
591 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
592 * SearxEngineXPathException: if the index is not found. Also see eval_xpath.
595 * result (bool, float, list, str): Results.
597 result = eval_xpath_list(elements, xpath_spec)
598 if -len(result) <= index < len(result):
600 if default == _NOTSET:
608 global _FASTTEXT_MODEL
609 if _FASTTEXT_MODEL
is None:
613 fasttext.FastText.eprint =
lambda x:
None
614 _FASTTEXT_MODEL = fasttext.load_model(str(data_dir /
'lid.176.ftz'))
615 return _FASTTEXT_MODEL
620 Converts a standard video URL into its embed format. Supported services include Youtube,
621 Facebook, Instagram, TikTok, and Dailymotion.
623 parsed_url = urlparse(url)
627 if parsed_url.netloc
in [
'www.youtube.com',
'youtube.com']
and parsed_url.path ==
'/watch' and parsed_url.query:
628 video_id = parse_qs(parsed_url.query).get(
'v', [])
630 iframe_src =
'https://www.youtube-nocookie.com/embed/' + video_id[0]
633 elif parsed_url.netloc
in [
'www.facebook.com',
'facebook.com']:
634 encoded_href = urlencode({
'href': url})
635 iframe_src =
'https://www.facebook.com/plugins/video.php?allowfullscreen=true&' + encoded_href
638 elif parsed_url.netloc
in [
'www.instagram.com',
'instagram.com']
and parsed_url.path.startswith(
'/p/'):
639 if parsed_url.path.endswith(
'/'):
640 iframe_src = url +
'embed'
642 iframe_src = url +
'/embed'
646 parsed_url.netloc
in [
'www.tiktok.com',
'tiktok.com']
647 and parsed_url.path.startswith(
'/@')
648 and '/video/' in parsed_url.path
650 path_parts = parsed_url.path.split(
'/video/')
651 video_id = path_parts[1]
652 iframe_src =
'https://www.tiktok.com/embed/' + video_id
655 elif parsed_url.netloc
in [
'www.dailymotion.com',
'dailymotion.com']
and parsed_url.path.startswith(
'/video/'):
656 path_parts = parsed_url.path.split(
'/')
657 if len(path_parts) == 3:
658 video_id = path_parts[2]
659 iframe_src =
'https://www.dailymotion.com/embed/video/' + video_id
664def detect_language(text: str, threshold: float = 0.3, only_search_languages: bool =
False) -> Optional[str]:
665 """Detect the language of the ``text`` parameter.
667 :param str text: The string whose language is to be detected.
669 :param float threshold: Threshold filters the returned labels by a threshold
670 on probability. A choice of 0.3 will return labels with at least 0.3
673 :param bool only_search_languages: If ``True``, returns only supported
674 SearXNG search languages. see :py:obj:`searx.languages`
678 The detected language code or ``None``. See below.
680 :raises ValueError: If ``text`` is not a string.
682 The language detection is done by using `a fork`_ of the fastText_ library
683 (`python fasttext`_). fastText_ distributes the `language identification
684 model`_, for reference:
686 - `FastText.zip: Compressing text classification models`_
687 - `Bag of Tricks for Efficient Text Classification`_
689 The `language identification model`_ support the language codes
692 af als am an ar arz as ast av az azb ba bar bcl be bg bh bn bo bpy br bs
693 bxr ca cbk ce ceb ckb co cs cv cy da de diq dsb dty dv el eml en eo es
694 et eu fa fi fr frr fy ga gd gl gn gom gu gv he hi hif hr hsb ht hu hy ia
695 id ie ilo io is it ja jbo jv ka kk km kn ko krc ku kv kw ky la lb lez li
696 lmo lo lrc lt lv mai mg mhr min mk ml mn mr mrj ms mt mwl my myv mzn nah
697 nap nds ne new nl nn no oc or os pa pam pfl pl pms pnb ps pt qu rm ro ru
698 rue sa sah sc scn sco sd sh si sk sl so sq sr su sv sw ta te tg th tk tl
699 tr tt tyv ug uk ur uz vec vep vi vls vo wa war wuu xal xmf yi yo yue zh
701 By using ``only_search_languages=True`` the `language identification model`_
702 is harmonized with the SearXNG's language (locale) model. General
703 conditions of SearXNG's locale model are:
705 a. SearXNG's locale of a query is passed to the
706 :py:obj:`searx.locales.get_engine_locale` to get a language and/or region
707 code that is used by an engine.
709 b. Most of SearXNG's engines do not support all the languages from `language
710 identification model`_ and there is also a discrepancy in the ISO-639-3
711 (fasttext) and ISO-639-2 (SearXNG)handling. Further more, in SearXNG the
712 locales like ``zh-TH`` (``zh-CN``) are mapped to ``zh_Hant``
713 (``zh_Hans``) while the `language identification model`_ reduce both to
716 .. _a fork: https://github.com/searxng/fasttext-predict
717 .. _fastText: https://fasttext.cc/
718 .. _python fasttext: https://pypi.org/project/fasttext/
719 .. _language identification model: https://fasttext.cc/docs/en/language-identification.html
720 .. _Bag of Tricks for Efficient Text Classification: https://arxiv.org/abs/1607.01759
721 .. _`FastText.zip: Compressing text classification models`: https://arxiv.org/abs/1612.03651
724 if not isinstance(text, str):
725 raise ValueError(
'text must a str')
727 if isinstance(r, tuple)
and len(r) == 2
and len(r[0]) > 0
and len(r[1]) > 0:
728 language = r[0][0].split(
'__label__')[1]
729 if only_search_languages
and language
not in SEARCH_LANGUAGE_CODES:
736 """Convert a javascript variable into JSON and then load the value
738 It does not deal with all cases, but it is good enough for now.
739 chompjs has a better implementation.
748 parts = re.split(
r'(["\'])', js_variable)
751 for i, p
in enumerate(parts):
756 parts[i] = parts[i].replace(
':', chr(1))
762 parts[i] = parts[i].replace(
'"',
r'\"')
765 if not in_string
and p
in (
'"',
"'"):
776 if len(previous_p) > 0
and previous_p[-1] ==
'\\':
788 parts[i] = _JS_VOID_RE.sub(
"null", p)
797 s = _JS_QUOTE_KEYS_RE.sub(
r'\1"\2"\3', s)
798 s = _JS_DECIMAL_RE.sub(
":0.", s)
800 s = s.replace(chr(1),
':')
extr(str txt, str begin, str end, str default="")
Callable[[str], str] get_string_replaces_function(Dict[str, str] replaces)
js_variable_to_python(js_variable)
Optional[str] detect_language(str text, float threshold=0.3, bool only_search_languages=False)
XPath get_xpath(XPathSpecType xpath_spec)
humanize_bytes(size, precision=2)
Optional[str] extract_text(xpath_results, bool allow_none=False)
str ecma_unescape(str string)
eval_xpath_getindex(ElementBase elements, XPathSpecType xpath_spec, int index, default=_NOTSET)
int convert_str_to_int(str number_str)
eval_xpath(ElementBase element, XPathSpecType xpath_spec)
str gen_useragent(Optional[str] os_string=None)
int int_or_zero(Union[List[str], str] num)
Optional[Tuple[bool, str, str]] is_valid_lang(lang)
"fasttext.FastText._FastText" _get_fasttext_model()
str extract_url(xpath_results, base_url)
eval_xpath_list(ElementBase element, XPathSpecType xpath_spec, Optional[int] min_len=None)
types.ModuleType load_module(str filename, str module_dir)
str normalize_url(str url, str base_url)
get_embeded_stream_url(url)
str html_to_text(str html_str)
humanize_number(size, precision=0)
Dict dict_subset(MutableMapping dictionary, Set[str] properties)
Dict get_engine_from_settings(str name)
str markdown_to_text(str markdown_str)