4__all__ = [
"initialize",
"check_network_configuration",
"raise_for_httperror"]
10import concurrent.futures
11from queue
import SimpleQueue
12from types
import MethodType
13from timeit
import default_timer
14from collections.abc
import Iterable
15from contextlib
import contextmanager
21from .network
import get_network, initialize, check_network_configuration
22from .client
import get_loop
23from .raise_for_httperror
import raise_for_httperror
26THREADLOCAL = threading.local()
27"""Thread-local data is data for thread specific values."""
31 THREADLOCAL.total_time = 0
35 """returns thread's total time or None"""
36 return THREADLOCAL.__dict__.get(
'total_time')
40 THREADLOCAL.timeout = timeout
41 THREADLOCAL.start_time = start_time
45 THREADLOCAL.network = get_network(network_name)
49 """If set return thread's network.
51 If unset, return value from :py:obj:`get_network`.
53 return THREADLOCAL.__dict__.get(
'network')
or get_network()
59 time_before_request = default_timer()
60 start_time = getattr(THREADLOCAL,
'start_time', time_before_request)
66 if hasattr(THREADLOCAL,
'total_time'):
67 time_after_request = default_timer()
68 THREADLOCAL.total_time += time_after_request - time_before_request
76 if 'timeout' in kwargs:
77 timeout = kwargs[
'timeout']
79 timeout = getattr(THREADLOCAL,
'timeout',
None)
80 if timeout
is not None:
81 kwargs[
'timeout'] = timeout
84 timeout = timeout
or 120
89 timeout -= default_timer() - start_time
94def request(method, url, **kwargs) -> SXNG_Response:
95 """same as requests/requests/api.py request(...)"""
99 future = asyncio.run_coroutine_threadsafe(
100 network.request(method, url, **kwargs),
104 return future.result(timeout)
105 except concurrent.futures.TimeoutError
as e:
106 raise httpx.TimeoutException(
'Timeout', request=
None)
from e
109def multi_requests(request_list: list[
"Request"]) -> list[httpx.Response | Exception]:
110 """send multiple HTTP requests in parallel. Wait for all requests to finish."""
116 for request_desc
in request_list:
118 future = asyncio.run_coroutine_threadsafe(
119 network.request(request_desc.method, request_desc.url, **request_desc.kwargs), loop
121 future_list.append((future, timeout))
125 for future, timeout
in future_list:
127 responses.append(future.result(timeout))
128 except concurrent.futures.TimeoutError:
129 responses.append(httpx.TimeoutException(
'Timeout', request=
None))
130 except Exception
as e:
136 """Request description for the multi_requests function"""
140 kwargs: dict[str, str] = {}
143 def get(url: str, **kwargs: t.Any):
144 return Request(
'GET', url, kwargs)
148 return Request(
'OPTIONS', url, kwargs)
151 def head(url: str, **kwargs: t.Any):
152 return Request(
'HEAD', url, kwargs)
155 def post(url: str, **kwargs: t.Any):
156 return Request(
'POST', url, kwargs)
159 def put(url: str, **kwargs: t.Any):
160 return Request(
'PUT', url, kwargs)
163 def patch(url: str, **kwargs: t.Any):
164 return Request(
'PATCH', url, kwargs)
168 return Request(
'DELETE', url, kwargs)
171def get(url: str, **kwargs: t.Any) -> SXNG_Response:
172 kwargs.setdefault(
'allow_redirects',
True)
173 return request(
'get', url, **kwargs)
176def options(url: str, **kwargs: t.Any) -> SXNG_Response:
177 kwargs.setdefault(
'allow_redirects',
True)
178 return request(
'options', url, **kwargs)
181def head(url: str, **kwargs: t.Any) -> SXNG_Response:
182 kwargs.setdefault(
'allow_redirects',
False)
183 return request(
'head', url, **kwargs)
186def post(url: str, data=
None, **kwargs: t.Any) -> SXNG_Response:
187 return request(
'post', url, data=data, **kwargs)
190def put(url: str, data=
None, **kwargs: t.Any) -> SXNG_Response:
191 return request(
'put', url, data=data, **kwargs)
194def patch(url: str, data=
None, **kwargs: t.Any) -> SXNG_Response:
195 return request(
'patch', url, data=data, **kwargs)
198def delete(url: str, **kwargs: t.Any) -> SXNG_Response:
199 return request(
'delete', url, **kwargs)
204 async with await network.stream(method, url, **kwargs)
as response:
208 async for chunk
in response.aiter_raw(65536):
211 except (httpx.StreamClosed, anyio.ClosedResourceError):
217 except Exception
as e:
229 queue = SimpleQueue()
231 future = asyncio.run_coroutine_threadsafe(
stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop())
234 obj_or_exception = queue.get()
235 while obj_or_exception
is not None:
236 if isinstance(obj_or_exception, Exception):
237 raise obj_or_exception
238 yield obj_or_exception
239 obj_or_exception = queue.get()
244 asyncio.run_coroutine_threadsafe(self.aclose(), get_loop())
249 for _
in self._generator:
253def stream(method: str, url: str, **kwargs: t.Any) -> tuple[httpx.Response, Iterable[bytes]]:
254 """Replace httpx.stream.
257 response, stream = poolrequests.stream(...)
261 httpx.Client.stream requires to write the httpx.HTTPTransport version of the
262 the httpx.AsyncHTTPTransport declared above.
267 response = next(generator)
268 if isinstance(response, Exception):
271 response._generator = generator
272 response.close = MethodType(_close_response_method, response)
274 return response, generator
delete(str url, **t.Any kwargs)
post(str url, **t.Any kwargs)
head(str url, **t.Any kwargs)
put(str url, **t.Any kwargs)
patch(str url, **t.Any kwargs)
get(str url, **t.Any kwargs)
options(str url, **t.Any kwargs)
set_context_network_name(str network_name)
_get_timeout(float start_time, kwargs)
_stream_generator(str method, str url, **t.Any kwargs)
stream_chunk_to_queue(network, queue, str method, str url, **t.Any kwargs)
SXNG_Response head(str url, **t.Any kwargs)
SXNG_Response request(method, url, **kwargs)
SXNG_Response get(str url, **t.Any kwargs)
SXNG_Response options(str url, **t.Any kwargs)
set_timeout_for_thread(float timeout, float|None start_time=None)
SXNG_Response delete(str url, **t.Any kwargs)
tuple[httpx.Response, Iterable[bytes]] stream(str method, str url, **t.Any kwargs)
list[httpx.Response|Exception] multi_requests(list["Request"] request_list)
SXNG_Response patch(str url, data=None, **t.Any kwargs)
SXNG_Response put(str url, data=None, **t.Any kwargs)
_close_response_method(self)
SXNG_Response post(str url, data=None, **t.Any kwargs)