.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
network.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2# pylint: disable=global-statement
3# pylint: disable=missing-module-docstring, missing-class-docstring
4
5__all__ = ["get_network"]
6
7import typing as t
8from collections.abc import Generator
9
10
11import atexit
12import asyncio
13import ipaddress
14from itertools import cycle
15
16import httpx
17
18from searx import logger, sxng_debug
19from searx.extended_types import SXNG_Response
20from .client import new_client, get_loop, AsyncHTTPTransportNoHttp
21from .raise_for_httperror import raise_for_httperror
22
23
24logger = logger.getChild('network')
25DEFAULT_NAME = '__DEFAULT__'
26NETWORKS: dict[str, "Network"] = {}
27# requests compatibility when reading proxy settings from settings.yml
28PROXY_PATTERN_MAPPING = {
29 'http': 'http://',
30 'https': 'https://',
31 'socks4': 'socks4://',
32 'socks5': 'socks5://',
33 'socks5h': 'socks5h://',
34 'http:': 'http://',
35 'https:': 'https://',
36 'socks4:': 'socks4://',
37 'socks5:': 'socks5://',
38 'socks5h:': 'socks5h://',
39}
40
41ADDRESS_MAPPING = {'ipv4': '0.0.0.0', 'ipv6': '::'}
42
43
44@t.final
45class Network:
46
47 __slots__ = (
48 'enable_http',
49 'verify',
50 'enable_http2',
51 'max_connections',
52 'max_keepalive_connections',
53 'keepalive_expiry',
54 'local_addresses',
55 'proxies',
56 'using_tor_proxy',
57 'max_redirects',
58 'retries',
59 'retry_on_http_error',
60 '_local_addresses_cycle',
61 '_proxies_cycle',
62 '_clients',
63 '_logger',
64 )
65
66 _TOR_CHECK_RESULT = {}
67
69 # pylint: disable=too-many-arguments
70 self,
71 enable_http: bool = True,
72 verify: bool = True,
73 enable_http2: bool = False,
74 max_connections: int = None, # pyright: ignore[reportArgumentType]
75 max_keepalive_connections: int = None, # pyright: ignore[reportArgumentType]
76 keepalive_expiry: float = None, # pyright: ignore[reportArgumentType]
77 proxies: str | dict[str, str] | None = None,
78 using_tor_proxy: bool = False,
79 local_addresses: str | list[str] | None = None,
80 retries: int = 0,
81 retry_on_http_error: bool = False,
82 max_redirects: int = 30,
83 logger_name: str = None, # pyright: ignore[reportArgumentType]
84 ):
85
86 self.enable_http = enable_http
87 self.verify = verify
88 self.enable_http2 = enable_http2
89 self.max_connections = max_connections
90 self.max_keepalive_connections = max_keepalive_connections
91 self.keepalive_expiry = keepalive_expiry
92 self.proxies = proxies
93 self.using_tor_proxy = using_tor_proxy
94 self.local_addresses = local_addresses
95 self.retries = retries
96 self.retry_on_http_error = retry_on_http_error
97 self.max_redirects = max_redirects
100 self._clients = {}
101 self._logger = logger.getChild(logger_name) if logger_name else logger
102 self.check_parameters()
103
105 for address in self.iter_ipaddresses():
106 if '/' in address:
107 ipaddress.ip_network(address, False)
108 else:
109 ipaddress.ip_address(address)
110
111 if self.proxies is not None and not isinstance(self.proxies, (str, dict)):
112 raise ValueError('proxies type has to be str, dict or None')
113
114 def iter_ipaddresses(self) -> Generator[str]:
115 local_addresses = self.local_addresses
116 if not local_addresses:
117 return
118 if isinstance(local_addresses, str):
119 local_addresses = [local_addresses]
120 yield from local_addresses
121
123 while True:
124 count = 0
125 for address in self.iter_ipaddresses():
126 if '/' in address:
127 for a in ipaddress.ip_network(address, False).hosts():
128 yield str(a)
129 count += 1
130 else:
131 a = ipaddress.ip_address(address)
132 yield str(a)
133 count += 1
134 if count == 0:
135 yield None
136
137 def iter_proxies(self) -> Generator[tuple[str, list[str]]]:
138 if not self.proxies:
139 return
140 # https://www.python-httpx.org/compatibility/#proxy-keys
141 if isinstance(self.proxies, str):
142 yield 'all://', [self.proxies]
143 else:
144 for pattern, proxy_url in self.proxies.items():
145 pattern: str = PROXY_PATTERN_MAPPING.get(pattern, pattern)
146 if isinstance(proxy_url, str):
147 proxy_url = [proxy_url]
148 yield pattern, proxy_url
149
150 def get_proxy_cycles(self) -> Generator[tuple[tuple[str, str], ...], str, str]: # not sure type is correct
151 proxy_settings: dict[str, t.Any] = {}
152 for pattern, proxy_urls in self.iter_proxies():
153 proxy_settings[pattern] = cycle(proxy_urls)
154 while True:
155 # pylint: disable=stop-iteration-return
156 yield tuple((pattern, next(proxy_url_cycle)) for pattern, proxy_url_cycle in proxy_settings.items())
157
158 async def log_response(self, response: httpx.Response):
159 request = response.request
160 status = f"{response.status_code} {response.reason_phrase}"
161 response_line = f"{response.http_version} {status}"
162 content_type = response.headers.get("Content-Type")
163 content_type = f' ({content_type})' if content_type else ''
164 self._logger.debug(f'HTTP Request: {request.method} {request.url} "{response_line}"{content_type}')
165
166 @staticmethod
167 async def check_tor_proxy(client: httpx.AsyncClient, proxies) -> bool:
168 if proxies in Network._TOR_CHECK_RESULT:
169 return Network._TOR_CHECK_RESULT[proxies]
170
171 result = True
172 # ignore client._transport because it is not used with all://
173 for transport in client._mounts.values(): # pylint: disable=protected-access
174 if isinstance(transport, AsyncHTTPTransportNoHttp):
175 continue
176 if getattr(transport, "_pool") and getattr(
177 # pylint: disable=protected-access
178 transport._pool, # type: ignore
179 "_rdns",
180 False,
181 ):
182 continue
183 return False
184 response = await client.get("https://check.torproject.org/api/ip", timeout=60)
185 if not response.json()["IsTor"]:
186 result = False
187 Network._TOR_CHECK_RESULT[proxies] = result
188 return result
189
190 async def get_client(self, verify: bool | None = None, max_redirects: int | None = None) -> httpx.AsyncClient:
191 verify = self.verify if verify is None else verify
192 max_redirects = self.max_redirects if max_redirects is None else max_redirects
193 local_address = next(self._local_addresses_cycle)
194 proxies = next(self._proxies_cycle) # is a tuple so it can be part of the key
195 key = (verify, max_redirects, local_address, proxies)
196 hook_log_response = self.log_response if sxng_debug else None
197 if key not in self._clients or self._clients[key].is_closed:
198 client = new_client(
199 self.enable_http,
200 verify,
201 self.enable_http2,
202 self.max_connections,
204 self.keepalive_expiry,
205 dict(proxies),
206 local_address,
207 0,
208 max_redirects,
209 hook_log_response,
210 )
211 if self.using_tor_proxy and not await self.check_tor_proxy(client, proxies):
212 await client.aclose()
213 raise httpx.ProxyError('Network configuration problem: not using Tor')
214 self._clients[key] = client
215 return self._clients[key]
216
217 async def aclose(self):
218 async def close_client(client):
219 try:
220 await client.aclose()
221 except httpx.HTTPError:
222 pass
223
224 await asyncio.gather(*[close_client(client) for client in self._clients.values()], return_exceptions=False)
225
226 @staticmethod
227 def extract_kwargs_clients(kwargs: dict[str, t.Any]) -> dict[str, t.Any]:
228 kwargs_clients: dict[str, t.Any] = {}
229 if 'verify' in kwargs:
230 kwargs_clients['verify'] = kwargs.pop('verify')
231 if 'max_redirects' in kwargs:
232 kwargs_clients['max_redirects'] = kwargs.pop('max_redirects')
233 if 'allow_redirects' in kwargs:
234 # see https://github.com/encode/httpx/pull/1808
235 kwargs['follow_redirects'] = kwargs.pop('allow_redirects')
236 return kwargs_clients
237
238 @staticmethod
239 def extract_do_raise_for_httperror(kwargs: dict[str, t.Any]):
240 do_raise_for_httperror = True
241 if 'raise_for_httperror' in kwargs:
242 do_raise_for_httperror = kwargs['raise_for_httperror']
243 del kwargs['raise_for_httperror']
244 return do_raise_for_httperror
245
246 def patch_response(self, response: httpx.Response, do_raise_for_httperror: bool) -> SXNG_Response:
247 if isinstance(response, httpx.Response):
248 response = t.cast(SXNG_Response, response)
249 # requests compatibility (response is not streamed)
250 # see also https://www.python-httpx.org/compatibility/#checking-for-4xx5xx-responses
251 response.ok = not response.is_error
252
253 # raise an exception
254 if do_raise_for_httperror:
255 try:
256 raise_for_httperror(response)
257 except:
258 self._logger.warning(f"HTTP Request failed: {response.request.method} {response.request.url}")
259 raise
260 return response
261
262 def is_valid_response(self, response: httpx.Response):
263 # pylint: disable=too-many-boolean-expressions
264 if (
265 (self.retry_on_http_error is True and 400 <= response.status_code <= 599)
266 or (isinstance(self.retry_on_http_error, list) and response.status_code in self.retry_on_http_error)
267 or (isinstance(self.retry_on_http_error, int) and response.status_code == self.retry_on_http_error)
268 ):
269 return False
270 return True
271
272 async def call_client(self, stream: bool, method: str, url: str, **kwargs: t.Any) -> SXNG_Response:
273 retries = self.retries
274 was_disconnected = False
275 do_raise_for_httperror = Network.extract_do_raise_for_httperror(kwargs)
276 kwargs_clients = Network.extract_kwargs_clients(kwargs)
277 while retries >= 0: # pragma: no cover
278 client = await self.get_client(**kwargs_clients)
279 cookies = kwargs.pop("cookies", None)
280 client.cookies = httpx.Cookies(cookies)
281 try:
282 if stream:
283 return client.stream(method, url, **kwargs)
284
285 response = await client.request(method, url, **kwargs)
286 if self.is_valid_response(response) or retries <= 0:
287 return self.patch_response(response, do_raise_for_httperror)
288 except httpx.RemoteProtocolError as e:
289 if not was_disconnected:
290 # the server has closed the connection:
291 # try again without decreasing the retries variable & with a new HTTP client
292 was_disconnected = True
293 await client.aclose()
294 self._logger.warning('httpx.RemoteProtocolError: the server has disconnected, retrying')
295 continue
296 if retries <= 0:
297 raise e
298 except (httpx.RequestError, httpx.HTTPStatusError) as e:
299 if retries <= 0:
300 raise e
301 retries -= 1
302
303 async def request(self, method: str, url: str, **kwargs: t.Any) -> SXNG_Response:
304 return await self.call_client(False, method, url, **kwargs)
305
306 async def stream(self, method: str, url: str, **kwargs):
307 return await self.call_client(True, method, url, **kwargs)
308
309 @classmethod
310 async def aclose_all(cls):
311 await asyncio.gather(*[network.aclose() for network in NETWORKS.values()], return_exceptions=False)
312
313
314def get_network(name: str | None = None) -> "Network":
315 return NETWORKS.get(name or DEFAULT_NAME) # pyright: ignore[reportReturnType]
316
317
319 async def check():
320 exception_count = 0
321 for network in NETWORKS.values():
322 if network.using_tor_proxy:
323 try:
324 await network.get_client()
325 except Exception: # pylint: disable=broad-except
326 network._logger.exception('Error') # pylint: disable=protected-access
327 exception_count += 1
328 return exception_count
329
330 future = asyncio.run_coroutine_threadsafe(check(), get_loop())
331 exception_count = future.result()
332 if exception_count > 0:
333 raise RuntimeError("Invalid network configuration")
334
335
337 settings_engines: list[dict[str, t.Any]] = None, # pyright: ignore[reportArgumentType]
338 settings_outgoing: dict[str, t.Any] = None, # pyright: ignore[reportArgumentType]
339) -> None:
340 # pylint: disable=import-outside-toplevel)
341 from searx.engines import engines
342 from searx import settings
343
344 # pylint: enable=import-outside-toplevel)
345
346 settings_engines = settings_engines or settings['engines']
347 settings_outgoing = settings_outgoing or settings['outgoing']
348
349 # default parameters for AsyncHTTPTransport
350 # see https://github.com/encode/httpx/blob/e05a5372eb6172287458b37447c30f650047e1b8/httpx/_transports/default.py#L108-L121 # pylint: disable=line-too-long
351 default_params: dict[str, t.Any] = {
352 'enable_http': False,
353 'verify': settings_outgoing['verify'],
354 'enable_http2': settings_outgoing['enable_http2'],
355 'max_connections': settings_outgoing['pool_connections'],
356 'max_keepalive_connections': settings_outgoing['pool_maxsize'],
357 'keepalive_expiry': settings_outgoing['keepalive_expiry'],
358 'local_addresses': settings_outgoing['source_ips'],
359 'using_tor_proxy': settings_outgoing['using_tor_proxy'],
360 'proxies': settings_outgoing['proxies'],
361 'max_redirects': settings_outgoing['max_redirects'],
362 'retries': settings_outgoing['retries'],
363 'retry_on_http_error': False,
364 }
365
366 def new_network(params: dict[str, t.Any], logger_name: str | None = None):
367 nonlocal default_params
368 result = {}
369 result.update(default_params) # pyright: ignore[reportUnknownMemberType]
370 result.update(params) # pyright: ignore[reportUnknownMemberType]
371 if logger_name:
372 result['logger_name'] = logger_name
373 return Network(**result) # type: ignore
374
375 def iter_networks():
376 nonlocal settings_engines
377 for engine_spec in settings_engines:
378 engine_name = engine_spec['name']
379 engine = engines.get(engine_name)
380 if engine is None:
381 continue
382 network = getattr(engine, 'network', None)
383 yield engine_name, engine, network
384
385 if NETWORKS:
386 done()
387 NETWORKS.clear()
388 NETWORKS[DEFAULT_NAME] = new_network({}, logger_name='default')
389 NETWORKS['ipv4'] = new_network({'local_addresses': '0.0.0.0'}, logger_name='ipv4')
390 NETWORKS['ipv6'] = new_network({'local_addresses': '::'}, logger_name='ipv6')
391
392 # define networks from outgoing.networks
393 for network_name, network in settings_outgoing['networks'].items():
394 NETWORKS[network_name] = new_network(network, logger_name=network_name)
395
396 # define networks from engines.[i].network (except references)
397 for engine_name, engine, network in iter_networks():
398 if network is None:
399 network = {}
400 for attribute_name, attribute_value in default_params.items():
401 if hasattr(engine, attribute_name):
402 network[attribute_name] = getattr(engine, attribute_name)
403 else:
404 network[attribute_name] = attribute_value
405 NETWORKS[engine_name] = new_network(network, logger_name=engine_name)
406 elif isinstance(network, dict):
407 NETWORKS[engine_name] = new_network(network, logger_name=engine_name)
408
409 # define networks from engines.[i].network (references)
410 for engine_name, engine, network in iter_networks():
411 if isinstance(network, str):
412 NETWORKS[engine_name] = NETWORKS[network]
413
414 # the /image_proxy endpoint has a dedicated network.
415 # same parameters than the default network, but HTTP/2 is disabled.
416 # It decreases the CPU load average, and the total time is more or less the same
417 if 'image_proxy' not in NETWORKS:
418 image_proxy_params = default_params.copy()
419 image_proxy_params['enable_http2'] = False
420 NETWORKS['image_proxy'] = new_network(image_proxy_params, logger_name='image_proxy')
421
422
423@atexit.register
424def done():
425 """Close all HTTP client
426
427 Avoid a warning at exit
428 See https://github.com/encode/httpx/pull/2026
429
430 Note: since Network.aclose has to be async, it is not possible to call this method on Network.__del__
431 So Network.aclose is called here using atexit.register
432 """
433 try:
434 loop = get_loop()
435 if loop:
436 future = asyncio.run_coroutine_threadsafe(Network.aclose_all(), loop)
437 # wait 3 seconds to close the HTTP clients
438 future.result(3)
439 finally:
440 NETWORKS.clear()
441
442
443NETWORKS[DEFAULT_NAME] = Network()
Generator[tuple[tuple[str, str],...], str, str] get_proxy_cycles(self)
Definition network.py:150
httpx.AsyncClient get_client(self, bool|None verify=None, int|None max_redirects=None)
Definition network.py:190
SXNG_Response request(self, str method, str url, **t.Any kwargs)
Definition network.py:303
log_response(self, httpx.Response response)
Definition network.py:158
extract_do_raise_for_httperror(dict[str, t.Any] kwargs)
Definition network.py:239
is_valid_response(self, httpx.Response response)
Definition network.py:262
stream(self, str method, str url, **kwargs)
Definition network.py:306
__init__(self, bool enable_http=True, bool verify=True, bool enable_http2=False, int max_connections=None, int max_keepalive_connections=None, float keepalive_expiry=None, str|dict[str, str]|None proxies=None, bool using_tor_proxy=False, str|list[str]|None local_addresses=None, int retries=0, bool retry_on_http_error=False, int max_redirects=30, str logger_name=None)
Definition network.py:84
Generator[str] iter_ipaddresses(self)
Definition network.py:114
dict[str, t.Any] extract_kwargs_clients(dict[str, t.Any] kwargs)
Definition network.py:227
SXNG_Response call_client(self, bool stream, str method, str url, **t.Any kwargs)
Definition network.py:272
bool check_tor_proxy(httpx.AsyncClient client, proxies)
Definition network.py:167
SXNG_Response patch_response(self, httpx.Response response, bool do_raise_for_httperror)
Definition network.py:246
Generator[tuple[str, list[str]]] iter_proxies(self)
Definition network.py:137
Generator[tuple[tuple[str, str],...], str, str] _proxies_cycle
Definition network.py:99
::1337x
Definition 1337x.py:1
None initialize(list[dict[str, t.Any]] settings_engines=None, dict[str, t.Any] settings_outgoing=None)
Definition network.py:339
"Network" get_network(str|None name=None)
Definition network.py:314