Coverage for src / bluetooth_sig / advertising / registry.py: 70%
108 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Registry for advertising data interpreter routing."""
3from __future__ import annotations
5import logging
6from typing import Any
8import msgspec
10from bluetooth_sig.advertising.base import (
11 AdvertisingData,
12 DataSource,
13 PayloadInterpreter,
14)
15from bluetooth_sig.advertising.state import DeviceAdvertisingState
16from bluetooth_sig.types.company import ManufacturerData
17from bluetooth_sig.types.uuid import BluetoothUUID
20class PayloadContext(msgspec.Struct, kw_only=True, frozen=True):
21 """Context information for payload interpretation."""
23 mac_address: str
24 rssi: int | None = None
25 timestamp: float | None = None
28logger = logging.getLogger(__name__)
31class PayloadInterpreterRegistry:
32 """Routes advertisements to PayloadInterpreter classes.
34 Does NOT manage interpreter instances or state - caller owns those.
35 Only handles class registration and lookup.
37 Attributes:
38 _by_service_uuid: Interpreters indexed by service UUID.
39 _by_company_id: Interpreters indexed by company ID.
40 _fallback: Interpreters that match by custom logic only.
42 """
44 def __init__(self) -> None:
45 """Initialise empty registry."""
46 self._by_service_uuid: dict[str, list[type[PayloadInterpreter[Any]]]] = {}
47 self._by_company_id: dict[int, list[type[PayloadInterpreter[Any]]]] = {}
48 self._fallback: list[type[PayloadInterpreter[Any]]] = []
50 def register(self, interpreter_class: type[PayloadInterpreter[Any]]) -> None:
51 """Register an interpreter class.
53 Called automatically by PayloadInterpreter.__init_subclass__.
55 Args:
56 interpreter_class: The interpreter class to register.
58 """
59 if not hasattr(interpreter_class, "_info"):
60 logger.warning("Interpreter %s has no _info, skipping", interpreter_class.__name__)
61 return
63 info = interpreter_class._info # pylint: disable=protected-access
65 if info.data_source == DataSource.MANUFACTURER and info.company_id is not None:
66 if info.company_id not in self._by_company_id:
67 self._by_company_id[info.company_id] = []
68 self._by_company_id[info.company_id].append(interpreter_class)
69 logger.debug("Registered %s for company 0x%04X", interpreter_class.__name__, info.company_id)
71 elif info.data_source == DataSource.SERVICE and info.service_uuid is not None:
72 uuid_key = str(info.service_uuid).upper()
73 if uuid_key not in self._by_service_uuid:
74 self._by_service_uuid[uuid_key] = []
75 self._by_service_uuid[uuid_key].append(interpreter_class)
76 logger.debug("Registered %s for UUID %s", interpreter_class.__name__, uuid_key)
78 else:
79 self._fallback.append(interpreter_class)
80 logger.debug("Registered fallback interpreter %s", interpreter_class.__name__)
82 def unregister(self, interpreter_class: type[PayloadInterpreter[Any]]) -> None:
83 """Unregister an interpreter class.
85 Args:
86 interpreter_class: The interpreter class to unregister.
88 """
89 if not hasattr(interpreter_class, "_info"):
90 return
92 info = interpreter_class._info # pylint: disable=protected-access
94 if info.data_source == DataSource.MANUFACTURER and info.company_id is not None:
95 if info.company_id in self._by_company_id:
96 self._by_company_id[info.company_id] = [
97 p for p in self._by_company_id[info.company_id] if p is not interpreter_class
98 ]
100 elif info.data_source == DataSource.SERVICE and info.service_uuid is not None:
101 uuid_key = str(info.service_uuid).upper()
102 if uuid_key in self._by_service_uuid:
103 self._by_service_uuid[uuid_key] = [
104 p for p in self._by_service_uuid[uuid_key] if p is not interpreter_class
105 ]
107 if interpreter_class in self._fallback:
108 self._fallback.remove(interpreter_class)
110 def find_interpreter_class(self, advertising_data: AdvertisingData) -> type[PayloadInterpreter[Any]] | None:
111 """Find first interpreter class that handles this advertisement.
113 Args:
114 advertising_data: Complete advertising data from BLE packet.
116 Returns:
117 First matching interpreter class, or None if no match.
119 """
120 candidates: list[type[PayloadInterpreter[Any]]] = []
122 for company_id in advertising_data.manufacturer_data:
123 if company_id in self._by_company_id:
124 candidates.extend(self._by_company_id[company_id])
126 for service_uuid in advertising_data.service_data:
127 uuid_key = str(service_uuid).upper()
128 if uuid_key in self._by_service_uuid:
129 candidates.extend(self._by_service_uuid[uuid_key])
131 candidates.extend(self._fallback)
133 for interpreter_class in candidates:
134 if interpreter_class.supports(advertising_data):
135 return interpreter_class
137 return None
139 def find_all_interpreter_classes(self, advertising_data: AdvertisingData) -> list[type[PayloadInterpreter[Any]]]:
140 """Find all interpreter classes that handle this advertisement.
142 Useful when multiple protocols coexist in one advertisement
143 (e.g., BTHome + Xiaomi UUIDs).
145 Args:
146 advertising_data: Complete advertising data from BLE packet.
148 Returns:
149 List of all matching interpreter classes.
151 """
152 candidates: list[type[PayloadInterpreter[Any]]] = []
154 for company_id in advertising_data.manufacturer_data:
155 if company_id in self._by_company_id:
156 candidates.extend(self._by_company_id[company_id])
158 for service_uuid in advertising_data.service_data:
159 uuid_key = str(service_uuid).upper()
160 if uuid_key in self._by_service_uuid:
161 candidates.extend(self._by_service_uuid[uuid_key])
163 candidates.extend(self._fallback)
165 return [ic for ic in candidates if ic.supports(advertising_data)]
167 def get_registered_interpreters(self) -> list[type[PayloadInterpreter[Any]]]:
168 """Get all registered interpreter classes.
170 Returns:
171 List of all registered interpreter classes (deduplicated).
173 """
174 all_interpreters: list[type[PayloadInterpreter[Any]]] = []
175 for interpreters in self._by_company_id.values():
176 all_interpreters.extend(interpreters)
177 for interpreters in self._by_service_uuid.values():
178 all_interpreters.extend(interpreters)
179 all_interpreters.extend(self._fallback)
180 return list(set(all_interpreters))
182 def clear(self) -> None:
183 """Clear all registered interpreters."""
184 self._by_company_id.clear()
185 self._by_service_uuid.clear()
186 self._fallback.clear()
189# Global singleton for PayloadInterpreter registration
190payload_interpreter_registry = PayloadInterpreterRegistry()
193def parse_advertising_payloads(
194 manufacturer_data: dict[int, bytes],
195 service_data: dict[BluetoothUUID, bytes],
196 context: PayloadContext,
197 state: DeviceAdvertisingState | None = None,
198 *,
199 registry: PayloadInterpreterRegistry | None = None,
200) -> list[Any]:
201 """Auto-discover and parse all payloads in an advertisement.
203 This is the high-level "just parse everything" API.
204 Finds all matching interpreters and parses their payloads.
206 Args:
207 manufacturer_data: Company ID → payload bytes mapping.
208 service_data: Service UUID → payload bytes mapping.
209 context: Advertisement context (MAC address, RSSI, timestamp).
210 state: Current device advertising state (optional, created if None).
211 registry: Interpreter registry to use (defaults to global registry).
213 Returns:
214 List of parsed data from all matching interpreters.
215 Failed interpretations are silently skipped (exceptions logged).
217 Example::
218 from bluetooth_sig.advertising import parse_advertising_payloads, PayloadContext
220 context = PayloadContext(mac_address="AA:BB:CC:DD:EE:FF", rssi=-60)
221 results = parse_advertising_payloads(
222 manufacturer_data={0x038F: xiaomi_bytes},
223 service_data={BTHOME_UUID: bthome_bytes},
224 context=context,
225 )
227 for data in results:
228 print(f"Parsed {data}")
230 """
231 results: list[Any] = []
233 # Use global registry if none provided
234 if registry is None:
235 registry = payload_interpreter_registry
237 # Create state if not provided
238 if state is None:
239 state = DeviceAdvertisingState(address=context.mac_address)
241 mfr_data_dict: dict[int, ManufacturerData] = {}
242 for company_id, payload in manufacturer_data.items():
243 mfr_data_dict[company_id] = ManufacturerData.from_id_and_payload(company_id, payload)
245 # Build AdvertisingData struct
246 ad_data = AdvertisingData(
247 manufacturer_data=mfr_data_dict,
248 service_data=service_data,
249 local_name=None,
250 rssi=context.rssi,
251 timestamp=context.timestamp,
252 )
254 # Find all matching interpreters
255 interpreter_classes = registry.find_all_interpreter_classes(ad_data)
257 if not interpreter_classes:
258 # No interpreters found - return empty list
259 return results
261 # Run each interpreter
262 for interpreter_class in interpreter_classes:
263 try:
264 interpreter = interpreter_class(context.mac_address)
265 data = interpreter.interpret(ad_data, state)
266 results.append(data)
267 except Exception: # pylint: disable=broad-exception-caught # Catch all interpreter errors
268 # Log and continue to next interpreter
269 logger.debug("Interpreter %s failed", interpreter_class.__name__, exc_info=True)
271 return results