Coverage for src/bluetooth_sig/advertising/registry.py: 73%

119 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-28 01:26 +0000

1"""Registry for advertising data interpreter routing.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6import threading 

7from typing import Any 

8 

9import msgspec 

10 

11from bluetooth_sig.advertising.base import ( 

12 AdvertisingData, 

13 DataSource, 

14 PayloadInterpreter, 

15) 

16from bluetooth_sig.advertising.state import DeviceAdvertisingState 

17from bluetooth_sig.types.company import ManufacturerData 

18from bluetooth_sig.types.uuid import BluetoothUUID 

19 

20 

21class PayloadContext(msgspec.Struct, kw_only=True, frozen=True): 

22 """Context information for payload interpretation.""" 

23 

24 mac_address: str 

25 rssi: int | None = None 

26 timestamp: float | None = None 

27 

28 

29logger = logging.getLogger(__name__) 

30 

31 

32class PayloadInterpreterRegistry: 

33 """Routes advertisements to PayloadInterpreter classes. 

34 

35 Does NOT manage interpreter instances or state - caller owns those. 

36 Only handles class registration and lookup. 

37 

38 Attributes: 

39 _by_service_uuid: Interpreters indexed by service UUID. 

40 _by_company_id: Interpreters indexed by company ID. 

41 _fallback: Interpreters that match by custom logic only. 

42 

43 """ 

44 

45 _instance: PayloadInterpreterRegistry | None = None 

46 _instance_lock = threading.RLock() 

47 

48 @classmethod 

49 def get_instance(cls) -> PayloadInterpreterRegistry: 

50 """Return the process-wide PayloadInterpreterRegistry singleton instance.""" 

51 if cls._instance is None: 

52 with cls._instance_lock: 

53 if cls._instance is None: 

54 cls._instance = cls() 

55 return cls._instance 

56 

57 def __init__(self) -> None: 

58 """Initialise empty registry.""" 

59 self._by_service_uuid: dict[str, list[type[PayloadInterpreter[Any]]]] = {} 

60 self._by_company_id: dict[int, list[type[PayloadInterpreter[Any]]]] = {} 

61 self._fallback: list[type[PayloadInterpreter[Any]]] = [] 

62 

63 def register(self, interpreter_class: type[PayloadInterpreter[Any]]) -> None: 

64 """Register an interpreter class. 

65 

66 Called automatically by PayloadInterpreter.__init_subclass__. 

67 

68 Args: 

69 interpreter_class: The interpreter class to register. 

70 

71 """ 

72 if not hasattr(interpreter_class, "_info"): 

73 logger.warning("Interpreter %s has no _info, skipping", interpreter_class.__name__) 

74 return 

75 

76 info = interpreter_class._info # pylint: disable=protected-access 

77 

78 if info.data_source == DataSource.MANUFACTURER and info.company_id is not None: 

79 if info.company_id not in self._by_company_id: 

80 self._by_company_id[info.company_id] = [] 

81 self._by_company_id[info.company_id].append(interpreter_class) 

82 logger.debug("Registered %s for company 0x%04X", interpreter_class.__name__, info.company_id) 

83 

84 elif info.data_source == DataSource.SERVICE and info.service_uuid is not None: 

85 uuid_key = str(info.service_uuid).upper() 

86 if uuid_key not in self._by_service_uuid: 

87 self._by_service_uuid[uuid_key] = [] 

88 self._by_service_uuid[uuid_key].append(interpreter_class) 

89 logger.debug("Registered %s for UUID %s", interpreter_class.__name__, uuid_key) 

90 

91 else: 

92 self._fallback.append(interpreter_class) 

93 logger.debug("Registered fallback interpreter %s", interpreter_class.__name__) 

94 

95 def unregister(self, interpreter_class: type[PayloadInterpreter[Any]]) -> None: 

96 """Unregister an interpreter class. 

97 

98 Args: 

99 interpreter_class: The interpreter class to unregister. 

100 

101 """ 

102 if not hasattr(interpreter_class, "_info"): 

103 return 

104 

105 info = interpreter_class._info # pylint: disable=protected-access 

106 

107 if info.data_source == DataSource.MANUFACTURER and info.company_id is not None: 

108 if info.company_id in self._by_company_id: 

109 self._by_company_id[info.company_id] = [ 

110 p for p in self._by_company_id[info.company_id] if p is not interpreter_class 

111 ] 

112 

113 elif info.data_source == DataSource.SERVICE and info.service_uuid is not None: 

114 uuid_key = str(info.service_uuid).upper() 

115 if uuid_key in self._by_service_uuid: 

116 self._by_service_uuid[uuid_key] = [ 

117 p for p in self._by_service_uuid[uuid_key] if p is not interpreter_class 

118 ] 

119 

120 if interpreter_class in self._fallback: 

121 self._fallback.remove(interpreter_class) 

122 

123 def find_interpreter_class(self, advertising_data: AdvertisingData) -> type[PayloadInterpreter[Any]] | None: 

124 """Find first interpreter class that handles this advertisement. 

125 

126 Args: 

127 advertising_data: Complete advertising data from BLE packet. 

128 

129 Returns: 

130 First matching interpreter class, or None if no match. 

131 

132 """ 

133 candidates: list[type[PayloadInterpreter[Any]]] = [] 

134 

135 for company_id in advertising_data.manufacturer_data: 

136 if company_id in self._by_company_id: 

137 candidates.extend(self._by_company_id[company_id]) 

138 

139 for service_uuid in advertising_data.service_data: 

140 uuid_key = str(service_uuid).upper() 

141 if uuid_key in self._by_service_uuid: 

142 candidates.extend(self._by_service_uuid[uuid_key]) 

143 

144 candidates.extend(self._fallback) 

145 

146 for interpreter_class in candidates: 

147 if interpreter_class.supports(advertising_data): 

148 return interpreter_class 

149 

150 return None 

151 

152 def find_all_interpreter_classes(self, advertising_data: AdvertisingData) -> list[type[PayloadInterpreter[Any]]]: 

153 """Find all interpreter classes that handle this advertisement. 

154 

155 Useful when multiple protocols coexist in one advertisement 

156 (e.g., BTHome + Xiaomi UUIDs). 

157 

158 Args: 

159 advertising_data: Complete advertising data from BLE packet. 

160 

161 Returns: 

162 List of all matching interpreter classes. 

163 

164 """ 

165 candidates: list[type[PayloadInterpreter[Any]]] = [] 

166 

167 for company_id in advertising_data.manufacturer_data: 

168 if company_id in self._by_company_id: 

169 candidates.extend(self._by_company_id[company_id]) 

170 

171 for service_uuid in advertising_data.service_data: 

172 uuid_key = str(service_uuid).upper() 

173 if uuid_key in self._by_service_uuid: 

174 candidates.extend(self._by_service_uuid[uuid_key]) 

175 

176 candidates.extend(self._fallback) 

177 

178 return [ic for ic in candidates if ic.supports(advertising_data)] 

179 

180 def get_registered_interpreters(self) -> list[type[PayloadInterpreter[Any]]]: 

181 """Get all registered interpreter classes. 

182 

183 Returns: 

184 List of all registered interpreter classes (deduplicated). 

185 

186 """ 

187 all_interpreters: list[type[PayloadInterpreter[Any]]] = [] 

188 for interpreters in self._by_company_id.values(): 

189 all_interpreters.extend(interpreters) 

190 for interpreters in self._by_service_uuid.values(): 

191 all_interpreters.extend(interpreters) 

192 all_interpreters.extend(self._fallback) 

193 return list(set(all_interpreters)) 

194 

195 def clear(self) -> None: 

196 """Clear all registered interpreters.""" 

197 self._by_company_id.clear() 

198 self._by_service_uuid.clear() 

199 self._fallback.clear() 

200 

201 

202def get_payload_interpreter_registry() -> PayloadInterpreterRegistry: 

203 """Return the process-wide payload interpreter registry singleton instance.""" 

204 return PayloadInterpreterRegistry.get_instance() 

205 

206 

207def parse_advertising_payloads( 

208 manufacturer_data: dict[int, bytes], 

209 service_data: dict[BluetoothUUID, bytes], 

210 context: PayloadContext, 

211 state: DeviceAdvertisingState | None = None, 

212 *, 

213 registry: PayloadInterpreterRegistry | None = None, 

214) -> list[Any]: 

215 """Auto-discover and parse all payloads in an advertisement. 

216 

217 This is the high-level "just parse everything" API. 

218 Finds all matching interpreters and parses their payloads. 

219 

220 Args: 

221 manufacturer_data: Company ID → payload bytes mapping. 

222 service_data: Service UUID → payload bytes mapping. 

223 context: Advertisement context (MAC address, RSSI, timestamp). 

224 state: Current device advertising state (optional, created if None). 

225 registry: Interpreter registry to use (defaults to process-wide registry). 

226 

227 Returns: 

228 List of parsed data from all matching interpreters. 

229 Failed interpretations are silently skipped (exceptions logged). 

230 

231 Example:: 

232 from bluetooth_sig.advertising import parse_advertising_payloads, PayloadContext 

233 

234 context = PayloadContext(mac_address="AA:BB:CC:DD:EE:FF", rssi=-60) 

235 results = parse_advertising_payloads( 

236 manufacturer_data={0x038F: xiaomi_bytes}, 

237 service_data={BTHOME_UUID: bthome_bytes}, 

238 context=context, 

239 ) 

240 

241 for data in results: 

242 print(f"Parsed {data}") 

243 

244 """ 

245 results: list[Any] = [] 

246 

247 # Use process-wide registry if none provided 

248 if registry is None: 

249 registry = get_payload_interpreter_registry() 

250 

251 # Create state if not provided 

252 if state is None: 

253 state = DeviceAdvertisingState(address=context.mac_address) 

254 

255 mfr_data_dict: dict[int, ManufacturerData] = {} 

256 for company_id, payload in manufacturer_data.items(): 

257 mfr_data_dict[company_id] = ManufacturerData.from_id_and_payload(company_id, payload) 

258 

259 # Build AdvertisingData struct 

260 ad_data = AdvertisingData( 

261 manufacturer_data=mfr_data_dict, 

262 service_data=service_data, 

263 local_name=None, 

264 rssi=context.rssi, 

265 timestamp=context.timestamp, 

266 ) 

267 

268 # Find all matching interpreters 

269 interpreter_classes = registry.find_all_interpreter_classes(ad_data) 

270 

271 if not interpreter_classes: 

272 # No interpreters found - return empty list 

273 return results 

274 

275 # Run each interpreter 

276 for interpreter_class in interpreter_classes: 

277 try: 

278 interpreter = interpreter_class(context.mac_address) 

279 data = interpreter.interpret(ad_data, state) 

280 results.append(data) 

281 except Exception: # pylint: disable=broad-exception-caught # Catch all interpreter errors 

282 # Log and continue to next interpreter 

283 logger.debug("Interpreter %s failed", interpreter_class.__name__, exc_info=True) 

284 

285 return results