5__all__ = [
"get_network"]
8from collections.abc
import Generator
14from itertools
import cycle
18from searx
import logger, sxng_debug
20from .client
import new_client, get_loop, AsyncHTTPTransportNoHttp
21from .raise_for_httperror
import raise_for_httperror
24logger = logger.getChild(
'network')
25DEFAULT_NAME =
'__DEFAULT__'
26NETWORKS: dict[str,
"Network"] = {}
28PROXY_PATTERN_MAPPING = {
31 'socks4':
'socks4://',
32 'socks5':
'socks5://',
33 'socks5h':
'socks5h://',
36 'socks4:':
'socks4://',
37 'socks5:':
'socks5://',
38 'socks5h:':
'socks5h://',
41ADDRESS_MAPPING = {
'ipv4':
'0.0.0.0',
'ipv6':
'::'}
52 'max_keepalive_connections',
59 'retry_on_http_error',
60 '_local_addresses_cycle',
66 _TOR_CHECK_RESULT = {}
71 enable_http: bool =
True,
73 enable_http2: bool =
False,
74 max_connections: int =
None,
75 max_keepalive_connections: int =
None,
76 keepalive_expiry: float =
None,
77 proxies: str | dict[str, str] |
None =
None,
78 using_tor_proxy: bool =
False,
79 local_addresses: str | list[str] |
None =
None,
81 retry_on_http_error: bool =
False,
82 max_redirects: int = 30,
83 logger_name: str =
None,
101 self.
_logger = logger.getChild(logger_name)
if logger_name
else logger
107 ipaddress.ip_network(address,
False)
109 ipaddress.ip_address(address)
111 if self.
proxies is not None and not isinstance(self.
proxies, (str, dict)):
112 raise ValueError(
'proxies type has to be str, dict or None')
116 if not local_addresses:
118 if isinstance(local_addresses, str):
119 local_addresses = [local_addresses]
120 yield from local_addresses
127 for a
in ipaddress.ip_network(address,
False).hosts():
131 a = ipaddress.ip_address(address)
141 if isinstance(self.
proxies, str):
144 for pattern, proxy_url
in self.
proxies.items():
145 pattern: str = PROXY_PATTERN_MAPPING.get(pattern, pattern)
146 if isinstance(proxy_url, str):
147 proxy_url = [proxy_url]
148 yield pattern, proxy_url
151 proxy_settings: dict[str, t.Any] = {}
153 proxy_settings[pattern] = cycle(proxy_urls)
156 yield tuple((pattern, next(proxy_url_cycle))
for pattern, proxy_url_cycle
in proxy_settings.items())
159 request = response.request
160 status = f
"{response.status_code} {response.reason_phrase}"
161 response_line = f
"{response.http_version} {status}"
162 content_type = response.headers.get(
"Content-Type")
163 content_type = f
' ({content_type})' if content_type
else ''
164 self.
_logger.debug(f
'HTTP Request: {request.method} {request.url} "{response_line}"{content_type}')
168 if proxies
in Network._TOR_CHECK_RESULT:
169 return Network._TOR_CHECK_RESULT[proxies]
173 for transport
in client._mounts.values():
174 if isinstance(transport, AsyncHTTPTransportNoHttp):
176 if getattr(transport,
"_pool")
and getattr(
184 response = await client.get(
"https://check.torproject.org/api/ip", timeout=60)
185 if not response.json()[
"IsTor"]:
187 Network._TOR_CHECK_RESULT[proxies] = result
190 async def get_client(self, verify: bool |
None =
None, max_redirects: int |
None =
None) -> httpx.AsyncClient:
191 verify = self.
verify if verify
is None else verify
192 max_redirects = self.
max_redirects if max_redirects
is None else max_redirects
195 key = (verify, max_redirects, local_address, proxies)
196 hook_log_response = self.
log_response if sxng_debug
else None
212 await client.aclose()
213 raise httpx.ProxyError(
'Network configuration problem: not using Tor')
218 async def close_client(client):
220 await client.aclose()
221 except httpx.HTTPError:
224 await asyncio.gather(*[close_client(client)
for client
in self.
_clients.values()], return_exceptions=
False)
228 kwargs_clients: dict[str, t.Any] = {}
229 if 'verify' in kwargs:
230 kwargs_clients[
'verify'] = kwargs.pop(
'verify')
231 if 'max_redirects' in kwargs:
232 kwargs_clients[
'max_redirects'] = kwargs.pop(
'max_redirects')
233 if 'allow_redirects' in kwargs:
235 kwargs[
'follow_redirects'] = kwargs.pop(
'allow_redirects')
236 return kwargs_clients
240 do_raise_for_httperror =
True
241 if 'raise_for_httperror' in kwargs:
242 do_raise_for_httperror = kwargs[
'raise_for_httperror']
243 del kwargs[
'raise_for_httperror']
244 return do_raise_for_httperror
246 def patch_response(self, response: httpx.Response, do_raise_for_httperror: bool) -> SXNG_Response:
247 if isinstance(response, httpx.Response):
248 response = t.cast(SXNG_Response, response)
251 response.ok =
not response.is_error
254 if do_raise_for_httperror:
256 raise_for_httperror(response)
258 self.
_logger.warning(f
"HTTP Request failed: {response.request.method} {response.request.url}")
272 async def call_client(self, stream: bool, method: str, url: str, **kwargs: t.Any) -> SXNG_Response:
274 was_disconnected =
False
275 do_raise_for_httperror = Network.extract_do_raise_for_httperror(kwargs)
276 kwargs_clients = Network.extract_kwargs_clients(kwargs)
278 client = await self.
get_client(**kwargs_clients)
279 cookies = kwargs.pop(
"cookies",
None)
280 client.cookies = httpx.Cookies(cookies)
283 return client.stream(method, url, **kwargs)
285 response = await client.request(method, url, **kwargs)
288 except httpx.RemoteProtocolError
as e:
289 if not was_disconnected:
292 was_disconnected =
True
293 await client.aclose()
294 self.
_logger.warning(
'httpx.RemoteProtocolError: the server has disconnected, retrying')
298 except (httpx.RequestError, httpx.HTTPStatusError)
as e:
303 async def request(self, method: str, url: str, **kwargs: t.Any) -> SXNG_Response:
304 return await self.
call_client(
False, method, url, **kwargs)
306 async def stream(self, method: str, url: str, **kwargs):
307 return await self.
call_client(
True, method, url, **kwargs)
311 await asyncio.gather(*[network.aclose()
for network
in NETWORKS.values()], return_exceptions=
False)
315 return NETWORKS.get(name
or DEFAULT_NAME)
321 for network
in NETWORKS.values():
322 if network.using_tor_proxy:
324 await network.get_client()
326 network._logger.exception(
'Error')
328 return exception_count
330 future = asyncio.run_coroutine_threadsafe(check(), get_loop())
331 exception_count = future.result()
332 if exception_count > 0:
333 raise RuntimeError(
"Invalid network configuration")
337 settings_engines: list[dict[str, t.Any]] =
None,
338 settings_outgoing: dict[str, t.Any] =
None,
342 from searx
import settings
346 settings_engines = settings_engines
or settings[
'engines']
347 settings_outgoing = settings_outgoing
or settings[
'outgoing']
351 default_params: dict[str, t.Any] = {
352 'enable_http':
False,
353 'verify': settings_outgoing[
'verify'],
354 'enable_http2': settings_outgoing[
'enable_http2'],
355 'max_connections': settings_outgoing[
'pool_connections'],
356 'max_keepalive_connections': settings_outgoing[
'pool_maxsize'],
357 'keepalive_expiry': settings_outgoing[
'keepalive_expiry'],
358 'local_addresses': settings_outgoing[
'source_ips'],
359 'using_tor_proxy': settings_outgoing[
'using_tor_proxy'],
360 'proxies': settings_outgoing[
'proxies'],
361 'max_redirects': settings_outgoing[
'max_redirects'],
362 'retries': settings_outgoing[
'retries'],
363 'retry_on_http_error':
False,
366 def new_network(params: dict[str, t.Any], logger_name: str |
None =
None):
367 nonlocal default_params
369 result.update(default_params)
370 result.update(params)
372 result[
'logger_name'] = logger_name
376 nonlocal settings_engines
377 for engine_spec
in settings_engines:
378 engine_name = engine_spec[
'name']
379 engine = engines.get(engine_name)
382 network = getattr(engine,
'network',
None)
383 yield engine_name, engine, network
388 NETWORKS[DEFAULT_NAME] = new_network({}, logger_name=
'default')
389 NETWORKS[
'ipv4'] = new_network({
'local_addresses':
'0.0.0.0'}, logger_name=
'ipv4')
390 NETWORKS[
'ipv6'] = new_network({
'local_addresses':
'::'}, logger_name=
'ipv6')
393 for network_name, network
in settings_outgoing[
'networks'].items():
394 NETWORKS[network_name] = new_network(network, logger_name=network_name)
397 for engine_name, engine, network
in iter_networks():
400 for attribute_name, attribute_value
in default_params.items():
401 if hasattr(engine, attribute_name):
402 network[attribute_name] = getattr(engine, attribute_name)
404 network[attribute_name] = attribute_value
405 NETWORKS[engine_name] = new_network(network, logger_name=engine_name)
406 elif isinstance(network, dict):
407 NETWORKS[engine_name] = new_network(network, logger_name=engine_name)
410 for engine_name, engine, network
in iter_networks():
411 if isinstance(network, str):
412 NETWORKS[engine_name] = NETWORKS[network]
417 if 'image_proxy' not in NETWORKS:
418 image_proxy_params = default_params.copy()
419 image_proxy_params[
'enable_http2'] =
False
420 NETWORKS[
'image_proxy'] = new_network(image_proxy_params, logger_name=
'image_proxy')
425 """Close all HTTP client
427 Avoid a warning at exit
428 See https://github.com/encode/httpx/pull/2026
430 Note: since Network.aclose has to be async, it is not possible to call this method on Network.__del__
431 So Network.aclose is called here using atexit.register
436 future = asyncio.run_coroutine_threadsafe(Network.aclose_all(), loop)
443NETWORKS[DEFAULT_NAME] =
Network()
Generator[tuple[tuple[str, str],...], str, str] get_proxy_cycles(self)
httpx.AsyncClient get_client(self, bool|None verify=None, int|None max_redirects=None)
get_ipaddress_cycle(self)
SXNG_Response request(self, str method, str url, **t.Any kwargs)
log_response(self, httpx.Response response)
extract_do_raise_for_httperror(dict[str, t.Any] kwargs)
is_valid_response(self, httpx.Response response)
stream(self, str method, str url, **kwargs)
__init__(self, bool enable_http=True, bool verify=True, bool enable_http2=False, int max_connections=None, int max_keepalive_connections=None, float keepalive_expiry=None, str|dict[str, str]|None proxies=None, bool using_tor_proxy=False, str|list[str]|None local_addresses=None, int retries=0, bool retry_on_http_error=False, int max_redirects=30, str logger_name=None)
Generator[str] iter_ipaddresses(self)
dict[str, t.Any] extract_kwargs_clients(dict[str, t.Any] kwargs)
SXNG_Response call_client(self, bool stream, str method, str url, **t.Any kwargs)
max_keepalive_connections
bool check_tor_proxy(httpx.AsyncClient client, proxies)
SXNG_Response patch_response(self, httpx.Response response, bool do_raise_for_httperror)
Generator[tuple[str, list[str]]] iter_proxies(self)
Generator[tuple[tuple[str, str],...], str, str] _proxies_cycle
None initialize(list[dict[str, t.Any]] settings_engines=None, dict[str, t.Any] settings_outgoing=None)
check_network_configuration()
"Network" get_network(str|None name=None)