.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
__init__.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2# pylint: disable=missing-module-docstring, global-statement
3
4__all__ = ["get_network", "initialize", "check_network_configuration", "raise_for_httperror"]
5
6import typing as t
7
8import asyncio
9import threading
10import concurrent.futures
11from queue import SimpleQueue
12from types import MethodType
13from timeit import default_timer
14from collections.abc import Iterable
15from contextlib import contextmanager
16
17import httpx
18import anyio
19
20from searx.extended_types import SXNG_Response
21from .network import get_network, initialize, check_network_configuration # pylint:disable=cyclic-import
22from .client import get_loop
23from .raise_for_httperror import raise_for_httperror
24
25if t.TYPE_CHECKING:
26 from searx.network.network import Network
27
28THREADLOCAL = threading.local()
29"""Thread-local data is data for thread specific values."""
30
31
33 THREADLOCAL.total_time = 0
34
35
36def get_time_for_thread() -> float | None:
37 """returns thread's total time or None"""
38 return THREADLOCAL.__dict__.get('total_time')
39
40
41def set_timeout_for_thread(timeout: float, start_time: float | None = None):
42 THREADLOCAL.timeout = timeout
43 THREADLOCAL.start_time = start_time
44
45
46def set_context_network_name(network_name: str):
47 THREADLOCAL.network = get_network(network_name)
48
49
50def get_context_network() -> "Network":
51 """If set return thread's network.
52
53 If unset, return value from :py:obj:`get_network`.
54 """
55 return THREADLOCAL.__dict__.get('network') or get_network()
56
57
58@contextmanager
60 # pylint: disable=too-many-branches
61 time_before_request = default_timer()
62 start_time = getattr(THREADLOCAL, 'start_time', time_before_request)
63 try:
64 yield start_time
65 finally:
66 # update total_time.
67 # See get_time_for_thread() and reset_time_for_thread()
68 if hasattr(THREADLOCAL, 'total_time'):
69 time_after_request = default_timer()
70 THREADLOCAL.total_time += time_after_request - time_before_request
71
72
73def _get_timeout(start_time: float, kwargs: t.Any) -> float:
74 # pylint: disable=too-many-branches
75
76 timeout: float | None
77 # timeout (httpx)
78 if 'timeout' in kwargs:
79 timeout = kwargs['timeout']
80 else:
81 timeout = getattr(THREADLOCAL, 'timeout', None)
82 if timeout is not None:
83 kwargs['timeout'] = timeout
84
85 # 2 minutes timeout for the requests without timeout
86 timeout = timeout or 120
87
88 # adjust actual timeout
89 timeout += 0.2 # overhead
90 if start_time:
91 timeout -= default_timer() - start_time
92
93 return timeout
94
95
96def request(method: str, url: str, **kwargs: t.Any) -> SXNG_Response:
97 """same as requests/requests/api.py request(...)"""
98 with _record_http_time() as start_time:
99 network = get_context_network()
100 timeout = _get_timeout(start_time, kwargs)
101 future = asyncio.run_coroutine_threadsafe(
102 network.request(method, url, **kwargs),
103 get_loop(),
104 )
105 try:
106 return future.result(timeout)
107 except concurrent.futures.TimeoutError as e:
108 raise httpx.TimeoutException('Timeout', request=None) from e
109
110
111def multi_requests(request_list: list["Request"]) -> list[httpx.Response | Exception]:
112 """send multiple HTTP requests in parallel. Wait for all requests to finish."""
113 with _record_http_time() as start_time:
114 # send the requests
115 network = get_context_network()
116 loop = get_loop()
117 future_list = []
118 for request_desc in request_list:
119 timeout = _get_timeout(start_time, request_desc.kwargs)
120 future = asyncio.run_coroutine_threadsafe(
121 network.request(request_desc.method, request_desc.url, **request_desc.kwargs), loop
122 )
123 future_list.append((future, timeout))
124
125 # read the responses
126 responses = []
127 for future, timeout in future_list:
128 try:
129 responses.append(future.result(timeout))
130 except concurrent.futures.TimeoutError:
131 responses.append(httpx.TimeoutException('Timeout', request=None))
132 except Exception as e: # pylint: disable=broad-except
133 responses.append(e)
134 return responses
135
136
137class Request(t.NamedTuple):
138 """Request description for the multi_requests function"""
139
140 method: str
141 url: str
142 kwargs: dict[str, str] = {}
143
144 @staticmethod
145 def get(url: str, **kwargs: t.Any):
146 return Request('GET', url, kwargs)
147
148 @staticmethod
149 def options(url: str, **kwargs: t.Any):
150 return Request('OPTIONS', url, kwargs)
151
152 @staticmethod
153 def head(url: str, **kwargs: t.Any):
154 return Request('HEAD', url, kwargs)
155
156 @staticmethod
157 def post(url: str, **kwargs: t.Any):
158 return Request('POST', url, kwargs)
159
160 @staticmethod
161 def put(url: str, **kwargs: t.Any):
162 return Request('PUT', url, kwargs)
163
164 @staticmethod
165 def patch(url: str, **kwargs: t.Any):
166 return Request('PATCH', url, kwargs)
167
168 @staticmethod
169 def delete(url: str, **kwargs: t.Any):
170 return Request('DELETE', url, kwargs)
171
172
173def get(url: str, **kwargs: t.Any) -> SXNG_Response:
174 kwargs.setdefault('allow_redirects', True)
175 return request('get', url, **kwargs)
176
177
178def options(url: str, **kwargs: t.Any) -> SXNG_Response:
179 kwargs.setdefault('allow_redirects', True)
180 return request('options', url, **kwargs)
181
182
183def head(url: str, **kwargs: t.Any) -> SXNG_Response:
184 kwargs.setdefault('allow_redirects', False)
185 return request('head', url, **kwargs)
186
187
188def post(url: str, data: dict[str, t.Any] | None = None, **kwargs: t.Any) -> SXNG_Response:
189 return request('post', url, data=data, **kwargs)
190
191
192def put(url: str, data: dict[str, t.Any] | None = None, **kwargs: t.Any) -> SXNG_Response:
193 return request('put', url, data=data, **kwargs)
194
195
196def patch(url: str, data: dict[str, t.Any] | None = None, **kwargs: t.Any) -> SXNG_Response:
197 return request('patch', url, data=data, **kwargs)
198
199
200def delete(url: str, **kwargs: t.Any) -> SXNG_Response:
201 return request('delete', url, **kwargs)
202
203
204async def stream_chunk_to_queue(network, queue, method: str, url: str, **kwargs: t.Any):
205 try:
206 async with await network.stream(method, url, **kwargs) as response:
207 queue.put(response)
208 # aiter_raw: access the raw bytes on the response without applying any HTTP content decoding
209 # https://www.python-httpx.org/quickstart/#streaming-responses
210 async for chunk in response.aiter_raw(65536):
211 if len(chunk) > 0:
212 queue.put(chunk)
213 except (httpx.StreamClosed, anyio.ClosedResourceError):
214 # the response was queued before the exception.
215 # the exception was raised on aiter_raw.
216 # we do nothing here: in the finally block, None will be queued
217 # so stream(method, url, **kwargs) generator can stop
218 pass
219 except Exception as e: # pylint: disable=broad-except
220 # broad except to avoid this scenario:
221 # exception in network.stream(method, url, **kwargs)
222 # -> the exception is not catch here
223 # -> queue None (in finally)
224 # -> the function below steam(method, url, **kwargs) has nothing to return
225 queue.put(e)
226 finally:
227 queue.put(None)
228
229
230def _stream_generator(method: str, url: str, **kwargs: t.Any):
231 queue = SimpleQueue()
232 network = get_context_network()
233 future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop())
234
235 # yield chunks
236 obj_or_exception = queue.get()
237 while obj_or_exception is not None:
238 if isinstance(obj_or_exception, Exception):
239 raise obj_or_exception
240 yield obj_or_exception
241 obj_or_exception = queue.get()
242 future.result()
243
244
246 asyncio.run_coroutine_threadsafe(self.aclose(), get_loop())
247 # reach the end of _self.generator ( _stream_generator ) to an avoid memory leak.
248 # it makes sure that :
249 # * the httpx response is closed (see the stream_chunk_to_queue function)
250 # * to call future.result() in _stream_generator
251 for _ in self._generator: # pylint: disable=protected-access
252 continue
253
254
255def stream(method: str, url: str, **kwargs: t.Any) -> tuple[SXNG_Response, Iterable[bytes]]:
256 """Replace httpx.stream.
257
258 Usage:
259 response, stream = poolrequests.stream(...)
260 for chunk in stream:
261 ...
262
263 httpx.Client.stream requires to write the httpx.HTTPTransport version of the
264 the httpx.AsyncHTTPTransport declared above.
265 """
266 generator = _stream_generator(method, url, **kwargs)
267
268 # yield response
269 response = next(generator) # pylint: disable=stop-iteration-return
270 if isinstance(response, Exception):
271 raise response
272
273 response._generator = generator # pylint: disable=protected-access
274 response.close = MethodType(_close_response_method, response)
275
276 return response, generator
delete(str url, **t.Any kwargs)
Definition __init__.py:169
post(str url, **t.Any kwargs)
Definition __init__.py:157
head(str url, **t.Any kwargs)
Definition __init__.py:153
put(str url, **t.Any kwargs)
Definition __init__.py:161
patch(str url, **t.Any kwargs)
Definition __init__.py:165
get(str url, **t.Any kwargs)
Definition __init__.py:145
options(str url, **t.Any kwargs)
Definition __init__.py:149
tuple[SXNG_Response, Iterable[bytes]] stream(str method, str url, **t.Any kwargs)
Definition __init__.py:255
set_context_network_name(str network_name)
Definition __init__.py:46
float _get_timeout(float start_time, t.Any kwargs)
Definition __init__.py:73
SXNG_Response patch(str url, dict[str, t.Any]|None data=None, **t.Any kwargs)
Definition __init__.py:196
_stream_generator(str method, str url, **t.Any kwargs)
Definition __init__.py:230
stream_chunk_to_queue(network, queue, str method, str url, **t.Any kwargs)
Definition __init__.py:204
SXNG_Response head(str url, **t.Any kwargs)
Definition __init__.py:183
_record_http_time()
Definition __init__.py:59
SXNG_Response put(str url, dict[str, t.Any]|None data=None, **t.Any kwargs)
Definition __init__.py:192
SXNG_Response get(str url, **t.Any kwargs)
Definition __init__.py:173
SXNG_Response options(str url, **t.Any kwargs)
Definition __init__.py:178
set_timeout_for_thread(float timeout, float|None start_time=None)
Definition __init__.py:41
"Network" get_context_network()
Definition __init__.py:50
SXNG_Response delete(str url, **t.Any kwargs)
Definition __init__.py:200
list[httpx.Response|Exception] multi_requests(list["Request"] request_list)
Definition __init__.py:111
SXNG_Response post(str url, dict[str, t.Any]|None data=None, **t.Any kwargs)
Definition __init__.py:188
SXNG_Response request(str method, str url, **t.Any kwargs)
Definition __init__.py:96
_close_response_method(self)
Definition __init__.py:245
float|None get_time_for_thread()
Definition __init__.py:36
reset_time_for_thread()
Definition __init__.py:32