6import concurrent.futures
7from queue
import SimpleQueue
8from types
import MethodType
9from timeit
import default_timer
10from typing
import Iterable, NamedTuple, Tuple, List, Dict, Union
11from contextlib
import contextmanager
16from .network
import get_network, initialize, check_network_configuration
17from .client
import get_loop
18from .raise_for_httperror
import raise_for_httperror
21THREADLOCAL = threading.local()
22"""Thread-local data is data for thread specific values."""
26 THREADLOCAL.total_time = 0
30 """returns thread's total time or None"""
31 return THREADLOCAL.__dict__.get(
'total_time')
35 THREADLOCAL.timeout = timeout
36 THREADLOCAL.start_time = start_time
40 THREADLOCAL.network = get_network(network_name)
44 """If set return thread's network.
46 If unset, return value from :py:obj:`get_network`.
48 return THREADLOCAL.__dict__.get(
'network')
or get_network()
54 time_before_request = default_timer()
55 start_time = getattr(THREADLOCAL,
'start_time', time_before_request)
61 if hasattr(THREADLOCAL,
'total_time'):
62 time_after_request = default_timer()
63 THREADLOCAL.total_time += time_after_request - time_before_request
70 if 'timeout' in kwargs:
71 timeout = kwargs[
'timeout']
73 timeout = getattr(THREADLOCAL,
'timeout',
None)
74 if timeout
is not None:
75 kwargs[
'timeout'] = timeout
78 timeout = timeout
or 120
83 timeout -= default_timer() - start_time
89 """same as requests/requests/api.py request(...)"""
93 future = asyncio.run_coroutine_threadsafe(network.request(method, url, **kwargs), get_loop())
95 return future.result(timeout)
96 except concurrent.futures.TimeoutError
as e:
97 raise httpx.TimeoutException(
'Timeout', request=
None)
from e
100def multi_requests(request_list: List[
"Request"]) -> List[Union[httpx.Response, Exception]]:
101 """send multiple HTTP requests in parallel. Wait for all requests to finish."""
107 for request_desc
in request_list:
109 future = asyncio.run_coroutine_threadsafe(
110 network.request(request_desc.method, request_desc.url, **request_desc.kwargs), loop
112 future_list.append((future, timeout))
116 for future, timeout
in future_list:
118 responses.append(future.result(timeout))
119 except concurrent.futures.TimeoutError:
120 responses.append(httpx.TimeoutException(
'Timeout', request=
None))
121 except Exception
as e:
127 """Request description for the multi_requests function"""
131 kwargs: Dict[str, str] = {}
135 return Request(
'GET', url, kwargs)
139 return Request(
'OPTIONS', url, kwargs)
143 return Request(
'HEAD', url, kwargs)
147 return Request(
'POST', url, kwargs)
151 return Request(
'PUT', url, kwargs)
155 return Request(
'PATCH', url, kwargs)
159 return Request(
'DELETE', url, kwargs)
163 kwargs.setdefault(
'allow_redirects',
True)
164 return request(
'get', url, **kwargs)
168 kwargs.setdefault(
'allow_redirects',
True)
169 return request(
'options', url, **kwargs)
173 kwargs.setdefault(
'allow_redirects',
False)
174 return request(
'head', url, **kwargs)
177def post(url, data=None, **kwargs):
178 return request(
'post', url, data=data, **kwargs)
181def put(url, data=None, **kwargs):
182 return request(
'put', url, data=data, **kwargs)
186 return request(
'patch', url, data=data, **kwargs)
190 return request(
'delete', url, **kwargs)
195 async with await network.stream(method, url, **kwargs)
as response:
199 async for chunk
in response.aiter_raw(65536):
202 except (httpx.StreamClosed, anyio.ClosedResourceError):
208 except Exception
as e:
220 queue = SimpleQueue()
222 future = asyncio.run_coroutine_threadsafe(
stream_chunk_to_queue(network, queue, method, url, **kwargs), get_loop())
225 obj_or_exception = queue.get()
226 while obj_or_exception
is not None:
227 if isinstance(obj_or_exception, Exception):
228 raise obj_or_exception
229 yield obj_or_exception
230 obj_or_exception = queue.get()
235 asyncio.run_coroutine_threadsafe(self.aclose(), get_loop())
240 for _
in self._generator:
244def stream(method, url, **kwargs) -> Tuple[httpx.Response, Iterable[bytes]]:
245 """Replace httpx.stream.
248 response, stream = poolrequests.stream(...)
252 httpx.Client.stream requires to write the httpx.HTTPTransport version of the
253 the httpx.AsyncHTTPTransport declared above.
258 response = next(generator)
259 if isinstance(response, Exception):
262 response._generator = generator
263 response.close = MethodType(_close_response_method, response)
265 return response, generator
Tuple[httpx.Response, Iterable[bytes]] stream(method, url, **kwargs)
List[Union[httpx.Response, Exception]] multi_requests(List["Request"] request_list)
put(url, data=None, **kwargs)
set_context_network_name(network_name)
stream_chunk_to_queue(network, queue, method, url, **kwargs)
request(method, url, **kwargs)
patch(url, data=None, **kwargs)
set_timeout_for_thread(timeout, start_time=None)
post(url, data=None, **kwargs)
_stream_generator(method, url, **kwargs)
_close_response_method(self)
_get_timeout(start_time, kwargs)