2"""Utility functions for the engines"""
4from __future__
import annotations
12from typing
import Optional, Union, Any, Set, List, Dict, MutableMapping, Tuple, Callable
13from numbers
import Number
14from os.path
import splitext, join
15from random
import choice
16from html.parser
import HTMLParser
17from html
import escape
18from urllib.parse
import urljoin, urlparse, parse_qs, urlencode
19from datetime
import timedelta
20from markdown_it
import MarkdownIt
23from lxml.etree
import ElementBase, XPath, XPathError, XPathSyntaxError
25from searx
import settings
29from searx.exceptions import SearxXPathSyntaxException, SearxEngineXPathException
30from searx
import logger
33logger = logger.getChild(
'utils')
35XPathSpecType = Union[str, XPath]
37_BLOCKED_TAGS = (
'script',
'style')
39_ECMA_UNESCAPE4_RE = re.compile(
r'%u([0-9a-fA-F]{4})', re.UNICODE)
40_ECMA_UNESCAPE2_RE = re.compile(
r'%([0-9a-fA-F]{2})', re.UNICODE)
42_JS_QUOTE_KEYS_RE = re.compile(
r'([\{\s,])(\w+)(:)')
43_JS_VOID_RE = re.compile(
r'void\s+[0-9]+|void\s*\([0-9]+\)')
44_JS_DECIMAL_RE = re.compile(
r":\s*\.")
46_XPATH_CACHE: Dict[str, XPath] = {}
47_LANG_TO_LC_CACHE: Dict[str, Dict[str, str]] = {}
49_FASTTEXT_MODEL: Optional[
"fasttext.FastText._FastText"] =
None
50"""fasttext model to predict language of a search term"""
52SEARCH_LANGUAGE_CODES = frozenset([searxng_locale[0].split(
'-')[0]
for searxng_locale
in sxng_locales])
53"""Languages supported by most searxng engines (:py:obj:`searx.sxng_locales.sxng_locales`)."""
57 """Internal class for this module, do not create instance of this class.
58 Replace the None value, allow explicitly pass None as a function argument"""
61_NOTSET = _NotSetClass()
65 """Return the searx User Agent"""
66 return 'searx/{searx_version} {suffix}'.format(
67 searx_version=VERSION_TAG, suffix=settings[
'outgoing'][
'useragent_suffix']
72 """Return a random browser User Agent
74 See searx/data/useragents.json
76 return USER_AGENTS[
'ua'].format(os=os_string
or choice(USER_AGENTS[
'os']), version=choice(USER_AGENTS[
'versions']))
80 """Internal exception raised when the HTML is invalid"""
83class _HTMLTextExtractor(HTMLParser):
84 """Internal class to extract text from HTML"""
87 HTMLParser.__init__(self)
100 if tag != self.
tags[-1]:
106 return not self.
tags or self.
tags[-1]
not in _BLOCKED_TAGS
116 if name[0]
in (
'x',
'X'):
117 codepoint = int(name[1:], 16)
119 codepoint = int(name)
120 self.
result.append(chr(codepoint))
130 return ''.join(self.
result).strip()
135 raise AssertionError(message)
139 """Extract text from a HTML string
142 * html_str (str): string HTML
145 * str: extracted text
148 >>> html_to_text('Example <span id="42">#2</span>')
151 >>> html_to_text('<style>.span { color: red; }</style><span>Example</span>')
154 >>> html_to_text(r'regexp: (?<![a-zA-Z]')
155 'regexp: (?<![a-zA-Z]'
159 html_str = html_str.replace(
'\n',
' ').replace(
'\r',
' ')
160 html_str =
' '.join(html_str.split())
165 except AssertionError:
167 s.feed(escape(html_str, quote=
True))
169 except _HTMLTextExtractorException:
170 logger.debug(
"HTMLTextExtractor: invalid HTML\n%s", html_str)
175 """Extract text from a Markdown string
178 * markdown_str (str): string Markdown
181 * str: extracted text
184 >>> markdown_to_text('[example](https://example.com)')
187 >>> markdown_to_text('## Headline')
192 MarkdownIt(
"commonmark", {
"typographer":
True}).enable([
"replacements",
"smartquotes"]).render(markdown_str)
197def extract_text(xpath_results, allow_none: bool =
False) -> Optional[str]:
198 """Extract text from a lxml result
200 * if xpath_results is list, extract the text from each result and concat the list
201 * if xpath_results is a xml element, extract all the text node from it
202 ( text_content() method from lxml )
203 * if xpath_results is a string element, then it's already done
205 if isinstance(xpath_results, list):
208 for e
in xpath_results:
210 return result.strip()
211 if isinstance(xpath_results, ElementBase):
213 text: str = html.tostring(xpath_results, encoding=
'unicode', method=
'text', with_tail=
False)
214 text = text.strip().replace(
'\n',
' ')
215 return ' '.join(text.split())
216 if isinstance(xpath_results, (str, Number, bool)):
217 return str(xpath_results)
218 if xpath_results
is None and allow_none:
220 if xpath_results
is None and not allow_none:
221 raise ValueError(
'extract_text(None, allow_none=False)')
222 raise ValueError(
'unsupported type')
226 """Normalize URL: add protocol, join URL with base_url, add trailing slash if there is no path
229 * url (str): Relative URL
230 * base_url (str): Base URL, it must be an absolute URL.
233 >>> normalize_url('https://example.com', 'http://example.com/')
234 'https://example.com/'
235 >>> normalize_url('//example.com', 'http://example.com/')
236 'http://example.com/'
237 >>> normalize_url('//example.com', 'https://example.com/')
238 'https://example.com/'
239 >>> normalize_url('/path?a=1', 'https://example.com')
240 'https://example.com/path?a=1'
241 >>> normalize_url('', 'https://example.com')
242 'https://example.com/'
243 >>> normalize_url('/test', '/path')
247 * lxml.etree.ParserError
250 * str: normalized URL
252 if url.startswith(
'//'):
254 parsed_search_url = urlparse(base_url)
255 url =
'{0}:{1}'.format(parsed_search_url.scheme
or 'http', url)
256 elif url.startswith(
'/'):
258 url = urljoin(base_url, url)
262 url = urljoin(base_url, url)
264 parsed_url = urlparse(url)
267 if not parsed_url.netloc:
268 raise ValueError(
'Cannot parse url')
269 if not parsed_url.path:
276 """Extract and normalize URL from lxml Element
279 * xpath_results (Union[List[html.HtmlElement], html.HtmlElement]): lxml Element(s)
280 * base_url (str): Base URL
283 >>> def f(s, search_url):
284 >>> return searx.utils.extract_url(html.fromstring(s), search_url)
285 >>> f('<span id="42">https://example.com</span>', 'http://example.com/')
286 'https://example.com/'
287 >>> f('https://example.com', 'http://example.com/')
288 'https://example.com/'
289 >>> f('//example.com', 'http://example.com/')
290 'http://example.com/'
291 >>> f('//example.com', 'https://example.com/')
292 'https://example.com/'
293 >>> f('/path?a=1', 'https://example.com')
294 'https://example.com/path?a=1'
295 >>> f('', 'https://example.com')
296 raise lxml.etree.ParserError
297 >>> searx.utils.extract_url([], 'https://example.com')
302 * lxml.etree.ParserError
305 * str: normalized URL
307 if xpath_results == []:
308 raise ValueError(
'Empty url resultset')
313 raise ValueError(
'URL not found')
316def dict_subset(dictionary: MutableMapping, properties: Set[str]) -> Dict:
317 """Extract a subset of a dict
320 >>> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'C'])
322 >>> >> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'D'])
325 return {k: dictionary[k]
for k
in properties
if k
in dictionary}
329 """Determine the *human readable* value of bytes on 1024 base (1KB=1024B)."""
330 s = [
'B ',
'KB',
'MB',
'GB',
'TB']
334 while size > 1024
and p < x:
337 return "%.*f %s" % (precision, size, s[p])
341 """Determine the *human readable* value of a decimal number."""
342 s = [
'',
'K',
'M',
'B',
'T']
346 while size > 1000
and p < x:
349 return "%.*f%s" % (precision, size, s[p])
353 """Convert number_str to int or 0 if number_str is not a number."""
354 if number_str.isdigit():
355 return int(number_str)
359def extr(txt: str, begin: str, end: str, default: str =
""):
360 """Extract the string between ``begin`` and ``end`` from ``txt``
362 :param txt: String to search in
363 :param begin: First string to be searched for
364 :param end: Second string to be searched for after ``begin``
365 :param default: Default value if one of ``begin`` or ``end`` is not
366 found. Defaults to an empty string.
367 :return: The string between the two search-strings ``begin`` and ``end``.
368 If at least one of ``begin`` or ``end`` is not found, the value of
369 ``default`` is returned.
372 >>> extr("abcde", "a", "e")
374 >>> extr("abcde", "a", "z", deafult="nothing")
382 first = txt.index(begin) + len(begin)
383 return txt[first : txt.index(end, first)]
389 """Convert num to int or 0. num can be either a str or a list.
390 If num is a list, the first element is converted to int (or return 0 if the list is empty).
391 If num is a str, see convert_str_to_int
393 if isinstance(num, list):
401 """Return language code and name if lang describe a language.
404 >>> is_valid_lang('zz')
406 >>> is_valid_lang('uk')
407 (True, 'uk', 'ukrainian')
408 >>> is_valid_lang(b'uk')
409 (True, 'uk', 'ukrainian')
410 >>> is_valid_lang('en')
411 (True, 'en', 'english')
412 >>> searx.utils.is_valid_lang('Español')
413 (True, 'es', 'spanish')
414 >>> searx.utils.is_valid_lang('Spanish')
415 (True, 'es', 'spanish')
417 if isinstance(lang, bytes):
419 is_abbr = len(lang) == 2
422 for l
in sxng_locales:
424 return (
True, l[0][:2], l[3].lower())
426 for l
in sxng_locales:
427 if l[1].lower() == lang
or l[3].lower() == lang:
428 return (
True, l[0][:2], l[3].lower())
432def load_module(filename: str, module_dir: str) -> types.ModuleType:
433 modname = splitext(filename)[0]
434 modpath = join(module_dir, filename)
436 spec = importlib.util.spec_from_file_location(modname, modpath)
438 raise ValueError(f
"Error loading '{modpath}' module")
439 module = importlib.util.module_from_spec(spec)
441 raise ValueError(f
"Error loading '{modpath}' module")
442 spec.loader.exec_module(module)
447 """Convert obj to its string representation."""
448 if isinstance(obj, str):
450 if hasattr(obj,
'__str__'):
456 """Python implementation of the unescape javascript function
458 https://www.ecma-international.org/ecma-262/6.0/#sec-unescape-string
459 https://developer.mozilla.org/fr/docs/Web/JavaScript/Reference/Objets_globaux/unescape
462 >>> ecma_unescape('%u5409')
464 >>> ecma_unescape('%20')
466 >>> ecma_unescape('%F3')
470 string = _ECMA_UNESCAPE4_RE.sub(
lambda e: chr(int(e.group(1), 16)), string)
472 string = _ECMA_UNESCAPE2_RE.sub(
lambda e: chr(int(e.group(1), 16)), string)
477 """Removes unicode's "PRIVATE USE CHARACTER"s (PUA_) from a string.
479 .. _PUA: https://en.wikipedia.org/wiki/Private_Use_Areas
481 pua_ranges = ((0xE000, 0xF8FF), (0xF0000, 0xFFFFD), (0x100000, 0x10FFFD))
485 if any(a <= i <= b
for (a, b)
in pua_ranges):
492 rep = {re.escape(k): v
for k, v
in replaces.items()}
493 pattern = re.compile(
"|".join(rep.keys()))
496 return pattern.sub(
lambda m: rep[re.escape(m.group(0))], text)
502 """Return engine configuration from settings.yml of a given engine name"""
504 if 'engines' not in settings:
507 for engine
in settings[
'engines']:
508 if 'name' not in engine:
510 if name == engine[
'name']:
517 """Return cached compiled XPath
519 There is no thread lock.
520 Worst case scenario, xpath_str is compiled more than one time.
523 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
526 * result (bool, float, list, str): Results.
529 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
530 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
532 if isinstance(xpath_spec, str):
533 result = _XPATH_CACHE.get(xpath_spec,
None)
536 result = XPath(xpath_spec)
537 except XPathSyntaxError
as e:
539 _XPATH_CACHE[xpath_spec] = result
542 if isinstance(xpath_spec, XPath):
545 raise TypeError(
'xpath_spec must be either a str or a lxml.etree.XPath')
548def eval_xpath(element: ElementBase, xpath_spec: XPathSpecType):
549 """Equivalent of element.xpath(xpath_str) but compile xpath_str once for all.
550 See https://lxml.de/xpathxslt.html#xpath-return-values
553 * element (ElementBase): [description]
554 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
557 * result (bool, float, list, str): Results.
560 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
561 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
562 * SearxEngineXPathException: Raise when the XPath can't be evaluated.
566 return xpath(element)
567 except XPathError
as e:
568 arg =
' '.join([str(i)
for i
in e.args])
572def eval_xpath_list(element: ElementBase, xpath_spec: XPathSpecType, min_len: Optional[int] =
None):
573 """Same as eval_xpath, check if the result is a list
576 * element (ElementBase): [description]
577 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
578 * min_len (int, optional): [description]. Defaults to None.
581 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
582 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
583 * SearxEngineXPathException: raise if the result is not a list
586 * result (bool, float, list, str): Results.
589 if not isinstance(result, list):
591 if min_len
is not None and min_len > len(result):
597 """Call eval_xpath_list then get one element using the index parameter.
598 If the index does not exist, either raise an exception is default is not set,
599 other return the default value (can be None).
602 * elements (ElementBase): lxml element to apply the xpath.
603 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath.
604 * index (int): index to get
605 * default (Object, optional): Defaults if index doesn't exist.
608 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
609 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
610 * SearxEngineXPathException: if the index is not found. Also see eval_xpath.
613 * result (bool, float, list, str): Results.
616 if -len(result) <= index < len(result):
618 if default == _NOTSET:
626 global _FASTTEXT_MODEL
627 if _FASTTEXT_MODEL
is None:
631 fasttext.FastText.eprint =
lambda x:
None
632 _FASTTEXT_MODEL = fasttext.load_model(str(data_dir /
'lid.176.ftz'))
633 return _FASTTEXT_MODEL
638 Converts a standard video URL into its embed format. Supported services include Youtube,
639 Facebook, Instagram, TikTok, Dailymotion, and Bilibili.
641 parsed_url = urlparse(url)
645 if parsed_url.netloc
in [
'www.youtube.com',
'youtube.com']
and parsed_url.path ==
'/watch' and parsed_url.query:
646 video_id = parse_qs(parsed_url.query).get(
'v', [])
648 iframe_src =
'https://www.youtube-nocookie.com/embed/' + video_id[0]
651 elif parsed_url.netloc
in [
'www.facebook.com',
'facebook.com']:
652 encoded_href = urlencode({
'href': url})
653 iframe_src =
'https://www.facebook.com/plugins/video.php?allowfullscreen=true&' + encoded_href
656 elif parsed_url.netloc
in [
'www.instagram.com',
'instagram.com']
and parsed_url.path.startswith(
'/p/'):
657 if parsed_url.path.endswith(
'/'):
658 iframe_src = url +
'embed'
660 iframe_src = url +
'/embed'
664 parsed_url.netloc
in [
'www.tiktok.com',
'tiktok.com']
665 and parsed_url.path.startswith(
'/@')
666 and '/video/' in parsed_url.path
668 path_parts = parsed_url.path.split(
'/video/')
669 video_id = path_parts[1]
670 iframe_src =
'https://www.tiktok.com/embed/' + video_id
673 elif parsed_url.netloc
in [
'www.dailymotion.com',
'dailymotion.com']
and parsed_url.path.startswith(
'/video/'):
674 path_parts = parsed_url.path.split(
'/')
675 if len(path_parts) == 3:
676 video_id = path_parts[2]
677 iframe_src =
'https://www.dailymotion.com/embed/video/' + video_id
680 elif parsed_url.netloc
in [
'www.bilibili.com',
'bilibili.com']
and parsed_url.path.startswith(
'/video/'):
681 path_parts = parsed_url.path.split(
'/')
683 video_id = path_parts[2]
685 if video_id.startswith(
'av'):
686 video_id = video_id[2:]
688 elif video_id.startswith(
'BV'):
692 f
'https://player.bilibili.com/player.html?{param_key}={video_id}&high_quality=1&autoplay=false&danmaku=0'
698def detect_language(text: str, threshold: float = 0.3, only_search_languages: bool =
False) -> Optional[str]:
699 """Detect the language of the ``text`` parameter.
701 :param str text: The string whose language is to be detected.
703 :param float threshold: Threshold filters the returned labels by a threshold
704 on probability. A choice of 0.3 will return labels with at least 0.3
707 :param bool only_search_languages: If ``True``, returns only supported
708 SearXNG search languages. see :py:obj:`searx.languages`
712 The detected language code or ``None``. See below.
714 :raises ValueError: If ``text`` is not a string.
716 The language detection is done by using `a fork`_ of the fastText_ library
717 (`python fasttext`_). fastText_ distributes the `language identification
718 model`_, for reference:
720 - `FastText.zip: Compressing text classification models`_
721 - `Bag of Tricks for Efficient Text Classification`_
723 The `language identification model`_ support the language codes
726 af als am an ar arz as ast av az azb ba bar bcl be bg bh bn bo bpy br bs
727 bxr ca cbk ce ceb ckb co cs cv cy da de diq dsb dty dv el eml en eo es
728 et eu fa fi fr frr fy ga gd gl gn gom gu gv he hi hif hr hsb ht hu hy ia
729 id ie ilo io is it ja jbo jv ka kk km kn ko krc ku kv kw ky la lb lez li
730 lmo lo lrc lt lv mai mg mhr min mk ml mn mr mrj ms mt mwl my myv mzn nah
731 nap nds ne new nl nn no oc or os pa pam pfl pl pms pnb ps pt qu rm ro ru
732 rue sa sah sc scn sco sd sh si sk sl so sq sr su sv sw ta te tg th tk tl
733 tr tt tyv ug uk ur uz vec vep vi vls vo wa war wuu xal xmf yi yo yue zh
735 By using ``only_search_languages=True`` the `language identification model`_
736 is harmonized with the SearXNG's language (locale) model. General
737 conditions of SearXNG's locale model are:
739 a. SearXNG's locale of a query is passed to the
740 :py:obj:`searx.locales.get_engine_locale` to get a language and/or region
741 code that is used by an engine.
743 b. Most of SearXNG's engines do not support all the languages from `language
744 identification model`_ and there is also a discrepancy in the ISO-639-3
745 (fasttext) and ISO-639-2 (SearXNG)handling. Further more, in SearXNG the
746 locales like ``zh-TH`` (``zh-CN``) are mapped to ``zh_Hant``
747 (``zh_Hans``) while the `language identification model`_ reduce both to
750 .. _a fork: https://github.com/searxng/fasttext-predict
751 .. _fastText: https://fasttext.cc/
752 .. _python fasttext: https://pypi.org/project/fasttext/
753 .. _language identification model: https://fasttext.cc/docs/en/language-identification.html
754 .. _Bag of Tricks for Efficient Text Classification: https://arxiv.org/abs/1607.01759
755 .. _`FastText.zip: Compressing text classification models`: https://arxiv.org/abs/1612.03651
758 if not isinstance(text, str):
759 raise ValueError(
'text must a str')
761 if isinstance(r, tuple)
and len(r) == 2
and len(r[0]) > 0
and len(r[1]) > 0:
762 language = r[0][0].split(
'__label__')[1]
763 if only_search_languages
and language
not in SEARCH_LANGUAGE_CODES:
770 """Convert a javascript variable into JSON and then load the value
772 It does not deal with all cases, but it is good enough for now.
773 chompjs has a better implementation.
782 parts = re.split(
r'(["\'])', js_variable)
785 for i, p
in enumerate(parts):
790 parts[i] = parts[i].replace(
':', chr(1))
796 parts[i] = parts[i].replace(
'"',
r'\"')
799 if not in_string
and p
in (
'"',
"'"):
810 if len(previous_p) > 0
and previous_p[-1] ==
'\\':
822 parts[i] = _JS_VOID_RE.sub(
"null", p)
831 s = _JS_QUOTE_KEYS_RE.sub(
r'\1"\2"\3', s)
832 s = _JS_DECIMAL_RE.sub(
":0.", s)
834 s = s.replace(chr(1),
':')
839 s = s.replace(
"',",
"\",")
845 """Parse a time string in format MM:SS or HH:MM:SS and convert it to a `timedelta` object.
847 Returns None if the provided string doesn't match any of the formats.
849 duration_str = duration_str.strip()
856 time_parts = ([
"00"] + duration_str.split(
":"))[:3]
857 hours, minutes, seconds = map(int, time_parts)
858 return timedelta(hours=hours, minutes=minutes, seconds=seconds)
860 except (ValueError, TypeError):
extr(str txt, str begin, str end, str default="")
Callable[[str], str] get_string_replaces_function(Dict[str, str] replaces)
js_variable_to_python(js_variable)
Optional[str] detect_language(str text, float threshold=0.3, bool only_search_languages=False)
XPath get_xpath(XPathSpecType xpath_spec)
humanize_bytes(size, precision=2)
Optional[str] extract_text(xpath_results, bool allow_none=False)
str ecma_unescape(str string)
eval_xpath_getindex(ElementBase elements, XPathSpecType xpath_spec, int index, default=_NOTSET)
int convert_str_to_int(str number_str)
eval_xpath(ElementBase element, XPathSpecType xpath_spec)
str gen_useragent(Optional[str] os_string=None)
int int_or_zero(Union[List[str], str] num)
Optional[Tuple[bool, str, str]] is_valid_lang(lang)
"fasttext.FastText._FastText" _get_fasttext_model()
str extract_url(xpath_results, base_url)
eval_xpath_list(ElementBase element, XPathSpecType xpath_spec, Optional[int] min_len=None)
types.ModuleType load_module(str filename, str module_dir)
str normalize_url(str url, str base_url)
get_embeded_stream_url(url)
str html_to_text(str html_str)
humanize_number(size, precision=0)
Dict dict_subset(MutableMapping dictionary, Set[str] properties)
timedelta|None parse_duration_string(str duration_str)
Dict get_engine_from_settings(str name)
str markdown_to_text(str markdown_str)
remove_pua_from_str(string)