Coverage for src/bluetooth_sig/device/device.py: 52%

240 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Device class for grouping BLE device services, characteristics, encryption, and advertiser data. 

2 

3This module provides a high-level Device abstraction that groups all 

4services, characteristics, encryption requirements, and advertiser data 

5for a BLE device. It integrates with the BluetoothSIGTranslator for 

6parsing while providing a unified view of device state. 

7""" 

8 

9from __future__ import annotations 

10 

11import logging 

12import re 

13from abc import abstractmethod 

14from typing import Any, Callable, Protocol, cast 

15 

16from ..gatt.characteristics import CharacteristicName 

17from ..gatt.context import CharacteristicContext, DeviceInfo 

18from ..gatt.descriptors.registry import DescriptorRegistry 

19from ..gatt.services import GattServiceRegistry, ServiceName 

20from ..gatt.services.base import BaseGattService, UnknownService 

21from ..types import ( 

22 CharacteristicDataProtocol, 

23 DeviceAdvertiserData, 

24) 

25from ..types.data_types import CharacteristicData 

26from ..types.device_types import DeviceEncryption, DeviceService 

27from ..types.gatt_enums import GattProperty 

28from ..types.uuid import BluetoothUUID 

29from .advertising_parser import AdvertisingParser 

30from .connection import ConnectionManagerProtocol 

31 

32__all__ = [ 

33 "Device", 

34 "SIGTranslatorProtocol", 

35 "UnknownService", 

36] 

37 

38 

39class SIGTranslatorProtocol(Protocol): # pylint: disable=too-few-public-methods 

40 """Protocol for SIG translator interface.""" 

41 

42 @abstractmethod 

43 def parse_characteristics( 

44 self, 

45 char_data: dict[str, bytes], 

46 descriptor_data: dict[str, dict[str, bytes]] | None = None, 

47 ctx: CharacteristicContext | None = None, 

48 ) -> dict[str, CharacteristicData]: 

49 """Parse multiple characteristics at once.""" 

50 

51 @abstractmethod 

52 def parse_characteristic( 

53 self, 

54 uuid: str, 

55 raw_data: bytes, 

56 ctx: CharacteristicContext | None = None, 

57 descriptor_data: dict[str, bytes] | None = None, 

58 ) -> CharacteristicData: 

59 """Parse a single characteristic's raw bytes.""" 

60 

61 @abstractmethod 

62 def get_characteristic_uuid_by_name(self, name: CharacteristicName) -> BluetoothUUID | None: 

63 """Get the UUID for a characteristic name enum (enum-only API).""" 

64 

65 @abstractmethod 

66 def get_service_uuid_by_name(self, name: str | ServiceName) -> BluetoothUUID | None: 

67 """Get the UUID for a service name or enum.""" 

68 

69 def get_characteristic_info_by_name(self, name: CharacteristicName) -> Any | None: # noqa: ANN401 # Adapter-specific characteristic info 

70 """Get characteristic info by enum name (optional method).""" 

71 

72 

73def _is_uuid_like(value: str) -> bool: 

74 """Check if a string looks like a Bluetooth UUID.""" 

75 # Remove dashes and check if it's a valid hex string of UUID length 

76 clean = value.replace("-", "") 

77 return bool(re.match(r"^[0-9A-Fa-f]+$", clean)) and len(clean) in [4, 8, 32] 

78 

79 

80class Device: # pylint: disable=too-many-instance-attributes,too-many-public-methods 

81 r"""High-level BLE device abstraction. 

82 

83 This class groups all services, characteristics, encryption requirements, and 

84 advertiser data for a BLE device. It integrates with 

85 [BluetoothSIGTranslator][bluetooth_sig.BluetoothSIGTranslator] 

86 for parsing while providing a unified view of device state. 

87 

88 Key features: 

89 - Parse advertiser data from BLE scan results 

90 - Add and manage GATT services with their characteristics 

91 - Access parsed characteristic data by UUID 

92 - Handle device encryption requirements 

93 - Cache device information for performance 

94 

95 Example: 

96 Create and configure a device: 

97 

98 ```python 

99 from bluetooth_sig import BluetoothSIGTranslator, Device 

100 

101 translator = BluetoothSIGTranslator() 

102 device = Device("AA:BB:CC:DD:EE:FF", translator) 

103 

104 # Add a service 

105 device.add_service("180F", {"2A19": b"\\x64"}) # Battery service 

106 

107 # Get parsed data 

108 battery = device.get_characteristic_data("2A19") 

109 print(f"Battery: {battery.value}%") 

110 ``` 

111 

112 """ 

113 

114 def __init__(self, address: str, translator: SIGTranslatorProtocol) -> None: 

115 """Initialise Device instance with address and translator. 

116 

117 Args: 

118 address: BLE device address 

119 translator: SIGTranslatorProtocol instance 

120 

121 """ 

122 self.address = address 

123 self.translator = translator 

124 # Optional connection manager implementing ConnectionManagerProtocol 

125 self.connection_manager: ConnectionManagerProtocol | None = None 

126 self._name: str = "" 

127 self.services: dict[str, DeviceService] = {} 

128 self.encryption = DeviceEncryption() 

129 self.advertiser_data = DeviceAdvertiserData(raw_data=b"") 

130 

131 # Advertising parser for handling advertising data 

132 self.advertising_parser = AdvertisingParser() 

133 

134 # Cache for device_info property 

135 self._device_info_cache: DeviceInfo | None = None 

136 

137 def __str__(self) -> str: 

138 """Return string representation of Device. 

139 

140 Returns: 

141 str: String representation of Device. 

142 

143 """ 

144 service_count = len(self.services) 

145 char_count = sum(len(service.characteristics) for service in self.services.values()) 

146 return f"Device({self.address}, name={self.name}, {service_count} services, {char_count} characteristics)" 

147 

148 def add_service( 

149 self, 

150 service_name: str | ServiceName, 

151 characteristics: dict[str, bytes], 

152 descriptors: dict[str, dict[str, bytes]] | None = None, 

153 ) -> None: 

154 """Add a service to the device with its characteristics and descriptors. 

155 

156 Args: 

157 service_name: Name or enum of the service to add 

158 characteristics: Dictionary mapping characteristic UUIDs to raw data 

159 descriptors: Optional nested dict mapping char_uuid -> desc_uuid -> raw data 

160 

161 """ 

162 # Resolve service UUID: accept UUID-like strings directly, else ask translator 

163 # service_uuid can be a BluetoothUUID or None (translator may return None) 

164 service_uuid: BluetoothUUID | None 

165 if isinstance(service_name, str) and _is_uuid_like(service_name): 

166 service_uuid = BluetoothUUID(service_name) 

167 else: 

168 service_uuid = self.translator.get_service_uuid_by_name(service_name) 

169 

170 if not service_uuid: 

171 # No UUID found - this is an error condition 

172 service_name_str = service_name if isinstance(service_name, str) else service_name.value 

173 raise ValueError( 

174 f"Cannot resolve service UUID for '{service_name_str}'. " 

175 "Service name not found in registry and not a valid UUID format." 

176 ) 

177 

178 service_class = GattServiceRegistry.get_service_class(service_uuid) 

179 service: BaseGattService 

180 if not service_class: 

181 service = UnknownService(uuid=service_uuid) 

182 else: 

183 service = service_class() 

184 

185 device_info = DeviceInfo( 

186 address=self.address, 

187 name=self.name, 

188 manufacturer_data=self.advertiser_data.manufacturer_data, 

189 service_uuids=self.advertiser_data.service_uuids, 

190 ) 

191 

192 base_ctx = CharacteristicContext(device_info=device_info) 

193 

194 parsed_characteristics = self.translator.parse_characteristics(characteristics, descriptors, ctx=base_ctx) 

195 

196 for char_data in parsed_characteristics.values(): 

197 self.update_encryption_requirements(char_data) 

198 

199 # Process descriptors if provided 

200 if descriptors: 

201 self._process_descriptors(descriptors, parsed_characteristics) 

202 

203 characteristics_cast = cast(dict[str, CharacteristicDataProtocol], parsed_characteristics) 

204 device_service = DeviceService(service=service, characteristics=characteristics_cast) 

205 

206 service_key = service_name if isinstance(service_name, str) else service_name.value 

207 self.services[service_key] = device_service 

208 

209 def _process_descriptors( 

210 self, descriptors: dict[str, dict[str, bytes]], parsed_characteristics: dict[str, Any] 

211 ) -> None: 

212 """Process and store descriptor data for characteristics. 

213 

214 Args: 

215 descriptors: Nested dict mapping char_uuid -> desc_uuid -> raw data 

216 parsed_characteristics: Already parsed characteristic data 

217 """ 

218 for char_uuid, char_descriptors in descriptors.items(): 

219 if char_uuid not in parsed_characteristics: 

220 continue # Skip descriptors for unknown characteristics 

221 

222 char_data = parsed_characteristics[char_uuid] 

223 if not hasattr(char_data, "add_descriptor"): 

224 continue # Characteristic doesn't support descriptors 

225 

226 for desc_uuid, _desc_data in char_descriptors.items(): 

227 descriptor = DescriptorRegistry.create_descriptor(desc_uuid) 

228 if descriptor: 

229 try: 

230 char_data.add_descriptor(descriptor) 

231 except Exception: # pylint: disable=broad-exception-caught 

232 # Skip malformed descriptors 

233 continue 

234 

235 def attach_connection_manager(self, manager: ConnectionManagerProtocol) -> None: 

236 """Attach a connection manager to handle BLE connections. 

237 

238 Args: 

239 manager: Connection manager implementing the ConnectionManagerProtocol 

240 

241 """ 

242 self.connection_manager = manager 

243 

244 async def detach_connection_manager(self) -> None: 

245 """Detach the current connection manager and disconnect if connected. 

246 

247 Disconnects if a connection manager is present, then removes it. 

248 """ 

249 if self.connection_manager: 

250 await self.disconnect() 

251 self.connection_manager = None 

252 

253 async def connect(self) -> None: 

254 """Connect to the BLE device. 

255 

256 Raises: 

257 RuntimeError: If no connection manager is attached 

258 

259 """ 

260 if not self.connection_manager: 

261 raise RuntimeError("No connection manager attached to Device") 

262 await self.connection_manager.connect() 

263 

264 async def disconnect(self) -> None: 

265 """Disconnect from the BLE device. 

266 

267 Raises: 

268 RuntimeError: If no connection manager is attached 

269 

270 """ 

271 if not self.connection_manager: 

272 raise RuntimeError("No connection manager attached to Device") 

273 await self.connection_manager.disconnect() 

274 

275 async def read(self, char_name: str | CharacteristicName) -> Any | None: # noqa: ANN401 # Returns characteristic-specific types 

276 """Read a characteristic value from the device. 

277 

278 Args: 

279 char_name: Name or enum of the characteristic to read 

280 

281 Returns: 

282 Parsed characteristic value or None if read fails 

283 

284 Raises: 

285 RuntimeError: If no connection manager is attached 

286 

287 """ 

288 if not self.connection_manager: 

289 raise RuntimeError("No connection manager attached to Device") 

290 

291 resolved_uuid = self._resolve_characteristic_name(char_name) 

292 raw = await self.connection_manager.read_gatt_char(resolved_uuid) 

293 parsed = self.translator.parse_characteristic(str(resolved_uuid), raw, descriptor_data=None) 

294 return parsed 

295 

296 async def write(self, char_name: str | CharacteristicName, data: bytes) -> None: 

297 """Write data to a characteristic on the device. 

298 

299 Args: 

300 char_name: Name or enum of the characteristic to write to 

301 data: Raw bytes to write 

302 

303 Raises: 

304 RuntimeError: If no connection manager is attached 

305 

306 """ 

307 if not self.connection_manager: 

308 raise RuntimeError("No connection manager attached to Device") 

309 

310 resolved_uuid = self._resolve_characteristic_name(char_name) 

311 await self.connection_manager.write_gatt_char(resolved_uuid, data) 

312 

313 async def start_notify(self, char_name: str | CharacteristicName, callback: Callable[[Any], None]) -> None: 

314 """Start notifications for a characteristic. 

315 

316 Args: 

317 char_name: Name or enum of the characteristic to monitor 

318 callback: Function to call when notifications are received 

319 

320 Raises: 

321 RuntimeError: If no connection manager is attached 

322 

323 """ 

324 if not self.connection_manager: 

325 raise RuntimeError("No connection manager attached to Device") 

326 

327 resolved_uuid = self._resolve_characteristic_name(char_name) 

328 

329 def _internal_cb(sender: str, data: bytes) -> None: 

330 parsed = self.translator.parse_characteristic(sender, data, descriptor_data=None) 

331 try: 

332 callback(parsed) 

333 except Exception as exc: # pylint: disable=broad-exception-caught 

334 logging.exception("Notification callback raised an exception: %s", exc) 

335 

336 await self.connection_manager.start_notify(resolved_uuid, _internal_cb) 

337 

338 def _resolve_characteristic_name(self, identifier: str | CharacteristicName) -> BluetoothUUID: 

339 """Resolve a characteristic name or enum to its UUID. 

340 

341 Args: 

342 identifier: Characteristic name string or enum 

343 

344 Returns: 

345 Characteristic UUID string 

346 

347 Raises: 

348 ValueError: If the characteristic name cannot be resolved 

349 

350 """ 

351 if isinstance(identifier, CharacteristicName): 

352 # For enum inputs, ask the translator for the UUID 

353 uuid = self.translator.get_characteristic_uuid_by_name(identifier) 

354 if uuid: 

355 return uuid 

356 norm = identifier.value.strip() 

357 else: 

358 norm = identifier 

359 stripped = norm.replace("-", "") 

360 if len(stripped) in (4, 8, 32) and all(c in "0123456789abcdefABCDEF" for c in stripped): 

361 return BluetoothUUID(norm) 

362 

363 raise ValueError(f"Unknown characteristic name: '{identifier}'") 

364 

365 async def stop_notify(self, char_name: str | CharacteristicName) -> None: 

366 """Stop notifications for a characteristic. 

367 

368 Args: 

369 char_name: Characteristic name or UUID 

370 

371 """ 

372 if not self.connection_manager: 

373 raise RuntimeError("No connection manager attached") 

374 

375 resolved_uuid = self._resolve_characteristic_name(char_name) 

376 await self.connection_manager.stop_notify(resolved_uuid) 

377 

378 def parse_advertiser_data(self, raw_data: bytes) -> None: 

379 """Parse raw advertising data and update device information. 

380 

381 Args: 

382 raw_data: Raw bytes from BLE advertising packet 

383 

384 """ 

385 parsed_data = self.advertising_parser.parse_advertising_data(raw_data) 

386 self.advertiser_data = parsed_data 

387 

388 # Update device name if not set 

389 if parsed_data.local_name and not self.name: 

390 self.name = parsed_data.local_name 

391 

392 def get_characteristic_data( 

393 self, service_name: str | ServiceName, char_uuid: str 

394 ) -> CharacteristicDataProtocol | None: 

395 """Get parsed characteristic data for a specific service and characteristic. 

396 

397 Args: 

398 service_name: Name or enum of the service 

399 char_uuid: UUID of the characteristic 

400 

401 Returns: 

402 Parsed characteristic data or None if not found. 

403 

404 """ 

405 service_key = service_name if isinstance(service_name, str) else service_name.value 

406 service = self.services.get(service_key) 

407 if service: 

408 return service.characteristics.get(char_uuid) 

409 return None 

410 

411 def update_encryption_requirements(self, char_data: CharacteristicData) -> None: 

412 """Update device encryption requirements based on characteristic properties. 

413 

414 Args: 

415 char_data: The parsed characteristic data with properties 

416 

417 """ 

418 properties = char_data.properties 

419 

420 # Check for encryption requirements 

421 encrypt_props = [GattProperty.ENCRYPT_READ, GattProperty.ENCRYPT_WRITE, GattProperty.ENCRYPT_NOTIFY] 

422 if any(prop in properties for prop in encrypt_props): 

423 self.encryption.requires_encryption = True 

424 

425 # Check for authentication requirements 

426 auth_props = [GattProperty.AUTH_READ, GattProperty.AUTH_WRITE, GattProperty.AUTH_NOTIFY] 

427 if any(prop in properties for prop in auth_props): 

428 self.encryption.requires_authentication = True 

429 

430 async def discover_services(self) -> dict[str, Any]: 

431 """Discover all services and characteristics from the device. 

432 

433 Returns: 

434 Dictionary mapping service UUIDs to service information 

435 

436 Raises: 

437 RuntimeError: If no connection manager is attached 

438 

439 """ 

440 if not self.connection_manager: 

441 raise RuntimeError("No connection manager attached to Device") 

442 

443 services_data = await self.connection_manager.get_services() 

444 

445 # Store discovered services in our internal structure 

446 for service_info in services_data: 

447 service_uuid = service_info.uuid 

448 if service_uuid not in self.services: 

449 # Create a service instance - we'll use UnknownService for undiscovered services 

450 service_instance = UnknownService(uuid=BluetoothUUID(service_uuid)) 

451 device_service = DeviceService(service=service_instance, characteristics={}) 

452 self.services[service_uuid] = device_service 

453 

454 # Add characteristics to the service 

455 for char_info in service_info.characteristics: 

456 char_uuid = char_info.uuid 

457 self.services[service_uuid].characteristics[char_uuid] = char_info 

458 

459 return dict(self.services) 

460 

461 async def get_characteristic_info(self, char_uuid: str) -> Any | None: # noqa: ANN401 # Adapter-specific characteristic metadata 

462 """Get information about a characteristic from the connection manager. 

463 

464 Args: 

465 char_uuid: UUID of the characteristic 

466 

467 Returns: 

468 Characteristic information or None if not found 

469 

470 Raises: 

471 RuntimeError: If no connection manager is attached 

472 

473 """ 

474 if not self.connection_manager: 

475 raise RuntimeError("No connection manager attached to Device") 

476 

477 services_data = await self.connection_manager.get_services() 

478 for service_info in services_data: 

479 for char_info in service_info.characteristics: 

480 if char_info.uuid == char_uuid: 

481 return char_info 

482 return None 

483 

484 async def read_multiple(self, char_names: list[str | CharacteristicName]) -> dict[str, Any | None]: 

485 """Read multiple characteristics in batch. 

486 

487 Args: 

488 char_names: List of characteristic names or enums to read 

489 

490 Returns: 

491 Dictionary mapping characteristic UUIDs to parsed values 

492 

493 Raises: 

494 RuntimeError: If no connection manager is attached 

495 

496 """ 

497 if not self.connection_manager: 

498 raise RuntimeError("No connection manager attached to Device") 

499 

500 results: dict[str, Any | None] = {} 

501 for char_name in char_names: 

502 try: 

503 value = await self.read(char_name) 

504 resolved_uuid = self._resolve_characteristic_name(char_name) 

505 results[str(resolved_uuid)] = value 

506 except Exception as exc: # pylint: disable=broad-exception-caught 

507 resolved_uuid = self._resolve_characteristic_name(char_name) 

508 results[str(resolved_uuid)] = None 

509 logging.warning("Failed to read characteristic %s: %s", char_name, exc) 

510 

511 return results 

512 

513 async def write_multiple(self, data_map: dict[str | CharacteristicName, bytes]) -> dict[str, bool]: 

514 """Write to multiple characteristics in batch. 

515 

516 Args: 

517 data_map: Dictionary mapping characteristic names/enums to data bytes 

518 

519 Returns: 

520 Dictionary mapping characteristic UUIDs to success status 

521 

522 Raises: 

523 RuntimeError: If no connection manager is attached 

524 

525 """ 

526 if not self.connection_manager: 

527 raise RuntimeError("No connection manager attached to Device") 

528 

529 results: dict[str, bool] = {} 

530 for char_name, data in data_map.items(): 

531 try: 

532 await self.write(char_name, data) 

533 resolved_uuid = self._resolve_characteristic_name(char_name) 

534 results[str(resolved_uuid)] = True 

535 except Exception as exc: # pylint: disable=broad-exception-caught 

536 resolved_uuid = self._resolve_characteristic_name(char_name) 

537 results[str(resolved_uuid)] = False 

538 logging.warning("Failed to write characteristic %s: %s", char_name, exc) 

539 

540 return results 

541 

542 @property 

543 def device_info(self) -> DeviceInfo: 

544 """Get cached device info object. 

545 

546 Returns: 

547 DeviceInfo with current device metadata 

548 

549 """ 

550 if self._device_info_cache is None: 

551 self._device_info_cache = DeviceInfo( 

552 address=self.address, 

553 name=self.name, 

554 manufacturer_data=self.advertiser_data.manufacturer_data, 

555 service_uuids=self.advertiser_data.service_uuids, 

556 ) 

557 else: 

558 # Update existing cache object with current data 

559 self._device_info_cache.name = self.name 

560 self._device_info_cache.manufacturer_data = self.advertiser_data.manufacturer_data 

561 self._device_info_cache.service_uuids = self.advertiser_data.service_uuids 

562 return self._device_info_cache 

563 

564 @property 

565 def name(self) -> str: 

566 """Get the device name.""" 

567 return self._name 

568 

569 @name.setter 

570 def name(self, value: str) -> None: 

571 """Set the device name and update cached device_info.""" 

572 self._name = value 

573 # Update existing cache object if it exists 

574 if self._device_info_cache is not None: 

575 self._device_info_cache.name = value 

576 

577 @property 

578 def is_connected(self) -> bool: 

579 """Check if the device is currently connected. 

580 

581 Returns: 

582 True if connected, False otherwise 

583 

584 """ 

585 if self.connection_manager is None: 

586 return False 

587 # Check if the connection manager has an is_connected property 

588 return getattr(self.connection_manager, "is_connected", False) 

589 

590 def get_service_by_uuid(self, service_uuid: str) -> DeviceService | None: 

591 """Get a service by its UUID. 

592 

593 Args: 

594 service_uuid: UUID of the service 

595 

596 Returns: 

597 DeviceService instance or None if not found 

598 

599 """ 

600 return self.services.get(service_uuid) 

601 

602 def get_services_by_name(self, service_name: str | ServiceName) -> list[DeviceService]: 

603 """Get services by name. 

604 

605 Args: 

606 service_name: Name or enum of the service 

607 

608 Returns: 

609 List of matching DeviceService instances 

610 

611 """ 

612 service_uuid = self.translator.get_service_uuid_by_name( 

613 service_name if isinstance(service_name, str) else service_name.value 

614 ) 

615 if service_uuid and str(service_uuid) in self.services: 

616 return [self.services[str(service_uuid)]] 

617 return [] 

618 

619 def list_characteristics(self, service_uuid: str | None = None) -> dict[str, list[str]]: 

620 """List all characteristics, optionally filtered by service. 

621 

622 Args: 

623 service_uuid: Optional service UUID to filter by 

624 

625 Returns: 

626 Dictionary mapping service UUIDs to lists of characteristic UUIDs 

627 

628 """ 

629 if service_uuid: 

630 service = self.services.get(service_uuid) 

631 if service: 

632 return {service_uuid: list(service.characteristics.keys())} 

633 return {} 

634 

635 return {svc_uuid: list(service.characteristics.keys()) for svc_uuid, service in self.services.items()}