.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
__init__.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2# pylint: disable=missing-module-docstring, global-statement
3
4__all__ = ["initialize", "check_network_configuration", "raise_for_httperror"]
5
6import typing as t
7
8import asyncio
9import threading
10import concurrent.futures
11from queue import SimpleQueue
12from types import MethodType
13from timeit import default_timer
14from collections.abc import Iterable
15from contextlib import contextmanager
16
17import httpx
18import anyio
19
20from searx.extended_types import SXNG_Response
21from .network import get_network, initialize, check_network_configuration # pylint:disable=cyclic-import
22from .client import get_loop
23from .raise_for_httperror import raise_for_httperror
24
25
26THREADLOCAL = threading.local()
27"""Thread-local data is data for thread specific values."""
28
29
31 THREADLOCAL.total_time = 0
32
33
35 """returns thread's total time or None"""
36 return THREADLOCAL.__dict__.get('total_time')
37
38
39def set_timeout_for_thread(timeout: float, start_time: float | None = None):
40 THREADLOCAL.timeout = timeout
41 THREADLOCAL.start_time = start_time
42
43
44def set_context_network_name(network_name: str):
45 THREADLOCAL.network = get_network(network_name)
46
47
49 """If set return thread's network.
50
51 If unset, return value from :py:obj:`get_network`.
52 """
53 return THREADLOCAL.__dict__.get('network') or get_network()
54
55
56@contextmanager
58 # pylint: disable=too-many-branches
59 time_before_request = default_timer()
60 start_time = getattr(THREADLOCAL, 'start_time', time_before_request)
61 try:
62 yield start_time
63 finally:
64 # update total_time.
65 # See get_time_for_thread() and reset_time_for_thread()
66 if hasattr(THREADLOCAL, 'total_time'):
67 time_after_request = default_timer()
68 THREADLOCAL.total_time += time_after_request - time_before_request
69
70
71def _get_timeout(start_time: float, kwargs):
72 # pylint: disable=too-many-branches
73
74 timeout: float | None
75 # timeout (httpx)
76 if 'timeout' in kwargs:
77 timeout = kwargs['timeout']
78 else:
79 timeout = getattr(THREADLOCAL, 'timeout', None)
80 if timeout is not None:
81 kwargs['timeout'] = timeout
82
83 # 2 minutes timeout for the requests without timeout
84 timeout = timeout or 120
85
86 # adjust actual timeout
87 timeout += 0.2 # overhead
88 if start_time:
89 timeout -= default_timer() - start_time
90
91 return timeout
92
93
94def request(method, url, **kwargs) -> SXNG_Response:
95 """same as requests/requests/api.py request(...)"""
96 with _record_http_time() as start_time:
97 network = get_context_network()
98 timeout = _get_timeout(start_time, kwargs)
99 future = asyncio.run_coroutine_threadsafe(
100 network.request(method, url, **kwargs),
101 get_loop(),
102 )
103 try:
104 return future.result(timeout)
105 except concurrent.futures.TimeoutError as e:
106 raise httpx.TimeoutException('Timeout', request=None) from e
107
108
109def multi_requests(request_list: list["Request"]) -> list[httpx.Response | Exception]:
110 """send multiple HTTP requests in parallel. Wait for all requests to finish."""
111 with _record_http_time() as start_time:
112 # send the requests
113 network = get_context_network()
114 loop = get_loop()
115 future_list = []
116 for request_desc in request_list:
117 timeout = _get_timeout(start_time, request_desc.kwargs)
118 future = asyncio.run_coroutine_threadsafe(
119 network.request(request_desc.method, request_desc.url, **request_desc.kwargs), loop
120 )
121 future_list.append((future, timeout))
122
123 # read the responses
124 responses = []
125 for future, timeout in future_list:
126 try:
127 responses.append(future.result(timeout))
128 except concurrent.futures.TimeoutError:
129 responses.append(httpx.TimeoutException('Timeout', request=None))
130 except Exception as e: # pylint: disable=broad-except
131 responses.append(e)
132 return responses
133
134
135class Request(t.NamedTuple):
136 """Request description for the multi_requests function"""
137
138 method: str
139 url: str
140 kwargs: dict[str, str] = {}
141
142 @staticmethod
143 def get(url: str, **kwargs: t.Any):
144 return Request('GET', url, kwargs)
145
146 @staticmethod
147 def options(url: str, **kwargs: t.Any):
148 return Request('OPTIONS', url, kwargs)
149
150 @staticmethod
151 def head(url: str, **kwargs: t.Any):
152 return Request('HEAD', url, kwargs)
153
154 @staticmethod
155 def post(url: str, **kwargs: t.Any):
156 return Request('POST', url, kwargs)
157
158 @staticmethod
159 def put(url: str, **kwargs: t.Any):
160 return Request('PUT', url, kwargs)
161
162 @staticmethod
163 def patch(url: str, **kwargs: t.Any):
164 return Request('PATCH', url, kwargs)
165
166 @staticmethod
167 def delete(url: str, **kwargs: t.Any):
168 return Request('DELETE', url, kwargs)
169
170
171def get(url: str, **kwargs: t.Any) -> SXNG_Response:
172 kwargs.setdefault('allow_redirects', True)
173 return request('get', url, **kwargs)
174
175
176def options(url: str, **kwargs: t.Any) -> SXNG_Response:
177 kwargs.setdefault('allow_redirects', True)
178 return request('options', url, **kwargs)
179
180
181def head(url: str, **kwargs: t.Any) -> SXNG_Response:
182 kwargs.setdefault('allow_redirects', False)
183 return request('head', url, **kwargs)
184
185
186def post(url: str, data=None, **kwargs: t.Any) -> SXNG_Response:
187 return request('post', url, data=data, **kwargs)
188
189
190def put(url: str, data=None, **kwargs: t.Any) -> SXNG_Response:
191 return request('put', url, data=data, **kwargs)
192
193
194def patch(url: str, data=None, **kwargs: t.Any) -> SXNG_Response:
195 return request('patch', url, data=data, **kwargs)
196
197
198def delete(url: str, **kwargs: t.Any) -> SXNG_Response:
199 return request('delete', url, **kwargs)
200
201
202async def stream_chunk_to_queue(network, queue, method: str, url: str, **kwargs: t.Any):
203 try:
204 async with await network.stream(method, url, **kwargs) as response:
205 queue.put(response)
206 # aiter_raw: access the raw bytes on the response without applying any HTTP content decoding
207 # https://www.python-httpx.org/quickstart/#streaming-responses
208 async for chunk in response.aiter_raw(65536):
209 if len(chunk) > 0:
210 queue.put(chunk)
211 except (httpx.StreamClosed, anyio.ClosedResourceError):
212 # the response was queued before the exception.
213 # the exception was raised on aiter_raw.
214 # we do nothing here: in the finally block, None will be queued
215 # so stream(method, url, **kwargs) generator can stop
216 pass
217 except Exception as e: # pylint: disable=broad-except
218 # broad except to avoid this scenario:
219 # exception in network.stream(method, url, **kwargs)
220 # -> the exception is not catch here
221 # -> queue None (in finally)
222 # -> the function below steam(method, url, **kwargs) has nothing to return
223 queue.put(e)
224 finally:
225 queue.put(None)
226
227
228def _stream_generator(method: str, url: str, **kwargs: t.Any):
229 queue = SimpleQueue()
230 network = get_context_network()
231 future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop())
232
233 # yield chunks
234 obj_or_exception = queue.get()
235 while obj_or_exception is not None:
236 if isinstance(obj_or_exception, Exception):
237 raise obj_or_exception
238 yield obj_or_exception
239 obj_or_exception = queue.get()
240 future.result()
241
242
244 asyncio.run_coroutine_threadsafe(self.aclose(), get_loop())
245 # reach the end of _self.generator ( _stream_generator ) to an avoid memory leak.
246 # it makes sure that :
247 # * the httpx response is closed (see the stream_chunk_to_queue function)
248 # * to call future.result() in _stream_generator
249 for _ in self._generator: # pylint: disable=protected-access
250 continue
251
252
253def stream(method: str, url: str, **kwargs: t.Any) -> tuple[httpx.Response, Iterable[bytes]]:
254 """Replace httpx.stream.
255
256 Usage:
257 response, stream = poolrequests.stream(...)
258 for chunk in stream:
259 ...
260
261 httpx.Client.stream requires to write the httpx.HTTPTransport version of the
262 the httpx.AsyncHTTPTransport declared above.
263 """
264 generator = _stream_generator(method, url, **kwargs)
265
266 # yield response
267 response = next(generator) # pylint: disable=stop-iteration-return
268 if isinstance(response, Exception):
269 raise response
270
271 response._generator = generator # pylint: disable=protected-access
272 response.close = MethodType(_close_response_method, response)
273
274 return response, generator
delete(str url, **t.Any kwargs)
Definition __init__.py:167
post(str url, **t.Any kwargs)
Definition __init__.py:155
head(str url, **t.Any kwargs)
Definition __init__.py:151
put(str url, **t.Any kwargs)
Definition __init__.py:159
patch(str url, **t.Any kwargs)
Definition __init__.py:163
get(str url, **t.Any kwargs)
Definition __init__.py:143
options(str url, **t.Any kwargs)
Definition __init__.py:147
get_context_network()
Definition __init__.py:48
set_context_network_name(str network_name)
Definition __init__.py:44
_get_timeout(float start_time, kwargs)
Definition __init__.py:71
_stream_generator(str method, str url, **t.Any kwargs)
Definition __init__.py:228
stream_chunk_to_queue(network, queue, str method, str url, **t.Any kwargs)
Definition __init__.py:202
SXNG_Response head(str url, **t.Any kwargs)
Definition __init__.py:181
get_time_for_thread()
Definition __init__.py:34
_record_http_time()
Definition __init__.py:57
SXNG_Response request(method, url, **kwargs)
Definition __init__.py:94
SXNG_Response get(str url, **t.Any kwargs)
Definition __init__.py:171
SXNG_Response options(str url, **t.Any kwargs)
Definition __init__.py:176
set_timeout_for_thread(float timeout, float|None start_time=None)
Definition __init__.py:39
SXNG_Response delete(str url, **t.Any kwargs)
Definition __init__.py:198
tuple[httpx.Response, Iterable[bytes]] stream(str method, str url, **t.Any kwargs)
Definition __init__.py:253
list[httpx.Response|Exception] multi_requests(list["Request"] request_list)
Definition __init__.py:109
SXNG_Response patch(str url, data=None, **t.Any kwargs)
Definition __init__.py:194
SXNG_Response put(str url, data=None, **t.Any kwargs)
Definition __init__.py:190
_close_response_method(self)
Definition __init__.py:243
SXNG_Response post(str url, data=None, **t.Any kwargs)
Definition __init__.py:186
reset_time_for_thread()
Definition __init__.py:30