Coverage for src / bluetooth_sig / device / device.py: 60%
364 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
1"""Device class for grouping BLE device services, characteristics, encryption, and advertiser data.
3This module provides a high-level Device abstraction that groups all
4services, characteristics, encryption requirements, and advertiser data
5for a BLE device. It integrates with the BluetoothSIGTranslator for
6parsing while providing a unified view of device state.
7"""
8# pylint: disable=too-many-lines # Device abstraction is a cohesive module with related classes
9# TODO split into multiple files
11from __future__ import annotations
13import logging
14from abc import abstractmethod
15from enum import Enum
16from typing import Any, Callable, Protocol, TypeVar, cast, overload
18from ..advertising import (
19 AdvertisingDataInterpreter,
20 AdvertisingPDUParser,
21 advertising_interpreter_registry,
22)
23from ..gatt.characteristics import CharacteristicName
24from ..gatt.characteristics.base import BaseCharacteristic
25from ..gatt.characteristics.registry import CharacteristicRegistry
26from ..gatt.characteristics.unknown import UnknownCharacteristic
27from ..gatt.context import CharacteristicContext, DeviceInfo
28from ..gatt.descriptors.base import BaseDescriptor
29from ..gatt.descriptors.registry import DescriptorRegistry
30from ..gatt.services import ServiceName
31from ..types import (
32 AdvertisementData,
33 AdvertisingData,
34 CharacteristicInfo,
35 DescriptorData,
36 DescriptorInfo,
37)
38from ..types.device_types import DeviceEncryption, DeviceService, ScannedDevice
39from ..types.uuid import BluetoothUUID
40from .connection import ConnectionManagerProtocol
42# Type variable for generic characteristic return types
43T = TypeVar("T")
45__all__ = [
46 "Device",
47 "DependencyResolutionMode",
48 "SIGTranslatorProtocol",
49]
52class DependencyResolutionMode(Enum):
53 """Mode for automatic dependency resolution during characteristic reads.
55 Attributes:
56 NORMAL: Auto-resolve dependencies, use cache when available
57 SKIP_DEPENDENCIES: Skip dependency resolution and validation
58 FORCE_REFRESH: Re-read dependencies from device, ignoring cache
59 """
61 NORMAL = "normal"
62 SKIP_DEPENDENCIES = "skip_dependencies"
63 FORCE_REFRESH = "force_refresh"
66class SIGTranslatorProtocol(Protocol): # pylint: disable=too-few-public-methods
67 """Protocol for SIG translator interface."""
69 @abstractmethod
70 def parse_characteristics(
71 self,
72 char_data: dict[str, bytes],
73 ctx: CharacteristicContext | None = None,
74 ) -> dict[str, Any]:
75 """Parse multiple characteristics at once."""
77 @abstractmethod
78 def parse_characteristic(
79 self,
80 uuid: str,
81 raw_data: bytes,
82 ctx: CharacteristicContext | None = None,
83 ) -> Any: # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe
84 """Parse a single characteristic's raw bytes."""
86 @abstractmethod
87 def get_characteristic_uuid_by_name(self, name: CharacteristicName) -> BluetoothUUID | None:
88 """Get the UUID for a characteristic name enum (enum-only API)."""
90 @abstractmethod
91 def get_service_uuid_by_name(self, name: str | ServiceName) -> BluetoothUUID | None:
92 """Get the UUID for a service name or enum."""
94 def get_characteristic_info_by_name(self, name: CharacteristicName) -> Any | None: # noqa: ANN401 # Adapter-specific characteristic info
95 """Get characteristic info by enum name (optional method)."""
98class Device: # pylint: disable=too-many-instance-attributes,too-many-public-methods
99 r"""High-level BLE device abstraction.
101 This class groups all services, characteristics, encryption requirements, and
102 advertiser data for a BLE device. It integrates with
103 [BluetoothSIGTranslator][bluetooth_sig.BluetoothSIGTranslator]
104 for parsing while providing a unified view of device state.
106 Key features:
107 - Parse advertiser data from BLE scan results
108 - Discover GATT services and characteristics via connection manager
109 - Access parsed characteristic data by UUID
110 - Handle device encryption requirements
111 - Cache device information for performance
113 Example:
114 Create and configure a device::
116 from bluetooth_sig import BluetoothSIGTranslator
117 from bluetooth_sig.device import Device
119 translator = BluetoothSIGTranslator()
120 device = Device("AA:BB:CC:DD:EE:FF", translator)
122 # Attach connection manager and discover services
123 device.attach_connection_manager(manager)
124 await device.connect()
125 await device.discover_services()
127 # Read characteristic
128 battery = await device.read("battery_level")
129 print(f"Battery: {battery.value}%")
131 """
133 def __init__(self, connection_manager: ConnectionManagerProtocol, translator: SIGTranslatorProtocol) -> None:
134 """Initialise Device instance with connection manager and translator.
136 Args:
137 connection_manager: Connection manager implementing ConnectionManagerProtocol
138 translator: SIGTranslatorProtocol instance
140 """
141 self.connection_manager = connection_manager
142 self.translator = translator
143 self._name: str = ""
144 self.services: dict[str, DeviceService] = {}
145 self.encryption = DeviceEncryption()
146 self.advertiser_data = AdvertisingData(raw_data=b"")
148 # Advertising PDU parser for handling raw advertising data
149 self._pdu_parser = AdvertisingPDUParser()
151 # Cache of vendor-specific advertising interpreters (keyed by class name)
152 self._advertising_interpreters: dict[str, AdvertisingDataInterpreter[Any]] = {}
154 # Optional bindkey for encrypted advertisements
155 self._advertising_bindkey: bytes | None = None
157 # Cache for device_info property
158 self._device_info_cache: DeviceInfo | None = None
160 # Last interpreted advertisement (with vendor-specific data)
161 self._last_interpreted_advertisement: AdvertisementData | None = None
163 def __str__(self) -> str:
164 """Return string representation of Device.
166 Returns:
167 str: String representation of Device.
169 """
170 service_count = len(self.services)
171 char_count = sum(len(service.characteristics) for service in self.services.values())
172 return f"Device({self.address}, name={self.name}, {service_count} services, {char_count} characteristics)"
174 @property
175 def address(self) -> str:
176 """Get the device address from the connection manager.
178 Returns:
179 BLE device address
181 """
182 return self.connection_manager.address
184 @staticmethod
185 async def scan(manager_class: type[ConnectionManagerProtocol], timeout: float = 5.0) -> list[ScannedDevice]:
186 """Scan for nearby BLE devices using a specific connection manager.
188 This is a static method that doesn't require a Device instance.
189 Use it to discover devices before creating Device instances.
191 Args:
192 manager_class: The connection manager class to use for scanning
193 (e.g., BleakRetryConnectionManager)
194 timeout: Scan duration in seconds (default: 5.0)
196 Returns:
197 List of discovered devices
199 Raises:
200 NotImplementedError: If the connection manager doesn't support scanning
202 Example::
204 from bluetooth_sig.device import Device
205 from connection_managers.bleak_retry import BleakRetryConnectionManager
207 # Scan for devices
208 devices = await Device.scan(BleakRetryConnectionManager, timeout=10.0)
210 # Create Device instance for first discovered device
211 if devices:
212 translator = BluetoothSIGTranslator()
213 device = Device(devices[0].address, translator)
215 """
216 return await manager_class.scan(timeout)
218 async def connect(self) -> None:
219 """Connect to the BLE device.
221 Raises:
222 RuntimeError: If no connection manager is attached
224 """
225 if not self.connection_manager:
226 raise RuntimeError("No connection manager attached to Device")
227 await self.connection_manager.connect()
229 async def disconnect(self) -> None:
230 """Disconnect from the BLE device.
232 Raises:
233 RuntimeError: If no connection manager is attached
235 """
236 if not self.connection_manager:
237 raise RuntimeError("No connection manager attached to Device")
238 await self.connection_manager.disconnect()
240 def _get_cached_characteristic(self, char_uuid: BluetoothUUID) -> BaseCharacteristic[Any] | None:
241 """Get cached characteristic instance from services.
243 Single source of truth for characteristics - searches across all services.
244 Access parsed data via characteristic.last_parsed property.
246 Args:
247 char_uuid: UUID of the characteristic to find
249 Returns: BaseCharacteristic[Any] instance if found, None otherwise
251 """
252 char_uuid_str = str(char_uuid)
253 for service in self.services.values():
254 if char_uuid_str in service.characteristics:
255 return service.characteristics[char_uuid_str]
256 return None
258 def _cache_characteristic(self, char_uuid: BluetoothUUID, char_instance: BaseCharacteristic[Any]) -> None:
259 """Store characteristic instance in services cache.
261 Only updates existing characteristic entries - does not create new services.
262 Characteristics must belong to a discovered service.
264 Args:
265 char_uuid: UUID of the characteristic
266 char_instance: BaseCharacteristic[Any] instance to cache
268 """
269 char_uuid_str = str(char_uuid)
270 # Find existing service that should contain this characteristic
271 for service in self.services.values():
272 if char_uuid_str in service.characteristics:
273 service.characteristics[char_uuid_str] = char_instance
274 return
275 # Characteristic not in any discovered service - warn about missing service
276 logging.warning(
277 "Cannot cache characteristic %s - not found in any discovered service. Run discover_services() first.",
278 char_uuid_str,
279 )
281 def _create_unknown_characteristic(self, dep_uuid: BluetoothUUID) -> BaseCharacteristic[Any]:
282 """Create an unknown characteristic instance for a UUID not in registry.
284 Args:
285 dep_uuid: UUID of the unknown characteristic
287 Returns:
288 UnknownCharacteristic instance
290 """
291 dep_uuid_str = str(dep_uuid)
292 char_info = CharacteristicInfo(uuid=dep_uuid, name=f"Unknown-{dep_uuid_str}")
293 return UnknownCharacteristic(info=char_info)
295 async def _resolve_single_dependency(
296 self,
297 dep_uuid: BluetoothUUID,
298 is_required: bool,
299 dep_class: type[BaseCharacteristic[Any]],
300 ) -> Any | None: # noqa: ANN401 # Dependency can be any characteristic type
301 """Resolve a single dependency by reading and parsing it.
303 Args:
304 dep_uuid: UUID of the dependency characteristic
305 is_required: Whether this is a required dependency
306 dep_class: The dependency characteristic class
308 Returns:
309 Parsed characteristic data, or None if optional and failed
311 Raises:
312 ValueError: If required dependency fails to read
314 """
315 if not self.connection_manager:
316 raise RuntimeError("No connection manager attached")
318 dep_uuid_str = str(dep_uuid)
320 try:
321 raw_data = await self.connection_manager.read_gatt_char(dep_uuid)
323 # Get or create characteristic instance
324 char_instance = self._get_cached_characteristic(dep_uuid)
325 if char_instance is None:
326 # Create a new characteristic instance using registry
327 char_class_or_none = CharacteristicRegistry.get_characteristic_class_by_uuid(dep_uuid)
328 if char_class_or_none:
329 char_instance = char_class_or_none()
330 else:
331 char_instance = self._create_unknown_characteristic(dep_uuid)
333 # Cache the instance
334 self._cache_characteristic(dep_uuid, char_instance)
336 # Parse using the characteristic instance
337 return char_instance.parse_value(raw_data)
339 except Exception as e: # pylint: disable=broad-exception-caught
340 if is_required:
341 raise ValueError(
342 f"Failed to read required dependency {dep_class.__name__} ({dep_uuid_str}): {e}"
343 ) from e
344 # Optional dependency failed, log and continue
345 logging.warning("Failed to read optional dependency %s: %s", dep_class.__name__, e)
346 return None
348 async def _ensure_dependencies_resolved(
349 self,
350 char_class: type[BaseCharacteristic[Any]],
351 resolution_mode: DependencyResolutionMode,
352 ) -> CharacteristicContext:
353 """Ensure all dependencies for a characteristic are resolved.
355 This method automatically reads feature characteristics needed for validation
356 of measurement characteristics. Feature characteristics are cached after first read.
358 Args:
359 char_class: The characteristic class to resolve dependencies for
360 resolution_mode: How to handle dependency resolution
362 Returns:
363 CharacteristicContext with resolved dependencies
365 Raises:
366 RuntimeError: If no connection manager is attached
368 """
369 if not self.connection_manager:
370 raise RuntimeError("No connection manager attached to Device")
372 # Get dependency declarations from characteristic class
373 optional_deps = getattr(char_class, "_optional_dependencies", [])
374 required_deps = getattr(char_class, "_required_dependencies", [])
376 # Build context with resolved dependencies
377 context_chars: dict[str, Any] = {}
379 for dep_class in required_deps + optional_deps:
380 is_required = dep_class in required_deps
382 # Get UUID for dependency characteristic
383 dep_uuid = dep_class.get_class_uuid()
384 if not dep_uuid:
385 if is_required:
386 raise ValueError(f"Required dependency {dep_class.__name__} has no UUID")
387 continue
389 dep_uuid_str = str(dep_uuid)
391 # Check resolution mode
392 if resolution_mode == DependencyResolutionMode.SKIP_DEPENDENCIES:
393 continue # Skip all dependency resolution
395 # Check cache (unless force refresh)
396 if resolution_mode != DependencyResolutionMode.FORCE_REFRESH:
397 cached_char = self._get_cached_characteristic(dep_uuid)
398 if cached_char is not None and cached_char.last_parsed is not None:
399 # Use the last_parsed data from the cached characteristic
400 context_chars[dep_uuid_str] = cached_char.last_parsed
401 continue
403 # Read and parse dependency from device
404 parsed_data = await self._resolve_single_dependency(dep_uuid, is_required, dep_class)
405 if parsed_data is not None:
406 context_chars[dep_uuid_str] = parsed_data
408 # Create context with device info and resolved dependencies
409 device_info = DeviceInfo(
410 address=self.address,
411 name=self.name,
412 manufacturer_data=self.advertiser_data.ad_structures.core.manufacturer_data,
413 service_uuids=self.advertiser_data.ad_structures.core.service_uuids,
414 )
416 return CharacteristicContext(
417 device_info=device_info,
418 other_characteristics=context_chars,
419 )
421 @overload
422 async def read(
423 self,
424 char: type[BaseCharacteristic[T]],
425 resolution_mode: DependencyResolutionMode = ...,
426 ) -> T | None: ...
428 @overload
429 async def read(
430 self,
431 char: str | CharacteristicName,
432 resolution_mode: DependencyResolutionMode = ...,
433 ) -> Any | None: ... # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe
435 async def read(
436 self,
437 char: str | CharacteristicName | type[BaseCharacteristic[T]],
438 resolution_mode: DependencyResolutionMode = DependencyResolutionMode.NORMAL,
439 ) -> T | Any | None: # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe
440 """Read a characteristic value from the device.
442 Args:
443 char: Name, enum, or characteristic class to read.
444 Passing the class enables type-safe return values.
445 resolution_mode: How to handle automatic dependency resolution:
446 - NORMAL: Auto-resolve dependencies, use cache when available (default)
447 - SKIP_DEPENDENCIES: Skip dependency resolution and validation
448 - FORCE_REFRESH: Re-read dependencies from device, ignoring cache
450 Returns:
451 Parsed characteristic value or None if read fails.
452 Return type is inferred from characteristic class when provided.
454 Raises:
455 RuntimeError: If no connection manager is attached
456 ValueError: If required dependencies cannot be resolved
458 Example::
460 # Type-safe: pass characteristic class, return type is inferred
461 from bluetooth_sig.gatt.characteristics import BatteryLevelCharacteristic
463 level: int | None = await device.read(BatteryLevelCharacteristic)
465 # Not type-safe: pass string/enum, returns Any
466 level = await device.read(CharacteristicName.BATTERY_LEVEL)
467 level = await device.read("2A19")
469 """
470 if not self.connection_manager:
471 raise RuntimeError("No connection manager attached to Device")
473 # Handle characteristic class input (type-safe path)
474 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
475 char_class: type[BaseCharacteristic[Any]] = char
476 char_instance = char_class()
477 resolved_uuid = char_instance.uuid
479 ctx: CharacteristicContext | None = None
480 if resolution_mode != DependencyResolutionMode.SKIP_DEPENDENCIES:
481 ctx = await self._ensure_dependencies_resolved(char_class, resolution_mode)
483 raw = await self.connection_manager.read_gatt_char(resolved_uuid)
484 return char_instance.parse_value(raw, ctx=ctx)
486 # Handle string/enum input (not type-safe path)
487 resolved_uuid = self._resolve_characteristic_name(char)
489 char_class_lookup = CharacteristicRegistry.get_characteristic_class_by_uuid(resolved_uuid)
491 # Resolve dependencies if characteristic class is known
492 ctx = None
493 if char_class_lookup and resolution_mode != DependencyResolutionMode.SKIP_DEPENDENCIES:
494 ctx = await self._ensure_dependencies_resolved(char_class_lookup, resolution_mode)
496 # Read the characteristic
497 raw = await self.connection_manager.read_gatt_char(resolved_uuid)
498 parsed = self.translator.parse_characteristic(str(resolved_uuid), raw, ctx=ctx)
500 return parsed
502 @overload
503 async def write(
504 self,
505 char: type[BaseCharacteristic[T]],
506 data: T,
507 response: bool = ...,
508 ) -> None: ...
510 @overload
511 async def write(
512 self,
513 char: str | CharacteristicName,
514 data: bytes,
515 response: bool = ...,
516 ) -> None: ...
518 async def write(
519 self,
520 char: str | CharacteristicName | type[BaseCharacteristic[T]],
521 data: bytes | T,
522 response: bool = True,
523 ) -> None:
524 r"""Write data to a characteristic on the device.
526 Args:
527 char: Name, enum, or characteristic class to write to.
528 Passing the class enables type-safe value encoding.
529 data: Raw bytes (for string/enum) or typed value (for characteristic class).
530 When using characteristic class, the value is encoded using build_value().
531 response: If True, use write-with-response (wait for acknowledgment).
532 If False, use write-without-response (faster but no confirmation).
533 Default is True for reliability.
535 Raises:
536 RuntimeError: If no connection manager is attached
537 CharacteristicEncodeError: If encoding fails (when using characteristic class)
539 Example::
541 # Type-safe: pass characteristic class and typed value
542 from bluetooth_sig.gatt.characteristics import AlertLevelCharacteristic
544 await device.write(AlertLevelCharacteristic, AlertLevel.HIGH)
546 # Not type-safe: pass raw bytes
547 await device.write("2A06", b"\x02")
548 await device.write(CharacteristicName.ALERT_LEVEL, b"\x02")
550 """
551 if not self.connection_manager:
552 raise RuntimeError("No connection manager attached to Device")
554 # Handle characteristic class input (type-safe path)
555 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
556 char_instance = char()
557 resolved_uuid = char_instance.uuid
558 # data is typed value T, encode it
559 encoded = char_instance.build_value(data) # type: ignore[arg-type]
560 await self.connection_manager.write_gatt_char(resolved_uuid, bytes(encoded), response=response)
561 return
563 # Handle string/enum input (not type-safe path)
564 # data must be bytes in this path
565 if not isinstance(data, (bytes, bytearray)):
566 raise TypeError(f"When using string/enum char_name, data must be bytes, got {type(data).__name__}")
568 resolved_uuid = self._resolve_characteristic_name(char)
569 # cast is safe: isinstance check above ensures data is bytes/bytearray
570 await self.connection_manager.write_gatt_char(resolved_uuid, cast(bytes, data), response=response)
572 @overload
573 async def start_notify(
574 self,
575 char: type[BaseCharacteristic[T]],
576 callback: Callable[[T], None],
577 ) -> None: ...
579 @overload
580 async def start_notify(
581 self,
582 char: str | CharacteristicName,
583 callback: Callable[[Any], None],
584 ) -> None: ...
586 async def start_notify(
587 self,
588 char: str | CharacteristicName | type[BaseCharacteristic[T]],
589 callback: Callable[[T], None] | Callable[[Any], None],
590 ) -> None:
591 """Start notifications for a characteristic.
593 Args:
594 char: Name, enum, or characteristic class to monitor.
595 Passing the class enables type-safe callbacks.
596 callback: Function to call when notifications are received.
597 Callback parameter type is inferred from characteristic class.
599 Raises:
600 RuntimeError: If no connection manager is attached
602 Example::
604 # Type-safe: callback receives typed value
605 from bluetooth_sig.gatt.characteristics import HeartRateMeasurementCharacteristic
608 def on_heart_rate(value: HeartRateMeasurementData) -> None:
609 print(f"Heart rate: {value.heart_rate}")
612 await device.start_notify(HeartRateMeasurementCharacteristic, on_heart_rate)
614 # Not type-safe: callback receives Any
615 await device.start_notify("2A37", lambda v: print(v))
617 """
618 if not self.connection_manager:
619 raise RuntimeError("No connection manager attached to Device")
621 # Handle characteristic class input (type-safe path)
622 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
623 char_instance = char()
624 resolved_uuid = char_instance.uuid
626 def _typed_cb(sender: str, data: bytes) -> None:
627 parsed = char_instance.parse_value(data)
628 try:
629 callback(parsed)
630 except Exception as exc: # pylint: disable=broad-exception-caught
631 logging.exception("Notification callback raised an exception: %s", exc)
633 await self.connection_manager.start_notify(resolved_uuid, _typed_cb)
634 return
636 # Handle string/enum input (not type-safe path)
637 resolved_uuid = self._resolve_characteristic_name(char)
639 def _internal_cb(sender: str, data: bytes) -> None:
640 parsed = self.translator.parse_characteristic(sender, data)
641 try:
642 callback(parsed)
643 except Exception as exc: # pylint: disable=broad-exception-caught
644 logging.exception("Notification callback raised an exception: %s", exc)
646 await self.connection_manager.start_notify(resolved_uuid, _internal_cb)
648 def _resolve_characteristic_name(self, identifier: str | CharacteristicName) -> BluetoothUUID:
649 """Resolve a characteristic name or enum to its UUID.
651 Args:
652 identifier: Characteristic name string or enum
654 Returns:
655 Characteristic UUID string
657 Raises:
658 ValueError: If the characteristic name cannot be resolved
660 """
661 if isinstance(identifier, CharacteristicName):
662 # For enum inputs, ask the translator for the UUID
663 uuid = self.translator.get_characteristic_uuid_by_name(identifier)
664 if uuid:
665 return uuid
666 norm = identifier.value.strip()
667 else:
668 norm = identifier
669 stripped = norm.replace("-", "")
670 if len(stripped) in (4, 8, 32) and all(c in "0123456789abcdefABCDEF" for c in stripped):
671 return BluetoothUUID(norm)
673 raise ValueError(f"Unknown characteristic name: '{identifier}'")
675 async def stop_notify(self, char_name: str | CharacteristicName) -> None:
676 """Stop notifications for a characteristic.
678 Args:
679 char_name: Characteristic name or UUID
681 """
682 if not self.connection_manager:
683 raise RuntimeError("No connection manager attached")
685 resolved_uuid = self._resolve_characteristic_name(char_name)
686 await self.connection_manager.stop_notify(resolved_uuid)
688 async def read_descriptor(self, desc_uuid: BluetoothUUID | BaseDescriptor) -> DescriptorData:
689 """Read a descriptor value from the device.
691 Args:
692 desc_uuid: UUID of the descriptor to read or BaseDescriptor instance
694 Returns:
695 Parsed descriptor data with metadata
697 Raises:
698 RuntimeError: If no connection manager is attached
700 """
701 if not self.connection_manager:
702 raise RuntimeError("No connection manager attached to Device")
704 # Extract UUID from BaseDescriptor if needed
705 if isinstance(desc_uuid, BaseDescriptor):
706 uuid = desc_uuid.uuid
707 else:
708 uuid = desc_uuid
710 raw_data = await self.connection_manager.read_gatt_descriptor(uuid)
712 # Try to create a descriptor instance and parse the data
713 descriptor = DescriptorRegistry.create_descriptor(str(uuid))
714 if descriptor:
715 return descriptor.parse_value(raw_data)
717 # If no registered descriptor found, return unparsed DescriptorData
718 return DescriptorData(
719 info=DescriptorInfo(uuid=uuid, name="Unknown Descriptor"),
720 value=raw_data,
721 raw_data=raw_data,
722 parse_success=False,
723 error_message="Unknown descriptor UUID - no parser available",
724 )
726 async def write_descriptor(self, desc_uuid: BluetoothUUID | BaseDescriptor, data: bytes | DescriptorData) -> None:
727 """Write data to a descriptor on the device.
729 Args:
730 desc_uuid: UUID of the descriptor to write to or BaseDescriptor instance
731 data: Either raw bytes to write, or a DescriptorData object.
732 If DescriptorData is provided, its raw_data will be written.
734 Raises:
735 RuntimeError: If no connection manager is attached
737 """
738 if not self.connection_manager:
739 raise RuntimeError("No connection manager attached to Device")
741 # Extract UUID from BaseDescriptor if needed
742 if isinstance(desc_uuid, BaseDescriptor):
743 uuid = desc_uuid.uuid
744 else:
745 uuid = desc_uuid
747 # Extract raw bytes from DescriptorData if needed
748 raw_data: bytes
749 if isinstance(data, DescriptorData):
750 raw_data = data.raw_data
751 else:
752 raw_data = data
754 await self.connection_manager.write_gatt_descriptor(uuid, raw_data)
756 async def pair(self) -> None:
757 """Pair with the device.
759 Raises an exception if pairing fails.
761 Raises:
762 RuntimeError: If no connection manager is attached
764 """
765 if not self.connection_manager:
766 raise RuntimeError("No connection manager attached to Device")
768 await self.connection_manager.pair()
770 async def unpair(self) -> None:
771 """Unpair from the device.
773 Raises an exception if unpairing fails.
775 Raises:
776 RuntimeError: If no connection manager is attached
778 """
779 if not self.connection_manager:
780 raise RuntimeError("No connection manager attached to Device")
782 await self.connection_manager.unpair()
784 async def read_rssi(self) -> int:
785 """Read the RSSI (signal strength) of the connection.
787 Returns:
788 RSSI value in dBm (typically negative, e.g., -60)
790 Raises:
791 RuntimeError: If no connection manager is attached
793 """
794 if not self.connection_manager:
795 raise RuntimeError("No connection manager attached to Device")
797 return await self.connection_manager.read_rssi()
799 def set_disconnected_callback(self, callback: Callable[[], None]) -> None:
800 """Set a callback to be invoked when the device disconnects.
802 Args:
803 callback: Function to call when disconnection occurs
805 Raises:
806 RuntimeError: If no connection manager is attached
808 """
809 if not self.connection_manager:
810 raise RuntimeError("No connection manager attached to Device")
812 self.connection_manager.set_disconnected_callback(callback)
814 @property
815 def mtu_size(self) -> int:
816 """Get the negotiated MTU size in bytes.
818 Returns:
819 The MTU size negotiated for this connection (typically 23-512 bytes)
821 Raises:
822 RuntimeError: If no connection manager is attached
824 """
825 if not self.connection_manager:
826 raise RuntimeError("No connection manager attached to Device")
828 return self.connection_manager.mtu_size
830 def set_advertising_bindkey(self, bindkey: bytes | None) -> None:
831 """Set encryption key for encrypted advertising data.
833 Args:
834 bindkey: Encryption key bytes, or None to clear
836 """
837 self._advertising_bindkey = bindkey
838 # Update any existing interpreters
839 for interpreter in self._advertising_interpreters.values():
840 interpreter.bindkey = bindkey
842 async def refresh_advertisement(self, refresh: bool = False) -> AdvertisementData | None:
843 """Get advertisement data from the connection manager.
845 Args:
846 refresh: If True, perform an active scan to get fresh advertisement
847 data from the device. If False, return the last cached value.
849 Returns:
850 Interpreted AdvertisementData if available, None if no advertisement
851 has been received by the connection manager yet.
853 Raises:
854 RuntimeError: If no connection manager is attached
856 Example::
858 device.attach_connection_manager(manager)
860 # Get cached advertisement (fast, no BLE activity)
861 ad = await device.refresh_advertisement()
863 # Force fresh scan (slower, active BLE scan)
864 ad = await device.refresh_advertisement(refresh=True)
866 if ad and ad.interpreted_data:
867 print(f"Sensor: {ad.interpreted_data}")
869 """
870 if not self.connection_manager:
871 raise RuntimeError("No connection manager attached to Device")
873 advertisement = await self.connection_manager.get_latest_advertisement(refresh=refresh)
874 if advertisement is None:
875 return None
877 # Process through the interpretation pipeline
878 self._process_advertisement(advertisement)
879 return self._last_interpreted_advertisement
881 def _process_advertisement(self, advertisement: AdvertisementData) -> None:
882 """Process advertisement data and store results.
884 Internal method that stores advertisement data and routes to
885 vendor interpreters for typed sensor data extraction.
887 Args:
888 advertisement: AdvertisementData from connection manager
890 """
891 # Store the advertisement data
892 self.advertiser_data = AdvertisingData(
893 raw_data=b"", # Raw data not available from converted advertisements
894 ad_structures=advertisement.ad_structures,
895 rssi=advertisement.rssi,
896 )
898 # Update device name if not set
899 if advertisement.ad_structures.core.local_name and not self.name:
900 self.name = advertisement.ad_structures.core.local_name
902 # Apply vendor interpretation and store result
903 interpreted = self._interpret_advertisement(advertisement)
904 # Store the interpreted result for later access
905 self._last_interpreted_advertisement = interpreted
907 def _interpret_advertisement(self, advertisement: AdvertisementData) -> AdvertisementData:
908 """Apply vendor interpretation to advertisement data.
910 Internal method that routes advertisement data to registered vendor
911 interpreters for typed sensor data extraction.
913 Args:
914 advertisement: AdvertisementData with ad_structures populated
916 Returns:
917 AdvertisementData with interpreted_data populated if a matching
918 vendor interpreter was found
920 """
921 # Route to vendor interpreter
922 interpreted_data: Any = None
923 interpreter_name: str | None = None
925 interpreter_class = advertising_interpreter_registry.find_interpreter_class(
926 advertisement.ad_structures.core.manufacturer_data,
927 advertisement.ad_structures.core.service_data,
928 advertisement.ad_structures.core.local_name or None,
929 )
931 if interpreter_class is not None:
932 # Get or create stateful interpreter for this device + interpreter type
933 class_name = interpreter_class.__name__
934 if class_name not in self._advertising_interpreters:
935 self._advertising_interpreters[class_name] = interpreter_class(
936 self.address,
937 bindkey=self._advertising_bindkey,
938 )
940 interpreter = self._advertising_interpreters[class_name]
941 interpreted_data = interpreter.interpret(
942 advertisement.ad_structures.core.manufacturer_data,
943 advertisement.ad_structures.core.service_data,
944 advertisement.ad_structures.core.local_name or None,
945 advertisement.rssi or 0,
946 )
947 interpreter_name = interpreter_class._info.name # pylint: disable=protected-access
949 return AdvertisementData(
950 ad_structures=advertisement.ad_structures,
951 interpreted_data=interpreted_data,
952 interpreter_name=interpreter_name,
953 rssi=advertisement.rssi,
954 )
956 def parse_raw_advertisement(self, raw_data: bytes, rssi: int = 0) -> AdvertisementData:
957 """Parse raw advertising PDU bytes directly.
959 Use this method when you have raw BLE advertising PDU bytes (e.g., from
960 a custom BLE stack or packet capture). For framework-integrated scanning,
961 use the connection manager's convert_advertisement() followed by
962 update_advertisement() instead.
964 Args:
965 raw_data: Raw BLE advertising PDU bytes
966 rssi: Received signal strength in dBm
968 Returns:
969 AdvertisementData with parsed AD structures and vendor interpretation
971 Example:
972 # Parse raw PDU bytes directly
973 result = device.parse_raw_advertisement(pdu_bytes, rssi=-65)
974 print(result.manufacturer_data)
976 """
977 # Parse raw PDU bytes
978 pdu_result = self._pdu_parser.parse_advertising_data(raw_data)
980 # Store the parsed AdvertisingData (with raw_data) directly
981 self.advertiser_data = AdvertisingData(
982 raw_data=raw_data,
983 ad_structures=pdu_result.ad_structures,
984 rssi=rssi,
985 )
987 # Update device name if present and not already set
988 if pdu_result.ad_structures.core.local_name and not self.name:
989 self.name = pdu_result.ad_structures.core.local_name
991 # Create AdvertisementData for interpretation
992 advertisement = AdvertisementData(
993 ad_structures=pdu_result.ad_structures,
994 rssi=rssi,
995 )
997 # Route to vendor interpreter and return result
998 return self._interpret_advertisement(advertisement)
1000 def get_characteristic_data(self, char_uuid: BluetoothUUID) -> Any | None: # noqa: ANN401 # Heterogeneous cache
1001 """Get parsed characteristic data - single source of truth via characteristic.last_parsed.
1003 Searches across all services to find the characteristic by UUID.
1005 Args:
1006 char_uuid: UUID of the characteristic
1008 Returns:
1009 Parsed characteristic value if found, None otherwise.
1011 Example::
1013 # Search for characteristic across all services
1014 battery_data = device.get_characteristic_data(BluetoothUUID("2A19"))
1015 if battery_data is not None:
1016 print(f"Battery: {battery_data}%")
1018 """
1019 char_instance = self._get_cached_characteristic(char_uuid)
1020 if char_instance is not None:
1021 return char_instance.last_parsed
1022 return None
1024 async def discover_services(self) -> dict[str, Any]:
1025 """Discover services and characteristics from the connected BLE device.
1027 This method performs BLE service discovery using the attached connection
1028 manager, retrieving the device's service structure with characteristics
1029 and their runtime properties (READ, WRITE, NOTIFY, etc.).
1031 The discovered services are stored in `self.services` as DeviceService
1032 objects with properly instantiated characteristic classes from the registry.
1034 This implements the standard BLE workflow:
1035 1. await device.connect()
1036 2. await device.discover_services() # This method
1037 3. value = await device.read("battery_level")
1039 Note:
1040 - This method discovers the SERVICE STRUCTURE (what services/characteristics
1041 exist and their properties), but does NOT read characteristic VALUES.
1042 - Use `read()` to retrieve actual characteristic values after discovery.
1043 - Services are cached in `self.services` keyed by service UUID string.
1045 Returns:
1046 Dictionary mapping service UUIDs to DeviceService objects
1048 Raises:
1049 RuntimeError: If no connection manager is attached
1051 Example::
1053 device = Device(address, translator)
1054 device.attach_connection_manager(manager)
1056 await device.connect()
1057 services = await device.discover_services() # Discover structure
1059 # Now services are available
1060 for service_uuid, device_service in services.items():
1061 print(f"Service: {service_uuid}")
1062 for char_uuid, char_instance in device_service.characteristics.items():
1063 print(f" Characteristic: {char_uuid}")
1065 # Read characteristic values
1066 battery = await device.read("battery_level")
1068 """
1069 if not self.connection_manager:
1070 raise RuntimeError("No connection manager attached to Device")
1072 services_data = await self.connection_manager.get_services()
1074 # Store discovered services in our internal structure
1075 for service_info in services_data:
1076 service_uuid = str(service_info.service.uuid)
1077 if service_uuid not in self.services:
1078 # Store the service directly from connection manager
1079 self.services[service_uuid] = service_info
1081 return dict(self.services)
1083 async def get_characteristic_info(self, char_uuid: str) -> Any | None: # noqa: ANN401 # Adapter-specific characteristic metadata
1084 """Get information about a characteristic from the connection manager.
1086 Args:
1087 char_uuid: UUID of the characteristic
1089 Returns:
1090 Characteristic information or None if not found
1092 Raises:
1093 RuntimeError: If no connection manager is attached
1095 """
1096 if not self.connection_manager:
1097 raise RuntimeError("No connection manager attached to Device")
1099 services_data = await self.connection_manager.get_services()
1100 for service_info in services_data:
1101 for char_uuid_key, char_info in service_info.characteristics.items():
1102 if char_uuid_key == char_uuid:
1103 return char_info
1104 return None
1106 async def read_multiple(self, char_names: list[str | CharacteristicName]) -> dict[str, Any | None]:
1107 """Read multiple characteristics in batch.
1109 Args:
1110 char_names: List of characteristic names or enums to read
1112 Returns:
1113 Dictionary mapping characteristic UUIDs to parsed values
1115 Raises:
1116 RuntimeError: If no connection manager is attached
1118 """
1119 if not self.connection_manager:
1120 raise RuntimeError("No connection manager attached to Device")
1122 results: dict[str, Any | None] = {}
1123 for char_name in char_names:
1124 try:
1125 value = await self.read(char_name)
1126 resolved_uuid = self._resolve_characteristic_name(char_name)
1127 results[str(resolved_uuid)] = value
1128 except Exception as exc: # pylint: disable=broad-exception-caught
1129 resolved_uuid = self._resolve_characteristic_name(char_name)
1130 results[str(resolved_uuid)] = None
1131 logging.warning("Failed to read characteristic %s: %s", char_name, exc)
1133 return results
1135 async def write_multiple(
1136 self, data_map: dict[str | CharacteristicName, bytes], response: bool = True
1137 ) -> dict[str, bool]:
1138 """Write to multiple characteristics in batch.
1140 Args:
1141 data_map: Dictionary mapping characteristic names/enums to data bytes
1142 response: If True, use write-with-response for all writes.
1143 If False, use write-without-response for all writes.
1145 Returns:
1146 Dictionary mapping characteristic UUIDs to success status
1148 Raises:
1149 RuntimeError: If no connection manager is attached
1151 """
1152 if not self.connection_manager:
1153 raise RuntimeError("No connection manager attached to Device")
1155 results: dict[str, bool] = {}
1156 for char_name, data in data_map.items():
1157 try:
1158 await self.write(char_name, data, response=response)
1159 resolved_uuid = self._resolve_characteristic_name(char_name)
1160 results[str(resolved_uuid)] = True
1161 except Exception as exc: # pylint: disable=broad-exception-caught
1162 resolved_uuid = self._resolve_characteristic_name(char_name)
1163 results[str(resolved_uuid)] = False
1164 logging.warning("Failed to write characteristic %s: %s", char_name, exc)
1166 return results
1168 @property
1169 def device_info(self) -> DeviceInfo:
1170 """Get cached device info object.
1172 Returns:
1173 DeviceInfo with current device metadata
1175 """
1176 if self._device_info_cache is None:
1177 self._device_info_cache = DeviceInfo(
1178 address=self.address,
1179 name=self.name,
1180 manufacturer_data=self.advertiser_data.ad_structures.core.manufacturer_data,
1181 service_uuids=self.advertiser_data.ad_structures.core.service_uuids,
1182 )
1183 else:
1184 # Update existing cache object with current data
1185 self._device_info_cache.name = self.name
1186 self._device_info_cache.manufacturer_data = self.advertiser_data.ad_structures.core.manufacturer_data
1187 self._device_info_cache.service_uuids = self.advertiser_data.ad_structures.core.service_uuids
1188 return self._device_info_cache
1190 @property
1191 def name(self) -> str:
1192 """Get the device name."""
1193 return self._name
1195 @name.setter
1196 def name(self, value: str) -> None:
1197 """Set the device name and update cached device_info."""
1198 self._name = value
1199 # Update existing cache object if it exists
1200 if self._device_info_cache is not None:
1201 self._device_info_cache.name = value
1203 @property
1204 def is_connected(self) -> bool:
1205 """Check if the device is currently connected.
1207 Returns:
1208 True if connected, False otherwise
1210 """
1211 return self.connection_manager.is_connected
1213 @property
1214 def interpreted_advertisement(self) -> AdvertisementData | None:
1215 """Get the last interpreted advertisement data.
1217 This is automatically populated when a connection manager pushes
1218 advertisement data to the device. The data includes vendor-specific
1219 interpretations if registered interpreters match.
1221 Returns:
1222 AdvertisementData with interpreted fields, or None if no
1223 advertisement has been received yet.
1225 """
1226 return self._last_interpreted_advertisement
1228 def get_service_by_uuid(self, service_uuid: str) -> DeviceService | None:
1229 """Get a service by its UUID.
1231 Args:
1232 service_uuid: UUID of the service
1234 Returns:
1235 DeviceService instance or None if not found
1237 """
1238 return self.services.get(service_uuid)
1240 def get_services_by_name(self, service_name: str | ServiceName) -> list[DeviceService]:
1241 """Get services by name.
1243 Args:
1244 service_name: Name or enum of the service
1246 Returns:
1247 List of matching DeviceService instances
1249 """
1250 service_uuid = self.translator.get_service_uuid_by_name(
1251 service_name if isinstance(service_name, str) else service_name.value
1252 )
1253 if service_uuid and str(service_uuid) in self.services:
1254 return [self.services[str(service_uuid)]]
1255 return []
1257 def list_characteristics(self, service_uuid: str | None = None) -> dict[str, list[str]]:
1258 """List all characteristics, optionally filtered by service.
1260 Args:
1261 service_uuid: Optional service UUID to filter by
1263 Returns:
1264 Dictionary mapping service UUIDs to lists of characteristic UUIDs
1266 """
1267 if service_uuid:
1268 service = self.services.get(service_uuid)
1269 if service:
1270 return {service_uuid: list(service.characteristics.keys())}
1271 return {}
1273 return {svc_uuid: list(service.characteristics.keys()) for svc_uuid, service in self.services.items()}