.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
abstract.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2"""Abstract base classes for engine request processors.
3
4"""
5
6import typing as t
7
8import logging
9import threading
10from abc import abstractmethod, ABC
11from timeit import default_timer
12
13from searx import settings, logger
14from searx.engines import engines
15from searx.network import get_time_for_thread, get_network
16from searx.metrics import histogram_observe, counter_inc, count_exception, count_error
17from searx.exceptions import SearxEngineAccessDeniedException, SearxEngineResponseException
18from searx.utils import get_engine_from_settings
19
20if t.TYPE_CHECKING:
21 from searx.enginelib import Engine
22
23logger = logger.getChild('searx.search.processor')
24SUSPENDED_STATUS: dict[int | str, 'SuspendedStatus'] = {}
25
26
28 """Class to handle suspend state."""
29
30 __slots__ = 'suspend_end_time', 'suspend_reason', 'continuous_errors', 'lock'
31
32 def __init__(self):
33 self.lock: threading.Lock = threading.Lock()
34 self.continuous_errors: int = 0
35 self.suspend_end_time: float = 0
36 self.suspend_reason: str = ""
37
38 @property
39 def is_suspended(self):
40 return self.suspend_end_time >= default_timer()
41
42 def suspend(self, suspended_time: int, suspend_reason: str):
43 with self.lock:
44 # update continuous_errors / suspend_end_time
45 self.continuous_errors += 1
46 if suspended_time is None:
47 suspended_time = min(
48 settings['search']['max_ban_time_on_fail'],
49 self.continuous_errors * settings['search']['ban_time_on_fail'],
50 )
51 self.suspend_end_time = default_timer() + suspended_time
52 self.suspend_reason = suspend_reason
53 logger.debug('Suspend for %i seconds', suspended_time)
54
55 def resume(self):
56 with self.lock:
57 # reset the suspend variables
58 self.continuous_errors = 0
59 self.suspend_end_time = 0
60 self.suspend_reason = ""
61
62
63class EngineProcessor(ABC):
64 """Base classes used for all types of request processors."""
65
66 __slots__ = 'engine', 'engine_name', 'suspended_status', 'logger'
67
68 def __init__(self, engine: "Engine|ModuleType", engine_name: str):
69 self.engine: "Engine" = engine
70 self.engine_name: str = engine_name
71 self.logger: logging.Logger = engines[engine_name].logger
72 key = get_network(self.engine_name)
73 key = id(key) if key else self.engine_name
74 self.suspended_status: SuspendedStatus = SUSPENDED_STATUS.setdefault(key, SuspendedStatus())
75
76 def initialize(self):
77 try:
78 self.engine.init(get_engine_from_settings(self.engine_name))
79 except SearxEngineResponseException as exc:
80 self.logger.warning('Fail to initialize // %s', exc)
81 except Exception: # pylint: disable=broad-except
82 self.logger.exception('Fail to initialize')
83 else:
84 self.logger.debug('Initialized')
85
86 @property
88 return hasattr(self.engine, 'init')
89
90 def handle_exception(self, result_container, exception_or_message, suspend=False):
91 # update result_container
92 if isinstance(exception_or_message, BaseException):
93 exception_class = exception_or_message.__class__
94 module_name = getattr(exception_class, '__module__', 'builtins')
95 module_name = '' if module_name == 'builtins' else module_name + '.'
96 error_message = module_name + exception_class.__qualname__
97 else:
98 error_message = exception_or_message
99 result_container.add_unresponsive_engine(self.engine_name, error_message)
100 # metrics
101 counter_inc('engine', self.engine_name, 'search', 'count', 'error')
102 if isinstance(exception_or_message, BaseException):
103 count_exception(self.engine_name, exception_or_message)
104 else:
105 count_error(self.engine_name, exception_or_message)
106 # suspend the engine ?
107 if suspend:
108 suspended_time = None
109 if isinstance(exception_or_message, SearxEngineAccessDeniedException):
110 suspended_time = exception_or_message.suspended_time
111 self.suspended_status.suspend(suspended_time, error_message) # pylint: disable=no-member
112
113 def _extend_container_basic(self, result_container, start_time, search_results):
114 # update result_container
115 result_container.extend(self.engine_name, search_results)
116 engine_time = default_timer() - start_time
117 page_load_time = get_time_for_thread()
118 result_container.add_timing(self.engine_name, engine_time, page_load_time)
119 # metrics
120 counter_inc('engine', self.engine_name, 'search', 'count', 'successful')
121 histogram_observe(engine_time, 'engine', self.engine_name, 'time', 'total')
122 if page_load_time is not None:
123 histogram_observe(page_load_time, 'engine', self.engine_name, 'time', 'http')
124
125 def extend_container(self, result_container, start_time, search_results):
126 if getattr(threading.current_thread(), '_timeout', False):
127 # the main thread is not waiting anymore
128 self.handle_exception(result_container, 'timeout', None)
129 else:
130 # check if the engine accepted the request
131 if search_results is not None:
132 self._extend_container_basic(result_container, start_time, search_results)
133 self.suspended_status.resume()
134
135 def extend_container_if_suspended(self, result_container):
136 if self.suspended_status.is_suspended:
137 result_container.add_unresponsive_engine(
138 self.engine_name, self.suspended_status.suspend_reason, suspended=True
139 )
140 return True
141 return False
142
143 def get_params(self, search_query, engine_category) -> dict[str, t.Any]:
144 """Returns a set of (see :ref:`request params <engine request arguments>`) or
145 ``None`` if request is not supported.
146
147 Not supported conditions (``None`` is returned):
148
149 - A page-number > 1 when engine does not support paging.
150 - A time range when the engine does not support time range.
151 """
152 # if paging is not supported, skip
153 if search_query.pageno > 1 and not self.engine.paging:
154 return None
155
156 # if max page is reached, skip
157 max_page = self.engine.max_page or settings['search']['max_page']
158 if max_page and max_page < search_query.pageno:
159 return None
160
161 # if time_range is not supported, skip
162 if search_query.time_range and not self.engine.time_range_support:
163 return None
164
165 params = {}
166 params["query"] = search_query.query
167 params['category'] = engine_category
168 params['pageno'] = search_query.pageno
169 params['safesearch'] = search_query.safesearch
170 params['time_range'] = search_query.time_range
171 params['engine_data'] = search_query.engine_data.get(self.engine_name, {})
172 params['searxng_locale'] = search_query.lang
173
174 # deprecated / vintage --> use params['searxng_locale']
175 #
176 # Conditions related to engine's traits are implemented in engine.traits
177 # module. Don't do 'locale' decisions here in the abstract layer of the
178 # search processor, just pass the value from user's choice unchanged to
179 # the engine request.
180
181 if hasattr(self.engine, 'language') and self.engine.language:
182 params['language'] = self.engine.language
183 else:
184 params['language'] = search_query.lang
185
186 return params
187
188 @abstractmethod
189 def search(self, query, params, result_container, start_time, timeout_limit):
190 pass
191
192 def get_tests(self):
193 tests = getattr(self.engine, 'tests', None)
194 if tests is None:
195 tests = getattr(self.engine, 'additional_tests', {})
196 tests.update(self.get_default_tests())
197 return tests
198
200 return {}
__init__(self, "Engine|ModuleType" engine, str engine_name)
Definition abstract.py:68
handle_exception(self, result_container, exception_or_message, suspend=False)
Definition abstract.py:90
dict[str, t.Any] get_params(self, search_query, engine_category)
Definition abstract.py:143
extend_container_if_suspended(self, result_container)
Definition abstract.py:135
_extend_container_basic(self, result_container, start_time, search_results)
Definition abstract.py:113
extend_container(self, result_container, start_time, search_results)
Definition abstract.py:125
suspend(self, int suspended_time, str suspend_reason)
Definition abstract.py:42
::1337x
Definition 1337x.py:1