Coverage for src / bluetooth_sig / advertising / registry.py: 70%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Registry for advertising data interpreter routing.""" 

2 

3from __future__ import annotations 

4 

5import logging 

6from typing import Any 

7 

8import msgspec 

9 

10from bluetooth_sig.advertising.base import ( 

11 AdvertisingData, 

12 DataSource, 

13 PayloadInterpreter, 

14) 

15from bluetooth_sig.advertising.state import DeviceAdvertisingState 

16from bluetooth_sig.types.company import ManufacturerData 

17from bluetooth_sig.types.uuid import BluetoothUUID 

18 

19 

20class PayloadContext(msgspec.Struct, kw_only=True, frozen=True): 

21 """Context information for payload interpretation.""" 

22 

23 mac_address: str 

24 rssi: int | None = None 

25 timestamp: float | None = None 

26 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class PayloadInterpreterRegistry: 

32 """Routes advertisements to PayloadInterpreter classes. 

33 

34 Does NOT manage interpreter instances or state - caller owns those. 

35 Only handles class registration and lookup. 

36 

37 Attributes: 

38 _by_service_uuid: Interpreters indexed by service UUID. 

39 _by_company_id: Interpreters indexed by company ID. 

40 _fallback: Interpreters that match by custom logic only. 

41 

42 """ 

43 

44 def __init__(self) -> None: 

45 """Initialise empty registry.""" 

46 self._by_service_uuid: dict[str, list[type[PayloadInterpreter[Any]]]] = {} 

47 self._by_company_id: dict[int, list[type[PayloadInterpreter[Any]]]] = {} 

48 self._fallback: list[type[PayloadInterpreter[Any]]] = [] 

49 

50 def register(self, interpreter_class: type[PayloadInterpreter[Any]]) -> None: 

51 """Register an interpreter class. 

52 

53 Called automatically by PayloadInterpreter.__init_subclass__. 

54 

55 Args: 

56 interpreter_class: The interpreter class to register. 

57 

58 """ 

59 if not hasattr(interpreter_class, "_info"): 

60 logger.warning("Interpreter %s has no _info, skipping", interpreter_class.__name__) 

61 return 

62 

63 info = interpreter_class._info # pylint: disable=protected-access 

64 

65 if info.data_source == DataSource.MANUFACTURER and info.company_id is not None: 

66 if info.company_id not in self._by_company_id: 

67 self._by_company_id[info.company_id] = [] 

68 self._by_company_id[info.company_id].append(interpreter_class) 

69 logger.debug("Registered %s for company 0x%04X", interpreter_class.__name__, info.company_id) 

70 

71 elif info.data_source == DataSource.SERVICE and info.service_uuid is not None: 

72 uuid_key = str(info.service_uuid).upper() 

73 if uuid_key not in self._by_service_uuid: 

74 self._by_service_uuid[uuid_key] = [] 

75 self._by_service_uuid[uuid_key].append(interpreter_class) 

76 logger.debug("Registered %s for UUID %s", interpreter_class.__name__, uuid_key) 

77 

78 else: 

79 self._fallback.append(interpreter_class) 

80 logger.debug("Registered fallback interpreter %s", interpreter_class.__name__) 

81 

82 def unregister(self, interpreter_class: type[PayloadInterpreter[Any]]) -> None: 

83 """Unregister an interpreter class. 

84 

85 Args: 

86 interpreter_class: The interpreter class to unregister. 

87 

88 """ 

89 if not hasattr(interpreter_class, "_info"): 

90 return 

91 

92 info = interpreter_class._info # pylint: disable=protected-access 

93 

94 if info.data_source == DataSource.MANUFACTURER and info.company_id is not None: 

95 if info.company_id in self._by_company_id: 

96 self._by_company_id[info.company_id] = [ 

97 p for p in self._by_company_id[info.company_id] if p is not interpreter_class 

98 ] 

99 

100 elif info.data_source == DataSource.SERVICE and info.service_uuid is not None: 

101 uuid_key = str(info.service_uuid).upper() 

102 if uuid_key in self._by_service_uuid: 

103 self._by_service_uuid[uuid_key] = [ 

104 p for p in self._by_service_uuid[uuid_key] if p is not interpreter_class 

105 ] 

106 

107 if interpreter_class in self._fallback: 

108 self._fallback.remove(interpreter_class) 

109 

110 def find_interpreter_class(self, advertising_data: AdvertisingData) -> type[PayloadInterpreter[Any]] | None: 

111 """Find first interpreter class that handles this advertisement. 

112 

113 Args: 

114 advertising_data: Complete advertising data from BLE packet. 

115 

116 Returns: 

117 First matching interpreter class, or None if no match. 

118 

119 """ 

120 candidates: list[type[PayloadInterpreter[Any]]] = [] 

121 

122 for company_id in advertising_data.manufacturer_data: 

123 if company_id in self._by_company_id: 

124 candidates.extend(self._by_company_id[company_id]) 

125 

126 for service_uuid in advertising_data.service_data: 

127 uuid_key = str(service_uuid).upper() 

128 if uuid_key in self._by_service_uuid: 

129 candidates.extend(self._by_service_uuid[uuid_key]) 

130 

131 candidates.extend(self._fallback) 

132 

133 for interpreter_class in candidates: 

134 if interpreter_class.supports(advertising_data): 

135 return interpreter_class 

136 

137 return None 

138 

139 def find_all_interpreter_classes(self, advertising_data: AdvertisingData) -> list[type[PayloadInterpreter[Any]]]: 

140 """Find all interpreter classes that handle this advertisement. 

141 

142 Useful when multiple protocols coexist in one advertisement 

143 (e.g., BTHome + Xiaomi UUIDs). 

144 

145 Args: 

146 advertising_data: Complete advertising data from BLE packet. 

147 

148 Returns: 

149 List of all matching interpreter classes. 

150 

151 """ 

152 candidates: list[type[PayloadInterpreter[Any]]] = [] 

153 

154 for company_id in advertising_data.manufacturer_data: 

155 if company_id in self._by_company_id: 

156 candidates.extend(self._by_company_id[company_id]) 

157 

158 for service_uuid in advertising_data.service_data: 

159 uuid_key = str(service_uuid).upper() 

160 if uuid_key in self._by_service_uuid: 

161 candidates.extend(self._by_service_uuid[uuid_key]) 

162 

163 candidates.extend(self._fallback) 

164 

165 return [ic for ic in candidates if ic.supports(advertising_data)] 

166 

167 def get_registered_interpreters(self) -> list[type[PayloadInterpreter[Any]]]: 

168 """Get all registered interpreter classes. 

169 

170 Returns: 

171 List of all registered interpreter classes (deduplicated). 

172 

173 """ 

174 all_interpreters: list[type[PayloadInterpreter[Any]]] = [] 

175 for interpreters in self._by_company_id.values(): 

176 all_interpreters.extend(interpreters) 

177 for interpreters in self._by_service_uuid.values(): 

178 all_interpreters.extend(interpreters) 

179 all_interpreters.extend(self._fallback) 

180 return list(set(all_interpreters)) 

181 

182 def clear(self) -> None: 

183 """Clear all registered interpreters.""" 

184 self._by_company_id.clear() 

185 self._by_service_uuid.clear() 

186 self._fallback.clear() 

187 

188 

189# Global singleton for PayloadInterpreter registration 

190payload_interpreter_registry = PayloadInterpreterRegistry() 

191 

192 

193def parse_advertising_payloads( 

194 manufacturer_data: dict[int, bytes], 

195 service_data: dict[BluetoothUUID, bytes], 

196 context: PayloadContext, 

197 state: DeviceAdvertisingState | None = None, 

198 *, 

199 registry: PayloadInterpreterRegistry | None = None, 

200) -> list[Any]: 

201 """Auto-discover and parse all payloads in an advertisement. 

202 

203 This is the high-level "just parse everything" API. 

204 Finds all matching interpreters and parses their payloads. 

205 

206 Args: 

207 manufacturer_data: Company ID → payload bytes mapping. 

208 service_data: Service UUID → payload bytes mapping. 

209 context: Advertisement context (MAC address, RSSI, timestamp). 

210 state: Current device advertising state (optional, created if None). 

211 registry: Interpreter registry to use (defaults to global registry). 

212 

213 Returns: 

214 List of parsed data from all matching interpreters. 

215 Failed interpretations are silently skipped (exceptions logged). 

216 

217 Example:: 

218 from bluetooth_sig.advertising import parse_advertising_payloads, PayloadContext 

219 

220 context = PayloadContext(mac_address="AA:BB:CC:DD:EE:FF", rssi=-60) 

221 results = parse_advertising_payloads( 

222 manufacturer_data={0x038F: xiaomi_bytes}, 

223 service_data={BTHOME_UUID: bthome_bytes}, 

224 context=context, 

225 ) 

226 

227 for data in results: 

228 print(f"Parsed {data}") 

229 

230 """ 

231 results: list[Any] = [] 

232 

233 # Use global registry if none provided 

234 if registry is None: 

235 registry = payload_interpreter_registry 

236 

237 # Create state if not provided 

238 if state is None: 

239 state = DeviceAdvertisingState(address=context.mac_address) 

240 

241 mfr_data_dict: dict[int, ManufacturerData] = {} 

242 for company_id, payload in manufacturer_data.items(): 

243 mfr_data_dict[company_id] = ManufacturerData.from_id_and_payload(company_id, payload) 

244 

245 # Build AdvertisingData struct 

246 ad_data = AdvertisingData( 

247 manufacturer_data=mfr_data_dict, 

248 service_data=service_data, 

249 local_name=None, 

250 rssi=context.rssi, 

251 timestamp=context.timestamp, 

252 ) 

253 

254 # Find all matching interpreters 

255 interpreter_classes = registry.find_all_interpreter_classes(ad_data) 

256 

257 if not interpreter_classes: 

258 # No interpreters found - return empty list 

259 return results 

260 

261 # Run each interpreter 

262 for interpreter_class in interpreter_classes: 

263 try: 

264 interpreter = interpreter_class(context.mac_address) 

265 data = interpreter.interpret(ad_data, state) 

266 results.append(data) 

267 except Exception: # pylint: disable=broad-exception-caught # Catch all interpreter errors 

268 # Log and continue to next interpreter 

269 logger.debug("Interpreter %s failed", interpreter_class.__name__, exc_info=True) 

270 

271 return results