4__all__ = [
"get_network",
"initialize",
"check_network_configuration",
"raise_for_httperror"]
10import concurrent.futures
11from queue
import SimpleQueue
12from types
import MethodType
13from timeit
import default_timer
14from collections.abc
import Iterable
15from contextlib
import contextmanager
21from .network
import get_network, initialize, check_network_configuration
22from .client
import get_loop
23from .raise_for_httperror
import raise_for_httperror
28THREADLOCAL = threading.local()
29"""Thread-local data is data for thread specific values."""
33 THREADLOCAL.total_time = 0
37 """returns thread's total time or None"""
38 return THREADLOCAL.__dict__.get(
'total_time')
42 THREADLOCAL.timeout = timeout
43 THREADLOCAL.start_time = start_time
47 THREADLOCAL.network = get_network(network_name)
51 """If set return thread's network.
53 If unset, return value from :py:obj:`get_network`.
55 return THREADLOCAL.__dict__.get(
'network')
or get_network()
61 time_before_request = default_timer()
62 start_time = getattr(THREADLOCAL,
'start_time', time_before_request)
68 if hasattr(THREADLOCAL,
'total_time'):
69 time_after_request = default_timer()
70 THREADLOCAL.total_time += time_after_request - time_before_request
78 if 'timeout' in kwargs:
79 timeout = kwargs[
'timeout']
81 timeout = getattr(THREADLOCAL,
'timeout',
None)
82 if timeout
is not None:
83 kwargs[
'timeout'] = timeout
86 timeout = timeout
or 120
91 timeout -= default_timer() - start_time
96def request(method: str, url: str, **kwargs: t.Any) -> SXNG_Response:
97 """same as requests/requests/api.py request(...)"""
101 future = asyncio.run_coroutine_threadsafe(
102 network.request(method, url, **kwargs),
106 return future.result(timeout)
107 except concurrent.futures.TimeoutError
as e:
108 raise httpx.TimeoutException(
'Timeout', request=
None)
from e
111def multi_requests(request_list: list[
"Request"]) -> list[httpx.Response | Exception]:
112 """send multiple HTTP requests in parallel. Wait for all requests to finish."""
118 for request_desc
in request_list:
120 future = asyncio.run_coroutine_threadsafe(
121 network.request(request_desc.method, request_desc.url, **request_desc.kwargs), loop
123 future_list.append((future, timeout))
127 for future, timeout
in future_list:
129 responses.append(future.result(timeout))
130 except concurrent.futures.TimeoutError:
131 responses.append(httpx.TimeoutException(
'Timeout', request=
None))
132 except Exception
as e:
138 """Request description for the multi_requests function"""
142 kwargs: dict[str, str] = {}
145 def get(url: str, **kwargs: t.Any):
146 return Request(
'GET', url, kwargs)
150 return Request(
'OPTIONS', url, kwargs)
153 def head(url: str, **kwargs: t.Any):
154 return Request(
'HEAD', url, kwargs)
157 def post(url: str, **kwargs: t.Any):
158 return Request(
'POST', url, kwargs)
161 def put(url: str, **kwargs: t.Any):
162 return Request(
'PUT', url, kwargs)
165 def patch(url: str, **kwargs: t.Any):
166 return Request(
'PATCH', url, kwargs)
170 return Request(
'DELETE', url, kwargs)
173def get(url: str, **kwargs: t.Any) -> SXNG_Response:
174 kwargs.setdefault(
'allow_redirects',
True)
175 return request(
'get', url, **kwargs)
178def options(url: str, **kwargs: t.Any) -> SXNG_Response:
179 kwargs.setdefault(
'allow_redirects',
True)
180 return request(
'options', url, **kwargs)
183def head(url: str, **kwargs: t.Any) -> SXNG_Response:
184 kwargs.setdefault(
'allow_redirects',
False)
185 return request(
'head', url, **kwargs)
188def post(url: str, data: dict[str, t.Any] |
None =
None, **kwargs: t.Any) -> SXNG_Response:
189 return request(
'post', url, data=data, **kwargs)
192def put(url: str, data: dict[str, t.Any] |
None =
None, **kwargs: t.Any) -> SXNG_Response:
193 return request(
'put', url, data=data, **kwargs)
196def patch(url: str, data: dict[str, t.Any] |
None =
None, **kwargs: t.Any) -> SXNG_Response:
197 return request(
'patch', url, data=data, **kwargs)
200def delete(url: str, **kwargs: t.Any) -> SXNG_Response:
201 return request(
'delete', url, **kwargs)
206 async with await network.stream(method, url, **kwargs)
as response:
210 async for chunk
in response.aiter_raw(65536):
213 except (httpx.StreamClosed, anyio.ClosedResourceError):
219 except Exception
as e:
231 queue = SimpleQueue()
233 future = asyncio.run_coroutine_threadsafe(
stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop())
236 obj_or_exception = queue.get()
237 while obj_or_exception
is not None:
238 if isinstance(obj_or_exception, Exception):
239 raise obj_or_exception
240 yield obj_or_exception
241 obj_or_exception = queue.get()
246 asyncio.run_coroutine_threadsafe(self.aclose(), get_loop())
251 for _
in self._generator:
255def stream(method: str, url: str, **kwargs: t.Any) -> tuple[SXNG_Response, Iterable[bytes]]:
256 """Replace httpx.stream.
259 response, stream = poolrequests.stream(...)
263 httpx.Client.stream requires to write the httpx.HTTPTransport version of the
264 the httpx.AsyncHTTPTransport declared above.
269 response = next(generator)
270 if isinstance(response, Exception):
273 response._generator = generator
274 response.close = MethodType(_close_response_method, response)
276 return response, generator
delete(str url, **t.Any kwargs)
post(str url, **t.Any kwargs)
head(str url, **t.Any kwargs)
put(str url, **t.Any kwargs)
patch(str url, **t.Any kwargs)
get(str url, **t.Any kwargs)
options(str url, **t.Any kwargs)
tuple[SXNG_Response, Iterable[bytes]] stream(str method, str url, **t.Any kwargs)
set_context_network_name(str network_name)
float _get_timeout(float start_time, t.Any kwargs)
SXNG_Response patch(str url, dict[str, t.Any]|None data=None, **t.Any kwargs)
_stream_generator(str method, str url, **t.Any kwargs)
stream_chunk_to_queue(network, queue, str method, str url, **t.Any kwargs)
SXNG_Response head(str url, **t.Any kwargs)
SXNG_Response put(str url, dict[str, t.Any]|None data=None, **t.Any kwargs)
SXNG_Response get(str url, **t.Any kwargs)
SXNG_Response options(str url, **t.Any kwargs)
set_timeout_for_thread(float timeout, float|None start_time=None)
"Network" get_context_network()
SXNG_Response delete(str url, **t.Any kwargs)
list[httpx.Response|Exception] multi_requests(list["Request"] request_list)
SXNG_Response post(str url, dict[str, t.Any]|None data=None, **t.Any kwargs)
SXNG_Response request(str method, str url, **t.Any kwargs)
_close_response_method(self)
float|None get_time_for_thread()