.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
trusted_proxies.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2"""Implementation of a middleware to determine the real IP of an HTTP request
3(:py:obj:`flask.request.remote_addr`) behind a proxy chain."""
4# pylint: disable=too-many-branches
5
6
7import typing as t
8
9from collections import abc
10from ipaddress import IPv4Address, IPv6Address, ip_address, ip_network, IPv4Network, IPv6Network
11from werkzeug.http import parse_list_header
12
13from . import config
14from ._helpers import log_error_only_once, logger
15
16if t.TYPE_CHECKING:
17 from _typeshed.wsgi import StartResponse
18 from _typeshed.wsgi import WSGIApplication
19 from _typeshed.wsgi import WSGIEnvironment
20
21
22@t.final
24 """A middleware like the ProxyFix_ class, where the ``x_for`` argument is
25 replaced by a method that determines the number of trusted proxies via the
26 ``botdetection.trusted_proxies`` setting.
27
28 .. sidebar:: :py:obj:`flask.Request.remote_addr`
29
30 SearXNG uses Werkzeug's ProxyFix_ (with it default ``x_for=1``).
31
32 The remote IP (:py:obj:`flask.Request.remote_addr`) of the request is taken
33 from (first match):
34
35 - X-Forwarded-For_: If the header is set, the first untrusted IP that comes
36 before the IPs that are still part of the ``botdetection.trusted_proxies``
37 is used.
38
39 - `X-Real-IP <https://github.com/searxng/searxng/issues/1237#issuecomment-1147564516>`__:
40 If X-Forwarded-For_ is not set, `X-Real-IP` is used
41 (``botdetection.trusted_proxies`` is ignored).
42
43 If none of the header is set, the REMOTE_ADDR_ from the WSGI layer is used.
44 If (for whatever reasons) none IP can be determined, an error message is
45 displayed and ``100::`` is used instead (:rfc:`6666`).
46
47 .. _ProxyFix:
48 https://werkzeug.palletsprojects.com/middleware/proxy_fix/
49
50 .. _X-Forwarded-For:
51 https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
52
53 .. _REMOTE_ADDR:
54 https://wsgi.readthedocs.io/en/latest/proposals-2.0.html#making-some-keys-required
55
56 """
57
58 def __init__(self, wsgi_app: "WSGIApplication") -> None:
59 self.wsgi_app = wsgi_app
60
61 def trusted_proxies(self) -> list[IPv4Network | IPv6Network]:
62 cfg = config.get_global_cfg()
63 proxy_list: list[str] = cfg.get("botdetection.trusted_proxies", default=[])
64 return [ip_network(net, strict=False) for net in proxy_list]
65
67 self,
68 x_forwarded_for: list[IPv4Address | IPv6Address],
69 trusted_proxies: list[IPv4Network | IPv6Network],
70 ) -> str:
71 # always rtl
72 for addr in reversed(x_forwarded_for):
73 trust: bool = False
74
75 for net in trusted_proxies:
76 if addr.version == net.version and addr in net:
77 logger.debug("trust proxy %s (member of %s)", addr, net)
78 trust = True
79 break
80
81 # client address
82 if not trust:
83 return addr.compressed
84
85 # fallback to first address
86 return x_forwarded_for[0].compressed
87
88 def __call__(self, environ: "WSGIEnvironment", start_response: "StartResponse") -> abc.Iterable[bytes]:
89 # pylint: disable=too-many-statements
90
91 trusted_proxies = self.trusted_proxies()
92
93 # We do not rely on the REMOTE_ADDR from the WSGI environment / the
94 # variable is first removed from the WSGI environment and explicitly set
95 # in this function!
96
97 orig_remote_addr: str | None = environ.pop("REMOTE_ADDR")
98
99 # Validate the IPs involved in this game and delete all invalid ones
100 # from the WSGI environment.
101
102 if orig_remote_addr:
103 try:
104 addr = ip_address(orig_remote_addr)
105 if addr.version == 6 and addr.ipv4_mapped:
106 addr = addr.ipv4_mapped
107 orig_remote_addr = addr.compressed
108 except ValueError as exc:
109 logger.error("REMOTE_ADDR: %s / discard REMOTE_ADDR from WSGI environment", exc)
110 orig_remote_addr = None
111
112 x_real_ip: str | None = environ.get("HTTP_X_REAL_IP")
113 if x_real_ip:
114 try:
115 addr = ip_address(x_real_ip)
116 if addr.version == 6 and addr.ipv4_mapped:
117 addr = addr.ipv4_mapped
118 x_real_ip = addr.compressed
119 except ValueError as exc:
120 logger.error("X-Real-IP: %s / discard HTTP_X_REAL_IP from WSGI environment", exc)
121 environ.pop("HTTP_X_REAL_IP")
122 x_real_ip = None
123
124 x_forwarded_for: list[IPv4Address | IPv6Address] = []
125 if environ.get("HTTP_X_FORWARDED_FOR"):
126 for x_for_ip in parse_list_header(str(environ.get("HTTP_X_FORWARDED_FOR"))):
127 try:
128 addr = ip_address(x_for_ip)
129 except ValueError as exc:
130 logger.error("X-Forwarded-For: %s / discard HTTP_X_FORWARDED_FOR from WSGI environment", exc)
131 environ.pop("HTTP_X_FORWARDED_FOR")
132 x_forwarded_for = []
133 break
134
135 if addr.version == 6 and addr.ipv4_mapped:
136 addr = addr.ipv4_mapped
137 x_forwarded_for.append(addr)
138
139 # log questionable WSGI environments
140
141 if not x_forwarded_for and not x_real_ip:
142 log_error_only_once("X-Forwarded-For nor X-Real-IP header is set!")
143
144 if x_forwarded_for and not trusted_proxies:
145 log_error_only_once("missing botdetection.trusted_proxies config")
146 # without trusted_proxies, this variable is useless for determining
147 # the real IP
148 x_forwarded_for = []
149
150 # securing the WSGI environment variables that are adjusted
151
152 environ.update({"botdetection.trusted_proxies.orig": {"REMOTE_ADDR": orig_remote_addr}})
153
154 # determine *the real IP*
155
156 if x_forwarded_for:
157 environ["REMOTE_ADDR"] = self.trusted_remote_addr(x_forwarded_for, trusted_proxies)
158
159 elif x_real_ip:
160 environ["REMOTE_ADDR"] = x_real_ip
161
162 elif orig_remote_addr:
163 environ["REMOTE_ADDR"] = orig_remote_addr
164
165 else:
166 logger.error("No remote IP could be determined, use black-hole address: 100::")
167 environ["REMOTE_ADDR"] = "100::"
168
169 try:
170 _ = ip_address(environ["REMOTE_ADDR"])
171 except ValueError as exc:
172 logger.error("REMOTE_ADDR: %s, use black-hole address: 100::", exc)
173 environ["REMOTE_ADDR"] = "100::"
174
175 logger.debug("final REMOTE_ADDR is: %s", environ["REMOTE_ADDR"])
176 return self.wsgi_app(environ, start_response)
str trusted_remote_addr(self, list[IPv4Address|IPv6Address] x_forwarded_for, list[IPv4Network|IPv6Network] trusted_proxies)
list[IPv4Network|IPv6Network] trusted_proxies(self)
abc.Iterable[bytes] __call__(self, "WSGIEnvironment" environ, "StartResponse" start_response)
None __init__(self, "WSGIApplication" wsgi_app)