.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
__init__.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2"""Implement request processors used by engine-types."""
3
4__all__ = [
5 'EngineProcessor',
6 'OfflineProcessor',
7 'OnlineProcessor',
8 'OnlineDictionaryProcessor',
9 'OnlineCurrencyProcessor',
10 'OnlineUrlSearchProcessor',
11 'PROCESSORS',
12]
13
14import typing as t
15
16import threading
17
18from searx import logger
19from searx import engines
20
21from .online import OnlineProcessor
22from .offline import OfflineProcessor
23from .online_dictionary import OnlineDictionaryProcessor
24from .online_currency import OnlineCurrencyProcessor
25from .online_url_search import OnlineUrlSearchProcessor
26from .abstract import EngineProcessor
27
28if t.TYPE_CHECKING:
29 from searx.enginelib import Engine
30
31logger = logger.getChild('search.processors')
32PROCESSORS: dict[str, EngineProcessor] = {}
33"""Cache request processors, stored by *engine-name* (:py:func:`initialize`)
34
35:meta hide-value:
36"""
37
38
39def get_processor_class(engine_type: str) -> type[EngineProcessor] | None:
40 """Return processor class according to the ``engine_type``"""
41 for c in [
42 OnlineProcessor,
43 OfflineProcessor,
44 OnlineDictionaryProcessor,
45 OnlineCurrencyProcessor,
46 OnlineUrlSearchProcessor,
47 ]:
48 if c.engine_type == engine_type:
49 return c
50 return None
51
52
53def get_processor(engine: "Engine | ModuleType", engine_name: str) -> EngineProcessor | None:
54 """Return processor instance that fits to ``engine.engine.type``"""
55 engine_type = getattr(engine, 'engine_type', 'online')
56 processor_class = get_processor_class(engine_type)
57 if processor_class is not None:
58 return processor_class(engine, engine_name)
59 return None
60
61
62def initialize_processor(processor: EngineProcessor):
63 """Initialize one processor
64
65 Call the init function of the engine
66 """
67 if processor.has_initialize_function:
68 _t = threading.Thread(target=processor.initialize, daemon=True)
69 _t.start()
70
71
72def initialize(engine_list: list[dict[str, t.Any]]):
73 """Initialize all engines and store a processor for each engine in
74 :py:obj:`PROCESSORS`."""
75 for engine_data in engine_list:
76 engine_name: str = engine_data['name']
77 engine = engines.engines.get(engine_name)
78 if engine:
79 processor = get_processor(engine, engine_name)
80 if processor is None:
81 engine.logger.error('Error get processor for engine %s', engine_name)
82 else:
83 initialize_processor(processor)
84 PROCESSORS[engine_name] = processor
EngineProcessor|None get_processor("Engine | ModuleType" engine, str engine_name)
Definition __init__.py:53
type[EngineProcessor]|None get_processor_class(str engine_type)
Definition __init__.py:39
initialize(list[dict[str, t.Any]] engine_list)
Definition __init__.py:72
initialize_processor(EngineProcessor processor)
Definition __init__.py:62