Coverage for src / bluetooth_sig / device / advertising.py: 57%

128 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Advertising-related functionality for Device. 

2 

3Manages advertising packet interpretation for a BLE device using 

4the composition pattern. This class is accessed via `device.advertising`. 

5 

6Based on patterns from bleak (BLEDevice + BleakClient) and real-world 

7implementations (BTHome-BLE, Xiaomi-BLE). 

8 

9Error Handling: 

10 Methods raise exceptions instead of returning status codes. 

11 This is consistent with GATT characteristic parsing and Pythonic patterns. 

12 

13 Exceptions: 

14 EncryptionRequiredError: Payload encrypted, no bindkey available 

15 DecryptionFailedError: Decryption failed (wrong key or corrupt data) 

16 AdvertisingParseError: General parse failure (includes "no interpreter found") 

17""" 

18 

19from __future__ import annotations 

20 

21import logging 

22from collections.abc import Callable 

23from typing import TypeVar, overload 

24 

25from bluetooth_sig.advertising import AdvertisingPDUParser 

26from bluetooth_sig.advertising.base import AdvertisingData, PayloadInterpreter 

27from bluetooth_sig.advertising.exceptions import ( 

28 AdvertisingParseError, 

29 DecryptionFailedError, 

30 EncryptionRequiredError, 

31) 

32from bluetooth_sig.advertising.registry import PayloadInterpreterRegistry 

33from bluetooth_sig.advertising.state import DeviceAdvertisingState 

34from bluetooth_sig.device.client import ClientManagerProtocol 

35from bluetooth_sig.types.advertising.result import AdvertisementData 

36 

37logger = logging.getLogger(__name__) 

38 

39# Type variable for generic interpreter return types 

40T = TypeVar("T") 

41 

42 

43class DeviceAdvertising: # pylint: disable=too-many-instance-attributes 

44 """Manages advertising packet interpretation for a device. 

45 

46 Accessed via `device.advertising`. 

47 

48 Attributes: 

49 state: Current advertising state (caller-owned, mutable). 

50 mac_address: Device MAC address. 

51 

52 Example:: 

53 device = Device(mac_address="AA:BB:CC:DD:EE:FF", translator=translator) 

54 

55 # Set bindkey for encrypted advertisements 

56 device.advertising.set_bindkey(bytes.fromhex("0102030405060708090a0b0c0d0e0f10")) 

57 

58 

59 # Subscribe to continuous advertisement updates 

60 def on_advertisement(ad_data: AdvertisementData, data: Any) -> None: 

61 if data is not None: 

62 print(f"Sensor data: {data}") 

63 

64 

65 device.advertising.subscribe(on_advertisement) 

66 

67 # Later, unsubscribe 

68 device.advertising.unsubscribe(on_advertisement) 

69 

70 # Or process single advertisement manually 

71 ad_data = AdvertisingData( 

72 manufacturer_data={}, 

73 service_data={BluetoothUUID("0000fcd2-..."): payload}, 

74 rssi=-60, 

75 ) 

76 try: 

77 data = device.advertising.process(ad_data) 

78 print(f"Sensor data: {data}") 

79 except EncryptionRequiredError: 

80 print("Need bindkey") 

81 except AdvertisingParseError as e: 

82 print(f"Parse error: {e}") 

83 

84 """ 

85 

86 def __init__(self, mac_address: str, connection_manager: ClientManagerProtocol) -> None: 

87 """Initialise advertising subsystem. 

88 

89 Args: 

90 mac_address: Device MAC address. 

91 connection_manager: Connection manager for backend monitoring. 

92 

93 """ 

94 self._mac_address = mac_address 

95 self._connection_manager = connection_manager 

96 self.state = DeviceAdvertisingState(address=mac_address) 

97 self._interpreters: dict[str, PayloadInterpreter[object]] = {} 

98 self._registry: PayloadInterpreterRegistry | None = None 

99 self._pdu_parser = AdvertisingPDUParser() 

100 self._callbacks: list[Callable[[AdvertisementData, object], None]] = [] 

101 self._backend_monitoring_enabled = False 

102 

103 @property 

104 def mac_address(self) -> str: 

105 """Device MAC address.""" 

106 return self._mac_address 

107 

108 def set_registry(self, registry: PayloadInterpreterRegistry) -> None: 

109 """Set the interpreter registry for auto-detection. 

110 

111 Args: 

112 registry: PayloadInterpreterRegistry to use for interpreter lookup. 

113 

114 """ 

115 self._registry = registry 

116 

117 def register_interpreter( 

118 self, 

119 name: str, 

120 interpreter: PayloadInterpreter[object], 

121 ) -> None: 

122 """Register a named interpreter. 

123 

124 Args: 

125 name: Unique name for the interpreter. 

126 interpreter: PayloadInterpreter instance. 

127 

128 """ 

129 self._interpreters[name] = interpreter 

130 

131 def get_interpreter(self, name: str) -> PayloadInterpreter[object] | None: 

132 """Get an interpreter by name. 

133 

134 Args: 

135 name: Interpreter name. 

136 

137 Returns: 

138 PayloadInterpreter instance if found, None otherwise. 

139 

140 """ 

141 return self._interpreters.get(name) 

142 

143 def set_bindkey(self, bindkey: bytes) -> None: 

144 """Set the encryption bindkey for decryption. 

145 

146 Args: 

147 bindkey: 16-byte AES-CCM key. 

148 

149 """ 

150 self.state.encryption.bindkey = bindkey 

151 

152 @overload 

153 def process( 

154 self, 

155 advertising_data: AdvertisingData, 

156 *, 

157 interpreter: type[PayloadInterpreter[T]], 

158 ) -> T: ... 

159 

160 @overload 

161 def process( 

162 self, 

163 advertising_data: AdvertisingData, 

164 ) -> object: ... 

165 

166 def process( 

167 self, 

168 advertising_data: AdvertisingData, 

169 *, 

170 interpreter: type[PayloadInterpreter[T]] | None = None, 

171 ) -> T | object: 

172 """Process an advertising payload. 

173 

174 Type-safe path: Pass an interpreter class to get typed return. 

175 

176 Args: 

177 advertising_data: Complete advertising data from BLE packet. 

178 interpreter: Interpreter class for type-safe parsing (recommended). 

179 

180 Returns: 

181 Parsed data from the interpreter. Return type is inferred when 

182 passing interpreter class, otherwise returns object. 

183 

184 Raises: 

185 AdvertisingParseError: No interpreter found or parse failure. 

186 EncryptionRequiredError: Payload encrypted, no bindkey available. 

187 DecryptionFailedError: Decryption failed (wrong key or corrupt data). 

188 

189 Example:: 

190 # Type-safe: IDE knows return type is BTHomeData 

191 data = device.advertising.process(ad_data, interpreter=BTHomeInterpreter) 

192 

193 # Auto-detect: returns object 

194 data = device.advertising.process(ad_data) 

195 

196 """ 

197 # Type-safe path: use interpreter class directly 

198 if interpreter is not None: 

199 interp_instance = interpreter(self._mac_address) 

200 cached_name = interp_instance.info.name or interpreter.__name__ 

201 # Cache for future auto-detection (variance: T is subtype of object) 

202 self._interpreters[cached_name] = interp_instance # type: ignore[assignment] # Covariant T stored in dict; safe because interpreters are read-only after caching 

203 return self._run_interpreter(interp_instance, advertising_data) 

204 

205 # Try registered interpreters first (returns object for dynamic dispatch) 

206 for registered_interp in self._interpreters.values(): 

207 if registered_interp.supports(advertising_data): 

208 return registered_interp.interpret(advertising_data, self.state) 

209 

210 # Try auto-detection via registry 

211 if self._registry is not None: 

212 detected_class = self._registry.find_interpreter_class(advertising_data) 

213 if detected_class is not None: 

214 # Create instance and cache it 

215 detected_instance = detected_class(self._mac_address) 

216 cached_name = detected_instance.info.name or detected_class.__name__ 

217 self._interpreters[cached_name] = detected_instance 

218 result: object = detected_instance.interpret(advertising_data, self.state) 

219 return result 

220 

221 raise AdvertisingParseError(message="No interpreter found for advertisement") 

222 

223 def _run_interpreter( 

224 self, 

225 interpreter: PayloadInterpreter[T], 

226 advertising_data: AdvertisingData, 

227 ) -> T: 

228 """Run a single interpreter. 

229 

230 Args: 

231 interpreter: The interpreter to run. 

232 advertising_data: Complete advertising data from BLE packet. 

233 

234 Returns: 

235 Parsed data from the interpreter. 

236 

237 Raises: 

238 EncryptionRequiredError: Payload encrypted, no bindkey available. 

239 DecryptionFailedError: Decryption failed. 

240 AdvertisingParseError: General parse failure. 

241 

242 """ 

243 # Interpreter raises exceptions on error, returns data on success 

244 return interpreter.interpret(advertising_data, self.state) 

245 

246 @overload 

247 def process_from_connection_manager( 

248 self, 

249 advertisement: AdvertisementData, 

250 *, 

251 interpreter: type[PayloadInterpreter[T]], 

252 ) -> tuple[AdvertisementData, T]: ... 

253 

254 @overload 

255 def process_from_connection_manager( 

256 self, 

257 advertisement: AdvertisementData, 

258 ) -> tuple[AdvertisementData, object | None]: ... 

259 

260 def process_from_connection_manager( 

261 self, 

262 advertisement: AdvertisementData, 

263 *, 

264 interpreter: type[PayloadInterpreter[T]] | None = None, 

265 ) -> tuple[AdvertisementData, T] | tuple[AdvertisementData, object | None]: 

266 """Process advertisement from connection manager. 

267 

268 Args: 

269 advertisement: AdvertisementData from connection manager 

270 interpreter: Interpreter class for type-safe parsing (optional). 

271 

272 Returns: 

273 Tuple of (processed AdvertisementData, interpreted data or None) 

274 

275 """ 

276 # Convert AdvertisementData to BaseAdvertisingData for processing 

277 base_advertising_data = AdvertisingData( 

278 manufacturer_data=advertisement.ad_structures.core.manufacturer_data, 

279 service_data=advertisement.ad_structures.core.service_data, 

280 local_name=advertisement.ad_structures.core.local_name, 

281 rssi=advertisement.rssi or 0, 

282 ) 

283 

284 # Try to process, catching errors and returning None for data 

285 interpreted_data: T | object | None = None 

286 interpreter_name: str | None = None 

287 

288 try: 

289 if interpreter is not None: 

290 interpreted_data = self.process(base_advertising_data, interpreter=interpreter) 

291 interpreter_name = interpreter.__name__ 

292 else: 

293 interpreted_data = self.process(base_advertising_data) 

294 # Find which interpreter was used 

295 for name, interp in self._interpreters.items(): 

296 if interp.supports(base_advertising_data): 

297 interpreter_name = name 

298 break 

299 except (AdvertisingParseError, EncryptionRequiredError, DecryptionFailedError) as exc: 

300 logger.warning("Advertising parse/decrypt failed for process_advertisement: %s", exc) 

301 

302 # Create enriched AdvertisementData with interpretation 

303 processed_advertisement = AdvertisementData( 

304 ad_structures=advertisement.ad_structures, 

305 rssi=advertisement.rssi, 

306 interpreted_data=interpreted_data, 

307 interpreter_name=interpreter_name, 

308 ) 

309 

310 return processed_advertisement, interpreted_data 

311 

312 @overload 

313 def parse_raw_pdu( 

314 self, 

315 raw_data: bytes, 

316 rssi: int = ..., 

317 *, 

318 interpreter: type[PayloadInterpreter[T]], 

319 ) -> tuple[AdvertisementData, T]: ... 

320 

321 @overload 

322 def parse_raw_pdu( 

323 self, 

324 raw_data: bytes, 

325 rssi: int = ..., 

326 ) -> tuple[AdvertisementData, object | None]: ... 

327 

328 def parse_raw_pdu( 

329 self, 

330 raw_data: bytes, 

331 rssi: int = 0, 

332 *, 

333 interpreter: type[PayloadInterpreter[T]] | None = None, 

334 ) -> tuple[AdvertisementData, T] | tuple[AdvertisementData, object | None]: 

335 """Parse raw advertising PDU bytes directly. 

336 

337 Args: 

338 raw_data: Raw BLE advertising PDU bytes 

339 rssi: Received signal strength in dBm 

340 interpreter: Interpreter class for type-safe parsing (optional). 

341 

342 Returns: 

343 Tuple of (parsed AdvertisementData, interpreted data or None) 

344 

345 """ 

346 # Parse raw PDU bytes 

347 pdu_result = self._pdu_parser.parse_advertising_data(raw_data) 

348 

349 # Process through advertising subsystem (convert to BaseAdvertisingData) 

350 base_advertising_data = AdvertisingData( 

351 manufacturer_data=pdu_result.ad_structures.core.manufacturer_data, 

352 service_data=pdu_result.ad_structures.core.service_data, 

353 local_name=pdu_result.ad_structures.core.local_name, 

354 rssi=rssi, 

355 ) 

356 

357 # Try to process, catching errors and returning None for data 

358 interpreted_data: T | object | None = None 

359 interpreter_name: str | None = None 

360 

361 try: 

362 if interpreter is not None: 

363 interpreted_data = self.process(base_advertising_data, interpreter=interpreter) 

364 interpreter_name = interpreter.__name__ 

365 else: 

366 interpreted_data = self.process(base_advertising_data) 

367 # Find which interpreter was used 

368 for name, interp in self._interpreters.items(): 

369 if interp.supports(base_advertising_data): 

370 interpreter_name = name 

371 break 

372 except (AdvertisingParseError, EncryptionRequiredError, DecryptionFailedError) as exc: 

373 logger.warning("Advertising parse/decrypt failed for parse_raw_pdu: %s", exc) 

374 

375 # Create AdvertisementData with interpretation 

376 advertisement = AdvertisementData( 

377 ad_structures=pdu_result.ad_structures, 

378 rssi=rssi, 

379 interpreted_data=interpreted_data, 

380 interpreter_name=interpreter_name, 

381 ) 

382 

383 return advertisement, interpreted_data 

384 

385 def subscribe( 

386 self, 

387 callback: Callable[[AdvertisementData, object], None], 

388 ) -> None: 

389 """Subscribe to continuous advertisement updates. 

390 

391 Registers a callback that will be invoked whenever new advertisements 

392 are received. Automatically enables backend monitoring when the first 

393 callback is registered. 

394 

395 Args: 

396 callback: Function called with (AdvertisementData, interpreted_data) 

397 when advertisements are received. interpreted_data is None 

398 if parsing failed. 

399 """ 

400 self._callbacks.append(callback) 

401 

402 # Automatically enable backend monitoring for first subscriber 

403 if len(self._callbacks) == 1: 

404 self._enable_backend_monitoring() 

405 

406 def unsubscribe(self, callback: Callable[[AdvertisementData, object], None] | None = None) -> None: 

407 """Unsubscribe from advertisement updates. 

408 

409 If callback is provided, removes only that specific callback. 

410 If no callback is provided, removes all callbacks. 

411 Automatically disables backend monitoring when no callbacks remain. 

412 

413 Args: 

414 callback: Specific callback to remove, or None to remove all 

415 

416 """ 

417 if callback is None: 

418 self._callbacks.clear() 

419 else: 

420 try: 

421 self._callbacks.remove(callback) 

422 except ValueError: 

423 logger.warning("Callback not found in subscriptions") 

424 

425 # Automatically disable backend monitoring when no callbacks remain 

426 if not self._callbacks and self._backend_monitoring_enabled: 

427 self._disable_backend_monitoring() 

428 

429 def _dispatch_to_callbacks( 

430 self, 

431 advertisement: AdvertisementData, 

432 interpreted_data: object, 

433 ) -> None: 

434 """Dispatch advertisement to all registered callbacks. 

435 

436 Args: 

437 advertisement: Enriched AdvertisementData with interpreted_data 

438 interpreted_data: Parsed data from interpreter, or None if parsing failed 

439 

440 """ 

441 for callback in self._callbacks: 

442 try: 

443 callback(advertisement, interpreted_data) 

444 except Exception: # pylint: disable=broad-exception-caught # User callbacks may raise anything 

445 logger.exception("Advertisement callback raised exception") 

446 

447 def _enable_backend_monitoring(self) -> None: 

448 """Enable backend monitoring (internal method).""" 

449 if self._backend_monitoring_enabled: 

450 return 

451 

452 try: 

453 self._connection_manager.register_advertisement_callback(self._backend_callback_handler) 

454 self._backend_monitoring_enabled = True 

455 except NotImplementedError: 

456 backend_name = self._connection_manager.__class__.__name__ 

457 logger.warning( 

458 "Backend %s does not support advertisement monitoring. " 

459 "Callbacks will not receive automatic updates. " 

460 "Use polling methods like refresh_advertisement() instead.", 

461 backend_name, 

462 ) 

463 

464 def _disable_backend_monitoring(self) -> None: 

465 """Disable backend monitoring (internal method).""" 

466 if self._backend_monitoring_enabled: 

467 self._connection_manager.unregister_advertisement_callback(self._backend_callback_handler) 

468 self._backend_monitoring_enabled = False 

469 

470 def _backend_callback_handler(self, raw_advertisement: AdvertisementData) -> None: 

471 """Process raw advertisement and dispatch to subscribers.""" 

472 try: 

473 processed_ad, interpreted_data = self.process_from_connection_manager(raw_advertisement) 

474 if self._callbacks: 

475 self._dispatch_to_callbacks(processed_ad, interpreted_data) 

476 except Exception: # pylint: disable=broad-exception-caught # Catch-all to prevent backend crashes 

477 logger.exception("Error processing backend advertisement")