.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
config.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2"""Configuration class :py:class:`Config` with deep-update, schema validation
3and deprecated names.
4
5The :py:class:`Config` class implements a configuration that is based on
6structured dictionaries. The configuration schema is defined in a dictionary
7structure and the configuration data is given in a dictionary structure.
8"""
9
10import typing
11
12import copy
13import logging
14import pathlib
15
16from ..compat import tomllib
17
18__all__ = ['Config', 'UNSET', 'SchemaIssue', 'set_global_cfg', 'get_global_cfg']
19
20log = logging.getLogger(__name__)
21
22CFG: "Config | None" = None
23"""Global config of the botdetection."""
24
25
26def set_global_cfg(cfg: "Config"):
27 global CFG # pylint: disable=global-statement
28 CFG = cfg
29
30
31def get_global_cfg() -> "Config":
32 if CFG is None:
33 raise ValueError("Botdetection's config is not yet initialized.")
34 return CFG
35
36
37@typing.final
38class FALSE:
39 """Class of ``False`` singleton"""
40
41 # pylint: disable=multiple-statements
42 def __init__(self, msg: str):
43 self.msg = msg
44
45 def __bool__(self):
46 return False
47
48 def __str__(self):
49 return self.msg
50
51 __repr__ = __str__
52
53
54UNSET = FALSE('<UNSET>')
55
56
57@typing.final
58class SchemaIssue(ValueError):
59 """Exception to store and/or raise a message from a schema issue."""
60
61 def __init__(self, level: typing.Literal['warn', 'invalid'], msg: str):
62 self.level = level
63 super().__init__(msg)
64
65 def __str__(self):
66 return f"[cfg schema {self.level}] {self.args[0]}"
67
68
69class Config:
70 """Base class used for configuration"""
71
72 UNSET: object = UNSET
73
74 @classmethod
75 def from_toml(cls, schema_file: pathlib.Path, cfg_file: pathlib.Path, deprecated: dict[str, str]) -> "Config":
76
77 # init schema
78
79 log.debug("load schema file: %s", schema_file)
80 cfg = cls(cfg_schema=toml_load(schema_file), deprecated=deprecated)
81 if not cfg_file.exists():
82 log.warning("missing config file: %s", cfg_file)
83 return cfg
84
85 # load configuration
86
87 log.debug("load config file: %s", cfg_file)
88 upd_cfg = toml_load(cfg_file)
89
90 is_valid, issue_list = cfg.validate(upd_cfg)
91 for msg in issue_list:
92 log.error(str(msg))
93 if not is_valid:
94 raise TypeError(f"schema of {cfg_file} is invalid!")
95 cfg.update(upd_cfg)
96 return cfg
97
98 def __init__(self, cfg_schema: dict[str, typing.Any], deprecated: dict[str, str]):
99 """Constructor of class Config.
100
101 :param cfg_schema: Schema of the configuration
102 :param deprecated: dictionary that maps deprecated configuration names to a messages
103
104 These values are needed for validation, see :py:obj:`validate`.
105
106 """
107 self.cfg_schema: dict[str, typing.Any] = cfg_schema
108 self.deprecated: dict[str, str] = deprecated
109 self.cfg: dict[str, typing.Any] = copy.deepcopy(cfg_schema)
110
111 def __getitem__(self, key: str) -> typing.Any:
112 return self.get(key)
113
114 def validate(self, cfg: dict[str, typing.Any]):
115 """Validation of dictionary ``cfg`` on :py:obj:`Config.SCHEMA`.
116 Validation is done by :py:obj:`validate`."""
117
118 return validate(self.cfg_schema, cfg, self.deprecated)
119
120 def update(self, upd_cfg: dict[str, typing.Any]):
121 """Update this configuration by ``upd_cfg``."""
122
123 dict_deepupdate(self.cfg, upd_cfg)
124
125 def default(self, name: str):
126 """Returns default value of field ``name`` in ``self.cfg_schema``."""
127 return value(name, self.cfg_schema)
128
129 def get(self, name: str, default: typing.Any = UNSET, replace: bool = True) -> typing.Any:
130 """Returns the value to which ``name`` points in the configuration.
131
132 If there is no such ``name`` in the config and the ``default`` is
133 :py:obj:`UNSET`, a :py:obj:`KeyError` is raised.
134 """
135
136 parent = self._get_parent_dict(name)
137 val = parent.get(name.split('.')[-1], UNSET)
138 if val is UNSET:
139 if default is UNSET:
140 raise KeyError(name)
141 val = default
142
143 if replace and isinstance(val, str):
144 val = val % self
145 return val
146
147 def set(self, name: str, val: typing.Any):
148 """Set the value to which ``name`` points in the configuration.
149
150 If there is no such ``name`` in the config, a :py:obj:`KeyError` is
151 raised.
152 """
153 parent = self._get_parent_dict(name)
154 parent[name.split('.')[-1]] = val
155
156 def _get_parent_dict(self, name: str) -> dict[str, typing.Any]:
157 parent_name = '.'.join(name.split('.')[:-1])
158 if parent_name:
159 parent: dict[str, typing.Any] = value(parent_name, self.cfg)
160 else:
161 parent = self.cfg
162 if (parent is UNSET) or (not isinstance(parent, dict)):
163 raise KeyError(parent_name)
164 return parent
165
166 def path(self, name: str, default: typing.Any = UNSET):
167 """Get a :py:class:`pathlib.Path` object from a config string."""
168
169 val = self.get(name, default)
170 if val is UNSET:
171 if default is UNSET:
172 raise KeyError(name)
173 return default
174 return pathlib.Path(str(val))
175
176 def pyobj(self, name: str, default: typing.Any = UNSET):
177 """Get python object referred by full qualiffied name (FQN) in the config
178 string."""
179
180 fqn = self.get(name, default)
181 if fqn is UNSET:
182 if default is UNSET:
183 raise KeyError(name)
184 return default
185 (modulename, name) = str(fqn).rsplit('.', 1)
186 m = __import__(modulename, {}, {}, [name], 0)
187 return getattr(m, name)
188
189
190def toml_load(file_name: str | pathlib.Path):
191 try:
192 with open(file_name, "rb") as f:
193 return tomllib.load(f)
194 except tomllib.TOMLDecodeError as exc:
195 msg = str(exc).replace('\t', '').replace('\n', ' ')
196 log.error("%s: %s", file_name, msg)
197 raise
198
199
200# working with dictionaries
201
202
203def value(name: str, data_dict: dict[str, typing.Any]):
204 """Returns the value to which ``name`` points in the ``dat_dict``.
205
206 .. code: python
207
208 >>> data_dict = {
209 "foo": {"bar": 1 },
210 "bar": {"foo": 2 },
211 "foobar": [1, 2, 3],
212 }
213 >>> value('foobar', data_dict)
214 [1, 2, 3]
215 >>> value('foo.bar', data_dict)
216 1
217 >>> value('foo.bar.xxx', data_dict)
218 <UNSET>
219
220 """
221
222 ret_val = data_dict
223 for part in name.split('.'):
224 if isinstance(ret_val, dict):
225 ret_val = ret_val.get(part, UNSET)
226 if ret_val is UNSET:
227 break
228 return ret_val
229
230
232 schema_dict: dict[str, typing.Any], data_dict: dict[str, typing.Any], deprecated: dict[str, str]
233) -> tuple[bool, list[SchemaIssue]]:
234 """Deep validation of dictionary in ``data_dict`` against dictionary in
235 ``schema_dict``. Argument deprecated is a dictionary that maps deprecated
236 configuration names to a messages::
237
238 deprecated = {
239 "foo.bar" : "config 'foo.bar' is deprecated, use 'bar.foo'",
240 "..." : "..."
241 }
242
243 The function returns a python tuple ``(is_valid, issue_list)``:
244
245 ``is_valid``:
246 A bool value indicating ``data_dict`` is valid or not.
247
248 ``issue_list``:
249 A list of messages (:py:obj:`SchemaIssue`) from the validation::
250
251 [schema warn] data_dict: deprecated 'fontlib.foo': <DEPRECATED['foo.bar']>
252 [schema invalid] data_dict: key unknown 'fontlib.foo'
253 [schema invalid] data_dict: type mismatch 'fontlib.foo': expected ..., is ...
254
255 If ``schema_dict`` or ``data_dict`` is not a dictionary type a
256 :py:obj:`SchemaIssue` is raised.
257
258 """
259 names: list[str] = []
260 is_valid: bool = True
261 issue_list: list[SchemaIssue] = []
262
263 if not isinstance(schema_dict, dict):
264 raise SchemaIssue('invalid', "schema_dict is not a dict type")
265 if not isinstance(data_dict, dict):
266 raise SchemaIssue('invalid', f"data_dict issue{'.'.join(names)} is not a dict type")
267
268 is_valid, issue_list = _validate(names, issue_list, schema_dict, data_dict, deprecated)
269 return is_valid, issue_list
270
271
273 names: list[str],
274 issue_list: list[SchemaIssue],
275 schema_dict: dict[str, typing.Any],
276 data_dict: dict[str, typing.Any],
277 deprecated: dict[str, str],
278) -> tuple[bool, list[SchemaIssue]]:
279
280 is_valid = True
281
282 data_value: dict[str, typing.Any]
283 for key, data_value in data_dict.items():
284
285 names.append(key)
286 name = '.'.join(names)
287
288 deprecated_msg = deprecated.get(name)
289 # print("XXX %s: key %s // data_value: %s" % (name, key, data_value))
290 if deprecated_msg:
291 issue_list.append(SchemaIssue('warn', f"data_dict '{name}': deprecated - {deprecated_msg}"))
292
293 schema_value = value(name, schema_dict)
294 # print("YYY %s: key %s // schema_value: %s" % (name, key, schema_value))
295 if schema_value is UNSET:
296 if not deprecated_msg:
297 issue_list.append(SchemaIssue('invalid', f"data_dict '{name}': key unknown in schema_dict"))
298 is_valid = False
299
300 elif type(schema_value) != type(data_value): # pylint: disable=unidiomatic-typecheck
301 issue_list.append(
303 'invalid',
304 (f"data_dict: type mismatch '{name}':" f" expected {type(schema_value)}, is: {type(data_value)}"),
305 )
306 )
307 is_valid = False
308
309 elif isinstance(data_value, dict):
310 _valid, _ = _validate(names, issue_list, schema_dict, data_value, deprecated)
311 is_valid = is_valid and _valid
312 names.pop()
313
314 return is_valid, issue_list
315
316
317def dict_deepupdate(base_dict: dict[str, typing.Any], upd_dict: dict[str, typing.Any], names: list[str] | None = None):
318 """Deep-update of dictionary in ``base_dict`` by dictionary in ``upd_dict``.
319
320 For each ``upd_key`` & ``upd_val`` pair in ``upd_dict``:
321
322 0. If types of ``base_dict[upd_key]`` and ``upd_val`` do not match raise a
323 :py:obj:`TypeError`.
324
325 1. If ``base_dict[upd_key]`` is a dict: recursively deep-update it by ``upd_val``.
326
327 2. If ``base_dict[upd_key]`` not exist: set ``base_dict[upd_key]`` from a
328 (deep-) copy of ``upd_val``.
329
330 3. If ``upd_val`` is a list, extend list in ``base_dict[upd_key]`` by the
331 list in ``upd_val``.
332
333 4. If ``upd_val`` is a set, update set in ``base_dict[upd_key]`` by set in
334 ``upd_val``.
335 """
336 # pylint: disable=too-many-branches
337 if not isinstance(base_dict, dict):
338 raise TypeError("argument 'base_dict' is not a dictionary type")
339 if not isinstance(upd_dict, dict):
340 raise TypeError("argument 'upd_dict' is not a dictionary type")
341
342 if names is None:
343 names = []
344
345 for upd_key, upd_val in upd_dict.items():
346 # For each upd_key & upd_val pair in upd_dict:
347
348 if isinstance(upd_val, dict):
349
350 if upd_key in base_dict:
351 # if base_dict[upd_key] exists, recursively deep-update it
352 if not isinstance(base_dict[upd_key], dict):
353 raise TypeError(f"type mismatch {'.'.join(names)}: is not a dict type in base_dict")
355 base_dict[upd_key],
356 upd_val, # pyright: ignore[reportUnknownArgumentType]
357 names
358 + [
359 upd_key,
360 ],
361 )
362
363 else:
364 # if base_dict[upd_key] not exist, set base_dict[upd_key] from deepcopy of upd_val
365 base_dict[upd_key] = copy.deepcopy(upd_val) # pyright: ignore[reportUnknownArgumentType]
366
367 elif isinstance(upd_val, list):
368
369 if upd_key in base_dict:
370 # if base_dict[upd_key] exists, base_dict[up_key] is extended by
371 # the list from upd_val
372 if not isinstance(base_dict[upd_key], list):
373 raise TypeError(f"type mismatch {'.'.join(names)}: is not a list type in base_dict")
374 base_dict[upd_key].extend(upd_val)
375
376 else:
377 # if base_dict[upd_key] doesn't exists, set base_dict[key] from a deepcopy of the
378 # list in upd_val.
379 base_dict[upd_key] = copy.deepcopy(upd_val) # pyright: ignore[reportUnknownArgumentType]
380
381 elif isinstance(upd_val, set):
382
383 if upd_key in base_dict:
384 # if base_dict[upd_key] exists, base_dict[up_key] is updated by the set in upd_val
385 if not isinstance(base_dict[upd_key], set):
386 raise TypeError(f"type mismatch {'.'.join(names)}: is not a set type in base_dict")
387 base_dict[upd_key].update(upd_val.copy())
388
389 else:
390 # if base_dict[upd_key] doesn't exists, set base_dict[upd_key] from a copy of the
391 # set in upd_val
392 base_dict[upd_key] = upd_val.copy()
393
394 else:
395 # for any other type of upd_val replace or add base_dict[upd_key] by a copy
396 # of upd_val
397 base_dict[upd_key] = copy.copy(upd_val)
pyobj(self, str name, typing.Any default=UNSET)
Definition config.py:176
"Config" from_toml(cls, pathlib.Path schema_file, pathlib.Path cfg_file, dict[str, str] deprecated)
Definition config.py:75
path(self, str name, typing.Any default=UNSET)
Definition config.py:166
dict[str, typing.Any] _get_parent_dict(self, str name)
Definition config.py:156
update(self, dict[str, typing.Any] upd_cfg)
Definition config.py:120
__init__(self, dict[str, typing.Any] cfg_schema, dict[str, str] deprecated)
Definition config.py:98
set(self, str name, typing.Any val)
Definition config.py:147
typing.Any __getitem__(self, str key)
Definition config.py:111
typing.Any get(self, str name, typing.Any default=UNSET, bool replace=True)
Definition config.py:129
validate(self, dict[str, typing.Any] cfg)
Definition config.py:114
dict[str, typing.Any] cfg_schema
Definition config.py:107
dict[str, typing.Any] cfg
Definition config.py:109
__init__(self, typing.Literal['warn', 'invalid'] level, str msg)
Definition config.py:61
set_global_cfg("Config" cfg)
Definition config.py:26
tuple[bool, list[SchemaIssue]] validate(dict[str, typing.Any] schema_dict, dict[str, typing.Any] data_dict, dict[str, str] deprecated)
Definition config.py:233
toml_load(str|pathlib.Path file_name)
Definition config.py:190
tuple[bool, list[SchemaIssue]] _validate(list[str] names, list[SchemaIssue] issue_list, dict[str, typing.Any] schema_dict, dict[str, typing.Any] data_dict, dict[str, str] deprecated)
Definition config.py:278
dict_deepupdate(dict[str, typing.Any] base_dict, dict[str, typing.Any] upd_dict, list[str]|None names=None)
Definition config.py:317
"Config" get_global_cfg()
Definition config.py:31
value(str name, dict[str, typing.Any] data_dict)
Definition config.py:203