Coverage for src / bluetooth_sig / device / device.py: 79%

160 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Device class for grouping BLE device services, characteristics, encryption, and advertiser data. 

2 

3Provides a high-level Device abstraction that groups all services, 

4characteristics, encryption requirements, and advertiser data for a BLE 

5device. 

6""" 

7 

8from __future__ import annotations 

9 

10from collections.abc import Callable 

11from typing import Any, TypeVar, overload 

12 

13from ..advertising.registry import PayloadInterpreterRegistry 

14from ..gatt.characteristics import CharacteristicName 

15from ..gatt.characteristics.base import BaseCharacteristic 

16from ..gatt.context import DeviceInfo 

17from ..gatt.descriptors.base import BaseDescriptor 

18from ..gatt.descriptors.registry import DescriptorRegistry 

19from ..gatt.services import ServiceName 

20from ..types import ( 

21 DescriptorData, 

22 DescriptorInfo, 

23) 

24from ..types.advertising.result import AdvertisementData 

25from ..types.device_types import ScannedDevice 

26from ..types.uuid import BluetoothUUID 

27from .advertising import DeviceAdvertising 

28from .characteristic_io import CharacteristicIO 

29from .client import ClientManagerProtocol 

30from .connected import DeviceConnected, DeviceEncryption, DeviceService 

31from .dependency_resolver import DependencyResolutionMode, DependencyResolver 

32from .protocols import SIGTranslatorProtocol 

33 

34# Type variable for generic characteristic return types 

35T = TypeVar("T") 

36 

37__all__ = [ 

38 "DependencyResolutionMode", 

39 "Device", 

40 "DeviceAdvertising", 

41 "DeviceConnected", 

42 "SIGTranslatorProtocol", 

43] 

44 

45 

46class Device: # pylint: disable=too-many-instance-attributes,too-many-public-methods 

47 r"""High-level BLE device abstraction using composition pattern. 

48 

49 Coordinates between connected GATT operations and advertising packet 

50 interpretation through two subsystems: 

51 

52 - ``device.connected`` — GATT connection, services, characteristics 

53 - ``device.advertising`` — Vendor-specific advertising interpretation 

54 

55 Convenience methods delegate to the appropriate subsystem, so callers 

56 can use ``await device.read("battery_level")`` without knowing which 

57 subsystem handles it. 

58 """ 

59 

60 def __init__(self, connection_manager: ClientManagerProtocol, translator: SIGTranslatorProtocol) -> None: 

61 """Initialise Device instance with connection manager and translator. 

62 

63 Args: 

64 connection_manager: Connection manager implementing ClientManagerProtocol 

65 translator: SIGTranslatorProtocol instance 

66 

67 """ 

68 self.connection_manager = connection_manager 

69 self.translator = translator 

70 self._name: str = "" 

71 

72 # Connected subsystem (composition pattern) 

73 self.connected = DeviceConnected( 

74 mac_address=self.address, 

75 connection_manager=connection_manager, 

76 ) 

77 

78 # Advertising subsystem (composition pattern) 

79 self.advertising = DeviceAdvertising(self.address, connection_manager) 

80 # Set up registry for auto-detection 

81 self.advertising.set_registry(PayloadInterpreterRegistry()) 

82 

83 # Dependency resolution delegate 

84 self._dep_resolver = DependencyResolver(connection_manager, self.connected) 

85 

86 # Characteristic I/O delegate 

87 self._char_io = CharacteristicIO(connection_manager, translator, self._dep_resolver, lambda: self.device_info) 

88 

89 # Cache for device_info property and last advertisement 

90 self._device_info_cache: DeviceInfo | None = None 

91 self._last_advertisement: AdvertisementData | None = None 

92 

93 def __str__(self) -> str: 

94 """Return string representation of Device. 

95 

96 Returns: 

97 str: String representation of Device. 

98 

99 """ 

100 service_count = len(self.connected.services) 

101 char_count = sum(len(service.characteristics) for service in self.connected.services.values()) 

102 return f"Device({self.address}, name={self.name}, {service_count} services, {char_count} characteristics)" 

103 

104 @property 

105 def services(self) -> dict[str, DeviceService]: 

106 """GATT services discovered on device. 

107 

108 Delegates to device.connected.services. 

109 

110 Returns: 

111 Dictionary of service UUID → DeviceService 

112 

113 """ 

114 return self.connected.services 

115 

116 @property 

117 def encryption(self) -> DeviceEncryption: 

118 """Encryption state for connected device. 

119 

120 Delegates to device.connected.encryption. 

121 

122 Returns: 

123 DeviceEncryption instance 

124 

125 """ 

126 return self.connected.encryption 

127 

128 @property 

129 def address(self) -> str: 

130 """Get the device address from the connection manager. 

131 

132 Returns: 

133 BLE device address 

134 

135 """ 

136 return self.connection_manager.address 

137 

138 @staticmethod 

139 async def scan(manager_class: type[ClientManagerProtocol], timeout: float = 5.0) -> list[ScannedDevice]: 

140 """Scan for nearby BLE devices using a specific connection manager. 

141 

142 This is a static method that doesn't require a Device instance. 

143 Use it to discover devices before creating Device instances. 

144 

145 Args: 

146 manager_class: The connection manager class to use for scanning 

147 (e.g., BleakRetryClientManager) 

148 timeout: Scan duration in seconds (default: 5.0) 

149 

150 Returns: 

151 List of discovered devices 

152 

153 Raises: 

154 NotImplementedError: If the connection manager doesn't support scanning 

155 

156 Example:: 

157 

158 from bluetooth_sig.device import Device 

159 from connection_managers.bleak_retry import BleakRetryClientManager 

160 

161 # Scan for devices 

162 devices = await Device.scan(BleakRetryClientManager, timeout=10.0) 

163 

164 # Create Device instance for first discovered device 

165 if devices: 

166 translator = BluetoothSIGTranslator() 

167 device = Device(devices[0].address, translator) 

168 

169 """ 

170 return await manager_class.scan(timeout) 

171 

172 async def connect(self) -> None: 

173 """Connect to the BLE device. 

174 

175 Convenience method that delegates to device.connected.connect(). 

176 

177 Raises: 

178 RuntimeError: If no connection manager is attached 

179 

180 """ 

181 await self.connected.connect() 

182 

183 async def disconnect(self) -> None: 

184 """Disconnect from the BLE device. 

185 

186 Convenience method that delegates to device.connected.disconnect(). 

187 

188 Raises: 

189 RuntimeError: If no connection manager is attached 

190 

191 """ 

192 await self.connected.disconnect() 

193 

194 # ------------------------------------------------------------------ 

195 # Characteristic I/O (delegated to CharacteristicIO) 

196 # ------------------------------------------------------------------ 

197 

198 @overload 

199 async def read( 

200 self, 

201 char: type[BaseCharacteristic[T]], 

202 resolution_mode: DependencyResolutionMode = ..., 

203 ) -> T | None: ... 

204 

205 @overload 

206 async def read( 

207 self, 

208 char: str | CharacteristicName, 

209 resolution_mode: DependencyResolutionMode = ..., 

210 ) -> Any | None: ... # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

211 

212 async def read( 

213 self, 

214 char: str | CharacteristicName | type[BaseCharacteristic[T]], 

215 resolution_mode: DependencyResolutionMode = DependencyResolutionMode.NORMAL, 

216 ) -> T | Any | None: # Runtime UUID dispatch cannot be type-safe 

217 """Read a characteristic value from the device. 

218 

219 Delegates to :class:`CharacteristicIO`. 

220 

221 Args: 

222 char: Name, enum, or characteristic class to read. 

223 resolution_mode: How to handle automatic dependency resolution. 

224 

225 Returns: 

226 Parsed characteristic value or None if read fails. 

227 

228 Raises: 

229 RuntimeError: If no connection manager is attached 

230 ValueError: If required dependencies cannot be resolved 

231 

232 """ 

233 return await self._char_io.read(char, resolution_mode) 

234 

235 @overload 

236 async def write( 

237 self, 

238 char: type[BaseCharacteristic[T]], 

239 data: T, 

240 response: bool = ..., 

241 ) -> None: ... 

242 

243 @overload 

244 async def write( 

245 self, 

246 char: str | CharacteristicName, 

247 data: bytes, 

248 response: bool = ..., 

249 ) -> None: ... 

250 

251 async def write( 

252 self, 

253 char: str | CharacteristicName | type[BaseCharacteristic[T]], 

254 data: bytes | T, 

255 response: bool = True, 

256 ) -> None: 

257 r"""Write data to a characteristic on the device. 

258 

259 Delegates to :class:`CharacteristicIO`. 

260 

261 Args: 

262 char: Name, enum, or characteristic class to write to. 

263 data: Raw bytes (for string/enum) or typed value (for characteristic class). 

264 response: If True, use write-with-response. Default is True. 

265 

266 Raises: 

267 RuntimeError: If no connection manager is attached 

268 CharacteristicEncodeError: If encoding fails (when using characteristic class) 

269 

270 """ 

271 await self._char_io.write(char, data, response=response) # type: ignore[arg-type, misc] # Union narrowing handled by overloads; mypy can't infer across delegation 

272 

273 @overload 

274 async def start_notify( 

275 self, 

276 char: type[BaseCharacteristic[T]], 

277 callback: Callable[[T], None], 

278 ) -> None: ... 

279 

280 @overload 

281 async def start_notify( 

282 self, 

283 char: str | CharacteristicName, 

284 callback: Callable[[Any], None], 

285 ) -> None: ... 

286 

287 async def start_notify( 

288 self, 

289 char: str | CharacteristicName | type[BaseCharacteristic[T]], 

290 callback: Callable[[T], None] | Callable[[Any], None], 

291 ) -> None: 

292 """Start notifications for a characteristic. 

293 

294 Delegates to :class:`CharacteristicIO`. 

295 

296 Args: 

297 char: Name, enum, or characteristic class to monitor. 

298 callback: Function to call when notifications are received. 

299 

300 Raises: 

301 RuntimeError: If no connection manager is attached 

302 

303 """ 

304 await self._char_io.start_notify(char, callback) 

305 

306 async def stop_notify(self, char_name: str | CharacteristicName) -> None: 

307 """Stop notifications for a characteristic. 

308 

309 Delegates to :class:`CharacteristicIO`. 

310 

311 Args: 

312 char_name: Characteristic name or UUID 

313 

314 """ 

315 await self._char_io.stop_notify(char_name) 

316 

317 async def read_descriptor(self, desc_uuid: BluetoothUUID | BaseDescriptor) -> DescriptorData: 

318 """Read a descriptor value from the device. 

319 

320 Args: 

321 desc_uuid: UUID of the descriptor to read or BaseDescriptor instance 

322 

323 Returns: 

324 Parsed descriptor data with metadata 

325 

326 Raises: 

327 RuntimeError: If no connection manager is attached 

328 

329 """ 

330 # Extract UUID from BaseDescriptor if needed 

331 uuid = desc_uuid.uuid if isinstance(desc_uuid, BaseDescriptor) else desc_uuid 

332 

333 raw_data = await self.connected.read_descriptor(uuid) 

334 

335 # Try to create a descriptor instance and parse the data 

336 descriptor = DescriptorRegistry.create_descriptor(str(uuid)) 

337 if descriptor: 

338 return descriptor.parse_value(raw_data) 

339 

340 # If no registered descriptor found, return unparsed DescriptorData 

341 return DescriptorData( 

342 info=DescriptorInfo(uuid=uuid, name="Unknown Descriptor"), 

343 value=raw_data, 

344 raw_data=raw_data, 

345 parse_success=False, 

346 error_message="Unknown descriptor UUID - no parser available", 

347 ) 

348 

349 async def write_descriptor(self, desc_uuid: BluetoothUUID | BaseDescriptor, data: bytes | DescriptorData) -> None: 

350 """Write data to a descriptor on the device. 

351 

352 Args: 

353 desc_uuid: UUID of the descriptor to write to or BaseDescriptor instance 

354 data: Either raw bytes to write, or a DescriptorData object. 

355 If DescriptorData is provided, its raw_data will be written. 

356 

357 Raises: 

358 RuntimeError: If no connection manager is attached 

359 

360 """ 

361 # Extract UUID from BaseDescriptor if needed 

362 uuid = desc_uuid.uuid if isinstance(desc_uuid, BaseDescriptor) else desc_uuid 

363 

364 # Extract raw bytes from DescriptorData if needed 

365 raw_data: bytes 

366 raw_data = data.raw_data if isinstance(data, DescriptorData) else data 

367 

368 await self.connected.write_descriptor(uuid, raw_data) 

369 

370 async def pair(self) -> None: 

371 """Pair with the device. 

372 

373 Convenience method that delegates to device.connected.pair(). 

374 

375 Raises: 

376 RuntimeError: If no connection manager is attached 

377 

378 """ 

379 await self.connected.pair() 

380 

381 async def unpair(self) -> None: 

382 """Unpair from the device. 

383 

384 Convenience method that delegates to device.connected.unpair(). 

385 

386 Raises: 

387 RuntimeError: If no connection manager is attached 

388 

389 """ 

390 await self.connected.unpair() 

391 

392 async def read_rssi(self) -> int: 

393 """Read the RSSI (signal strength) of the connection. 

394 

395 Convenience method that delegates to device.connected.read_rssi(). 

396 

397 Returns: 

398 RSSI value in dBm (typically negative, e.g., -60) 

399 

400 Raises: 

401 RuntimeError: If no connection manager is attached 

402 

403 """ 

404 return await self.connected.read_rssi() 

405 

406 def set_disconnected_callback(self, callback: Callable[[], None]) -> None: 

407 """Set a callback to be invoked when the device disconnects. 

408 

409 Convenience method that delegates to device.connected.set_disconnected_callback(). 

410 

411 Args: 

412 callback: Function to call when disconnection occurs 

413 

414 Raises: 

415 RuntimeError: If no connection manager is attached 

416 

417 """ 

418 self.connected.set_disconnected_callback(callback) 

419 

420 @property 

421 def mtu_size(self) -> int: 

422 """Get the negotiated MTU size in bytes. 

423 

424 Delegates to device.connected.mtu_size. 

425 

426 Returns: 

427 The MTU size negotiated for this connection (typically 23-512 bytes) 

428 

429 Raises: 

430 RuntimeError: If no connection manager is attached 

431 

432 """ 

433 return self.connected.mtu_size 

434 

435 async def refresh_advertisement(self, refresh: bool = False) -> AdvertisementData | None: 

436 """Get advertisement data from the connection manager. 

437 

438 Args: 

439 refresh: If ``True``, perform an active scan for fresh data. 

440 

441 Returns: 

442 Interpreted :class:`AdvertisementData`, or ``None`` if unavailable. 

443 

444 Raises: 

445 RuntimeError: If no connection manager is attached. 

446 

447 """ 

448 if not self.connection_manager: 

449 raise RuntimeError("No connection manager attached to Device") 

450 

451 advertisement = await self.connection_manager.get_latest_advertisement(refresh=refresh) 

452 if advertisement is None: 

453 return None 

454 

455 # Process and cache the advertisement 

456 processed_ad, _result = self.advertising.process_from_connection_manager(advertisement) 

457 self._last_advertisement = processed_ad 

458 

459 # Update device name if not set 

460 if advertisement.ad_structures.core.local_name and not self.name: 

461 self.name = advertisement.ad_structures.core.local_name 

462 

463 return processed_ad 

464 

465 def parse_raw_advertisement(self, raw_data: bytes, rssi: int = 0) -> AdvertisementData: 

466 """Parse raw advertising PDU bytes directly. 

467 

468 Args: 

469 raw_data: Raw BLE advertising PDU bytes. 

470 rssi: Received signal strength in dBm. 

471 

472 Returns: 

473 AdvertisementData with parsed AD structures and vendor interpretation. 

474 

475 """ 

476 # Delegate to advertising subsystem 

477 advertisement, _result = self.advertising.parse_raw_pdu(raw_data, rssi) 

478 self._last_advertisement = advertisement 

479 

480 # Update device name if present and not already set 

481 if advertisement.ad_structures.core.local_name and not self.name: 

482 self.name = advertisement.ad_structures.core.local_name 

483 

484 return advertisement 

485 

486 def get_characteristic_data(self, char_uuid: BluetoothUUID) -> Any | None: # noqa: ANN401 # Heterogeneous cache 

487 """Get parsed characteristic data via ``characteristic.last_parsed``. 

488 

489 Args: 

490 char_uuid: UUID of the characteristic. 

491 

492 Returns: 

493 Parsed characteristic value if found, ``None`` otherwise. 

494 

495 """ 

496 char_instance = self.connected.get_cached_characteristic(char_uuid) 

497 if char_instance is not None: 

498 return char_instance.last_parsed 

499 return None 

500 

501 async def discover_services(self) -> dict[str, Any]: 

502 """Discover services and characteristics from the connected BLE device. 

503 

504 Performs BLE service discovery via the connection manager. The 

505 discovered :class:`DeviceService` objects (with characteristic 

506 instances and runtime properties) are stored in ``self.services``. 

507 

508 Returns: 

509 Dictionary mapping service UUIDs to DeviceService objects. 

510 

511 Raises: 

512 RuntimeError: If no connection manager is attached. 

513 

514 """ 

515 # Delegate to connected subsystem 

516 services_list = await self.connected.discover_services() 

517 

518 # Invalidate device_info cache since services changed 

519 self._device_info_cache = None 

520 

521 # Return as dict for backward compatibility 

522 return {str(svc.uuid): svc for svc in services_list} 

523 

524 async def get_characteristic_info(self, char_uuid: str) -> Any | None: # noqa: ANN401 # Adapter-specific characteristic metadata 

525 """Get information about a characteristic from the connection manager. 

526 

527 Args: 

528 char_uuid: UUID of the characteristic 

529 

530 Returns: 

531 Characteristic information or None if not found 

532 

533 Raises: 

534 RuntimeError: If no connection manager is attached 

535 

536 """ 

537 if not self.connection_manager: 

538 raise RuntimeError("No connection manager attached to Device") 

539 

540 services_data = await self.connection_manager.get_services() 

541 for service_info in services_data: 

542 for char_uuid_key, char_info in service_info.characteristics.items(): 

543 if char_uuid_key == char_uuid: 

544 return char_info 

545 return None 

546 

547 async def read_multiple(self, char_names: list[str | CharacteristicName]) -> dict[str, Any | None]: 

548 """Read multiple characteristics in batch. 

549 

550 Delegates to :class:`CharacteristicIO`. 

551 

552 Args: 

553 char_names: List of characteristic names or enums to read 

554 

555 Returns: 

556 Dictionary mapping characteristic UUIDs to parsed values 

557 

558 """ 

559 return await self._char_io.read_multiple(char_names) 

560 

561 async def write_multiple( 

562 self, data_map: dict[str | CharacteristicName, bytes], response: bool = True 

563 ) -> dict[str, bool]: 

564 """Write to multiple characteristics in batch. 

565 

566 Delegates to :class:`CharacteristicIO`. 

567 

568 Args: 

569 data_map: Dictionary mapping characteristic names/enums to data bytes 

570 response: If True, use write-with-response for all writes. 

571 

572 Returns: 

573 Dictionary mapping characteristic UUIDs to success status 

574 

575 """ 

576 return await self._char_io.write_multiple(data_map, response=response) 

577 

578 @property 

579 def device_info(self) -> DeviceInfo: 

580 """Get cached device info object. 

581 

582 Returns: 

583 DeviceInfo with current device metadata 

584 

585 """ 

586 if self._device_info_cache is None: 

587 self._device_info_cache = DeviceInfo( 

588 address=self.address, 

589 name=self.name, 

590 manufacturer_data=self._last_advertisement.ad_structures.core.manufacturer_data 

591 if self._last_advertisement 

592 else {}, 

593 service_uuids=self._last_advertisement.ad_structures.core.service_uuids 

594 if self._last_advertisement 

595 else [], 

596 ) 

597 else: 

598 # Update existing cache object with current data 

599 self._device_info_cache.name = self.name 

600 if self._last_advertisement: 

601 self._device_info_cache.manufacturer_data = ( 

602 self._last_advertisement.ad_structures.core.manufacturer_data 

603 ) 

604 self._device_info_cache.service_uuids = self._last_advertisement.ad_structures.core.service_uuids 

605 return self._device_info_cache 

606 

607 @property 

608 def name(self) -> str: 

609 """Get the device name.""" 

610 return self._name 

611 

612 @name.setter 

613 def name(self, value: str) -> None: 

614 """Set the device name and update cached device_info.""" 

615 self._name = value 

616 # Update existing cache object if it exists 

617 if self._device_info_cache is not None: 

618 self._device_info_cache.name = value 

619 

620 @property 

621 def is_connected(self) -> bool: 

622 """Check if the device is currently connected. 

623 

624 Delegates to device.connected.is_connected. 

625 

626 Returns: 

627 True if connected, False otherwise 

628 

629 """ 

630 return self.connected.is_connected 

631 

632 @property 

633 def last_advertisement(self) -> AdvertisementData | None: 

634 """Get the last received advertisement data. 

635 

636 This is automatically updated when advertisements are processed via 

637 refresh_advertisement() or parse_raw_advertisement(). 

638 

639 Returns: 

640 AdvertisementData with AD structures and interpreted_data if available, 

641 None if no advertisement has been received yet. 

642 

643 """ 

644 return self._last_advertisement 

645 

646 def get_service_by_uuid(self, service_uuid: str) -> DeviceService | None: 

647 """Get a service by its UUID. 

648 

649 Args: 

650 service_uuid: UUID of the service 

651 

652 Returns: 

653 DeviceService instance or None if not found 

654 

655 """ 

656 return self.services.get(service_uuid) 

657 

658 def get_services_by_name(self, service_name: str | ServiceName) -> list[DeviceService]: 

659 """Get services by name. 

660 

661 Args: 

662 service_name: Name or enum of the service 

663 

664 Returns: 

665 List of matching DeviceService instances 

666 

667 """ 

668 service_uuid = self.translator.get_service_uuid_by_name( 

669 service_name if isinstance(service_name, str) else service_name.value 

670 ) 

671 if service_uuid and str(service_uuid) in self.services: 

672 return [self.services[str(service_uuid)]] 

673 return [] 

674 

675 def list_characteristics(self, service_uuid: str | None = None) -> dict[str, list[str]]: 

676 """List all characteristics, optionally filtered by service. 

677 

678 Args: 

679 service_uuid: Optional service UUID to filter by 

680 

681 Returns: 

682 Dictionary mapping service UUIDs to lists of characteristic UUIDs 

683 

684 """ 

685 if service_uuid: 

686 service = self.services.get(service_uuid) 

687 if service: 

688 return {service_uuid: list(service.characteristics.keys())} 

689 return {} 

690 

691 return {svc_uuid: list(service.characteristics.keys()) for svc_uuid, service in self.services.items()}