Coverage for src / bluetooth_sig / device / device.py: 60%

364 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""Device class for grouping BLE device services, characteristics, encryption, and advertiser data. 

2 

3This module provides a high-level Device abstraction that groups all 

4services, characteristics, encryption requirements, and advertiser data 

5for a BLE device. It integrates with the BluetoothSIGTranslator for 

6parsing while providing a unified view of device state. 

7""" 

8# pylint: disable=too-many-lines # Device abstraction is a cohesive module with related classes 

9# TODO split into multiple files 

10 

11from __future__ import annotations 

12 

13import logging 

14from abc import abstractmethod 

15from enum import Enum 

16from typing import Any, Callable, Protocol, TypeVar, cast, overload 

17 

18from ..advertising import ( 

19 AdvertisingDataInterpreter, 

20 AdvertisingPDUParser, 

21 advertising_interpreter_registry, 

22) 

23from ..gatt.characteristics import CharacteristicName 

24from ..gatt.characteristics.base import BaseCharacteristic 

25from ..gatt.characteristics.registry import CharacteristicRegistry 

26from ..gatt.characteristics.unknown import UnknownCharacteristic 

27from ..gatt.context import CharacteristicContext, DeviceInfo 

28from ..gatt.descriptors.base import BaseDescriptor 

29from ..gatt.descriptors.registry import DescriptorRegistry 

30from ..gatt.services import ServiceName 

31from ..types import ( 

32 AdvertisementData, 

33 AdvertisingData, 

34 CharacteristicInfo, 

35 DescriptorData, 

36 DescriptorInfo, 

37) 

38from ..types.device_types import DeviceEncryption, DeviceService, ScannedDevice 

39from ..types.uuid import BluetoothUUID 

40from .connection import ConnectionManagerProtocol 

41 

42# Type variable for generic characteristic return types 

43T = TypeVar("T") 

44 

45__all__ = [ 

46 "Device", 

47 "DependencyResolutionMode", 

48 "SIGTranslatorProtocol", 

49] 

50 

51 

52class DependencyResolutionMode(Enum): 

53 """Mode for automatic dependency resolution during characteristic reads. 

54 

55 Attributes: 

56 NORMAL: Auto-resolve dependencies, use cache when available 

57 SKIP_DEPENDENCIES: Skip dependency resolution and validation 

58 FORCE_REFRESH: Re-read dependencies from device, ignoring cache 

59 """ 

60 

61 NORMAL = "normal" 

62 SKIP_DEPENDENCIES = "skip_dependencies" 

63 FORCE_REFRESH = "force_refresh" 

64 

65 

66class SIGTranslatorProtocol(Protocol): # pylint: disable=too-few-public-methods 

67 """Protocol for SIG translator interface.""" 

68 

69 @abstractmethod 

70 def parse_characteristics( 

71 self, 

72 char_data: dict[str, bytes], 

73 ctx: CharacteristicContext | None = None, 

74 ) -> dict[str, Any]: 

75 """Parse multiple characteristics at once.""" 

76 

77 @abstractmethod 

78 def parse_characteristic( 

79 self, 

80 uuid: str, 

81 raw_data: bytes, 

82 ctx: CharacteristicContext | None = None, 

83 ) -> Any: # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

84 """Parse a single characteristic's raw bytes.""" 

85 

86 @abstractmethod 

87 def get_characteristic_uuid_by_name(self, name: CharacteristicName) -> BluetoothUUID | None: 

88 """Get the UUID for a characteristic name enum (enum-only API).""" 

89 

90 @abstractmethod 

91 def get_service_uuid_by_name(self, name: str | ServiceName) -> BluetoothUUID | None: 

92 """Get the UUID for a service name or enum.""" 

93 

94 def get_characteristic_info_by_name(self, name: CharacteristicName) -> Any | None: # noqa: ANN401 # Adapter-specific characteristic info 

95 """Get characteristic info by enum name (optional method).""" 

96 

97 

98class Device: # pylint: disable=too-many-instance-attributes,too-many-public-methods 

99 r"""High-level BLE device abstraction. 

100 

101 This class groups all services, characteristics, encryption requirements, and 

102 advertiser data for a BLE device. It integrates with 

103 [BluetoothSIGTranslator][bluetooth_sig.BluetoothSIGTranslator] 

104 for parsing while providing a unified view of device state. 

105 

106 Key features: 

107 - Parse advertiser data from BLE scan results 

108 - Discover GATT services and characteristics via connection manager 

109 - Access parsed characteristic data by UUID 

110 - Handle device encryption requirements 

111 - Cache device information for performance 

112 

113 Example: 

114 Create and configure a device:: 

115 

116 from bluetooth_sig import BluetoothSIGTranslator 

117 from bluetooth_sig.device import Device 

118 

119 translator = BluetoothSIGTranslator() 

120 device = Device("AA:BB:CC:DD:EE:FF", translator) 

121 

122 # Attach connection manager and discover services 

123 device.attach_connection_manager(manager) 

124 await device.connect() 

125 await device.discover_services() 

126 

127 # Read characteristic 

128 battery = await device.read("battery_level") 

129 print(f"Battery: {battery.value}%") 

130 

131 """ 

132 

133 def __init__(self, connection_manager: ConnectionManagerProtocol, translator: SIGTranslatorProtocol) -> None: 

134 """Initialise Device instance with connection manager and translator. 

135 

136 Args: 

137 connection_manager: Connection manager implementing ConnectionManagerProtocol 

138 translator: SIGTranslatorProtocol instance 

139 

140 """ 

141 self.connection_manager = connection_manager 

142 self.translator = translator 

143 self._name: str = "" 

144 self.services: dict[str, DeviceService] = {} 

145 self.encryption = DeviceEncryption() 

146 self.advertiser_data = AdvertisingData(raw_data=b"") 

147 

148 # Advertising PDU parser for handling raw advertising data 

149 self._pdu_parser = AdvertisingPDUParser() 

150 

151 # Cache of vendor-specific advertising interpreters (keyed by class name) 

152 self._advertising_interpreters: dict[str, AdvertisingDataInterpreter[Any]] = {} 

153 

154 # Optional bindkey for encrypted advertisements 

155 self._advertising_bindkey: bytes | None = None 

156 

157 # Cache for device_info property 

158 self._device_info_cache: DeviceInfo | None = None 

159 

160 # Last interpreted advertisement (with vendor-specific data) 

161 self._last_interpreted_advertisement: AdvertisementData | None = None 

162 

163 def __str__(self) -> str: 

164 """Return string representation of Device. 

165 

166 Returns: 

167 str: String representation of Device. 

168 

169 """ 

170 service_count = len(self.services) 

171 char_count = sum(len(service.characteristics) for service in self.services.values()) 

172 return f"Device({self.address}, name={self.name}, {service_count} services, {char_count} characteristics)" 

173 

174 @property 

175 def address(self) -> str: 

176 """Get the device address from the connection manager. 

177 

178 Returns: 

179 BLE device address 

180 

181 """ 

182 return self.connection_manager.address 

183 

184 @staticmethod 

185 async def scan(manager_class: type[ConnectionManagerProtocol], timeout: float = 5.0) -> list[ScannedDevice]: 

186 """Scan for nearby BLE devices using a specific connection manager. 

187 

188 This is a static method that doesn't require a Device instance. 

189 Use it to discover devices before creating Device instances. 

190 

191 Args: 

192 manager_class: The connection manager class to use for scanning 

193 (e.g., BleakRetryConnectionManager) 

194 timeout: Scan duration in seconds (default: 5.0) 

195 

196 Returns: 

197 List of discovered devices 

198 

199 Raises: 

200 NotImplementedError: If the connection manager doesn't support scanning 

201 

202 Example:: 

203 

204 from bluetooth_sig.device import Device 

205 from connection_managers.bleak_retry import BleakRetryConnectionManager 

206 

207 # Scan for devices 

208 devices = await Device.scan(BleakRetryConnectionManager, timeout=10.0) 

209 

210 # Create Device instance for first discovered device 

211 if devices: 

212 translator = BluetoothSIGTranslator() 

213 device = Device(devices[0].address, translator) 

214 

215 """ 

216 return await manager_class.scan(timeout) 

217 

218 async def connect(self) -> None: 

219 """Connect to the BLE device. 

220 

221 Raises: 

222 RuntimeError: If no connection manager is attached 

223 

224 """ 

225 if not self.connection_manager: 

226 raise RuntimeError("No connection manager attached to Device") 

227 await self.connection_manager.connect() 

228 

229 async def disconnect(self) -> None: 

230 """Disconnect from the BLE device. 

231 

232 Raises: 

233 RuntimeError: If no connection manager is attached 

234 

235 """ 

236 if not self.connection_manager: 

237 raise RuntimeError("No connection manager attached to Device") 

238 await self.connection_manager.disconnect() 

239 

240 def _get_cached_characteristic(self, char_uuid: BluetoothUUID) -> BaseCharacteristic[Any] | None: 

241 """Get cached characteristic instance from services. 

242 

243 Single source of truth for characteristics - searches across all services. 

244 Access parsed data via characteristic.last_parsed property. 

245 

246 Args: 

247 char_uuid: UUID of the characteristic to find 

248 

249 Returns: BaseCharacteristic[Any] instance if found, None otherwise 

250 

251 """ 

252 char_uuid_str = str(char_uuid) 

253 for service in self.services.values(): 

254 if char_uuid_str in service.characteristics: 

255 return service.characteristics[char_uuid_str] 

256 return None 

257 

258 def _cache_characteristic(self, char_uuid: BluetoothUUID, char_instance: BaseCharacteristic[Any]) -> None: 

259 """Store characteristic instance in services cache. 

260 

261 Only updates existing characteristic entries - does not create new services. 

262 Characteristics must belong to a discovered service. 

263 

264 Args: 

265 char_uuid: UUID of the characteristic 

266 char_instance: BaseCharacteristic[Any] instance to cache 

267 

268 """ 

269 char_uuid_str = str(char_uuid) 

270 # Find existing service that should contain this characteristic 

271 for service in self.services.values(): 

272 if char_uuid_str in service.characteristics: 

273 service.characteristics[char_uuid_str] = char_instance 

274 return 

275 # Characteristic not in any discovered service - warn about missing service 

276 logging.warning( 

277 "Cannot cache characteristic %s - not found in any discovered service. Run discover_services() first.", 

278 char_uuid_str, 

279 ) 

280 

281 def _create_unknown_characteristic(self, dep_uuid: BluetoothUUID) -> BaseCharacteristic[Any]: 

282 """Create an unknown characteristic instance for a UUID not in registry. 

283 

284 Args: 

285 dep_uuid: UUID of the unknown characteristic 

286 

287 Returns: 

288 UnknownCharacteristic instance 

289 

290 """ 

291 dep_uuid_str = str(dep_uuid) 

292 char_info = CharacteristicInfo(uuid=dep_uuid, name=f"Unknown-{dep_uuid_str}") 

293 return UnknownCharacteristic(info=char_info) 

294 

295 async def _resolve_single_dependency( 

296 self, 

297 dep_uuid: BluetoothUUID, 

298 is_required: bool, 

299 dep_class: type[BaseCharacteristic[Any]], 

300 ) -> Any | None: # noqa: ANN401 # Dependency can be any characteristic type 

301 """Resolve a single dependency by reading and parsing it. 

302 

303 Args: 

304 dep_uuid: UUID of the dependency characteristic 

305 is_required: Whether this is a required dependency 

306 dep_class: The dependency characteristic class 

307 

308 Returns: 

309 Parsed characteristic data, or None if optional and failed 

310 

311 Raises: 

312 ValueError: If required dependency fails to read 

313 

314 """ 

315 if not self.connection_manager: 

316 raise RuntimeError("No connection manager attached") 

317 

318 dep_uuid_str = str(dep_uuid) 

319 

320 try: 

321 raw_data = await self.connection_manager.read_gatt_char(dep_uuid) 

322 

323 # Get or create characteristic instance 

324 char_instance = self._get_cached_characteristic(dep_uuid) 

325 if char_instance is None: 

326 # Create a new characteristic instance using registry 

327 char_class_or_none = CharacteristicRegistry.get_characteristic_class_by_uuid(dep_uuid) 

328 if char_class_or_none: 

329 char_instance = char_class_or_none() 

330 else: 

331 char_instance = self._create_unknown_characteristic(dep_uuid) 

332 

333 # Cache the instance 

334 self._cache_characteristic(dep_uuid, char_instance) 

335 

336 # Parse using the characteristic instance 

337 return char_instance.parse_value(raw_data) 

338 

339 except Exception as e: # pylint: disable=broad-exception-caught 

340 if is_required: 

341 raise ValueError( 

342 f"Failed to read required dependency {dep_class.__name__} ({dep_uuid_str}): {e}" 

343 ) from e 

344 # Optional dependency failed, log and continue 

345 logging.warning("Failed to read optional dependency %s: %s", dep_class.__name__, e) 

346 return None 

347 

348 async def _ensure_dependencies_resolved( 

349 self, 

350 char_class: type[BaseCharacteristic[Any]], 

351 resolution_mode: DependencyResolutionMode, 

352 ) -> CharacteristicContext: 

353 """Ensure all dependencies for a characteristic are resolved. 

354 

355 This method automatically reads feature characteristics needed for validation 

356 of measurement characteristics. Feature characteristics are cached after first read. 

357 

358 Args: 

359 char_class: The characteristic class to resolve dependencies for 

360 resolution_mode: How to handle dependency resolution 

361 

362 Returns: 

363 CharacteristicContext with resolved dependencies 

364 

365 Raises: 

366 RuntimeError: If no connection manager is attached 

367 

368 """ 

369 if not self.connection_manager: 

370 raise RuntimeError("No connection manager attached to Device") 

371 

372 # Get dependency declarations from characteristic class 

373 optional_deps = getattr(char_class, "_optional_dependencies", []) 

374 required_deps = getattr(char_class, "_required_dependencies", []) 

375 

376 # Build context with resolved dependencies 

377 context_chars: dict[str, Any] = {} 

378 

379 for dep_class in required_deps + optional_deps: 

380 is_required = dep_class in required_deps 

381 

382 # Get UUID for dependency characteristic 

383 dep_uuid = dep_class.get_class_uuid() 

384 if not dep_uuid: 

385 if is_required: 

386 raise ValueError(f"Required dependency {dep_class.__name__} has no UUID") 

387 continue 

388 

389 dep_uuid_str = str(dep_uuid) 

390 

391 # Check resolution mode 

392 if resolution_mode == DependencyResolutionMode.SKIP_DEPENDENCIES: 

393 continue # Skip all dependency resolution 

394 

395 # Check cache (unless force refresh) 

396 if resolution_mode != DependencyResolutionMode.FORCE_REFRESH: 

397 cached_char = self._get_cached_characteristic(dep_uuid) 

398 if cached_char is not None and cached_char.last_parsed is not None: 

399 # Use the last_parsed data from the cached characteristic 

400 context_chars[dep_uuid_str] = cached_char.last_parsed 

401 continue 

402 

403 # Read and parse dependency from device 

404 parsed_data = await self._resolve_single_dependency(dep_uuid, is_required, dep_class) 

405 if parsed_data is not None: 

406 context_chars[dep_uuid_str] = parsed_data 

407 

408 # Create context with device info and resolved dependencies 

409 device_info = DeviceInfo( 

410 address=self.address, 

411 name=self.name, 

412 manufacturer_data=self.advertiser_data.ad_structures.core.manufacturer_data, 

413 service_uuids=self.advertiser_data.ad_structures.core.service_uuids, 

414 ) 

415 

416 return CharacteristicContext( 

417 device_info=device_info, 

418 other_characteristics=context_chars, 

419 ) 

420 

421 @overload 

422 async def read( 

423 self, 

424 char: type[BaseCharacteristic[T]], 

425 resolution_mode: DependencyResolutionMode = ..., 

426 ) -> T | None: ... 

427 

428 @overload 

429 async def read( 

430 self, 

431 char: str | CharacteristicName, 

432 resolution_mode: DependencyResolutionMode = ..., 

433 ) -> Any | None: ... # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

434 

435 async def read( 

436 self, 

437 char: str | CharacteristicName | type[BaseCharacteristic[T]], 

438 resolution_mode: DependencyResolutionMode = DependencyResolutionMode.NORMAL, 

439 ) -> T | Any | None: # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

440 """Read a characteristic value from the device. 

441 

442 Args: 

443 char: Name, enum, or characteristic class to read. 

444 Passing the class enables type-safe return values. 

445 resolution_mode: How to handle automatic dependency resolution: 

446 - NORMAL: Auto-resolve dependencies, use cache when available (default) 

447 - SKIP_DEPENDENCIES: Skip dependency resolution and validation 

448 - FORCE_REFRESH: Re-read dependencies from device, ignoring cache 

449 

450 Returns: 

451 Parsed characteristic value or None if read fails. 

452 Return type is inferred from characteristic class when provided. 

453 

454 Raises: 

455 RuntimeError: If no connection manager is attached 

456 ValueError: If required dependencies cannot be resolved 

457 

458 Example:: 

459 

460 # Type-safe: pass characteristic class, return type is inferred 

461 from bluetooth_sig.gatt.characteristics import BatteryLevelCharacteristic 

462 

463 level: int | None = await device.read(BatteryLevelCharacteristic) 

464 

465 # Not type-safe: pass string/enum, returns Any 

466 level = await device.read(CharacteristicName.BATTERY_LEVEL) 

467 level = await device.read("2A19") 

468 

469 """ 

470 if not self.connection_manager: 

471 raise RuntimeError("No connection manager attached to Device") 

472 

473 # Handle characteristic class input (type-safe path) 

474 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

475 char_class: type[BaseCharacteristic[Any]] = char 

476 char_instance = char_class() 

477 resolved_uuid = char_instance.uuid 

478 

479 ctx: CharacteristicContext | None = None 

480 if resolution_mode != DependencyResolutionMode.SKIP_DEPENDENCIES: 

481 ctx = await self._ensure_dependencies_resolved(char_class, resolution_mode) 

482 

483 raw = await self.connection_manager.read_gatt_char(resolved_uuid) 

484 return char_instance.parse_value(raw, ctx=ctx) 

485 

486 # Handle string/enum input (not type-safe path) 

487 resolved_uuid = self._resolve_characteristic_name(char) 

488 

489 char_class_lookup = CharacteristicRegistry.get_characteristic_class_by_uuid(resolved_uuid) 

490 

491 # Resolve dependencies if characteristic class is known 

492 ctx = None 

493 if char_class_lookup and resolution_mode != DependencyResolutionMode.SKIP_DEPENDENCIES: 

494 ctx = await self._ensure_dependencies_resolved(char_class_lookup, resolution_mode) 

495 

496 # Read the characteristic 

497 raw = await self.connection_manager.read_gatt_char(resolved_uuid) 

498 parsed = self.translator.parse_characteristic(str(resolved_uuid), raw, ctx=ctx) 

499 

500 return parsed 

501 

502 @overload 

503 async def write( 

504 self, 

505 char: type[BaseCharacteristic[T]], 

506 data: T, 

507 response: bool = ..., 

508 ) -> None: ... 

509 

510 @overload 

511 async def write( 

512 self, 

513 char: str | CharacteristicName, 

514 data: bytes, 

515 response: bool = ..., 

516 ) -> None: ... 

517 

518 async def write( 

519 self, 

520 char: str | CharacteristicName | type[BaseCharacteristic[T]], 

521 data: bytes | T, 

522 response: bool = True, 

523 ) -> None: 

524 r"""Write data to a characteristic on the device. 

525 

526 Args: 

527 char: Name, enum, or characteristic class to write to. 

528 Passing the class enables type-safe value encoding. 

529 data: Raw bytes (for string/enum) or typed value (for characteristic class). 

530 When using characteristic class, the value is encoded using build_value(). 

531 response: If True, use write-with-response (wait for acknowledgment). 

532 If False, use write-without-response (faster but no confirmation). 

533 Default is True for reliability. 

534 

535 Raises: 

536 RuntimeError: If no connection manager is attached 

537 CharacteristicEncodeError: If encoding fails (when using characteristic class) 

538 

539 Example:: 

540 

541 # Type-safe: pass characteristic class and typed value 

542 from bluetooth_sig.gatt.characteristics import AlertLevelCharacteristic 

543 

544 await device.write(AlertLevelCharacteristic, AlertLevel.HIGH) 

545 

546 # Not type-safe: pass raw bytes 

547 await device.write("2A06", b"\x02") 

548 await device.write(CharacteristicName.ALERT_LEVEL, b"\x02") 

549 

550 """ 

551 if not self.connection_manager: 

552 raise RuntimeError("No connection manager attached to Device") 

553 

554 # Handle characteristic class input (type-safe path) 

555 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

556 char_instance = char() 

557 resolved_uuid = char_instance.uuid 

558 # data is typed value T, encode it 

559 encoded = char_instance.build_value(data) # type: ignore[arg-type] 

560 await self.connection_manager.write_gatt_char(resolved_uuid, bytes(encoded), response=response) 

561 return 

562 

563 # Handle string/enum input (not type-safe path) 

564 # data must be bytes in this path 

565 if not isinstance(data, (bytes, bytearray)): 

566 raise TypeError(f"When using string/enum char_name, data must be bytes, got {type(data).__name__}") 

567 

568 resolved_uuid = self._resolve_characteristic_name(char) 

569 # cast is safe: isinstance check above ensures data is bytes/bytearray 

570 await self.connection_manager.write_gatt_char(resolved_uuid, cast(bytes, data), response=response) 

571 

572 @overload 

573 async def start_notify( 

574 self, 

575 char: type[BaseCharacteristic[T]], 

576 callback: Callable[[T], None], 

577 ) -> None: ... 

578 

579 @overload 

580 async def start_notify( 

581 self, 

582 char: str | CharacteristicName, 

583 callback: Callable[[Any], None], 

584 ) -> None: ... 

585 

586 async def start_notify( 

587 self, 

588 char: str | CharacteristicName | type[BaseCharacteristic[T]], 

589 callback: Callable[[T], None] | Callable[[Any], None], 

590 ) -> None: 

591 """Start notifications for a characteristic. 

592 

593 Args: 

594 char: Name, enum, or characteristic class to monitor. 

595 Passing the class enables type-safe callbacks. 

596 callback: Function to call when notifications are received. 

597 Callback parameter type is inferred from characteristic class. 

598 

599 Raises: 

600 RuntimeError: If no connection manager is attached 

601 

602 Example:: 

603 

604 # Type-safe: callback receives typed value 

605 from bluetooth_sig.gatt.characteristics import HeartRateMeasurementCharacteristic 

606 

607 

608 def on_heart_rate(value: HeartRateMeasurementData) -> None: 

609 print(f"Heart rate: {value.heart_rate}") 

610 

611 

612 await device.start_notify(HeartRateMeasurementCharacteristic, on_heart_rate) 

613 

614 # Not type-safe: callback receives Any 

615 await device.start_notify("2A37", lambda v: print(v)) 

616 

617 """ 

618 if not self.connection_manager: 

619 raise RuntimeError("No connection manager attached to Device") 

620 

621 # Handle characteristic class input (type-safe path) 

622 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

623 char_instance = char() 

624 resolved_uuid = char_instance.uuid 

625 

626 def _typed_cb(sender: str, data: bytes) -> None: 

627 parsed = char_instance.parse_value(data) 

628 try: 

629 callback(parsed) 

630 except Exception as exc: # pylint: disable=broad-exception-caught 

631 logging.exception("Notification callback raised an exception: %s", exc) 

632 

633 await self.connection_manager.start_notify(resolved_uuid, _typed_cb) 

634 return 

635 

636 # Handle string/enum input (not type-safe path) 

637 resolved_uuid = self._resolve_characteristic_name(char) 

638 

639 def _internal_cb(sender: str, data: bytes) -> None: 

640 parsed = self.translator.parse_characteristic(sender, data) 

641 try: 

642 callback(parsed) 

643 except Exception as exc: # pylint: disable=broad-exception-caught 

644 logging.exception("Notification callback raised an exception: %s", exc) 

645 

646 await self.connection_manager.start_notify(resolved_uuid, _internal_cb) 

647 

648 def _resolve_characteristic_name(self, identifier: str | CharacteristicName) -> BluetoothUUID: 

649 """Resolve a characteristic name or enum to its UUID. 

650 

651 Args: 

652 identifier: Characteristic name string or enum 

653 

654 Returns: 

655 Characteristic UUID string 

656 

657 Raises: 

658 ValueError: If the characteristic name cannot be resolved 

659 

660 """ 

661 if isinstance(identifier, CharacteristicName): 

662 # For enum inputs, ask the translator for the UUID 

663 uuid = self.translator.get_characteristic_uuid_by_name(identifier) 

664 if uuid: 

665 return uuid 

666 norm = identifier.value.strip() 

667 else: 

668 norm = identifier 

669 stripped = norm.replace("-", "") 

670 if len(stripped) in (4, 8, 32) and all(c in "0123456789abcdefABCDEF" for c in stripped): 

671 return BluetoothUUID(norm) 

672 

673 raise ValueError(f"Unknown characteristic name: '{identifier}'") 

674 

675 async def stop_notify(self, char_name: str | CharacteristicName) -> None: 

676 """Stop notifications for a characteristic. 

677 

678 Args: 

679 char_name: Characteristic name or UUID 

680 

681 """ 

682 if not self.connection_manager: 

683 raise RuntimeError("No connection manager attached") 

684 

685 resolved_uuid = self._resolve_characteristic_name(char_name) 

686 await self.connection_manager.stop_notify(resolved_uuid) 

687 

688 async def read_descriptor(self, desc_uuid: BluetoothUUID | BaseDescriptor) -> DescriptorData: 

689 """Read a descriptor value from the device. 

690 

691 Args: 

692 desc_uuid: UUID of the descriptor to read or BaseDescriptor instance 

693 

694 Returns: 

695 Parsed descriptor data with metadata 

696 

697 Raises: 

698 RuntimeError: If no connection manager is attached 

699 

700 """ 

701 if not self.connection_manager: 

702 raise RuntimeError("No connection manager attached to Device") 

703 

704 # Extract UUID from BaseDescriptor if needed 

705 if isinstance(desc_uuid, BaseDescriptor): 

706 uuid = desc_uuid.uuid 

707 else: 

708 uuid = desc_uuid 

709 

710 raw_data = await self.connection_manager.read_gatt_descriptor(uuid) 

711 

712 # Try to create a descriptor instance and parse the data 

713 descriptor = DescriptorRegistry.create_descriptor(str(uuid)) 

714 if descriptor: 

715 return descriptor.parse_value(raw_data) 

716 

717 # If no registered descriptor found, return unparsed DescriptorData 

718 return DescriptorData( 

719 info=DescriptorInfo(uuid=uuid, name="Unknown Descriptor"), 

720 value=raw_data, 

721 raw_data=raw_data, 

722 parse_success=False, 

723 error_message="Unknown descriptor UUID - no parser available", 

724 ) 

725 

726 async def write_descriptor(self, desc_uuid: BluetoothUUID | BaseDescriptor, data: bytes | DescriptorData) -> None: 

727 """Write data to a descriptor on the device. 

728 

729 Args: 

730 desc_uuid: UUID of the descriptor to write to or BaseDescriptor instance 

731 data: Either raw bytes to write, or a DescriptorData object. 

732 If DescriptorData is provided, its raw_data will be written. 

733 

734 Raises: 

735 RuntimeError: If no connection manager is attached 

736 

737 """ 

738 if not self.connection_manager: 

739 raise RuntimeError("No connection manager attached to Device") 

740 

741 # Extract UUID from BaseDescriptor if needed 

742 if isinstance(desc_uuid, BaseDescriptor): 

743 uuid = desc_uuid.uuid 

744 else: 

745 uuid = desc_uuid 

746 

747 # Extract raw bytes from DescriptorData if needed 

748 raw_data: bytes 

749 if isinstance(data, DescriptorData): 

750 raw_data = data.raw_data 

751 else: 

752 raw_data = data 

753 

754 await self.connection_manager.write_gatt_descriptor(uuid, raw_data) 

755 

756 async def pair(self) -> None: 

757 """Pair with the device. 

758 

759 Raises an exception if pairing fails. 

760 

761 Raises: 

762 RuntimeError: If no connection manager is attached 

763 

764 """ 

765 if not self.connection_manager: 

766 raise RuntimeError("No connection manager attached to Device") 

767 

768 await self.connection_manager.pair() 

769 

770 async def unpair(self) -> None: 

771 """Unpair from the device. 

772 

773 Raises an exception if unpairing fails. 

774 

775 Raises: 

776 RuntimeError: If no connection manager is attached 

777 

778 """ 

779 if not self.connection_manager: 

780 raise RuntimeError("No connection manager attached to Device") 

781 

782 await self.connection_manager.unpair() 

783 

784 async def read_rssi(self) -> int: 

785 """Read the RSSI (signal strength) of the connection. 

786 

787 Returns: 

788 RSSI value in dBm (typically negative, e.g., -60) 

789 

790 Raises: 

791 RuntimeError: If no connection manager is attached 

792 

793 """ 

794 if not self.connection_manager: 

795 raise RuntimeError("No connection manager attached to Device") 

796 

797 return await self.connection_manager.read_rssi() 

798 

799 def set_disconnected_callback(self, callback: Callable[[], None]) -> None: 

800 """Set a callback to be invoked when the device disconnects. 

801 

802 Args: 

803 callback: Function to call when disconnection occurs 

804 

805 Raises: 

806 RuntimeError: If no connection manager is attached 

807 

808 """ 

809 if not self.connection_manager: 

810 raise RuntimeError("No connection manager attached to Device") 

811 

812 self.connection_manager.set_disconnected_callback(callback) 

813 

814 @property 

815 def mtu_size(self) -> int: 

816 """Get the negotiated MTU size in bytes. 

817 

818 Returns: 

819 The MTU size negotiated for this connection (typically 23-512 bytes) 

820 

821 Raises: 

822 RuntimeError: If no connection manager is attached 

823 

824 """ 

825 if not self.connection_manager: 

826 raise RuntimeError("No connection manager attached to Device") 

827 

828 return self.connection_manager.mtu_size 

829 

830 def set_advertising_bindkey(self, bindkey: bytes | None) -> None: 

831 """Set encryption key for encrypted advertising data. 

832 

833 Args: 

834 bindkey: Encryption key bytes, or None to clear 

835 

836 """ 

837 self._advertising_bindkey = bindkey 

838 # Update any existing interpreters 

839 for interpreter in self._advertising_interpreters.values(): 

840 interpreter.bindkey = bindkey 

841 

842 async def refresh_advertisement(self, refresh: bool = False) -> AdvertisementData | None: 

843 """Get advertisement data from the connection manager. 

844 

845 Args: 

846 refresh: If True, perform an active scan to get fresh advertisement 

847 data from the device. If False, return the last cached value. 

848 

849 Returns: 

850 Interpreted AdvertisementData if available, None if no advertisement 

851 has been received by the connection manager yet. 

852 

853 Raises: 

854 RuntimeError: If no connection manager is attached 

855 

856 Example:: 

857 

858 device.attach_connection_manager(manager) 

859 

860 # Get cached advertisement (fast, no BLE activity) 

861 ad = await device.refresh_advertisement() 

862 

863 # Force fresh scan (slower, active BLE scan) 

864 ad = await device.refresh_advertisement(refresh=True) 

865 

866 if ad and ad.interpreted_data: 

867 print(f"Sensor: {ad.interpreted_data}") 

868 

869 """ 

870 if not self.connection_manager: 

871 raise RuntimeError("No connection manager attached to Device") 

872 

873 advertisement = await self.connection_manager.get_latest_advertisement(refresh=refresh) 

874 if advertisement is None: 

875 return None 

876 

877 # Process through the interpretation pipeline 

878 self._process_advertisement(advertisement) 

879 return self._last_interpreted_advertisement 

880 

881 def _process_advertisement(self, advertisement: AdvertisementData) -> None: 

882 """Process advertisement data and store results. 

883 

884 Internal method that stores advertisement data and routes to 

885 vendor interpreters for typed sensor data extraction. 

886 

887 Args: 

888 advertisement: AdvertisementData from connection manager 

889 

890 """ 

891 # Store the advertisement data 

892 self.advertiser_data = AdvertisingData( 

893 raw_data=b"", # Raw data not available from converted advertisements 

894 ad_structures=advertisement.ad_structures, 

895 rssi=advertisement.rssi, 

896 ) 

897 

898 # Update device name if not set 

899 if advertisement.ad_structures.core.local_name and not self.name: 

900 self.name = advertisement.ad_structures.core.local_name 

901 

902 # Apply vendor interpretation and store result 

903 interpreted = self._interpret_advertisement(advertisement) 

904 # Store the interpreted result for later access 

905 self._last_interpreted_advertisement = interpreted 

906 

907 def _interpret_advertisement(self, advertisement: AdvertisementData) -> AdvertisementData: 

908 """Apply vendor interpretation to advertisement data. 

909 

910 Internal method that routes advertisement data to registered vendor 

911 interpreters for typed sensor data extraction. 

912 

913 Args: 

914 advertisement: AdvertisementData with ad_structures populated 

915 

916 Returns: 

917 AdvertisementData with interpreted_data populated if a matching 

918 vendor interpreter was found 

919 

920 """ 

921 # Route to vendor interpreter 

922 interpreted_data: Any = None 

923 interpreter_name: str | None = None 

924 

925 interpreter_class = advertising_interpreter_registry.find_interpreter_class( 

926 advertisement.ad_structures.core.manufacturer_data, 

927 advertisement.ad_structures.core.service_data, 

928 advertisement.ad_structures.core.local_name or None, 

929 ) 

930 

931 if interpreter_class is not None: 

932 # Get or create stateful interpreter for this device + interpreter type 

933 class_name = interpreter_class.__name__ 

934 if class_name not in self._advertising_interpreters: 

935 self._advertising_interpreters[class_name] = interpreter_class( 

936 self.address, 

937 bindkey=self._advertising_bindkey, 

938 ) 

939 

940 interpreter = self._advertising_interpreters[class_name] 

941 interpreted_data = interpreter.interpret( 

942 advertisement.ad_structures.core.manufacturer_data, 

943 advertisement.ad_structures.core.service_data, 

944 advertisement.ad_structures.core.local_name or None, 

945 advertisement.rssi or 0, 

946 ) 

947 interpreter_name = interpreter_class._info.name # pylint: disable=protected-access 

948 

949 return AdvertisementData( 

950 ad_structures=advertisement.ad_structures, 

951 interpreted_data=interpreted_data, 

952 interpreter_name=interpreter_name, 

953 rssi=advertisement.rssi, 

954 ) 

955 

956 def parse_raw_advertisement(self, raw_data: bytes, rssi: int = 0) -> AdvertisementData: 

957 """Parse raw advertising PDU bytes directly. 

958 

959 Use this method when you have raw BLE advertising PDU bytes (e.g., from 

960 a custom BLE stack or packet capture). For framework-integrated scanning, 

961 use the connection manager's convert_advertisement() followed by 

962 update_advertisement() instead. 

963 

964 Args: 

965 raw_data: Raw BLE advertising PDU bytes 

966 rssi: Received signal strength in dBm 

967 

968 Returns: 

969 AdvertisementData with parsed AD structures and vendor interpretation 

970 

971 Example: 

972 # Parse raw PDU bytes directly 

973 result = device.parse_raw_advertisement(pdu_bytes, rssi=-65) 

974 print(result.manufacturer_data) 

975 

976 """ 

977 # Parse raw PDU bytes 

978 pdu_result = self._pdu_parser.parse_advertising_data(raw_data) 

979 

980 # Store the parsed AdvertisingData (with raw_data) directly 

981 self.advertiser_data = AdvertisingData( 

982 raw_data=raw_data, 

983 ad_structures=pdu_result.ad_structures, 

984 rssi=rssi, 

985 ) 

986 

987 # Update device name if present and not already set 

988 if pdu_result.ad_structures.core.local_name and not self.name: 

989 self.name = pdu_result.ad_structures.core.local_name 

990 

991 # Create AdvertisementData for interpretation 

992 advertisement = AdvertisementData( 

993 ad_structures=pdu_result.ad_structures, 

994 rssi=rssi, 

995 ) 

996 

997 # Route to vendor interpreter and return result 

998 return self._interpret_advertisement(advertisement) 

999 

1000 def get_characteristic_data(self, char_uuid: BluetoothUUID) -> Any | None: # noqa: ANN401 # Heterogeneous cache 

1001 """Get parsed characteristic data - single source of truth via characteristic.last_parsed. 

1002 

1003 Searches across all services to find the characteristic by UUID. 

1004 

1005 Args: 

1006 char_uuid: UUID of the characteristic 

1007 

1008 Returns: 

1009 Parsed characteristic value if found, None otherwise. 

1010 

1011 Example:: 

1012 

1013 # Search for characteristic across all services 

1014 battery_data = device.get_characteristic_data(BluetoothUUID("2A19")) 

1015 if battery_data is not None: 

1016 print(f"Battery: {battery_data}%") 

1017 

1018 """ 

1019 char_instance = self._get_cached_characteristic(char_uuid) 

1020 if char_instance is not None: 

1021 return char_instance.last_parsed 

1022 return None 

1023 

1024 async def discover_services(self) -> dict[str, Any]: 

1025 """Discover services and characteristics from the connected BLE device. 

1026 

1027 This method performs BLE service discovery using the attached connection 

1028 manager, retrieving the device's service structure with characteristics 

1029 and their runtime properties (READ, WRITE, NOTIFY, etc.). 

1030 

1031 The discovered services are stored in `self.services` as DeviceService 

1032 objects with properly instantiated characteristic classes from the registry. 

1033 

1034 This implements the standard BLE workflow: 

1035 1. await device.connect() 

1036 2. await device.discover_services() # This method 

1037 3. value = await device.read("battery_level") 

1038 

1039 Note: 

1040 - This method discovers the SERVICE STRUCTURE (what services/characteristics 

1041 exist and their properties), but does NOT read characteristic VALUES. 

1042 - Use `read()` to retrieve actual characteristic values after discovery. 

1043 - Services are cached in `self.services` keyed by service UUID string. 

1044 

1045 Returns: 

1046 Dictionary mapping service UUIDs to DeviceService objects 

1047 

1048 Raises: 

1049 RuntimeError: If no connection manager is attached 

1050 

1051 Example:: 

1052 

1053 device = Device(address, translator) 

1054 device.attach_connection_manager(manager) 

1055 

1056 await device.connect() 

1057 services = await device.discover_services() # Discover structure 

1058 

1059 # Now services are available 

1060 for service_uuid, device_service in services.items(): 

1061 print(f"Service: {service_uuid}") 

1062 for char_uuid, char_instance in device_service.characteristics.items(): 

1063 print(f" Characteristic: {char_uuid}") 

1064 

1065 # Read characteristic values 

1066 battery = await device.read("battery_level") 

1067 

1068 """ 

1069 if not self.connection_manager: 

1070 raise RuntimeError("No connection manager attached to Device") 

1071 

1072 services_data = await self.connection_manager.get_services() 

1073 

1074 # Store discovered services in our internal structure 

1075 for service_info in services_data: 

1076 service_uuid = str(service_info.service.uuid) 

1077 if service_uuid not in self.services: 

1078 # Store the service directly from connection manager 

1079 self.services[service_uuid] = service_info 

1080 

1081 return dict(self.services) 

1082 

1083 async def get_characteristic_info(self, char_uuid: str) -> Any | None: # noqa: ANN401 # Adapter-specific characteristic metadata 

1084 """Get information about a characteristic from the connection manager. 

1085 

1086 Args: 

1087 char_uuid: UUID of the characteristic 

1088 

1089 Returns: 

1090 Characteristic information or None if not found 

1091 

1092 Raises: 

1093 RuntimeError: If no connection manager is attached 

1094 

1095 """ 

1096 if not self.connection_manager: 

1097 raise RuntimeError("No connection manager attached to Device") 

1098 

1099 services_data = await self.connection_manager.get_services() 

1100 for service_info in services_data: 

1101 for char_uuid_key, char_info in service_info.characteristics.items(): 

1102 if char_uuid_key == char_uuid: 

1103 return char_info 

1104 return None 

1105 

1106 async def read_multiple(self, char_names: list[str | CharacteristicName]) -> dict[str, Any | None]: 

1107 """Read multiple characteristics in batch. 

1108 

1109 Args: 

1110 char_names: List of characteristic names or enums to read 

1111 

1112 Returns: 

1113 Dictionary mapping characteristic UUIDs to parsed values 

1114 

1115 Raises: 

1116 RuntimeError: If no connection manager is attached 

1117 

1118 """ 

1119 if not self.connection_manager: 

1120 raise RuntimeError("No connection manager attached to Device") 

1121 

1122 results: dict[str, Any | None] = {} 

1123 for char_name in char_names: 

1124 try: 

1125 value = await self.read(char_name) 

1126 resolved_uuid = self._resolve_characteristic_name(char_name) 

1127 results[str(resolved_uuid)] = value 

1128 except Exception as exc: # pylint: disable=broad-exception-caught 

1129 resolved_uuid = self._resolve_characteristic_name(char_name) 

1130 results[str(resolved_uuid)] = None 

1131 logging.warning("Failed to read characteristic %s: %s", char_name, exc) 

1132 

1133 return results 

1134 

1135 async def write_multiple( 

1136 self, data_map: dict[str | CharacteristicName, bytes], response: bool = True 

1137 ) -> dict[str, bool]: 

1138 """Write to multiple characteristics in batch. 

1139 

1140 Args: 

1141 data_map: Dictionary mapping characteristic names/enums to data bytes 

1142 response: If True, use write-with-response for all writes. 

1143 If False, use write-without-response for all writes. 

1144 

1145 Returns: 

1146 Dictionary mapping characteristic UUIDs to success status 

1147 

1148 Raises: 

1149 RuntimeError: If no connection manager is attached 

1150 

1151 """ 

1152 if not self.connection_manager: 

1153 raise RuntimeError("No connection manager attached to Device") 

1154 

1155 results: dict[str, bool] = {} 

1156 for char_name, data in data_map.items(): 

1157 try: 

1158 await self.write(char_name, data, response=response) 

1159 resolved_uuid = self._resolve_characteristic_name(char_name) 

1160 results[str(resolved_uuid)] = True 

1161 except Exception as exc: # pylint: disable=broad-exception-caught 

1162 resolved_uuid = self._resolve_characteristic_name(char_name) 

1163 results[str(resolved_uuid)] = False 

1164 logging.warning("Failed to write characteristic %s: %s", char_name, exc) 

1165 

1166 return results 

1167 

1168 @property 

1169 def device_info(self) -> DeviceInfo: 

1170 """Get cached device info object. 

1171 

1172 Returns: 

1173 DeviceInfo with current device metadata 

1174 

1175 """ 

1176 if self._device_info_cache is None: 

1177 self._device_info_cache = DeviceInfo( 

1178 address=self.address, 

1179 name=self.name, 

1180 manufacturer_data=self.advertiser_data.ad_structures.core.manufacturer_data, 

1181 service_uuids=self.advertiser_data.ad_structures.core.service_uuids, 

1182 ) 

1183 else: 

1184 # Update existing cache object with current data 

1185 self._device_info_cache.name = self.name 

1186 self._device_info_cache.manufacturer_data = self.advertiser_data.ad_structures.core.manufacturer_data 

1187 self._device_info_cache.service_uuids = self.advertiser_data.ad_structures.core.service_uuids 

1188 return self._device_info_cache 

1189 

1190 @property 

1191 def name(self) -> str: 

1192 """Get the device name.""" 

1193 return self._name 

1194 

1195 @name.setter 

1196 def name(self, value: str) -> None: 

1197 """Set the device name and update cached device_info.""" 

1198 self._name = value 

1199 # Update existing cache object if it exists 

1200 if self._device_info_cache is not None: 

1201 self._device_info_cache.name = value 

1202 

1203 @property 

1204 def is_connected(self) -> bool: 

1205 """Check if the device is currently connected. 

1206 

1207 Returns: 

1208 True if connected, False otherwise 

1209 

1210 """ 

1211 return self.connection_manager.is_connected 

1212 

1213 @property 

1214 def interpreted_advertisement(self) -> AdvertisementData | None: 

1215 """Get the last interpreted advertisement data. 

1216 

1217 This is automatically populated when a connection manager pushes 

1218 advertisement data to the device. The data includes vendor-specific 

1219 interpretations if registered interpreters match. 

1220 

1221 Returns: 

1222 AdvertisementData with interpreted fields, or None if no 

1223 advertisement has been received yet. 

1224 

1225 """ 

1226 return self._last_interpreted_advertisement 

1227 

1228 def get_service_by_uuid(self, service_uuid: str) -> DeviceService | None: 

1229 """Get a service by its UUID. 

1230 

1231 Args: 

1232 service_uuid: UUID of the service 

1233 

1234 Returns: 

1235 DeviceService instance or None if not found 

1236 

1237 """ 

1238 return self.services.get(service_uuid) 

1239 

1240 def get_services_by_name(self, service_name: str | ServiceName) -> list[DeviceService]: 

1241 """Get services by name. 

1242 

1243 Args: 

1244 service_name: Name or enum of the service 

1245 

1246 Returns: 

1247 List of matching DeviceService instances 

1248 

1249 """ 

1250 service_uuid = self.translator.get_service_uuid_by_name( 

1251 service_name if isinstance(service_name, str) else service_name.value 

1252 ) 

1253 if service_uuid and str(service_uuid) in self.services: 

1254 return [self.services[str(service_uuid)]] 

1255 return [] 

1256 

1257 def list_characteristics(self, service_uuid: str | None = None) -> dict[str, list[str]]: 

1258 """List all characteristics, optionally filtered by service. 

1259 

1260 Args: 

1261 service_uuid: Optional service UUID to filter by 

1262 

1263 Returns: 

1264 Dictionary mapping service UUIDs to lists of characteristic UUIDs 

1265 

1266 """ 

1267 if service_uuid: 

1268 service = self.services.get(service_uuid) 

1269 if service: 

1270 return {service_uuid: list(service.characteristics.keys())} 

1271 return {} 

1272 

1273 return {svc_uuid: list(service.characteristics.keys()) for svc_uuid, service in self.services.items()}