Coverage for src/bluetooth_sig/advertising/registry.py: 73%
119 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 01:26 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 01:26 +0000
1"""Registry for advertising data interpreter routing."""
3from __future__ import annotations
5import logging
6import threading
7from typing import Any
9import msgspec
11from bluetooth_sig.advertising.base import (
12 AdvertisingData,
13 DataSource,
14 PayloadInterpreter,
15)
16from bluetooth_sig.advertising.state import DeviceAdvertisingState
17from bluetooth_sig.types.company import ManufacturerData
18from bluetooth_sig.types.uuid import BluetoothUUID
21class PayloadContext(msgspec.Struct, kw_only=True, frozen=True):
22 """Context information for payload interpretation."""
24 mac_address: str
25 rssi: int | None = None
26 timestamp: float | None = None
29logger = logging.getLogger(__name__)
32class PayloadInterpreterRegistry:
33 """Routes advertisements to PayloadInterpreter classes.
35 Does NOT manage interpreter instances or state - caller owns those.
36 Only handles class registration and lookup.
38 Attributes:
39 _by_service_uuid: Interpreters indexed by service UUID.
40 _by_company_id: Interpreters indexed by company ID.
41 _fallback: Interpreters that match by custom logic only.
43 """
45 _instance: PayloadInterpreterRegistry | None = None
46 _instance_lock = threading.RLock()
48 @classmethod
49 def get_instance(cls) -> PayloadInterpreterRegistry:
50 """Return the process-wide PayloadInterpreterRegistry singleton instance."""
51 if cls._instance is None:
52 with cls._instance_lock:
53 if cls._instance is None:
54 cls._instance = cls()
55 return cls._instance
57 def __init__(self) -> None:
58 """Initialise empty registry."""
59 self._by_service_uuid: dict[str, list[type[PayloadInterpreter[Any]]]] = {}
60 self._by_company_id: dict[int, list[type[PayloadInterpreter[Any]]]] = {}
61 self._fallback: list[type[PayloadInterpreter[Any]]] = []
63 def register(self, interpreter_class: type[PayloadInterpreter[Any]]) -> None:
64 """Register an interpreter class.
66 Called automatically by PayloadInterpreter.__init_subclass__.
68 Args:
69 interpreter_class: The interpreter class to register.
71 """
72 if not hasattr(interpreter_class, "_info"):
73 logger.warning("Interpreter %s has no _info, skipping", interpreter_class.__name__)
74 return
76 info = interpreter_class._info # pylint: disable=protected-access
78 if info.data_source == DataSource.MANUFACTURER and info.company_id is not None:
79 if info.company_id not in self._by_company_id:
80 self._by_company_id[info.company_id] = []
81 self._by_company_id[info.company_id].append(interpreter_class)
82 logger.debug("Registered %s for company 0x%04X", interpreter_class.__name__, info.company_id)
84 elif info.data_source == DataSource.SERVICE and info.service_uuid is not None:
85 uuid_key = str(info.service_uuid).upper()
86 if uuid_key not in self._by_service_uuid:
87 self._by_service_uuid[uuid_key] = []
88 self._by_service_uuid[uuid_key].append(interpreter_class)
89 logger.debug("Registered %s for UUID %s", interpreter_class.__name__, uuid_key)
91 else:
92 self._fallback.append(interpreter_class)
93 logger.debug("Registered fallback interpreter %s", interpreter_class.__name__)
95 def unregister(self, interpreter_class: type[PayloadInterpreter[Any]]) -> None:
96 """Unregister an interpreter class.
98 Args:
99 interpreter_class: The interpreter class to unregister.
101 """
102 if not hasattr(interpreter_class, "_info"):
103 return
105 info = interpreter_class._info # pylint: disable=protected-access
107 if info.data_source == DataSource.MANUFACTURER and info.company_id is not None:
108 if info.company_id in self._by_company_id:
109 self._by_company_id[info.company_id] = [
110 p for p in self._by_company_id[info.company_id] if p is not interpreter_class
111 ]
113 elif info.data_source == DataSource.SERVICE and info.service_uuid is not None:
114 uuid_key = str(info.service_uuid).upper()
115 if uuid_key in self._by_service_uuid:
116 self._by_service_uuid[uuid_key] = [
117 p for p in self._by_service_uuid[uuid_key] if p is not interpreter_class
118 ]
120 if interpreter_class in self._fallback:
121 self._fallback.remove(interpreter_class)
123 def find_interpreter_class(self, advertising_data: AdvertisingData) -> type[PayloadInterpreter[Any]] | None:
124 """Find first interpreter class that handles this advertisement.
126 Args:
127 advertising_data: Complete advertising data from BLE packet.
129 Returns:
130 First matching interpreter class, or None if no match.
132 """
133 candidates: list[type[PayloadInterpreter[Any]]] = []
135 for company_id in advertising_data.manufacturer_data:
136 if company_id in self._by_company_id:
137 candidates.extend(self._by_company_id[company_id])
139 for service_uuid in advertising_data.service_data:
140 uuid_key = str(service_uuid).upper()
141 if uuid_key in self._by_service_uuid:
142 candidates.extend(self._by_service_uuid[uuid_key])
144 candidates.extend(self._fallback)
146 for interpreter_class in candidates:
147 if interpreter_class.supports(advertising_data):
148 return interpreter_class
150 return None
152 def find_all_interpreter_classes(self, advertising_data: AdvertisingData) -> list[type[PayloadInterpreter[Any]]]:
153 """Find all interpreter classes that handle this advertisement.
155 Useful when multiple protocols coexist in one advertisement
156 (e.g., BTHome + Xiaomi UUIDs).
158 Args:
159 advertising_data: Complete advertising data from BLE packet.
161 Returns:
162 List of all matching interpreter classes.
164 """
165 candidates: list[type[PayloadInterpreter[Any]]] = []
167 for company_id in advertising_data.manufacturer_data:
168 if company_id in self._by_company_id:
169 candidates.extend(self._by_company_id[company_id])
171 for service_uuid in advertising_data.service_data:
172 uuid_key = str(service_uuid).upper()
173 if uuid_key in self._by_service_uuid:
174 candidates.extend(self._by_service_uuid[uuid_key])
176 candidates.extend(self._fallback)
178 return [ic for ic in candidates if ic.supports(advertising_data)]
180 def get_registered_interpreters(self) -> list[type[PayloadInterpreter[Any]]]:
181 """Get all registered interpreter classes.
183 Returns:
184 List of all registered interpreter classes (deduplicated).
186 """
187 all_interpreters: list[type[PayloadInterpreter[Any]]] = []
188 for interpreters in self._by_company_id.values():
189 all_interpreters.extend(interpreters)
190 for interpreters in self._by_service_uuid.values():
191 all_interpreters.extend(interpreters)
192 all_interpreters.extend(self._fallback)
193 return list(set(all_interpreters))
195 def clear(self) -> None:
196 """Clear all registered interpreters."""
197 self._by_company_id.clear()
198 self._by_service_uuid.clear()
199 self._fallback.clear()
202def get_payload_interpreter_registry() -> PayloadInterpreterRegistry:
203 """Return the process-wide payload interpreter registry singleton instance."""
204 return PayloadInterpreterRegistry.get_instance()
207def parse_advertising_payloads(
208 manufacturer_data: dict[int, bytes],
209 service_data: dict[BluetoothUUID, bytes],
210 context: PayloadContext,
211 state: DeviceAdvertisingState | None = None,
212 *,
213 registry: PayloadInterpreterRegistry | None = None,
214) -> list[Any]:
215 """Auto-discover and parse all payloads in an advertisement.
217 This is the high-level "just parse everything" API.
218 Finds all matching interpreters and parses their payloads.
220 Args:
221 manufacturer_data: Company ID → payload bytes mapping.
222 service_data: Service UUID → payload bytes mapping.
223 context: Advertisement context (MAC address, RSSI, timestamp).
224 state: Current device advertising state (optional, created if None).
225 registry: Interpreter registry to use (defaults to process-wide registry).
227 Returns:
228 List of parsed data from all matching interpreters.
229 Failed interpretations are silently skipped (exceptions logged).
231 Example::
232 from bluetooth_sig.advertising import parse_advertising_payloads, PayloadContext
234 context = PayloadContext(mac_address="AA:BB:CC:DD:EE:FF", rssi=-60)
235 results = parse_advertising_payloads(
236 manufacturer_data={0x038F: xiaomi_bytes},
237 service_data={BTHOME_UUID: bthome_bytes},
238 context=context,
239 )
241 for data in results:
242 print(f"Parsed {data}")
244 """
245 results: list[Any] = []
247 # Use process-wide registry if none provided
248 if registry is None:
249 registry = get_payload_interpreter_registry()
251 # Create state if not provided
252 if state is None:
253 state = DeviceAdvertisingState(address=context.mac_address)
255 mfr_data_dict: dict[int, ManufacturerData] = {}
256 for company_id, payload in manufacturer_data.items():
257 mfr_data_dict[company_id] = ManufacturerData.from_id_and_payload(company_id, payload)
259 # Build AdvertisingData struct
260 ad_data = AdvertisingData(
261 manufacturer_data=mfr_data_dict,
262 service_data=service_data,
263 local_name=None,
264 rssi=context.rssi,
265 timestamp=context.timestamp,
266 )
268 # Find all matching interpreters
269 interpreter_classes = registry.find_all_interpreter_classes(ad_data)
271 if not interpreter_classes:
272 # No interpreters found - return empty list
273 return results
275 # Run each interpreter
276 for interpreter_class in interpreter_classes:
277 try:
278 interpreter = interpreter_class(context.mac_address)
279 data = interpreter.interpret(ad_data, state)
280 results.append(data)
281 except Exception: # pylint: disable=broad-exception-caught # Catch all interpreter errors
282 # Log and continue to next interpreter
283 logger.debug("Interpreter %s failed", interpreter_class.__name__, exc_info=True)
285 return results