6import concurrent.futures
7from queue
import SimpleQueue
8from types
import MethodType
9from timeit
import default_timer
10from typing
import Iterable, NamedTuple, Tuple, List, Dict, Union
11from contextlib
import contextmanager
17from .network
import get_network, initialize, check_network_configuration
18from .client
import get_loop
19from .raise_for_httperror
import raise_for_httperror
22THREADLOCAL = threading.local()
23"""Thread-local data is data for thread specific values."""
27 THREADLOCAL.total_time = 0
31 """returns thread's total time or None"""
32 return THREADLOCAL.__dict__.get(
'total_time')
36 THREADLOCAL.timeout = timeout
37 THREADLOCAL.start_time = start_time
41 THREADLOCAL.network = get_network(network_name)
45 """If set return thread's network.
47 If unset, return value from :py:obj:`get_network`.
49 return THREADLOCAL.__dict__.get(
'network')
or get_network()
55 time_before_request = default_timer()
56 start_time = getattr(THREADLOCAL,
'start_time', time_before_request)
62 if hasattr(THREADLOCAL,
'total_time'):
63 time_after_request = default_timer()
64 THREADLOCAL.total_time += time_after_request - time_before_request
71 if 'timeout' in kwargs:
72 timeout = kwargs[
'timeout']
74 timeout = getattr(THREADLOCAL,
'timeout',
None)
75 if timeout
is not None:
76 kwargs[
'timeout'] = timeout
79 timeout = timeout
or 120
84 timeout -= default_timer() - start_time
89def request(method, url, **kwargs) -> SXNG_Response:
90 """same as requests/requests/api.py request(...)"""
94 future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), get_loop())
96 return future.result(timeout)
97 except concurrent.futures.TimeoutError
as e:
98 raise httpx.TimeoutException(
'Timeout', request=
None)
from e
101def multi_requests(request_list: List[
"Request"]) -> List[Union[httpx.Response, Exception]]:
102 """send multiple HTTP requests in parallel. Wait for all requests to finish."""
108 for request_desc
in request_list:
110 future = asyncio.run_coroutine_threadsafe(
111 network.request(request_desc.method, request_desc.url, **request_desc.kwargs), loop
113 future_list.append((future, timeout))
117 for future, timeout
in future_list:
119 responses.append(future.result(timeout))
120 except concurrent.futures.TimeoutError:
121 responses.append(httpx.TimeoutException(
'Timeout', request=
None))
122 except Exception
as e:
128 """Request description for the multi_requests function"""
132 kwargs: Dict[str, str] = {}
136 return Request(
'GET', url, kwargs)
140 return Request(
'OPTIONS', url, kwargs)
144 return Request(
'HEAD', url, kwargs)
148 return Request(
'POST', url, kwargs)
152 return Request(
'PUT', url, kwargs)
156 return Request(
'PATCH', url, kwargs)
160 return Request(
'DELETE', url, kwargs)
163def get(url, **kwargs) -> SXNG_Response:
164 kwargs.setdefault(
'allow_redirects',
True)
165 return request(
'get', url, **kwargs)
169 kwargs.setdefault(
'allow_redirects',
True)
170 return request(
'options', url, **kwargs)
173def head(url, **kwargs) -> SXNG_Response:
174 kwargs.setdefault(
'allow_redirects',
False)
175 return request(
'head', url, **kwargs)
178def post(url, data=None, **kwargs) -> SXNG_Response:
179 return request(
'post', url, data=data, **kwargs)
182def put(url, data=None, **kwargs) -> SXNG_Response:
183 return request(
'put', url, data=data, **kwargs)
186def patch(url, data=None, **kwargs) -> SXNG_Response:
187 return request(
'patch', url, data=data, **kwargs)
190def delete(url, **kwargs) -> SXNG_Response:
191 return request(
'delete', url, **kwargs)
196 async with await network.stream(method, url, **kwargs)
as response:
200 async for chunk
in response.aiter_raw(65536):
203 except (httpx.StreamClosed, anyio.ClosedResourceError):
209 except Exception
as e:
221 queue = SimpleQueue()
223 future = asyncio.run_coroutine_threadsafe(
stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop())
226 obj_or_exception = queue.get()
227 while obj_or_exception
is not None:
228 if isinstance(obj_or_exception, Exception):
229 raise obj_or_exception
230 yield obj_or_exception
231 obj_or_exception = queue.get()
236 asyncio.run_coroutine_threadsafe(self.aclose(), get_loop())
241 for _
in self._generator:
245def stream(method, url, **kwargs) -> Tuple[httpx.Response, Iterable[bytes]]:
246 """Replace httpx.stream.
249 response, stream = poolrequests.stream(...)
253 httpx.Client.stream requires to write the httpx.HTTPTransport version of the
254 the httpx.AsyncHTTPTransport declared above.
259 response = next(generator)
260 if isinstance(response, Exception):
263 response._generator = generator
264 response.close = MethodType(_close_response_method, response)
266 return response, generator
Tuple[httpx.Response, Iterable[bytes]] stream(method, url, **kwargs)
SXNG_Response delete(url, **kwargs)
List[Union[httpx.Response, Exception]] multi_requests(List["Request"] request_list)
set_context_network_name(network_name)
stream_chunk_to_queue(network, queue, method, url, **kwargs)
SXNG_Response patch(url, data=None, **kwargs)
SXNG_Response request(method, url, **kwargs)
SXNG_Response options(url, **kwargs)
set_timeout_for_thread(timeout, start_time=None)
SXNG_Response put(url, data=None, **kwargs)
SXNG_Response post(url, data=None, **kwargs)
_stream_generator(method, url, **kwargs)
SXNG_Response head(url, **kwargs)
_close_response_method(self)
SXNG_Response get(url, **kwargs)
_get_timeout(start_time, kwargs)