.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
__init__.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2# pylint: disable=missing-module-docstring, global-statement
3
4import asyncio
5import threading
6import concurrent.futures
7from queue import SimpleQueue
8from types import MethodType
9from timeit import default_timer
10from typing import Iterable, NamedTuple, Tuple, List, Dict, Union
11from contextlib import contextmanager
12
13import httpx
14import anyio
15
16from .network import get_network, initialize, check_network_configuration # pylint:disable=cyclic-import
17from .client import get_loop
18from .raise_for_httperror import raise_for_httperror
19
20
21THREADLOCAL = threading.local()
22"""Thread-local data is data for thread specific values."""
23
24
26 THREADLOCAL.total_time = 0
27
28
30 """returns thread's total time or None"""
31 return THREADLOCAL.__dict__.get('total_time')
32
33
34def set_timeout_for_thread(timeout, start_time=None):
35 THREADLOCAL.timeout = timeout
36 THREADLOCAL.start_time = start_time
37
38
39def set_context_network_name(network_name):
40 THREADLOCAL.network = get_network(network_name)
41
42
44 """If set return thread's network.
45
46 If unset, return value from :py:obj:`get_network`.
47 """
48 return THREADLOCAL.__dict__.get('network') or get_network()
49
50
51@contextmanager
53 # pylint: disable=too-many-branches
54 time_before_request = default_timer()
55 start_time = getattr(THREADLOCAL, 'start_time', time_before_request)
56 try:
57 yield start_time
58 finally:
59 # update total_time.
60 # See get_time_for_thread() and reset_time_for_thread()
61 if hasattr(THREADLOCAL, 'total_time'):
62 time_after_request = default_timer()
63 THREADLOCAL.total_time += time_after_request - time_before_request
64
65
66def _get_timeout(start_time, kwargs):
67 # pylint: disable=too-many-branches
68
69 # timeout (httpx)
70 if 'timeout' in kwargs:
71 timeout = kwargs['timeout']
72 else:
73 timeout = getattr(THREADLOCAL, 'timeout', None)
74 if timeout is not None:
75 kwargs['timeout'] = timeout
76
77 # 2 minutes timeout for the requests without timeout
78 timeout = timeout or 120
79
80 # adjust actual timeout
81 timeout += 0.2 # overhead
82 if start_time:
83 timeout -= default_timer() - start_time
84
85 return timeout
86
87
88def request(method, url, **kwargs):
89 """same as requests/requests/api.py request(...)"""
90 with _record_http_time() as start_time:
91 network = get_context_network()
92 timeout = _get_timeout(start_time, kwargs)
93 future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), get_loop())
94 try:
95 return future.result(timeout)
96 except concurrent.futures.TimeoutError as e:
97 raise httpx.TimeoutException('Timeout', request=None) from e
98
99
100def multi_requests(request_list: List["Request"]) -> List[Union[httpx.Response, Exception]]:
101 """send multiple HTTP requests in parallel. Wait for all requests to finish."""
102 with _record_http_time() as start_time:
103 # send the requests
104 network = get_context_network()
105 loop = get_loop()
106 future_list = []
107 for request_desc in request_list:
108 timeout = _get_timeout(start_time, request_desc.kwargs)
109 future = asyncio.run_coroutine_threadsafe(
110 network.request(request_desc.method, request_desc.url, **request_desc.kwargs), loop
111 )
112 future_list.append((future, timeout))
113
114 # read the responses
115 responses = []
116 for future, timeout in future_list:
117 try:
118 responses.append(future.result(timeout))
119 except concurrent.futures.TimeoutError:
120 responses.append(httpx.TimeoutException('Timeout', request=None))
121 except Exception as e: # pylint: disable=broad-except
122 responses.append(e)
123 return responses
124
125
126class Request(NamedTuple):
127 """Request description for the multi_requests function"""
128
129 method: str
130 url: str
131 kwargs: Dict[str, str] = {}
132
133 @staticmethod
134 def get(url, **kwargs):
135 return Request('GET', url, kwargs)
136
137 @staticmethod
138 def options(url, **kwargs):
139 return Request('OPTIONS', url, kwargs)
140
141 @staticmethod
142 def head(url, **kwargs):
143 return Request('HEAD', url, kwargs)
144
145 @staticmethod
146 def post(url, **kwargs):
147 return Request('POST', url, kwargs)
148
149 @staticmethod
150 def put(url, **kwargs):
151 return Request('PUT', url, kwargs)
152
153 @staticmethod
154 def patch(url, **kwargs):
155 return Request('PATCH', url, kwargs)
156
157 @staticmethod
158 def delete(url, **kwargs):
159 return Request('DELETE', url, kwargs)
160
161
162def get(url, **kwargs):
163 kwargs.setdefault('allow_redirects', True)
164 return request('get', url, **kwargs)
165
166
167def options(url, **kwargs):
168 kwargs.setdefault('allow_redirects', True)
169 return request('options', url, **kwargs)
170
171
172def head(url, **kwargs):
173 kwargs.setdefault('allow_redirects', False)
174 return request('head', url, **kwargs)
175
176
177def post(url, data=None, **kwargs):
178 return request('post', url, data=data, **kwargs)
179
180
181def put(url, data=None, **kwargs):
182 return request('put', url, data=data, **kwargs)
183
184
185def patch(url, data=None, **kwargs):
186 return request('patch', url, data=data, **kwargs)
187
188
189def delete(url, **kwargs):
190 return request('delete', url, **kwargs)
191
192
193async def stream_chunk_to_queue(network, queue, method, url, **kwargs):
194 try:
195 async with await network.stream(method, url, **kwargs) as response:
196 queue.put(response)
197 # aiter_raw: access the raw bytes on the response without applying any HTTP content decoding
198 # https://www.python-httpx.org/quickstart/#streaming-responses
199 async for chunk in response.aiter_raw(65536):
200 if len(chunk) > 0:
201 queue.put(chunk)
202 except (httpx.StreamClosed, anyio.ClosedResourceError):
203 # the response was queued before the exception.
204 # the exception was raised on aiter_raw.
205 # we do nothing here: in the finally block, None will be queued
206 # so stream(method, url, **kwargs) generator can stop
207 pass
208 except Exception as e: # pylint: disable=broad-except
209 # broad except to avoid this scenario:
210 # exception in network.stream(method, url, **kwargs)
211 # -> the exception is not catch here
212 # -> queue None (in finally)
213 # -> the function below steam(method, url, **kwargs) has nothing to return
214 queue.put(e)
215 finally:
216 queue.put(None)
217
218
219def _stream_generator(method, url, **kwargs):
220 queue = SimpleQueue()
221 network = get_context_network()
222 future = asyncio.run_coroutine_threadsafe(stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop())
223
224 # yield chunks
225 obj_or_exception = queue.get()
226 while obj_or_exception is not None:
227 if isinstance(obj_or_exception, Exception):
228 raise obj_or_exception
229 yield obj_or_exception
230 obj_or_exception = queue.get()
231 future.result()
232
233
235 asyncio.run_coroutine_threadsafe(self.aclose(), get_loop())
236 # reach the end of _self.generator ( _stream_generator ) to an avoid memory leak.
237 # it makes sure that :
238 # * the httpx response is closed (see the stream_chunk_to_queue function)
239 # * to call future.result() in _stream_generator
240 for _ in self._generator: # pylint: disable=protected-access
241 continue
242
243
244def stream(method, url, **kwargs) -> Tuple[httpx.Response, Iterable[bytes]]:
245 """Replace httpx.stream.
246
247 Usage:
248 response, stream = poolrequests.stream(...)
249 for chunk in stream:
250 ...
251
252 httpx.Client.stream requires to write the httpx.HTTPTransport version of the
253 the httpx.AsyncHTTPTransport declared above.
254 """
255 generator = _stream_generator(method, url, **kwargs)
256
257 # yield response
258 response = next(generator) # pylint: disable=stop-iteration-return
259 if isinstance(response, Exception):
260 raise response
261
262 response._generator = generator # pylint: disable=protected-access
263 response.close = MethodType(_close_response_method, response)
264
265 return response, generator
patch(url, **kwargs)
Definition __init__.py:154
post(url, **kwargs)
Definition __init__.py:146
delete(url, **kwargs)
Definition __init__.py:158
options(url, **kwargs)
Definition __init__.py:138
get(url, **kwargs)
Definition __init__.py:134
head(url, **kwargs)
Definition __init__.py:142
put(url, **kwargs)
Definition __init__.py:150
Tuple[httpx.Response, Iterable[bytes]] stream(method, url, **kwargs)
Definition __init__.py:244
get_context_network()
Definition __init__.py:43
List[Union[httpx.Response, Exception]] multi_requests(List["Request"] request_list)
Definition __init__.py:100
get(url, **kwargs)
Definition __init__.py:162
put(url, data=None, **kwargs)
Definition __init__.py:181
set_context_network_name(network_name)
Definition __init__.py:39
stream_chunk_to_queue(network, queue, method, url, **kwargs)
Definition __init__.py:193
options(url, **kwargs)
Definition __init__.py:167
get_time_for_thread()
Definition __init__.py:29
_record_http_time()
Definition __init__.py:52
request(method, url, **kwargs)
Definition __init__.py:88
patch(url, data=None, **kwargs)
Definition __init__.py:185
set_timeout_for_thread(timeout, start_time=None)
Definition __init__.py:34
delete(url, **kwargs)
Definition __init__.py:189
post(url, data=None, **kwargs)
Definition __init__.py:177
head(url, **kwargs)
Definition __init__.py:172
_stream_generator(method, url, **kwargs)
Definition __init__.py:219
_close_response_method(self)
Definition __init__.py:234
reset_time_for_thread()
Definition __init__.py:25
_get_timeout(start_time, kwargs)
Definition __init__.py:66