.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
link_token.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2"""
3Method ``link_token``
4---------------------
5
6The ``link_token`` method evaluates a request as :py:obj:`suspicious
7<is_suspicious>` if the URL ``/client<token>.css`` is not requested by the
8client. By adding a random component (the token) in the URL, a bot can not send
9a ping by request a static URL.
10
11.. note::
12
13 This method requires a valkey DB and needs a HTTP X-Forwarded-For_ header.
14
15To get in use of this method a flask URL route needs to be added:
16
17.. code:: python
18
19 @app.route('/client<token>.css', methods=['GET', 'POST'])
20 def client_token(token=None):
21 link_token.ping(request, token)
22 return Response('', mimetype='text/css')
23
24And in the HTML template from flask a stylesheet link is needed (the value of
25``link_token`` comes from :py:obj:`get_token`):
26
27.. code:: html
28
29 <link rel="stylesheet"
30 href="{{ url_for('client_token', token=link_token) }}"
31 type="text/css" >
32
33.. _X-Forwarded-For:
34 https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
35
36"""
37from __future__ import annotations
38from ipaddress import (
39 IPv4Network,
40 IPv6Network,
41 ip_address,
42)
43
44import string
45import random
46import flask
47
48from searx.valkeylib import secret_hash
49
50from ._helpers import (
51 get_network,
52 logger,
53)
54
55from . import config
56from . import valkeydb
57
58TOKEN_LIVE_TIME = 600
59"""Lifetime (sec) of limiter's CSS token."""
60
61PING_LIVE_TIME = 3600
62"""Lifetime (sec) of the ping-key from a client (request)"""
63
64PING_KEY = 'SearXNG_limiter.ping'
65"""Prefix of all ping-keys generated by :py:obj:`get_ping_key`"""
66
67TOKEN_KEY = 'SearXNG_limiter.token'
68"""Key for which the current token is stored in the DB"""
69
70logger = logger.getChild('botdetection.link_token')
71
72
73def is_suspicious(network: IPv4Network | IPv6Network, request: flask.Request, renew: bool = False):
74 """Checks whether a valid ping is exists for this (client) network, if not
75 this request is rated as *suspicious*. If a valid ping exists and argument
76 ``renew`` is ``True`` the expire time of this ping is reset to
77 :py:obj:`PING_LIVE_TIME`.
78
79 """
80 valkey_client = valkeydb.get_valkey_client()
81 ping_key = get_ping_key(network, request)
82 if not valkey_client.get(ping_key):
83 logger.info("missing ping (IP: %s) / request: %s", network.compressed, ping_key)
84 return True
85
86 if renew:
87 valkey_client.set(ping_key, 1, ex=PING_LIVE_TIME)
88
89 logger.debug("found ping for (client) network %s -> %s", network.compressed, ping_key)
90 return False
91
92
93def ping(request: flask.Request, token: str):
94 """This function is called by a request to URL ``/client<token>.css``. If
95 ``token`` is valid a :py:obj:`PING_KEY` for the client is stored in the DB.
96 The expire time of this ping-key is :py:obj:`PING_LIVE_TIME`.
97
98 """
99 valkey_client = valkeydb.get_valkey_client()
100 cfg = config.get_global_cfg()
101
102 if not token_is_valid(token):
103 return
104
105 real_ip = ip_address(request.remote_addr) # type: ignore
106 network = get_network(real_ip, cfg)
107
108 ping_key = get_ping_key(network, request)
109 logger.debug(
110 "store ping_key for (client) network %s (IP %s) -> %s", network.compressed, real_ip.compressed, ping_key
111 )
112 valkey_client.set(ping_key, 1, ex=PING_LIVE_TIME)
113
114
115def get_ping_key(network: IPv4Network | IPv6Network, request: flask.Request) -> str:
116 """Generates a hashed key that fits (more or less) to a *WEB-browser
117 session* in a network."""
118 return (
119 PING_KEY
120 + "["
121 + secret_hash(
122 network.compressed + request.headers.get('Accept-Language', '') + request.headers.get('User-Agent', '')
123 )
124 + "]"
125 )
126
127
128def token_is_valid(token) -> bool:
129 valid = token == get_token()
130 logger.debug("token is valid --> %s", valid)
131 return valid
132
133
134def get_token() -> str:
135 """Returns current token. If there is no currently active token a new token
136 is generated randomly and stored in the Valkey DB. Without without a
137 database connection, string "12345678" is returned.
138
139 - :py:obj:`TOKEN_LIVE_TIME`
140 - :py:obj:`TOKEN_KEY`
141
142 """
143 try:
144 valkey_client = valkeydb.get_valkey_client()
145 except ValueError:
146 # This function is also called when limiter is inactive / no valkey DB
147 # (see render function in webapp.py)
148 return '12345678'
149
150 token = valkey_client.get(TOKEN_KEY)
151 if token:
152 token = token.decode('UTF-8') # type: ignore
153 else:
154 token = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(16))
155 valkey_client.set(TOKEN_KEY, token, ex=TOKEN_LIVE_TIME)
156 return token