2"""Utility functions for the engines"""
12from collections.abc
import MutableMapping, Callable
14from numbers
import Number
15from os.path
import splitext, join
16from random
import choice
17from html.parser
import HTMLParser
18from html
import escape
19from urllib.parse
import urljoin, urlparse, parse_qs, urlencode
20from datetime
import timedelta
21from markdown_it
import MarkdownIt
24from lxml.etree
import ElementBase, XPath, XPathError, XPathSyntaxError
26from searx
import settings
30from searx.exceptions import SearxXPathSyntaxException, SearxEngineXPathException
31from searx
import logger
34 import fasttext.FastText
37logger = logger.getChild(
'utils')
39XPathSpecType: t.TypeAlias = str | XPath
40"""Type alias used by :py:obj:`searx.utils.get_xpath`,
41:py:obj:`searx.utils.eval_xpath` and other XPath selectors."""
43_BLOCKED_TAGS = (
'script',
'style')
45_ECMA_UNESCAPE4_RE = re.compile(
r'%u([0-9a-fA-F]{4})', re.UNICODE)
46_ECMA_UNESCAPE2_RE = re.compile(
r'%([0-9a-fA-F]{2})', re.UNICODE)
48_JS_QUOTE_KEYS_RE = re.compile(
r'([\{\s,])(\w+)(:)')
49_JS_VOID_RE = re.compile(
r'void\s+[0-9]+|void\s*\([0-9]+\)')
50_JS_DECIMAL_RE = re.compile(
r":\s*\.")
52_XPATH_CACHE: dict[str, XPath] = {}
53_LANG_TO_LC_CACHE: dict[str, dict[str, str]] = {}
55_FASTTEXT_MODEL:
"fasttext.FastText._FastText | None" =
None
56"""fasttext model to predict language of a search term"""
58SEARCH_LANGUAGE_CODES = frozenset([searxng_locale[0].split(
'-')[0]
for searxng_locale
in sxng_locales])
59"""Languages supported by most searxng engines (:py:obj:`searx.sxng_locales.sxng_locales`)."""
63 """Internal class for this module, do not create instance of this class.
64 Replace the None value, allow explicitly pass None as a function argument"""
67_NOTSET = _NotSetClass()
71 """Return the SearXNG User Agent"""
72 return f
"SearXNG/{VERSION_TAG} {settings['outgoing']['useragent_suffix']}".strip()
76 """Return a random browser User Agent
78 See searx/data/useragents.json
80 return USER_AGENTS[
'ua'].format(
81 os=os_string
or choice(USER_AGENTS[
'os']),
82 version=choice(USER_AGENTS[
'versions']),
87 """Internal class to extract text from HTML"""
90 HTMLParser.__init__(self)
103 if tag != self.
tags[-1]:
104 self.
result.append(f
"</{tag}>")
110 return not self.
tags or self.
tags[-1]
not in _BLOCKED_TAGS
120 if name[0]
in (
'x',
'X'):
121 codepoint = int(name[1:], 16)
123 codepoint = int(name)
124 self.
result.append(chr(codepoint))
134 return ''.join(self.
result).strip()
136 def error(self, message: str) ->
None:
139 raise AssertionError(message)
143 """Extract text from a HTML string
146 * html_str (str): string HTML
149 * str: extracted text
152 >>> html_to_text('Example <span id="42">#2</span>')
155 >>> html_to_text('<style>.span { color: red; }</style><span>Example</span>')
158 >>> html_to_text(r'regexp: (?<![a-zA-Z]')
159 'regexp: (?<![a-zA-Z]'
161 >>> html_to_text(r'<p><b>Lorem ipsum </i>dolor sit amet</p>')
162 'Lorem ipsum </i>dolor sit amet</p>'
164 >>> html_to_text(r'> < a')
170 html_str = html_str.replace(
'\n',
' ').replace(
'\r',
' ')
171 html_str =
' '.join(html_str.split())
176 except AssertionError:
178 s.feed(escape(html_str, quote=
True))
184 """Extract text from a Markdown string
187 * markdown_str (str): string Markdown
190 * str: extracted text
193 >>> markdown_to_text('[example](https://example.com)')
196 >>> markdown_to_text('## Headline')
201 MarkdownIt(
"commonmark", {
"typographer":
True}).enable([
"replacements",
"smartquotes"]).render(markdown_str)
207 xpath_results: list[ElementBase] | ElementBase | str | Number | bool |
None,
208 allow_none: bool =
False,
210 """Extract text from a lxml result
212 * if xpath_results is list, extract the text from each result and concat the list
213 * if xpath_results is a xml element, extract all the text node from it
214 ( text_content() method from lxml )
215 * if xpath_results is a string element, then it's already done
217 if isinstance(xpath_results, list):
220 for e
in xpath_results:
222 return result.strip()
223 if isinstance(xpath_results, ElementBase):
225 text: str = html.tostring(
231 text = text.strip().replace(
'\n',
' ')
232 return ' '.join(text.split())
233 if isinstance(xpath_results, (str, Number, bool)):
234 return str(xpath_results)
235 if xpath_results
is None and allow_none:
237 if xpath_results
is None and not allow_none:
238 raise ValueError(
'extract_text(None, allow_none=False)')
239 raise ValueError(
'unsupported type')
243 """Normalize URL: add protocol, join URL with base_url, add trailing slash if there is no path
246 * url (str): Relative URL
247 * base_url (str): Base URL, it must be an absolute URL.
250 >>> normalize_url('https://example.com', 'http://example.com/')
251 'https://example.com/'
252 >>> normalize_url('//example.com', 'http://example.com/')
253 'http://example.com/'
254 >>> normalize_url('//example.com', 'https://example.com/')
255 'https://example.com/'
256 >>> normalize_url('/path?a=1', 'https://example.com')
257 'https://example.com/path?a=1'
258 >>> normalize_url('', 'https://example.com')
259 'https://example.com/'
260 >>> normalize_url('/test', '/path')
264 * lxml.etree.ParserError
267 * str: normalized URL
269 if url.startswith(
'//'):
271 parsed_search_url = urlparse(base_url)
272 url =
'{0}:{1}'.format(parsed_search_url.scheme
or 'http', url)
273 elif url.startswith(
'/'):
275 url = urljoin(base_url, url)
279 url = urljoin(base_url, url)
281 parsed_url = urlparse(url)
284 if not parsed_url.netloc:
285 raise ValueError(
'Cannot parse url')
286 if not parsed_url.path:
292def extract_url(xpath_results: list[ElementBase] | ElementBase | str | Number | bool |
None, base_url: str) -> str:
293 """Extract and normalize URL from lxml Element
296 >>> def f(s, search_url):
297 >>> return searx.utils.extract_url(html.fromstring(s), search_url)
298 >>> f('<span id="42">https://example.com</span>', 'http://example.com/')
299 'https://example.com/'
300 >>> f('https://example.com', 'http://example.com/')
301 'https://example.com/'
302 >>> f('//example.com', 'http://example.com/')
303 'http://example.com/'
304 >>> f('//example.com', 'https://example.com/')
305 'https://example.com/'
306 >>> f('/path?a=1', 'https://example.com')
307 'https://example.com/path?a=1'
308 >>> f('', 'https://example.com')
309 raise lxml.etree.ParserError
310 >>> searx.utils.extract_url([], 'https://example.com')
315 * lxml.etree.ParserError
318 * str: normalized URL
320 if xpath_results == []:
321 raise ValueError(
'Empty url resultset')
326 raise ValueError(
'URL not found')
329def dict_subset(dictionary: MutableMapping[t.Any, t.Any], properties: set[str]) -> MutableMapping[str, t.Any]:
330 """Extract a subset of a dict
333 >>> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'C'])
335 >>> >> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'D'])
338 return {k: dictionary[k]
for k
in properties
if k
in dictionary}
342 """Determine the *human readable* value of bytes on 1024 base (1KB=1024B)."""
343 s = [
'B ',
'KB',
'MB',
'GB',
'TB']
347 while size > 1024
and p < x:
350 return "%.*f %s" % (precision, size, s[p])
354 """Determine the *human readable* value of a decimal number."""
355 s = [
'',
'K',
'M',
'B',
'T']
359 while size > 1000
and p < x:
362 return "%.*f%s" % (precision, size, s[p])
366 """Convert number_str to int or 0 if number_str is not a number."""
367 if number_str.isdigit():
368 return int(number_str)
372def extr(txt: str, begin: str, end: str, default: str =
""):
373 """Extract the string between ``begin`` and ``end`` from ``txt``
375 :param txt: String to search in
376 :param begin: First string to be searched for
377 :param end: Second string to be searched for after ``begin``
378 :param default: Default value if one of ``begin`` or ``end`` is not
379 found. Defaults to an empty string.
380 :return: The string between the two search-strings ``begin`` and ``end``.
381 If at least one of ``begin`` or ``end`` is not found, the value of
382 ``default`` is returned.
385 >>> extr("abcde", "a", "e")
387 >>> extr("abcde", "a", "z", deafult="nothing")
395 first = txt.index(begin) + len(begin)
396 return txt[first : txt.index(end, first)]
402 """Convert num to int or 0. num can be either a str or a list.
403 If num is a list, the first element is converted to int (or return 0 if the list is empty).
404 If num is a str, see convert_str_to_int
406 if isinstance(num, list):
414 """Return language code and name if lang describe a language.
417 >>> is_valid_lang('zz')
419 >>> is_valid_lang('uk')
420 (True, 'uk', 'ukrainian')
421 >>> is_valid_lang(b'uk')
422 (True, 'uk', 'ukrainian')
423 >>> is_valid_lang('en')
424 (True, 'en', 'english')
425 >>> searx.utils.is_valid_lang('Español')
426 (True, 'es', 'spanish')
427 >>> searx.utils.is_valid_lang('Spanish')
428 (True, 'es', 'spanish')
430 if isinstance(lang, bytes):
432 is_abbr = len(lang) == 2
435 for l
in sxng_locales:
437 return (
True, l[0][:2], l[3].lower())
439 for l
in sxng_locales:
440 if l[1].lower() == lang
or l[3].lower() == lang:
441 return (
True, l[0][:2], l[3].lower())
445def load_module(filename: str, module_dir: str) -> types.ModuleType:
446 modname = splitext(filename)[0]
447 modpath = join(module_dir, filename)
449 spec = importlib.util.spec_from_file_location(modname, modpath)
451 raise ValueError(f
"Error loading '{modpath}' module")
452 module = importlib.util.module_from_spec(spec)
454 raise ValueError(f
"Error loading '{modpath}' module")
455 spec.loader.exec_module(module)
460 """Convert obj to its string representation."""
461 if isinstance(obj, str):
463 if hasattr(obj,
'__str__'):
469 """Python implementation of the unescape javascript function
471 https://www.ecma-international.org/ecma-262/6.0/#sec-unescape-string
472 https://developer.mozilla.org/fr/docs/Web/JavaScript/Reference/Objets_globaux/unescape
475 >>> ecma_unescape('%u5409')
477 >>> ecma_unescape('%20')
479 >>> ecma_unescape('%F3')
483 string = _ECMA_UNESCAPE4_RE.sub(
lambda e: chr(int(e.group(1), 16)), string)
485 string = _ECMA_UNESCAPE2_RE.sub(
lambda e: chr(int(e.group(1), 16)), string)
490 """Removes unicode's "PRIVATE USE CHARACTER"s (PUA_) from a string.
492 .. _PUA: https://en.wikipedia.org/wiki/Private_Use_Areas
494 pua_ranges = ((0xE000, 0xF8FF), (0xF0000, 0xFFFFD), (0x100000, 0x10FFFD))
498 if any(a <= i <= b
for (a, b)
in pua_ranges):
505 rep = {re.escape(k): v
for k, v
in replaces.items()}
506 pattern = re.compile(
"|".join(rep.keys()))
509 return pattern.sub(
lambda m: rep[re.escape(m.group(0))], text)
515 """Return engine configuration from settings.yml of a given engine name"""
517 if 'engines' not in settings:
520 for engine
in settings[
'engines']:
521 if 'name' not in engine:
523 if name == engine[
'name']:
530 """Return cached compiled :py:obj:`lxml.etree.XPath` object.
533 Raised when ``xpath_spec`` is neither a :py:obj:`str` nor a
534 :py:obj:`lxml.etree.XPath`.
536 ``SearxXPathSyntaxException``:
537 Raised when there is a syntax error in the *XPath* selector (``str``).
539 if isinstance(xpath_spec, str):
540 result = _XPATH_CACHE.get(xpath_spec,
None)
543 result = XPath(xpath_spec)
544 except XPathSyntaxError
as e:
546 _XPATH_CACHE[xpath_spec] = result
549 if isinstance(xpath_spec, XPath):
552 raise TypeError(
'xpath_spec must be either a str or a lxml.etree.XPath')
555def eval_xpath(element: ElementBase, xpath_spec: XPathSpecType) -> t.Any:
556 """Equivalent of ``element.xpath(xpath_str)`` but compile ``xpath_str`` into
557 a :py:obj:`lxml.etree.XPath` object once for all. The return value of
558 ``xpath(..)`` is complex, read `XPath return values`_ for more details.
560 .. _XPath return values:
561 https://lxml.de/xpathxslt.html#xpath-return-values
564 Raised when ``xpath_spec`` is neither a :py:obj:`str` nor a
565 :py:obj:`lxml.etree.XPath`.
567 ``SearxXPathSyntaxException``:
568 Raised when there is a syntax error in the *XPath* selector (``str``).
570 ``SearxEngineXPathException:``
571 Raised when the XPath can't be evaluated (masked
572 :py:obj:`lxml.etree..XPathError`).
577 return xpath(element)
578 except XPathError
as e:
579 arg =
' '.join([str(i)
for i
in e.args])
583def eval_xpath_list(element: ElementBase, xpath_spec: XPathSpecType, min_len: int |
None =
None) -> list[t.Any]:
584 """Same as :py:obj:`searx.utils.eval_xpath`, but additionally ensures the
585 return value is a :py:obj:`list`. The minimum length of the list is also
586 checked (if ``min_len`` is set)."""
589 if not isinstance(result, list):
591 if min_len
is not None and min_len > len(result):
597 element: ElementBase,
598 xpath_spec: XPathSpecType,
600 default: t.Any = _NOTSET,
602 """Same as :py:obj:`searx.utils.eval_xpath_list`, but returns item on
603 position ``index`` from the list (index starts with ``0``).
605 The exceptions known from :py:obj:`searx.utils.eval_xpath` are thrown. If a
606 default is specified, this is returned if an element at position ``index``
607 could not be determined.
611 if -len(result) <= index < len(result):
613 if default == _NOTSET:
621 global _FASTTEXT_MODEL
622 if _FASTTEXT_MODEL
is None:
626 fasttext.FastText.eprint =
lambda x:
None
627 _FASTTEXT_MODEL = fasttext.load_model(str(data_dir /
'lid.176.ftz'))
628 return _FASTTEXT_MODEL
633 Converts a standard video URL into its embed format. Supported services include Youtube,
634 Facebook, Instagram, TikTok, Dailymotion, and Bilibili.
636 parsed_url = urlparse(url)
640 if parsed_url.netloc
in [
'www.youtube.com',
'youtube.com']
and parsed_url.path ==
'/watch' and parsed_url.query:
641 video_id = parse_qs(parsed_url.query).get(
'v', [])
643 iframe_src =
'https://www.youtube-nocookie.com/embed/' + video_id[0]
646 elif parsed_url.netloc
in [
'www.facebook.com',
'facebook.com']:
647 encoded_href = urlencode({
'href': url})
648 iframe_src =
'https://www.facebook.com/plugins/video.php?allowfullscreen=true&' + encoded_href
651 elif parsed_url.netloc
in [
'www.instagram.com',
'instagram.com']
and parsed_url.path.startswith(
'/p/'):
652 if parsed_url.path.endswith(
'/'):
653 iframe_src = url +
'embed'
655 iframe_src = url +
'/embed'
659 parsed_url.netloc
in [
'www.tiktok.com',
'tiktok.com']
660 and parsed_url.path.startswith(
'/@')
661 and '/video/' in parsed_url.path
663 path_parts = parsed_url.path.split(
'/video/')
664 video_id = path_parts[1]
665 iframe_src =
'https://www.tiktok.com/embed/' + video_id
668 elif parsed_url.netloc
in [
'www.dailymotion.com',
'dailymotion.com']
and parsed_url.path.startswith(
'/video/'):
669 path_parts = parsed_url.path.split(
'/')
670 if len(path_parts) == 3:
671 video_id = path_parts[2]
672 iframe_src =
'https://www.dailymotion.com/embed/video/' + video_id
675 elif parsed_url.netloc
in [
'www.bilibili.com',
'bilibili.com']
and parsed_url.path.startswith(
'/video/'):
676 path_parts = parsed_url.path.split(
'/')
678 video_id = path_parts[2]
680 if video_id.startswith(
'av'):
681 video_id = video_id[2:]
683 elif video_id.startswith(
'BV'):
687 f
'https://player.bilibili.com/player.html?{param_key}={video_id}&high_quality=1&autoplay=false&danmaku=0'
693def detect_language(text: str, threshold: float = 0.3, only_search_languages: bool =
False) -> str |
None:
694 """Detect the language of the ``text`` parameter.
696 :param str text: The string whose language is to be detected.
698 :param float threshold: Threshold filters the returned labels by a threshold
699 on probability. A choice of 0.3 will return labels with at least 0.3
702 :param bool only_search_languages: If ``True``, returns only supported
703 SearXNG search languages. see :py:obj:`searx.languages`
707 The detected language code or ``None``. See below.
709 :raises ValueError: If ``text`` is not a string.
711 The language detection is done by using `a fork`_ of the fastText_ library
712 (`python fasttext`_). fastText_ distributes the `language identification
713 model`_, for reference:
715 - `FastText.zip: Compressing text classification models`_
716 - `Bag of Tricks for Efficient Text Classification`_
718 The `language identification model`_ support the language codes
721 af als am an ar arz as ast av az azb ba bar bcl be bg bh bn bo bpy br bs
722 bxr ca cbk ce ceb ckb co cs cv cy da de diq dsb dty dv el eml en eo es
723 et eu fa fi fr frr fy ga gd gl gn gom gu gv he hi hif hr hsb ht hu hy ia
724 id ie ilo io is it ja jbo jv ka kk km kn ko krc ku kv kw ky la lb lez li
725 lmo lo lrc lt lv mai mg mhr min mk ml mn mr mrj ms mt mwl my myv mzn nah
726 nap nds ne new nl nn no oc or os pa pam pfl pl pms pnb ps pt qu rm ro ru
727 rue sa sah sc scn sco sd sh si sk sl so sq sr su sv sw ta te tg th tk tl
728 tr tt tyv ug uk ur uz vec vep vi vls vo wa war wuu xal xmf yi yo yue zh
730 By using ``only_search_languages=True`` the `language identification model`_
731 is harmonized with the SearXNG's language (locale) model. General
732 conditions of SearXNG's locale model are:
734 a. SearXNG's locale of a query is passed to the
735 :py:obj:`searx.locales.get_engine_locale` to get a language and/or region
736 code that is used by an engine.
738 b. Most of SearXNG's engines do not support all the languages from `language
739 identification model`_ and there is also a discrepancy in the ISO-639-3
740 (fasttext) and ISO-639-2 (SearXNG)handling. Further more, in SearXNG the
741 locales like ``zh-TH`` (``zh-CN``) are mapped to ``zh_Hant``
742 (``zh_Hans``) while the `language identification model`_ reduce both to
745 .. _a fork: https://github.com/searxng/fasttext-predict
746 .. _fastText: https://fasttext.cc/
747 .. _python fasttext: https://pypi.org/project/fasttext/
748 .. _language identification model: https://fasttext.cc/docs/en/language-identification.html
749 .. _Bag of Tricks for Efficient Text Classification: https://arxiv.org/abs/1607.01759
750 .. _`FastText.zip: Compressing text classification models`: https://arxiv.org/abs/1612.03651
753 if not isinstance(text, str):
754 raise ValueError(
'text must a str')
756 if isinstance(r, tuple)
and len(r) == 2
and len(r[0]) > 0
and len(r[1]) > 0:
757 language = r[0][0].split(
'__label__')[1]
758 if only_search_languages
and language
not in SEARCH_LANGUAGE_CODES:
765 """Convert a javascript variable into JSON and then load the value
767 It does not deal with all cases, but it is good enough for now.
768 chompjs has a better implementation.
777 parts = re.split(
r'(["\'])', js_variable)
780 for i, p
in enumerate(parts):
785 parts[i] = parts[i].replace(
':', chr(1))
791 parts[i] = parts[i].replace(
'"',
r'\"')
794 if not in_string
and p
in (
'"',
"'"):
805 if len(previous_p) > 0
and previous_p[-1] ==
'\\':
817 parts[i] = _JS_VOID_RE.sub(
"null", p)
826 s = _JS_QUOTE_KEYS_RE.sub(
r'\1"\2"\3', s)
827 s = _JS_DECIMAL_RE.sub(
":0.", s)
829 s = s.replace(chr(1),
':')
834 s = s.replace(
"',",
"\",")
840 """Parse a time string in format MM:SS or HH:MM:SS and convert it to a `timedelta` object.
842 Returns None if the provided string doesn't match any of the formats.
844 duration_str = duration_str.strip()
851 time_parts = ([
"00"] + duration_str.split(
":"))[:3]
852 hours, minutes, seconds = map(int, time_parts)
853 return timedelta(hours=hours, minutes=minutes, seconds=seconds)
855 except (ValueError, TypeError):
str|None detect_language(str text, float threshold=0.3, bool only_search_languages=False)
extr(str txt, str begin, str end, str default="")
t.Any eval_xpath(ElementBase element, XPathSpecType xpath_spec)
str js_variable_to_python(str js_variable)
XPath get_xpath(XPathSpecType xpath_spec)
tuple[bool, str, str]|None is_valid_lang(str lang)
list[t.Any] eval_xpath_list(ElementBase element, XPathSpecType xpath_spec, int|None min_len=None)
humanize_bytes(int|float size, int precision=2)
str ecma_unescape(str string)
int convert_str_to_int(str number_str)
int int_or_zero(list[str]|str num)
"fasttext.FastText._FastText" _get_fasttext_model()
humanize_number(int|float size, int precision=0)
remove_pua_from_str(str string)
str extract_url(list[ElementBase]|ElementBase|str|Number|bool|None xpath_results, str base_url)
types.ModuleType load_module(str filename, str module_dir)
MutableMapping[str, t.Any] dict_subset(MutableMapping[t.Any, t.Any] dictionary, set[str] properties)
str gen_useragent(str|None os_string=None)
str|None extract_text(list[ElementBase]|ElementBase|str|Number|bool|None xpath_results, bool allow_none=False)
get_embeded_stream_url(str url)
str normalize_url(str url, str base_url)
t.Any eval_xpath_getindex(ElementBase element, XPathSpecType xpath_spec, int index, t.Any default=_NOTSET)
str html_to_text(str html_str)
timedelta|None parse_duration_string(str duration_str)
dict[str, dict[str, str]] get_engine_from_settings(str name)
str markdown_to_text(str markdown_str)
Callable[[str], str] get_string_replaces_function(dict[str, str] replaces)