.oO SearXNG Developer Documentation Oo.
Loading...
Searching...
No Matches
results.py
Go to the documentation of this file.
1# SPDX-License-Identifier: AGPL-3.0-or-later
2# pylint: disable=missing-module-docstring, missing-class-docstring
3
4import typing as t
5
6import warnings
7from collections import defaultdict
8from threading import RLock
9
10from searx import logger as log
11import searx.engines
12from searx.metrics import histogram_observe, counter_add
13from searx.result_types import Result, LegacyResult, MainResult
14from searx.result_types.answer import AnswerSet, BaseAnswer
15
16
18 result: MainResult | LegacyResult,
19 priority: MainResult.PriorityType,
20) -> float:
21 weight = 1.0
22
23 for result_engine in result['engines']:
24 if hasattr(searx.engines.engines.get(result_engine), 'weight'):
25 weight *= float(searx.engines.engines[result_engine].weight)
26
27 weight *= len(result['positions'])
28 score = 0
29
30 for position in result['positions']:
31 if priority == 'low':
32 continue
33 if priority == 'high':
34 score += weight
35 else:
36 score += weight / position
37
38 return score
39
40
41class Timing(t.NamedTuple):
42 engine: str
43 total: float
44 load: float
45
46
47class UnresponsiveEngine(t.NamedTuple):
48 engine: str
49 error_type: str
50 suspended: bool
51
52
54 """In the result container, the results are collected, sorted and duplicates
55 will be merged."""
56
57 # pylint: disable=too-many-statements
58
59 main_results_map: dict[int, MainResult | LegacyResult]
60 infoboxes: list[LegacyResult]
61 suggestions: set[str]
62 answers: AnswerSet
63 corrections: set[str]
64
65 def __init__(self):
66 self.main_results_map = {}
67 self.infoboxes = []
68 self.suggestions = set()
70 self.corrections = set()
71
72 self._number_of_results: list[int] = []
73 self.engine_data: dict[str, dict[str, str]] = defaultdict(dict)
74 self._closed: bool = False
75 self.paging: bool = False
76 self.unresponsive_engines: set[UnresponsiveEngine] = set()
77 self.timings: list[Timing] = []
78 self.redirect_url: str | None = None
79 self.on_result: t.Callable[[Result | LegacyResult], bool] = lambda _: True
80 self._lock: RLock = RLock()
81 self._main_results_sorted: list[MainResult | LegacyResult] = None # type: ignore
82
83 def extend(
84 self, engine_name: str | None, results: list[Result | LegacyResult]
85 ): # pylint: disable=too-many-branches
86 if self._closed:
87 log.debug("container is closed, ignoring results: %s", results)
88 return
89 main_count = 0
90
91 for result in list(results):
92
93 if isinstance(result, Result):
94 result.engine = result.engine or engine_name
95 result.normalize_result_fields()
96 if not self.on_result(result):
97 continue
98
99 if isinstance(result, BaseAnswer):
100 self.answers.add(result)
101 elif isinstance(result, MainResult):
102 main_count += 1
103 self._merge_main_result(result, main_count)
104 else:
105 # more types need to be implemented in the future ..
106 raise NotImplementedError(f"no handler implemented to process the result of type {result}")
107
108 else:
109 result["engine"] = result.get("engine") or engine_name or ""
110 result = LegacyResult(result) # for backward compatibility, will be romeved one day
111 result.normalize_result_fields()
112
113 if "suggestion" in result:
114 if self.on_result(result):
115 self.suggestions.add(result["suggestion"])
116 continue
117
118 if "answer" in result:
119 if self.on_result(result):
120 warnings.warn(
121 f"answer results from engine {result.engine}"
122 " are without typification / migrate to Answer class.",
123 DeprecationWarning,
124 )
125 self.answers.add(result) # type: ignore
126 continue
127
128 if "correction" in result:
129 if self.on_result(result):
130 self.corrections.add(result["correction"])
131 continue
132
133 if "infobox" in result:
134 if self.on_result(result):
135 self._merge_infobox(result)
136 continue
137
138 if "number_of_results" in result:
139 if self.on_result(result):
140 self._number_of_results.append(result["number_of_results"])
141 continue
142
143 if "engine_data" in result:
144 if self.on_result(result):
145 if result.engine:
146 self.engine_data[result.engine][result["key"]] = result["engine_data"]
147 continue
148
149 if self.on_result(result):
150 main_count += 1
151 self._merge_main_result(result, main_count)
152 continue
153
154 if engine_name in searx.engines.engines:
155 eng = searx.engines.engines[engine_name]
156 histogram_observe(main_count, "engine", eng.name, "result", "count")
157 if not self.paging and eng.paging:
158 self.paging = True
159
160 def _merge_infobox(self, new_infobox: LegacyResult):
161 add_infobox = True
162
163 new_id = getattr(new_infobox, "id", None)
164 if new_id is not None:
165 with self._lock:
166 for existing_infobox in self.infoboxes:
167 if new_id == getattr(existing_infobox, "id", None):
168 merge_two_infoboxes(existing_infobox, new_infobox)
169 add_infobox = False
170 if add_infobox:
171 self.infoboxes.append(new_infobox)
172
173 def _merge_main_result(self, result: MainResult | LegacyResult, position: int):
174 result_hash = hash(result)
175
176 with self._lock:
177
178 merged = self.main_results_map.get(result_hash)
179 if not merged:
180 # if there is no duplicate in the merged results, append result
181 result.positions = [position]
182 self.main_results_map[result_hash] = result
183 return
184
185 merge_two_main_results(merged, result)
186 # add the new position
187 merged.positions.append(position)
188
189 def close(self):
190 self._closed = True
191
192 for result in self.main_results_map.values():
193 result.score = calculate_score(result, result.priority)
194 for eng_name in result.engines:
195 counter_add(result.score, 'engine', eng_name, 'score')
196
197 def get_ordered_results(self) -> list[MainResult | LegacyResult]:
198 """Returns a sorted list of results to be displayed in the main result
199 area (:ref:`result types`)."""
200
201 if not self._closed:
202 self.close()
203
204 if self._main_results_sorted:
205 return self._main_results_sorted
206
207 # first pass, sort results by "score" (descanding)
208 results = sorted(self.main_results_map.values(), key=lambda x: x.score, reverse=True)
209
210 # pass 2 : group results by category and template
211 gresults: list[MainResult | LegacyResult] = []
212 categoryPositions: dict[str, t.Any] = {}
213 max_count = 8
214 max_distance = 20
215
216 for res in results:
217 # do we need to handle more than one category per engine?
218 engine = searx.engines.engines.get(res.engine or "")
219 if engine:
220 res.category = engine.categories[0] if len(engine.categories) > 0 else ""
221
222 # do we need to handle more than one category per engine?
223 category = f"{res.category}:{res.template}:{'img_src' if (res.thumbnail or res.img_src) else ''}"
224 grp = categoryPositions.get(category)
225
226 # group with previous results using the same category, if the group
227 # can accept more result and is not too far from the current
228 # position
229
230 if (grp is not None) and (grp["count"] > 0) and (len(gresults) - grp["index"] < max_distance):
231 # group with the previous results using the same category with
232 # this one
233 index = grp["index"]
234 gresults.insert(index, res)
235
236 # update every index after the current one (including the
237 # current one)
238 for item in categoryPositions.values():
239 v = item["index"]
240 if v >= index:
241 item["index"] = v + 1
242
243 # update this category
244 grp["count"] -= 1
245
246 else:
247 gresults.append(res)
248 # update categoryIndex
249 categoryPositions[category] = {"index": len(gresults), "count": max_count}
250 continue
251
252 self._main_results_sorted = gresults
253 return self._main_results_sorted
254
255 @property
256 def number_of_results(self) -> int:
257 """Returns the average of results number, returns zero if the average
258 result number is smaller than the actual result count."""
259
260 if not self._closed:
261 log.error("call to ResultContainer.number_of_results before ResultContainer.close")
262 return 0
263
264 with self._lock:
265 resultnum_sum = sum(self._number_of_results)
266 if not resultnum_sum or not self._number_of_results:
267 return 0
268
269 average = int(resultnum_sum / len(self._number_of_results))
270 if average < len(self.get_ordered_results()):
271 average = 0
272 return average
273
274 def add_unresponsive_engine(self, engine_name: str, error_type: str, suspended: bool = False):
275 with self._lock:
276 if self._closed:
277 log.error("call to ResultContainer.add_unresponsive_engine after ResultContainer.close")
278 return
279 if searx.engines.engines[engine_name].display_error_messages:
280 self.unresponsive_engines.add(UnresponsiveEngine(engine_name, error_type, suspended))
281
282 def add_timing(self, engine_name: str, engine_time: float, page_load_time: float):
283 with self._lock:
284 if self._closed:
285 log.error("call to ResultContainer.add_timing after ResultContainer.close")
286 return
287 self.timings.append(Timing(engine_name, total=engine_time, load=page_load_time))
288
289 def get_timings(self) -> list[Timing]:
290 with self._lock:
291 if not self._closed:
292 log.error("call to ResultContainer.get_timings before ResultContainer.close")
293 return []
294 return self.timings
295
296
297def merge_two_infoboxes(origin: LegacyResult, other: LegacyResult):
298 """Merges the values from ``other`` into ``origin``."""
299 # pylint: disable=too-many-branches
300 weight1 = getattr(searx.engines.engines[origin.engine], "weight", 1)
301 weight2 = getattr(searx.engines.engines[other.engine], "weight", 1)
302
303 if weight2 > weight1:
304 origin.engine = other.engine
305
306 origin.engines |= other.engines
307
308 if other.urls:
309 url_items = origin.get("urls", [])
310
311 for url2 in other.urls:
312 unique_url = True
313 entity_url2 = url2.get("entity")
314
315 for url1 in origin.get("urls", []):
316 if (entity_url2 is not None and entity_url2 == url1.get("entity")) or (
317 url1.get("url") == url2.get("url")
318 ):
319 unique_url = False
320 break
321 if unique_url:
322 url_items.append(url2)
323
324 origin.urls = url_items
325
326 if other.img_src:
327 if not origin.img_src:
328 origin.img_src = other.img_src
329 elif weight2 > weight1:
330 origin.img_src = other.img_src
331
332 if other.attributes:
333 if not origin.attributes:
334 origin.attributes = other.attributes
335 else:
336 attr_names_1: set[str] = set()
337 for attr in origin.attributes:
338 label = attr.get("label")
339 if label:
340 attr_names_1.add(label)
341
342 entity = attr.get("entity")
343 if entity:
344 attr_names_1.add(entity)
345
346 for attr in other.attributes:
347 if attr.get("label") not in attr_names_1 and attr.get('entity') not in attr_names_1:
348 origin.attributes.append(attr)
349
350 if other.content:
351 if not origin.content:
352 origin.content = other.content
353 elif len(other.content) > len(origin.content):
354 origin.content = other.content
355
356
357def merge_two_main_results(origin: MainResult | LegacyResult, other: MainResult | LegacyResult):
358 """Merges the values from ``other`` into ``origin``."""
359
360 if len(other.content) > len(origin.content):
361 # use content with more text
362 origin.content = other.content
363
364 # use title with more text
365 if len(other.title) > len(origin.title):
366 origin.title = other.title
367
368 # merge all result's parameters not found in origin
369 if isinstance(other, MainResult) and isinstance(origin, MainResult):
370 origin.defaults_from(other)
371 elif isinstance(other, LegacyResult) and isinstance(origin, LegacyResult):
372 origin.defaults_from(other)
373
374 # add engine to list of result-engines
375 origin.engines.add(other.engine or "")
376
377 # use https, ftps, .. if possible
378 if origin.parsed_url and not origin.parsed_url.scheme.endswith("s"):
379 if other.parsed_url and other.parsed_url.scheme.endswith("s"):
380 origin.parsed_url = origin.parsed_url._replace(scheme=other.parsed_url.scheme)
381 origin.url = origin.parsed_url.geturl()
add_unresponsive_engine(self, str engine_name, str error_type, bool suspended=False)
Definition results.py:274
dict[str, dict[str, str]] engine_data
Definition results.py:73
list[MainResult|LegacyResult] _main_results_sorted
Definition results.py:81
_merge_infobox(self, LegacyResult new_infobox)
Definition results.py:160
_merge_main_result(self, MainResult|LegacyResult result, int position)
Definition results.py:173
list[MainResult|LegacyResult] get_ordered_results(self)
Definition results.py:197
t.Callable[[Result|LegacyResult], bool] on_result
Definition results.py:79
set[UnresponsiveEngine] unresponsive_engines
Definition results.py:76
extend(self, str|None engine_name, list[Result|LegacyResult] results)
Definition results.py:85
add_timing(self, str engine_name, float engine_time, float page_load_time)
Definition results.py:282
list[Timing] get_timings(self)
Definition results.py:289
::1337x
Definition 1337x.py:1
merge_two_infoboxes(LegacyResult origin, LegacyResult other)
Definition results.py:297
float calculate_score(MainResult|LegacyResult result, MainResult.PriorityType priority)
Definition results.py:20
merge_two_main_results(MainResult|LegacyResult origin, MainResult|LegacyResult other)
Definition results.py:357