.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
__init__.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2# pylint: disable=missing-module-docstring, global-statement
3
4import asyncio
5import threading
6import concurrent.futures
7from queue import SimpleQueue
8from types import MethodType
9from timeit import default_timer
10from typing import Iterable, NamedTuple, Tuple, List, Dict, Union
11from contextlib import contextmanager
12
13import httpx
14import anyio
15
16from searx.extended_types import SXNG_Response
17from .network import get_network, initialize, check_network_configuration # pylint:disable=cyclic-import
18from .client import get_loop
19from .raise_for_httperror import raise_for_httperror
20
21
22THREADLOCAL = threading.local()
23"""Thread-local data is data for thread specific values."""
24
25
27 THREADLOCAL.total_time = 0
28
29
31 """returns thread's total time or None"""
32 return THREADLOCAL.__dict__.get('total_time')
33
34
35def set_timeout_for_thread(timeout, start_time=None):
36 THREADLOCAL.timeout = timeout
37 THREADLOCAL.start_time = start_time
38
39
40def set_context_network_name(network_name):
41 THREADLOCAL.network = get_network(network_name)
42
43
45 """If set return thread's network.
46
47 If unset, return value from :py:obj:`get_network`.
48 """
49 return THREADLOCAL.__dict__.get('network') or get_network()
50
51
52@contextmanager
54 # pylint: disable=too-many-branches
55 time_before_request = default_timer()
56 start_time = getattr(THREADLOCAL, 'start_time', time_before_request)
57 try:
58 yield start_time
59 finally:
60 # update total_time.
61 # See get_time_for_thread() and reset_time_for_thread()
62 if hasattr(THREADLOCAL, 'total_time'):
63 time_after_request = default_timer()
64 THREADLOCAL.total_time += time_after_request - time_before_request
65
66
67def _get_timeout(start_time, kwargs):
68 # pylint: disable=too-many-branches
69
70 # timeout (httpx)
71 if 'timeout' in kwargs:
72 timeout = kwargs['timeout']
73 else:
74 timeout = getattr(THREADLOCAL, 'timeout', None)
75 if timeout is not None:
76 kwargs['timeout'] = timeout
77
78 # 2 minutes timeout for the requests without timeout
79 timeout = timeout or 120
80
81 # adjust actual timeout
82 timeout += 0.2 # overhead
83 if start_time:
84 timeout -= default_timer() - start_time
85
86 return timeout
87
88
89def request(method, url, **kwargs) -> SXNG_Response:
90 """same as requests/requests/api.py request(...)"""
91 with _record_http_time() as start_time:
92 network = get_context_network()
93 timeout = _get_timeout(start_time, kwargs)
94 future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), get_loop())
95 try:
96 return future.result(timeout)
97 except concurrent.futures.TimeoutError as e:
98 raise httpx.TimeoutException('Timeout', request=None) from e
99
100
101def multi_requests(request_list: List["Request"]) -> List[Union[httpx.Response, Exception]]:
102 """send multiple HTTP requests in parallel. Wait for all requests to finish."""
103 with _record_http_time() as start_time:
104 # send the requests
105 network = get_context_network()
106 loop = get_loop()
107 future_list = []
108 for request_desc in request_list:
109 timeout = _get_timeout(start_time, request_desc.kwargs)
110 future = asyncio.run_coroutine_threadsafe(
111 network.request(request_desc.method, request_desc.url, **request_desc.kwargs), loop
112 )
113 future_list.append((future, timeout))
114
115 # read the responses
116 responses = []
117 for future, timeout in future_list:
118 try:
119 responses.append(future.result(timeout))
120 except concurrent.futures.TimeoutError:
121 responses.append(httpx.TimeoutException('Timeout', request=None))
122 except Exception as e: # pylint: disable=broad-except
123 responses.append(e)
124 return responses
125
126
127class Request(NamedTuple):
128 """Request description for the multi_requests function"""
129
130 method: str
131 url: str
132 kwargs: Dict[str, str] = {}
133
134 @staticmethod
135 def get(url, **kwargs):
136 return Request('GET', url, kwargs)
137
138 @staticmethod
139 def options(url, **kwargs):
140 return Request('OPTIONS', url, kwargs)
141
142 @staticmethod
143 def head(url, **kwargs):
144 return Request('HEAD', url, kwargs)
145
146 @staticmethod
147 def post(url, **kwargs):
148 return Request('POST', url, kwargs)
149
150 @staticmethod
151 def put(url, **kwargs):
152 return Request('PUT', url, kwargs)
153
154 @staticmethod
155 def patch(url, **kwargs):
156 return Request('PATCH', url, kwargs)
157
158 @staticmethod
159 def delete(url, **kwargs):
160 return Request('DELETE', url, kwargs)
161
162
163def get(url, **kwargs) -> SXNG_Response:
164 kwargs.setdefault('allow_redirects', True)
165 return request('get', url, **kwargs)
166
167
168def options(url, **kwargs) -> SXNG_Response:
169 kwargs.setdefault('allow_redirects', True)
170 return request('options', url, **kwargs)
171
172
173def head(url, **kwargs) -> SXNG_Response:
174 kwargs.setdefault('allow_redirects', False)
175 return request('head', url, **kwargs)
176
177
178def post(url, data=None, **kwargs) -> SXNG_Response:
179 return request('post', url, data=data, **kwargs)
180
181
182def put(url, data=None, **kwargs) -> SXNG_Response:
183 return request('put', url, data=data, **kwargs)
184
185
186def patch(url, data=None, **kwargs) -> SXNG_Response:
187 return request('patch', url, data=data, **kwargs)
188
189
190def delete(url, **kwargs) -> SXNG_Response:
191 return request('delete', url, **kwargs)
192
193
194async def stream_chunk_to_queue(network, queue, method, url, **kwargs):
195 try:
196 async with await network.stream(method, url, **kwargs) as response:
197 queue.put(response)
198 # aiter_raw: access the raw bytes on the response without applying any HTTP content decoding
199 # https://www.python-httpx.org/quickstart/#streaming-responses
200 async for chunk in response.aiter_raw(65536):
201 if len(chunk) > 0:
202 queue.put(chunk)
203 except (httpx.StreamClosed, anyio.ClosedResourceError):
204 # the response was queued before the exception.
205 # the exception was raised on aiter_raw.
206 # we do nothing here: in the finally block, None will be queued
207 # so stream(method, url, **kwargs) generator can stop
208 pass
209 except Exception as e: # pylint: disable=broad-except
210 # broad except to avoid this scenario:
211 # exception in network.stream(method, url, **kwargs)
212 # -> the exception is not catch here
213 # -> queue None (in finally)
214 # -> the function below steam(method, url, **kwargs) has nothing to return
215 queue.put(e)
216 finally:
217 queue.put(None)
218
219
220def _stream_generator(method, url, **kwargs):
221 queue = SimpleQueue()
222 network = get_context_network()
223 future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop())
224
225 # yield chunks
226 obj_or_exception = queue.get()
227 while obj_or_exception is not None:
228 if isinstance(obj_or_exception, Exception):
229 raise obj_or_exception
230 yield obj_or_exception
231 obj_or_exception = queue.get()
232 future.result()
233
234
236 asyncio.run_coroutine_threadsafe(self.aclose(), get_loop())
237 # reach the end of _self.generator ( _stream_generator ) to an avoid memory leak.
238 # it makes sure that :
239 # * the httpx response is closed (see the stream_chunk_to_queue function)
240 # * to call future.result() in _stream_generator
241 for _ in self._generator: # pylint: disable=protected-access
242 continue
243
244
245def stream(method, url, **kwargs) -> Tuple[httpx.Response, Iterable[bytes]]:
246 """Replace httpx.stream.
247
248 Usage:
249 response, stream = poolrequests.stream(...)
250 for chunk in stream:
251 ...
252
253 httpx.Client.stream requires to write the httpx.HTTPTransport version of the
254 the httpx.AsyncHTTPTransport declared above.
255 """
256 generator = _stream_generator(method, url, **kwargs)
257
258 # yield response
259 response = next(generator) # pylint: disable=stop-iteration-return
260 if isinstance(response, Exception):
261 raise response
262
263 response._generator = generator # pylint: disable=protected-access
264 response.close = MethodType(_close_response_method, response)
265
266 return response, generator
patch(url, **kwargs)
Definition __init__.py:155
post(url, **kwargs)
Definition __init__.py:147
delete(url, **kwargs)
Definition __init__.py:159
options(url, **kwargs)
Definition __init__.py:139
get(url, **kwargs)
Definition __init__.py:135
head(url, **kwargs)
Definition __init__.py:143
put(url, **kwargs)
Definition __init__.py:151
Tuple[httpx.Response, Iterable[bytes]] stream(method, url, **kwargs)
Definition __init__.py:245
SXNG_Response delete(url, **kwargs)
Definition __init__.py:190
get_context_network()
Definition __init__.py:44
List[Union[httpx.Response, Exception]] multi_requests(List["Request"] request_list)
Definition __init__.py:101
set_context_network_name(network_name)
Definition __init__.py:40
stream_chunk_to_queue(network, queue, method, url, **kwargs)
Definition __init__.py:194
get_time_for_thread()
Definition __init__.py:30
_record_http_time()
Definition __init__.py:53
SXNG_Response patch(url, data=None, **kwargs)
Definition __init__.py:186
SXNG_Response request(method, url, **kwargs)
Definition __init__.py:89
SXNG_Response options(url, **kwargs)
Definition __init__.py:168
set_timeout_for_thread(timeout, start_time=None)
Definition __init__.py:35
SXNG_Response put(url, data=None, **kwargs)
Definition __init__.py:182
SXNG_Response post(url, data=None, **kwargs)
Definition __init__.py:178
_stream_generator(method, url, **kwargs)
Definition __init__.py:220
SXNG_Response head(url, **kwargs)
Definition __init__.py:173
_close_response_method(self)
Definition __init__.py:235
SXNG_Response get(url, **kwargs)
Definition __init__.py:163
reset_time_for_thread()
Definition __init__.py:26
_get_timeout(start_time, kwargs)
Definition __init__.py:67