2"""Utility functions for the engines"""
4from __future__
import annotations
12from typing
import Optional, Union, Any, Set, List, Dict, MutableMapping, Tuple, Callable
13from numbers
import Number
14from os.path
import splitext, join
15from random
import choice
16from html.parser
import HTMLParser
17from html
import escape
18from urllib.parse
import urljoin, urlparse, parse_qs, urlencode
19from datetime
import timedelta
20from markdown_it
import MarkdownIt
23from lxml.etree
import ElementBase, XPath, XPathError, XPathSyntaxError
25from searx
import settings
29from searx.exceptions import SearxXPathSyntaxException, SearxEngineXPathException
30from searx
import logger
33logger = logger.getChild(
'utils')
35XPathSpecType = Union[str, XPath]
37_BLOCKED_TAGS = (
'script',
'style')
39_ECMA_UNESCAPE4_RE = re.compile(
r'%u([0-9a-fA-F]{4})', re.UNICODE)
40_ECMA_UNESCAPE2_RE = re.compile(
r'%([0-9a-fA-F]{2})', re.UNICODE)
42_JS_QUOTE_KEYS_RE = re.compile(
r'([\{\s,])(\w+)(:)')
43_JS_VOID_RE = re.compile(
r'void\s+[0-9]+|void\s*\([0-9]+\)')
44_JS_DECIMAL_RE = re.compile(
r":\s*\.")
46_XPATH_CACHE: Dict[str, XPath] = {}
47_LANG_TO_LC_CACHE: Dict[str, Dict[str, str]] = {}
49_FASTTEXT_MODEL: Optional[
"fasttext.FastText._FastText"] =
None
50"""fasttext model to predict language of a search term"""
52SEARCH_LANGUAGE_CODES = frozenset([searxng_locale[0].split(
'-')[0]
for searxng_locale
in sxng_locales])
53"""Languages supported by most searxng engines (:py:obj:`searx.sxng_locales.sxng_locales`)."""
57 """Internal class for this module, do not create instance of this class.
58 Replace the None value, allow explicitly pass None as a function argument"""
61_NOTSET = _NotSetClass()
65 """Return the searx User Agent"""
66 return 'searx/{searx_version} {suffix}'.format(
67 searx_version=VERSION_TAG, suffix=settings[
'outgoing'][
'useragent_suffix']
72 """Return a random browser User Agent
74 See searx/data/useragents.json
76 return USER_AGENTS[
'ua'].format(os=os_string
or choice(USER_AGENTS[
'os']), version=choice(USER_AGENTS[
'versions']))
80 """Internal exception raised when the HTML is invalid"""
83class _HTMLTextExtractor(HTMLParser):
84 """Internal class to extract text from HTML"""
87 HTMLParser.__init__(self)
100 if tag != self.
tags[-1]:
106 return not self.
tags or self.
tags[-1]
not in _BLOCKED_TAGS
116 if name[0]
in (
'x',
'X'):
117 codepoint = int(name[1:], 16)
119 codepoint = int(name)
120 self.
result.append(chr(codepoint))
130 return ''.join(self.
result).strip()
135 raise AssertionError(message)
139 """Extract text from a HTML string
142 * html_str (str): string HTML
145 * str: extracted text
148 >>> html_to_text('Example <span id="42">#2</span>')
151 >>> html_to_text('<style>.span { color: red; }</style><span>Example</span>')
154 >>> html_to_text(r'regexp: (?<![a-zA-Z]')
155 'regexp: (?<![a-zA-Z]'
159 html_str = html_str.replace(
'\n',
' ').replace(
'\r',
' ')
160 html_str =
' '.join(html_str.split())
164 except AssertionError:
166 s.feed(escape(html_str, quote=
True))
167 except _HTMLTextExtractorException:
168 logger.debug(
"HTMLTextExtractor: invalid HTML\n%s", html_str)
173 """Extract text from a Markdown string
176 * markdown_str (str): string Markdown
179 * str: extracted text
182 >>> markdown_to_text('[example](https://example.com)')
185 >>> markdown_to_text('## Headline')
190 MarkdownIt(
"commonmark", {
"typographer":
True}).enable([
"replacements",
"smartquotes"]).render(markdown_str)
195def extract_text(xpath_results, allow_none: bool =
False) -> Optional[str]:
196 """Extract text from a lxml result
198 * if xpath_results is list, extract the text from each result and concat the list
199 * if xpath_results is a xml element, extract all the text node from it
200 ( text_content() method from lxml )
201 * if xpath_results is a string element, then it's already done
203 if isinstance(xpath_results, list):
206 for e
in xpath_results:
208 return result.strip()
209 if isinstance(xpath_results, ElementBase):
211 text: str = html.tostring(xpath_results, encoding=
'unicode', method=
'text', with_tail=
False)
212 text = text.strip().replace(
'\n',
' ')
213 return ' '.join(text.split())
214 if isinstance(xpath_results, (str, Number, bool)):
215 return str(xpath_results)
216 if xpath_results
is None and allow_none:
218 if xpath_results
is None and not allow_none:
219 raise ValueError(
'extract_text(None, allow_none=False)')
220 raise ValueError(
'unsupported type')
224 """Normalize URL: add protocol, join URL with base_url, add trailing slash if there is no path
227 * url (str): Relative URL
228 * base_url (str): Base URL, it must be an absolute URL.
231 >>> normalize_url('https://example.com', 'http://example.com/')
232 'https://example.com/'
233 >>> normalize_url('//example.com', 'http://example.com/')
234 'http://example.com/'
235 >>> normalize_url('//example.com', 'https://example.com/')
236 'https://example.com/'
237 >>> normalize_url('/path?a=1', 'https://example.com')
238 'https://example.com/path?a=1'
239 >>> normalize_url('', 'https://example.com')
240 'https://example.com/'
241 >>> normalize_url('/test', '/path')
245 * lxml.etree.ParserError
248 * str: normalized URL
250 if url.startswith(
'//'):
252 parsed_search_url = urlparse(base_url)
253 url =
'{0}:{1}'.format(parsed_search_url.scheme
or 'http', url)
254 elif url.startswith(
'/'):
256 url = urljoin(base_url, url)
260 url = urljoin(base_url, url)
262 parsed_url = urlparse(url)
265 if not parsed_url.netloc:
266 raise ValueError(
'Cannot parse url')
267 if not parsed_url.path:
274 """Extract and normalize URL from lxml Element
277 * xpath_results (Union[List[html.HtmlElement], html.HtmlElement]): lxml Element(s)
278 * base_url (str): Base URL
281 >>> def f(s, search_url):
282 >>> return searx.utils.extract_url(html.fromstring(s), search_url)
283 >>> f('<span id="42">https://example.com</span>', 'http://example.com/')
284 'https://example.com/'
285 >>> f('https://example.com', 'http://example.com/')
286 'https://example.com/'
287 >>> f('//example.com', 'http://example.com/')
288 'http://example.com/'
289 >>> f('//example.com', 'https://example.com/')
290 'https://example.com/'
291 >>> f('/path?a=1', 'https://example.com')
292 'https://example.com/path?a=1'
293 >>> f('', 'https://example.com')
294 raise lxml.etree.ParserError
295 >>> searx.utils.extract_url([], 'https://example.com')
300 * lxml.etree.ParserError
303 * str: normalized URL
305 if xpath_results == []:
306 raise ValueError(
'Empty url resultset')
311 raise ValueError(
'URL not found')
314def dict_subset(dictionary: MutableMapping, properties: Set[str]) -> Dict:
315 """Extract a subset of a dict
318 >>> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'C'])
320 >>> >> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'D'])
323 return {k: dictionary[k]
for k
in properties
if k
in dictionary}
327 """Determine the *human readable* value of bytes on 1024 base (1KB=1024B)."""
328 s = [
'B ',
'KB',
'MB',
'GB',
'TB']
332 while size > 1024
and p < x:
335 return "%.*f %s" % (precision, size, s[p])
339 """Determine the *human readable* value of a decimal number."""
340 s = [
'',
'K',
'M',
'B',
'T']
344 while size > 1000
and p < x:
347 return "%.*f%s" % (precision, size, s[p])
351 """Convert number_str to int or 0 if number_str is not a number."""
352 if number_str.isdigit():
353 return int(number_str)
357def extr(txt: str, begin: str, end: str, default: str =
""):
358 """Extract the string between ``begin`` and ``end`` from ``txt``
360 :param txt: String to search in
361 :param begin: First string to be searched for
362 :param end: Second string to be searched for after ``begin``
363 :param default: Default value if one of ``begin`` or ``end`` is not
364 found. Defaults to an empty string.
365 :return: The string between the two search-strings ``begin`` and ``end``.
366 If at least one of ``begin`` or ``end`` is not found, the value of
367 ``default`` is returned.
370 >>> extr("abcde", "a", "e")
372 >>> extr("abcde", "a", "z", deafult="nothing")
380 first = txt.index(begin) + len(begin)
381 return txt[first : txt.index(end, first)]
387 """Convert num to int or 0. num can be either a str or a list.
388 If num is a list, the first element is converted to int (or return 0 if the list is empty).
389 If num is a str, see convert_str_to_int
391 if isinstance(num, list):
399 """Return language code and name if lang describe a language.
402 >>> is_valid_lang('zz')
404 >>> is_valid_lang('uk')
405 (True, 'uk', 'ukrainian')
406 >>> is_valid_lang(b'uk')
407 (True, 'uk', 'ukrainian')
408 >>> is_valid_lang('en')
409 (True, 'en', 'english')
410 >>> searx.utils.is_valid_lang('Español')
411 (True, 'es', 'spanish')
412 >>> searx.utils.is_valid_lang('Spanish')
413 (True, 'es', 'spanish')
415 if isinstance(lang, bytes):
417 is_abbr = len(lang) == 2
420 for l
in sxng_locales:
422 return (
True, l[0][:2], l[3].lower())
424 for l
in sxng_locales:
425 if l[1].lower() == lang
or l[3].lower() == lang:
426 return (
True, l[0][:2], l[3].lower())
430def load_module(filename: str, module_dir: str) -> types.ModuleType:
431 modname = splitext(filename)[0]
432 modpath = join(module_dir, filename)
434 spec = importlib.util.spec_from_file_location(modname, modpath)
436 raise ValueError(f
"Error loading '{modpath}' module")
437 module = importlib.util.module_from_spec(spec)
439 raise ValueError(f
"Error loading '{modpath}' module")
440 spec.loader.exec_module(module)
445 """Convert obj to its string representation."""
446 if isinstance(obj, str):
448 if hasattr(obj,
'__str__'):
454 """Python implementation of the unescape javascript function
456 https://www.ecma-international.org/ecma-262/6.0/#sec-unescape-string
457 https://developer.mozilla.org/fr/docs/Web/JavaScript/Reference/Objets_globaux/unescape
460 >>> ecma_unescape('%u5409')
462 >>> ecma_unescape('%20')
464 >>> ecma_unescape('%F3')
468 string = _ECMA_UNESCAPE4_RE.sub(
lambda e: chr(int(e.group(1), 16)), string)
470 string = _ECMA_UNESCAPE2_RE.sub(
lambda e: chr(int(e.group(1), 16)), string)
475 """Removes unicode's "PRIVATE USE CHARACTER"s (PUA_) from a string.
477 .. _PUA: https://en.wikipedia.org/wiki/Private_Use_Areas
479 pua_ranges = ((0xE000, 0xF8FF), (0xF0000, 0xFFFFD), (0x100000, 0x10FFFD))
483 if any(a <= i <= b
for (a, b)
in pua_ranges):
490 rep = {re.escape(k): v
for k, v
in replaces.items()}
491 pattern = re.compile(
"|".join(rep.keys()))
494 return pattern.sub(
lambda m: rep[re.escape(m.group(0))], text)
500 """Return engine configuration from settings.yml of a given engine name"""
502 if 'engines' not in settings:
505 for engine
in settings[
'engines']:
506 if 'name' not in engine:
508 if name == engine[
'name']:
515 """Return cached compiled XPath
517 There is no thread lock.
518 Worst case scenario, xpath_str is compiled more than one time.
521 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
524 * result (bool, float, list, str): Results.
527 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
528 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
530 if isinstance(xpath_spec, str):
531 result = _XPATH_CACHE.get(xpath_spec,
None)
534 result = XPath(xpath_spec)
535 except XPathSyntaxError
as e:
537 _XPATH_CACHE[xpath_spec] = result
540 if isinstance(xpath_spec, XPath):
543 raise TypeError(
'xpath_spec must be either a str or a lxml.etree.XPath')
546def eval_xpath(element: ElementBase, xpath_spec: XPathSpecType):
547 """Equivalent of element.xpath(xpath_str) but compile xpath_str once for all.
548 See https://lxml.de/xpathxslt.html#xpath-return-values
551 * element (ElementBase): [description]
552 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
555 * result (bool, float, list, str): Results.
558 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
559 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
560 * SearxEngineXPathException: Raise when the XPath can't be evaluated.
564 return xpath(element)
565 except XPathError
as e:
566 arg =
' '.join([str(i)
for i
in e.args])
570def eval_xpath_list(element: ElementBase, xpath_spec: XPathSpecType, min_len: Optional[int] =
None):
571 """Same as eval_xpath, check if the result is a list
574 * element (ElementBase): [description]
575 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
576 * min_len (int, optional): [description]. Defaults to None.
579 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
580 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
581 * SearxEngineXPathException: raise if the result is not a list
584 * result (bool, float, list, str): Results.
587 if not isinstance(result, list):
589 if min_len
is not None and min_len > len(result):
595 """Call eval_xpath_list then get one element using the index parameter.
596 If the index does not exist, either raise an exception is default is not set,
597 other return the default value (can be None).
600 * elements (ElementBase): lxml element to apply the xpath.
601 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath.
602 * index (int): index to get
603 * default (Object, optional): Defaults if index doesn't exist.
606 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
607 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
608 * SearxEngineXPathException: if the index is not found. Also see eval_xpath.
611 * result (bool, float, list, str): Results.
614 if -len(result) <= index < len(result):
616 if default == _NOTSET:
624 global _FASTTEXT_MODEL
625 if _FASTTEXT_MODEL
is None:
629 fasttext.FastText.eprint =
lambda x:
None
630 _FASTTEXT_MODEL = fasttext.load_model(str(data_dir /
'lid.176.ftz'))
631 return _FASTTEXT_MODEL
636 Converts a standard video URL into its embed format. Supported services include Youtube,
637 Facebook, Instagram, TikTok, Dailymotion, and Bilibili.
639 parsed_url = urlparse(url)
643 if parsed_url.netloc
in [
'www.youtube.com',
'youtube.com']
and parsed_url.path ==
'/watch' and parsed_url.query:
644 video_id = parse_qs(parsed_url.query).get(
'v', [])
646 iframe_src =
'https://www.youtube-nocookie.com/embed/' + video_id[0]
649 elif parsed_url.netloc
in [
'www.facebook.com',
'facebook.com']:
650 encoded_href = urlencode({
'href': url})
651 iframe_src =
'https://www.facebook.com/plugins/video.php?allowfullscreen=true&' + encoded_href
654 elif parsed_url.netloc
in [
'www.instagram.com',
'instagram.com']
and parsed_url.path.startswith(
'/p/'):
655 if parsed_url.path.endswith(
'/'):
656 iframe_src = url +
'embed'
658 iframe_src = url +
'/embed'
662 parsed_url.netloc
in [
'www.tiktok.com',
'tiktok.com']
663 and parsed_url.path.startswith(
'/@')
664 and '/video/' in parsed_url.path
666 path_parts = parsed_url.path.split(
'/video/')
667 video_id = path_parts[1]
668 iframe_src =
'https://www.tiktok.com/embed/' + video_id
671 elif parsed_url.netloc
in [
'www.dailymotion.com',
'dailymotion.com']
and parsed_url.path.startswith(
'/video/'):
672 path_parts = parsed_url.path.split(
'/')
673 if len(path_parts) == 3:
674 video_id = path_parts[2]
675 iframe_src =
'https://www.dailymotion.com/embed/video/' + video_id
678 elif parsed_url.netloc
in [
'www.bilibili.com',
'bilibili.com']
and parsed_url.path.startswith(
'/video/'):
679 path_parts = parsed_url.path.split(
'/')
681 video_id = path_parts[2]
683 if video_id.startswith(
'av'):
684 video_id = video_id[2:]
686 elif video_id.startswith(
'BV'):
690 f
'https://player.bilibili.com/player.html?{param_key}={video_id}&high_quality=1&autoplay=false&danmaku=0'
696def detect_language(text: str, threshold: float = 0.3, only_search_languages: bool =
False) -> Optional[str]:
697 """Detect the language of the ``text`` parameter.
699 :param str text: The string whose language is to be detected.
701 :param float threshold: Threshold filters the returned labels by a threshold
702 on probability. A choice of 0.3 will return labels with at least 0.3
705 :param bool only_search_languages: If ``True``, returns only supported
706 SearXNG search languages. see :py:obj:`searx.languages`
710 The detected language code or ``None``. See below.
712 :raises ValueError: If ``text`` is not a string.
714 The language detection is done by using `a fork`_ of the fastText_ library
715 (`python fasttext`_). fastText_ distributes the `language identification
716 model`_, for reference:
718 - `FastText.zip: Compressing text classification models`_
719 - `Bag of Tricks for Efficient Text Classification`_
721 The `language identification model`_ support the language codes
724 af als am an ar arz as ast av az azb ba bar bcl be bg bh bn bo bpy br bs
725 bxr ca cbk ce ceb ckb co cs cv cy da de diq dsb dty dv el eml en eo es
726 et eu fa fi fr frr fy ga gd gl gn gom gu gv he hi hif hr hsb ht hu hy ia
727 id ie ilo io is it ja jbo jv ka kk km kn ko krc ku kv kw ky la lb lez li
728 lmo lo lrc lt lv mai mg mhr min mk ml mn mr mrj ms mt mwl my myv mzn nah
729 nap nds ne new nl nn no oc or os pa pam pfl pl pms pnb ps pt qu rm ro ru
730 rue sa sah sc scn sco sd sh si sk sl so sq sr su sv sw ta te tg th tk tl
731 tr tt tyv ug uk ur uz vec vep vi vls vo wa war wuu xal xmf yi yo yue zh
733 By using ``only_search_languages=True`` the `language identification model`_
734 is harmonized with the SearXNG's language (locale) model. General
735 conditions of SearXNG's locale model are:
737 a. SearXNG's locale of a query is passed to the
738 :py:obj:`searx.locales.get_engine_locale` to get a language and/or region
739 code that is used by an engine.
741 b. Most of SearXNG's engines do not support all the languages from `language
742 identification model`_ and there is also a discrepancy in the ISO-639-3
743 (fasttext) and ISO-639-2 (SearXNG)handling. Further more, in SearXNG the
744 locales like ``zh-TH`` (``zh-CN``) are mapped to ``zh_Hant``
745 (``zh_Hans``) while the `language identification model`_ reduce both to
748 .. _a fork: https://github.com/searxng/fasttext-predict
749 .. _fastText: https://fasttext.cc/
750 .. _python fasttext: https://pypi.org/project/fasttext/
751 .. _language identification model: https://fasttext.cc/docs/en/language-identification.html
752 .. _Bag of Tricks for Efficient Text Classification: https://arxiv.org/abs/1607.01759
753 .. _`FastText.zip: Compressing text classification models`: https://arxiv.org/abs/1612.03651
756 if not isinstance(text, str):
757 raise ValueError(
'text must a str')
759 if isinstance(r, tuple)
and len(r) == 2
and len(r[0]) > 0
and len(r[1]) > 0:
760 language = r[0][0].split(
'__label__')[1]
761 if only_search_languages
and language
not in SEARCH_LANGUAGE_CODES:
768 """Convert a javascript variable into JSON and then load the value
770 It does not deal with all cases, but it is good enough for now.
771 chompjs has a better implementation.
780 parts = re.split(
r'(["\'])', js_variable)
783 for i, p
in enumerate(parts):
788 parts[i] = parts[i].replace(
':', chr(1))
794 parts[i] = parts[i].replace(
'"',
r'\"')
797 if not in_string
and p
in (
'"',
"'"):
808 if len(previous_p) > 0
and previous_p[-1] ==
'\\':
820 parts[i] = _JS_VOID_RE.sub(
"null", p)
829 s = _JS_QUOTE_KEYS_RE.sub(
r'\1"\2"\3', s)
830 s = _JS_DECIMAL_RE.sub(
":0.", s)
832 s = s.replace(chr(1),
':')
837 s = s.replace(
"',",
"\",")
843 """Parse a time string in format MM:SS or HH:MM:SS and convert it to a `timedelta` object.
845 Returns None if the provided string doesn't match any of the formats.
847 duration_str = duration_str.strip()
854 time_parts = ([
"00"] + duration_str.split(
":"))[:3]
855 hours, minutes, seconds = map(int, time_parts)
856 return timedelta(hours=hours, minutes=minutes, seconds=seconds)
858 except (ValueError, TypeError):
extr(str txt, str begin, str end, str default="")
Callable[[str], str] get_string_replaces_function(Dict[str, str] replaces)
js_variable_to_python(js_variable)
Optional[str] detect_language(str text, float threshold=0.3, bool only_search_languages=False)
XPath get_xpath(XPathSpecType xpath_spec)
humanize_bytes(size, precision=2)
Optional[str] extract_text(xpath_results, bool allow_none=False)
str ecma_unescape(str string)
eval_xpath_getindex(ElementBase elements, XPathSpecType xpath_spec, int index, default=_NOTSET)
int convert_str_to_int(str number_str)
eval_xpath(ElementBase element, XPathSpecType xpath_spec)
str gen_useragent(Optional[str] os_string=None)
int int_or_zero(Union[List[str], str] num)
Optional[Tuple[bool, str, str]] is_valid_lang(lang)
"fasttext.FastText._FastText" _get_fasttext_model()
str extract_url(xpath_results, base_url)
eval_xpath_list(ElementBase element, XPathSpecType xpath_spec, Optional[int] min_len=None)
types.ModuleType load_module(str filename, str module_dir)
str normalize_url(str url, str base_url)
get_embeded_stream_url(url)
str html_to_text(str html_str)
humanize_number(size, precision=0)
Dict dict_subset(MutableMapping dictionary, Set[str] properties)
timedelta|None parse_duration_string(str duration_str)
Dict get_engine_from_settings(str name)
str markdown_to_text(str markdown_str)
remove_pua_from_str(string)