2"""Utility functions for the engines"""
4from __future__
import annotations
12from typing
import Optional, Union, Any, Set, List, Dict, MutableMapping, Tuple, Callable
13from numbers
import Number
14from os.path
import splitext, join
15from random
import choice
16from html.parser
import HTMLParser
17from html
import escape
18from urllib.parse
import urljoin, urlparse, parse_qs, urlencode
19from datetime
import timedelta
20from markdown_it
import MarkdownIt
23from lxml.etree
import ElementBase, XPath, XPathError, XPathSyntaxError
25from searx
import settings
29from searx.exceptions import SearxXPathSyntaxException, SearxEngineXPathException
30from searx
import logger
33logger = logger.getChild(
'utils')
35XPathSpecType = Union[str, XPath]
37_BLOCKED_TAGS = (
'script',
'style')
39_ECMA_UNESCAPE4_RE = re.compile(
r'%u([0-9a-fA-F]{4})', re.UNICODE)
40_ECMA_UNESCAPE2_RE = re.compile(
r'%([0-9a-fA-F]{2})', re.UNICODE)
42_JS_QUOTE_KEYS_RE = re.compile(
r'([\{\s,])(\w+)(:)')
43_JS_VOID_RE = re.compile(
r'void\s+[0-9]+|void\s*\([0-9]+\)')
44_JS_DECIMAL_RE = re.compile(
r":\s*\.")
46_XPATH_CACHE: Dict[str, XPath] = {}
47_LANG_TO_LC_CACHE: Dict[str, Dict[str, str]] = {}
49_FASTTEXT_MODEL: Optional[
"fasttext.FastText._FastText"] =
None
50"""fasttext model to predict language of a search term"""
52SEARCH_LANGUAGE_CODES = frozenset([searxng_locale[0].split(
'-')[0]
for searxng_locale
in sxng_locales])
53"""Languages supported by most searxng engines (:py:obj:`searx.sxng_locales.sxng_locales`)."""
57 """Internal class for this module, do not create instance of this class.
58 Replace the None value, allow explicitly pass None as a function argument"""
61_NOTSET = _NotSetClass()
65 """Return the searx User Agent"""
66 return 'searx/{searx_version} {suffix}'.format(
67 searx_version=VERSION_TAG, suffix=settings[
'outgoing'][
'useragent_suffix']
72 """Return a random browser User Agent
74 See searx/data/useragents.json
76 return USER_AGENTS[
'ua'].format(os=os_string
or choice(USER_AGENTS[
'os']), version=choice(USER_AGENTS[
'versions']))
80 """Internal exception raised when the HTML is invalid"""
83class _HTMLTextExtractor(HTMLParser):
84 """Internal class to extract text from HTML"""
87 HTMLParser.__init__(self)
100 if tag != self.
tags[-1]:
106 return not self.
tags or self.
tags[-1]
not in _BLOCKED_TAGS
116 if name[0]
in (
'x',
'X'):
117 codepoint = int(name[1:], 16)
119 codepoint = int(name)
120 self.
result.append(chr(codepoint))
130 return ''.join(self.
result).strip()
135 raise AssertionError(message)
139 """Extract text from a HTML string
142 * html_str (str): string HTML
145 * str: extracted text
148 >>> html_to_text('Example <span id="42">#2</span>')
151 >>> html_to_text('<style>.span { color: red; }</style><span>Example</span>')
154 >>> html_to_text(r'regexp: (?<![a-zA-Z]')
155 'regexp: (?<![a-zA-Z]'
157 html_str = html_str.replace(
'\n',
' ').replace(
'\r',
' ')
158 html_str =
' '.join(html_str.split())
162 except AssertionError:
164 s.feed(escape(html_str, quote=
True))
165 except _HTMLTextExtractorException:
166 logger.debug(
"HTMLTextExtractor: invalid HTML\n%s", html_str)
171 """Extract text from a Markdown string
174 * markdown_str (str): string Markdown
177 * str: extracted text
180 >>> markdown_to_text('[example](https://example.com)')
183 >>> markdown_to_text('## Headline')
188 MarkdownIt(
"commonmark", {
"typographer":
True}).enable([
"replacements",
"smartquotes"]).render(markdown_str)
193def extract_text(xpath_results, allow_none: bool =
False) -> Optional[str]:
194 """Extract text from a lxml result
196 * if xpath_results is list, extract the text from each result and concat the list
197 * if xpath_results is a xml element, extract all the text node from it
198 ( text_content() method from lxml )
199 * if xpath_results is a string element, then it's already done
201 if isinstance(xpath_results, list):
204 for e
in xpath_results:
206 return result.strip()
207 if isinstance(xpath_results, ElementBase):
209 text: str = html.tostring(xpath_results, encoding=
'unicode', method=
'text', with_tail=
False)
210 text = text.strip().replace(
'\n',
' ')
211 return ' '.join(text.split())
212 if isinstance(xpath_results, (str, Number, bool)):
213 return str(xpath_results)
214 if xpath_results
is None and allow_none:
216 if xpath_results
is None and not allow_none:
217 raise ValueError(
'extract_text(None, allow_none=False)')
218 raise ValueError(
'unsupported type')
222 """Normalize URL: add protocol, join URL with base_url, add trailing slash if there is no path
225 * url (str): Relative URL
226 * base_url (str): Base URL, it must be an absolute URL.
229 >>> normalize_url('https://example.com', 'http://example.com/')
230 'https://example.com/'
231 >>> normalize_url('//example.com', 'http://example.com/')
232 'http://example.com/'
233 >>> normalize_url('//example.com', 'https://example.com/')
234 'https://example.com/'
235 >>> normalize_url('/path?a=1', 'https://example.com')
236 'https://example.com/path?a=1'
237 >>> normalize_url('', 'https://example.com')
238 'https://example.com/'
239 >>> normalize_url('/test', '/path')
243 * lxml.etree.ParserError
246 * str: normalized URL
248 if url.startswith(
'//'):
250 parsed_search_url = urlparse(base_url)
251 url =
'{0}:{1}'.format(parsed_search_url.scheme
or 'http', url)
252 elif url.startswith(
'/'):
254 url = urljoin(base_url, url)
258 url = urljoin(base_url, url)
260 parsed_url = urlparse(url)
263 if not parsed_url.netloc:
264 raise ValueError(
'Cannot parse url')
265 if not parsed_url.path:
272 """Extract and normalize URL from lxml Element
275 * xpath_results (Union[List[html.HtmlElement], html.HtmlElement]): lxml Element(s)
276 * base_url (str): Base URL
279 >>> def f(s, search_url):
280 >>> return searx.utils.extract_url(html.fromstring(s), search_url)
281 >>> f('<span id="42">https://example.com</span>', 'http://example.com/')
282 'https://example.com/'
283 >>> f('https://example.com', 'http://example.com/')
284 'https://example.com/'
285 >>> f('//example.com', 'http://example.com/')
286 'http://example.com/'
287 >>> f('//example.com', 'https://example.com/')
288 'https://example.com/'
289 >>> f('/path?a=1', 'https://example.com')
290 'https://example.com/path?a=1'
291 >>> f('', 'https://example.com')
292 raise lxml.etree.ParserError
293 >>> searx.utils.extract_url([], 'https://example.com')
298 * lxml.etree.ParserError
301 * str: normalized URL
303 if xpath_results == []:
304 raise ValueError(
'Empty url resultset')
309 raise ValueError(
'URL not found')
312def dict_subset(dictionary: MutableMapping, properties: Set[str]) -> Dict:
313 """Extract a subset of a dict
316 >>> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'C'])
318 >>> >> dict_subset({'A': 'a', 'B': 'b', 'C': 'c'}, ['A', 'D'])
321 return {k: dictionary[k]
for k
in properties
if k
in dictionary}
325 """Determine the *human readable* value of bytes on 1024 base (1KB=1024B)."""
326 s = [
'B ',
'KB',
'MB',
'GB',
'TB']
330 while size > 1024
and p < x:
333 return "%.*f %s" % (precision, size, s[p])
337 """Determine the *human readable* value of a decimal number."""
338 s = [
'',
'K',
'M',
'B',
'T']
342 while size > 1000
and p < x:
345 return "%.*f%s" % (precision, size, s[p])
349 """Convert number_str to int or 0 if number_str is not a number."""
350 if number_str.isdigit():
351 return int(number_str)
355def extr(txt: str, begin: str, end: str, default: str =
""):
356 """Extract the string between ``begin`` and ``end`` from ``txt``
358 :param txt: String to search in
359 :param begin: First string to be searched for
360 :param end: Second string to be searched for after ``begin``
361 :param default: Default value if one of ``begin`` or ``end`` is not
362 found. Defaults to an empty string.
363 :return: The string between the two search-strings ``begin`` and ``end``.
364 If at least one of ``begin`` or ``end`` is not found, the value of
365 ``default`` is returned.
368 >>> extr("abcde", "a", "e")
370 >>> extr("abcde", "a", "z", deafult="nothing")
378 first = txt.index(begin) + len(begin)
379 return txt[first : txt.index(end, first)]
385 """Convert num to int or 0. num can be either a str or a list.
386 If num is a list, the first element is converted to int (or return 0 if the list is empty).
387 If num is a str, see convert_str_to_int
389 if isinstance(num, list):
397 """Return language code and name if lang describe a language.
400 >>> is_valid_lang('zz')
402 >>> is_valid_lang('uk')
403 (True, 'uk', 'ukrainian')
404 >>> is_valid_lang(b'uk')
405 (True, 'uk', 'ukrainian')
406 >>> is_valid_lang('en')
407 (True, 'en', 'english')
408 >>> searx.utils.is_valid_lang('Español')
409 (True, 'es', 'spanish')
410 >>> searx.utils.is_valid_lang('Spanish')
411 (True, 'es', 'spanish')
413 if isinstance(lang, bytes):
415 is_abbr = len(lang) == 2
418 for l
in sxng_locales:
420 return (
True, l[0][:2], l[3].lower())
422 for l
in sxng_locales:
423 if l[1].lower() == lang
or l[3].lower() == lang:
424 return (
True, l[0][:2], l[3].lower())
428def load_module(filename: str, module_dir: str) -> types.ModuleType:
429 modname = splitext(filename)[0]
430 modpath = join(module_dir, filename)
432 spec = importlib.util.spec_from_file_location(modname, modpath)
434 raise ValueError(f
"Error loading '{modpath}' module")
435 module = importlib.util.module_from_spec(spec)
437 raise ValueError(f
"Error loading '{modpath}' module")
438 spec.loader.exec_module(module)
443 """Convert obj to its string representation."""
444 if isinstance(obj, str):
446 if hasattr(obj,
'__str__'):
452 """Python implementation of the unescape javascript function
454 https://www.ecma-international.org/ecma-262/6.0/#sec-unescape-string
455 https://developer.mozilla.org/fr/docs/Web/JavaScript/Reference/Objets_globaux/unescape
458 >>> ecma_unescape('%u5409')
460 >>> ecma_unescape('%20')
462 >>> ecma_unescape('%F3')
466 string = _ECMA_UNESCAPE4_RE.sub(
lambda e: chr(int(e.group(1), 16)), string)
468 string = _ECMA_UNESCAPE2_RE.sub(
lambda e: chr(int(e.group(1), 16)), string)
473 """Removes unicode's "PRIVATE USE CHARACTER"s (PUA_) from a string.
475 .. _PUA: https://en.wikipedia.org/wiki/Private_Use_Areas
477 pua_ranges = ((0xE000, 0xF8FF), (0xF0000, 0xFFFFD), (0x100000, 0x10FFFD))
481 if any(a <= i <= b
for (a, b)
in pua_ranges):
488 rep = {re.escape(k): v
for k, v
in replaces.items()}
489 pattern = re.compile(
"|".join(rep.keys()))
492 return pattern.sub(
lambda m: rep[re.escape(m.group(0))], text)
498 """Return engine configuration from settings.yml of a given engine name"""
500 if 'engines' not in settings:
503 for engine
in settings[
'engines']:
504 if 'name' not in engine:
506 if name == engine[
'name']:
513 """Return cached compiled XPath
515 There is no thread lock.
516 Worst case scenario, xpath_str is compiled more than one time.
519 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
522 * result (bool, float, list, str): Results.
525 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
526 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
528 if isinstance(xpath_spec, str):
529 result = _XPATH_CACHE.get(xpath_spec,
None)
532 result = XPath(xpath_spec)
533 except XPathSyntaxError
as e:
535 _XPATH_CACHE[xpath_spec] = result
538 if isinstance(xpath_spec, XPath):
541 raise TypeError(
'xpath_spec must be either a str or a lxml.etree.XPath')
544def eval_xpath(element: ElementBase, xpath_spec: XPathSpecType):
545 """Equivalent of element.xpath(xpath_str) but compile xpath_str once for all.
546 See https://lxml.de/xpathxslt.html#xpath-return-values
549 * element (ElementBase): [description]
550 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
553 * result (bool, float, list, str): Results.
556 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
557 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
558 * SearxEngineXPathException: Raise when the XPath can't be evaluated.
562 return xpath(element)
563 except XPathError
as e:
564 arg =
' '.join([str(i)
for i
in e.args])
568def eval_xpath_list(element: ElementBase, xpath_spec: XPathSpecType, min_len: Optional[int] =
None):
569 """Same as eval_xpath, check if the result is a list
572 * element (ElementBase): [description]
573 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
574 * min_len (int, optional): [description]. Defaults to None.
577 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
578 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
579 * SearxEngineXPathException: raise if the result is not a list
582 * result (bool, float, list, str): Results.
585 if not isinstance(result, list):
587 if min_len
is not None and min_len > len(result):
593 """Call eval_xpath_list then get one element using the index parameter.
594 If the index does not exist, either raise an exception is default is not set,
595 other return the default value (can be None).
598 * elements (ElementBase): lxml element to apply the xpath.
599 * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath.
600 * index (int): index to get
601 * default (Object, optional): Defaults if index doesn't exist.
604 * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
605 * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
606 * SearxEngineXPathException: if the index is not found. Also see eval_xpath.
609 * result (bool, float, list, str): Results.
612 if -len(result) <= index < len(result):
614 if default == _NOTSET:
622 global _FASTTEXT_MODEL
623 if _FASTTEXT_MODEL
is None:
627 fasttext.FastText.eprint =
lambda x:
None
628 _FASTTEXT_MODEL = fasttext.load_model(str(data_dir /
'lid.176.ftz'))
629 return _FASTTEXT_MODEL
634 Converts a standard video URL into its embed format. Supported services include Youtube,
635 Facebook, Instagram, TikTok, Dailymotion, and Bilibili.
637 parsed_url = urlparse(url)
641 if parsed_url.netloc
in [
'www.youtube.com',
'youtube.com']
and parsed_url.path ==
'/watch' and parsed_url.query:
642 video_id = parse_qs(parsed_url.query).get(
'v', [])
644 iframe_src =
'https://www.youtube-nocookie.com/embed/' + video_id[0]
647 elif parsed_url.netloc
in [
'www.facebook.com',
'facebook.com']:
648 encoded_href = urlencode({
'href': url})
649 iframe_src =
'https://www.facebook.com/plugins/video.php?allowfullscreen=true&' + encoded_href
652 elif parsed_url.netloc
in [
'www.instagram.com',
'instagram.com']
and parsed_url.path.startswith(
'/p/'):
653 if parsed_url.path.endswith(
'/'):
654 iframe_src = url +
'embed'
656 iframe_src = url +
'/embed'
660 parsed_url.netloc
in [
'www.tiktok.com',
'tiktok.com']
661 and parsed_url.path.startswith(
'/@')
662 and '/video/' in parsed_url.path
664 path_parts = parsed_url.path.split(
'/video/')
665 video_id = path_parts[1]
666 iframe_src =
'https://www.tiktok.com/embed/' + video_id
669 elif parsed_url.netloc
in [
'www.dailymotion.com',
'dailymotion.com']
and parsed_url.path.startswith(
'/video/'):
670 path_parts = parsed_url.path.split(
'/')
671 if len(path_parts) == 3:
672 video_id = path_parts[2]
673 iframe_src =
'https://www.dailymotion.com/embed/video/' + video_id
676 elif parsed_url.netloc
in [
'www.bilibili.com',
'bilibili.com']
and parsed_url.path.startswith(
'/video/'):
677 path_parts = parsed_url.path.split(
'/')
679 video_id = path_parts[2]
681 if video_id.startswith(
'av'):
682 video_id = video_id[2:]
684 elif video_id.startswith(
'BV'):
688 f
'https://player.bilibili.com/player.html?{param_key}={video_id}&high_quality=1&autoplay=false&danmaku=0'
694def detect_language(text: str, threshold: float = 0.3, only_search_languages: bool =
False) -> Optional[str]:
695 """Detect the language of the ``text`` parameter.
697 :param str text: The string whose language is to be detected.
699 :param float threshold: Threshold filters the returned labels by a threshold
700 on probability. A choice of 0.3 will return labels with at least 0.3
703 :param bool only_search_languages: If ``True``, returns only supported
704 SearXNG search languages. see :py:obj:`searx.languages`
708 The detected language code or ``None``. See below.
710 :raises ValueError: If ``text`` is not a string.
712 The language detection is done by using `a fork`_ of the fastText_ library
713 (`python fasttext`_). fastText_ distributes the `language identification
714 model`_, for reference:
716 - `FastText.zip: Compressing text classification models`_
717 - `Bag of Tricks for Efficient Text Classification`_
719 The `language identification model`_ support the language codes
722 af als am an ar arz as ast av az azb ba bar bcl be bg bh bn bo bpy br bs
723 bxr ca cbk ce ceb ckb co cs cv cy da de diq dsb dty dv el eml en eo es
724 et eu fa fi fr frr fy ga gd gl gn gom gu gv he hi hif hr hsb ht hu hy ia
725 id ie ilo io is it ja jbo jv ka kk km kn ko krc ku kv kw ky la lb lez li
726 lmo lo lrc lt lv mai mg mhr min mk ml mn mr mrj ms mt mwl my myv mzn nah
727 nap nds ne new nl nn no oc or os pa pam pfl pl pms pnb ps pt qu rm ro ru
728 rue sa sah sc scn sco sd sh si sk sl so sq sr su sv sw ta te tg th tk tl
729 tr tt tyv ug uk ur uz vec vep vi vls vo wa war wuu xal xmf yi yo yue zh
731 By using ``only_search_languages=True`` the `language identification model`_
732 is harmonized with the SearXNG's language (locale) model. General
733 conditions of SearXNG's locale model are:
735 a. SearXNG's locale of a query is passed to the
736 :py:obj:`searx.locales.get_engine_locale` to get a language and/or region
737 code that is used by an engine.
739 b. Most of SearXNG's engines do not support all the languages from `language
740 identification model`_ and there is also a discrepancy in the ISO-639-3
741 (fasttext) and ISO-639-2 (SearXNG)handling. Further more, in SearXNG the
742 locales like ``zh-TH`` (``zh-CN``) are mapped to ``zh_Hant``
743 (``zh_Hans``) while the `language identification model`_ reduce both to
746 .. _a fork: https://github.com/searxng/fasttext-predict
747 .. _fastText: https://fasttext.cc/
748 .. _python fasttext: https://pypi.org/project/fasttext/
749 .. _language identification model: https://fasttext.cc/docs/en/language-identification.html
750 .. _Bag of Tricks for Efficient Text Classification: https://arxiv.org/abs/1607.01759
751 .. _`FastText.zip: Compressing text classification models`: https://arxiv.org/abs/1612.03651
754 if not isinstance(text, str):
755 raise ValueError(
'text must a str')
757 if isinstance(r, tuple)
and len(r) == 2
and len(r[0]) > 0
and len(r[1]) > 0:
758 language = r[0][0].split(
'__label__')[1]
759 if only_search_languages
and language
not in SEARCH_LANGUAGE_CODES:
766 """Convert a javascript variable into JSON and then load the value
768 It does not deal with all cases, but it is good enough for now.
769 chompjs has a better implementation.
778 parts = re.split(
r'(["\'])', js_variable)
781 for i, p
in enumerate(parts):
786 parts[i] = parts[i].replace(
':', chr(1))
792 parts[i] = parts[i].replace(
'"',
r'\"')
795 if not in_string
and p
in (
'"',
"'"):
806 if len(previous_p) > 0
and previous_p[-1] ==
'\\':
818 parts[i] = _JS_VOID_RE.sub(
"null", p)
827 s = _JS_QUOTE_KEYS_RE.sub(
r'\1"\2"\3', s)
828 s = _JS_DECIMAL_RE.sub(
":0.", s)
830 s = s.replace(chr(1),
':')
836 """Parse a time string in format MM:SS or HH:MM:SS and convert it to a `timedelta` object.
838 Returns None if the provided string doesn't match any of the formats.
840 duration_str = duration_str.strip()
847 time_parts = ([
"00"] + duration_str.split(
":"))[:3]
848 hours, minutes, seconds = map(int, time_parts)
849 return timedelta(hours=hours, minutes=minutes, seconds=seconds)
851 except (ValueError, TypeError):
extr(str txt, str begin, str end, str default="")
Callable[[str], str] get_string_replaces_function(Dict[str, str] replaces)
js_variable_to_python(js_variable)
Optional[str] detect_language(str text, float threshold=0.3, bool only_search_languages=False)
XPath get_xpath(XPathSpecType xpath_spec)
humanize_bytes(size, precision=2)
Optional[str] extract_text(xpath_results, bool allow_none=False)
str ecma_unescape(str string)
eval_xpath_getindex(ElementBase elements, XPathSpecType xpath_spec, int index, default=_NOTSET)
int convert_str_to_int(str number_str)
eval_xpath(ElementBase element, XPathSpecType xpath_spec)
str gen_useragent(Optional[str] os_string=None)
int int_or_zero(Union[List[str], str] num)
Optional[Tuple[bool, str, str]] is_valid_lang(lang)
"fasttext.FastText._FastText" _get_fasttext_model()
str extract_url(xpath_results, base_url)
eval_xpath_list(ElementBase element, XPathSpecType xpath_spec, Optional[int] min_len=None)
types.ModuleType load_module(str filename, str module_dir)
str normalize_url(str url, str base_url)
get_embeded_stream_url(url)
str html_to_text(str html_str)
humanize_number(size, precision=0)
Dict dict_subset(MutableMapping dictionary, Set[str] properties)
timedelta|None parse_duration_string(str duration_str)
Dict get_engine_from_settings(str name)
str markdown_to_text(str markdown_str)
remove_pua_from_str(string)