Coverage for src/bluetooth_sig/device/device.py: 52%
240 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""Device class for grouping BLE device services, characteristics, encryption, and advertiser data.
3This module provides a high-level Device abstraction that groups all
4services, characteristics, encryption requirements, and advertiser data
5for a BLE device. It integrates with the BluetoothSIGTranslator for
6parsing while providing a unified view of device state.
7"""
9from __future__ import annotations
11import logging
12import re
13from abc import abstractmethod
14from typing import Any, Callable, Protocol, cast
16from ..gatt.characteristics import CharacteristicName
17from ..gatt.context import CharacteristicContext, DeviceInfo
18from ..gatt.descriptors.registry import DescriptorRegistry
19from ..gatt.services import GattServiceRegistry, ServiceName
20from ..gatt.services.base import BaseGattService, UnknownService
21from ..types import (
22 CharacteristicDataProtocol,
23 DeviceAdvertiserData,
24)
25from ..types.data_types import CharacteristicData
26from ..types.device_types import DeviceEncryption, DeviceService
27from ..types.gatt_enums import GattProperty
28from ..types.uuid import BluetoothUUID
29from .advertising_parser import AdvertisingParser
30from .connection import ConnectionManagerProtocol
32__all__ = [
33 "Device",
34 "SIGTranslatorProtocol",
35 "UnknownService",
36]
39class SIGTranslatorProtocol(Protocol): # pylint: disable=too-few-public-methods
40 """Protocol for SIG translator interface."""
42 @abstractmethod
43 def parse_characteristics(
44 self,
45 char_data: dict[str, bytes],
46 descriptor_data: dict[str, dict[str, bytes]] | None = None,
47 ctx: CharacteristicContext | None = None,
48 ) -> dict[str, CharacteristicData]:
49 """Parse multiple characteristics at once."""
51 @abstractmethod
52 def parse_characteristic(
53 self,
54 uuid: str,
55 raw_data: bytes,
56 ctx: CharacteristicContext | None = None,
57 descriptor_data: dict[str, bytes] | None = None,
58 ) -> CharacteristicData:
59 """Parse a single characteristic's raw bytes."""
61 @abstractmethod
62 def get_characteristic_uuid_by_name(self, name: CharacteristicName) -> BluetoothUUID | None:
63 """Get the UUID for a characteristic name enum (enum-only API)."""
65 @abstractmethod
66 def get_service_uuid_by_name(self, name: str | ServiceName) -> BluetoothUUID | None:
67 """Get the UUID for a service name or enum."""
69 def get_characteristic_info_by_name(self, name: CharacteristicName) -> Any | None: # noqa: ANN401 # Adapter-specific characteristic info
70 """Get characteristic info by enum name (optional method)."""
73def _is_uuid_like(value: str) -> bool:
74 """Check if a string looks like a Bluetooth UUID."""
75 # Remove dashes and check if it's a valid hex string of UUID length
76 clean = value.replace("-", "")
77 return bool(re.match(r"^[0-9A-Fa-f]+$", clean)) and len(clean) in [4, 8, 32]
80class Device: # pylint: disable=too-many-instance-attributes,too-many-public-methods
81 r"""High-level BLE device abstraction.
83 This class groups all services, characteristics, encryption requirements, and
84 advertiser data for a BLE device. It integrates with
85 [BluetoothSIGTranslator][bluetooth_sig.BluetoothSIGTranslator]
86 for parsing while providing a unified view of device state.
88 Key features:
89 - Parse advertiser data from BLE scan results
90 - Add and manage GATT services with their characteristics
91 - Access parsed characteristic data by UUID
92 - Handle device encryption requirements
93 - Cache device information for performance
95 Example:
96 Create and configure a device:
98 ```python
99 from bluetooth_sig import BluetoothSIGTranslator, Device
101 translator = BluetoothSIGTranslator()
102 device = Device("AA:BB:CC:DD:EE:FF", translator)
104 # Add a service
105 device.add_service("180F", {"2A19": b"\\x64"}) # Battery service
107 # Get parsed data
108 battery = device.get_characteristic_data("2A19")
109 print(f"Battery: {battery.value}%")
110 ```
112 """
114 def __init__(self, address: str, translator: SIGTranslatorProtocol) -> None:
115 """Initialise Device instance with address and translator.
117 Args:
118 address: BLE device address
119 translator: SIGTranslatorProtocol instance
121 """
122 self.address = address
123 self.translator = translator
124 # Optional connection manager implementing ConnectionManagerProtocol
125 self.connection_manager: ConnectionManagerProtocol | None = None
126 self._name: str = ""
127 self.services: dict[str, DeviceService] = {}
128 self.encryption = DeviceEncryption()
129 self.advertiser_data = DeviceAdvertiserData(raw_data=b"")
131 # Advertising parser for handling advertising data
132 self.advertising_parser = AdvertisingParser()
134 # Cache for device_info property
135 self._device_info_cache: DeviceInfo | None = None
137 def __str__(self) -> str:
138 """Return string representation of Device.
140 Returns:
141 str: String representation of Device.
143 """
144 service_count = len(self.services)
145 char_count = sum(len(service.characteristics) for service in self.services.values())
146 return f"Device({self.address}, name={self.name}, {service_count} services, {char_count} characteristics)"
148 def add_service(
149 self,
150 service_name: str | ServiceName,
151 characteristics: dict[str, bytes],
152 descriptors: dict[str, dict[str, bytes]] | None = None,
153 ) -> None:
154 """Add a service to the device with its characteristics and descriptors.
156 Args:
157 service_name: Name or enum of the service to add
158 characteristics: Dictionary mapping characteristic UUIDs to raw data
159 descriptors: Optional nested dict mapping char_uuid -> desc_uuid -> raw data
161 """
162 # Resolve service UUID: accept UUID-like strings directly, else ask translator
163 # service_uuid can be a BluetoothUUID or None (translator may return None)
164 service_uuid: BluetoothUUID | None
165 if isinstance(service_name, str) and _is_uuid_like(service_name):
166 service_uuid = BluetoothUUID(service_name)
167 else:
168 service_uuid = self.translator.get_service_uuid_by_name(service_name)
170 if not service_uuid:
171 # No UUID found - this is an error condition
172 service_name_str = service_name if isinstance(service_name, str) else service_name.value
173 raise ValueError(
174 f"Cannot resolve service UUID for '{service_name_str}'. "
175 "Service name not found in registry and not a valid UUID format."
176 )
178 service_class = GattServiceRegistry.get_service_class(service_uuid)
179 service: BaseGattService
180 if not service_class:
181 service = UnknownService(uuid=service_uuid)
182 else:
183 service = service_class()
185 device_info = DeviceInfo(
186 address=self.address,
187 name=self.name,
188 manufacturer_data=self.advertiser_data.manufacturer_data,
189 service_uuids=self.advertiser_data.service_uuids,
190 )
192 base_ctx = CharacteristicContext(device_info=device_info)
194 parsed_characteristics = self.translator.parse_characteristics(characteristics, descriptors, ctx=base_ctx)
196 for char_data in parsed_characteristics.values():
197 self.update_encryption_requirements(char_data)
199 # Process descriptors if provided
200 if descriptors:
201 self._process_descriptors(descriptors, parsed_characteristics)
203 characteristics_cast = cast(dict[str, CharacteristicDataProtocol], parsed_characteristics)
204 device_service = DeviceService(service=service, characteristics=characteristics_cast)
206 service_key = service_name if isinstance(service_name, str) else service_name.value
207 self.services[service_key] = device_service
209 def _process_descriptors(
210 self, descriptors: dict[str, dict[str, bytes]], parsed_characteristics: dict[str, Any]
211 ) -> None:
212 """Process and store descriptor data for characteristics.
214 Args:
215 descriptors: Nested dict mapping char_uuid -> desc_uuid -> raw data
216 parsed_characteristics: Already parsed characteristic data
217 """
218 for char_uuid, char_descriptors in descriptors.items():
219 if char_uuid not in parsed_characteristics:
220 continue # Skip descriptors for unknown characteristics
222 char_data = parsed_characteristics[char_uuid]
223 if not hasattr(char_data, "add_descriptor"):
224 continue # Characteristic doesn't support descriptors
226 for desc_uuid, _desc_data in char_descriptors.items():
227 descriptor = DescriptorRegistry.create_descriptor(desc_uuid)
228 if descriptor:
229 try:
230 char_data.add_descriptor(descriptor)
231 except Exception: # pylint: disable=broad-exception-caught
232 # Skip malformed descriptors
233 continue
235 def attach_connection_manager(self, manager: ConnectionManagerProtocol) -> None:
236 """Attach a connection manager to handle BLE connections.
238 Args:
239 manager: Connection manager implementing the ConnectionManagerProtocol
241 """
242 self.connection_manager = manager
244 async def detach_connection_manager(self) -> None:
245 """Detach the current connection manager and disconnect if connected.
247 Disconnects if a connection manager is present, then removes it.
248 """
249 if self.connection_manager:
250 await self.disconnect()
251 self.connection_manager = None
253 async def connect(self) -> None:
254 """Connect to the BLE device.
256 Raises:
257 RuntimeError: If no connection manager is attached
259 """
260 if not self.connection_manager:
261 raise RuntimeError("No connection manager attached to Device")
262 await self.connection_manager.connect()
264 async def disconnect(self) -> None:
265 """Disconnect from the BLE device.
267 Raises:
268 RuntimeError: If no connection manager is attached
270 """
271 if not self.connection_manager:
272 raise RuntimeError("No connection manager attached to Device")
273 await self.connection_manager.disconnect()
275 async def read(self, char_name: str | CharacteristicName) -> Any | None: # noqa: ANN401 # Returns characteristic-specific types
276 """Read a characteristic value from the device.
278 Args:
279 char_name: Name or enum of the characteristic to read
281 Returns:
282 Parsed characteristic value or None if read fails
284 Raises:
285 RuntimeError: If no connection manager is attached
287 """
288 if not self.connection_manager:
289 raise RuntimeError("No connection manager attached to Device")
291 resolved_uuid = self._resolve_characteristic_name(char_name)
292 raw = await self.connection_manager.read_gatt_char(resolved_uuid)
293 parsed = self.translator.parse_characteristic(str(resolved_uuid), raw, descriptor_data=None)
294 return parsed
296 async def write(self, char_name: str | CharacteristicName, data: bytes) -> None:
297 """Write data to a characteristic on the device.
299 Args:
300 char_name: Name or enum of the characteristic to write to
301 data: Raw bytes to write
303 Raises:
304 RuntimeError: If no connection manager is attached
306 """
307 if not self.connection_manager:
308 raise RuntimeError("No connection manager attached to Device")
310 resolved_uuid = self._resolve_characteristic_name(char_name)
311 await self.connection_manager.write_gatt_char(resolved_uuid, data)
313 async def start_notify(self, char_name: str | CharacteristicName, callback: Callable[[Any], None]) -> None:
314 """Start notifications for a characteristic.
316 Args:
317 char_name: Name or enum of the characteristic to monitor
318 callback: Function to call when notifications are received
320 Raises:
321 RuntimeError: If no connection manager is attached
323 """
324 if not self.connection_manager:
325 raise RuntimeError("No connection manager attached to Device")
327 resolved_uuid = self._resolve_characteristic_name(char_name)
329 def _internal_cb(sender: str, data: bytes) -> None:
330 parsed = self.translator.parse_characteristic(sender, data, descriptor_data=None)
331 try:
332 callback(parsed)
333 except Exception as exc: # pylint: disable=broad-exception-caught
334 logging.exception("Notification callback raised an exception: %s", exc)
336 await self.connection_manager.start_notify(resolved_uuid, _internal_cb)
338 def _resolve_characteristic_name(self, identifier: str | CharacteristicName) -> BluetoothUUID:
339 """Resolve a characteristic name or enum to its UUID.
341 Args:
342 identifier: Characteristic name string or enum
344 Returns:
345 Characteristic UUID string
347 Raises:
348 ValueError: If the characteristic name cannot be resolved
350 """
351 if isinstance(identifier, CharacteristicName):
352 # For enum inputs, ask the translator for the UUID
353 uuid = self.translator.get_characteristic_uuid_by_name(identifier)
354 if uuid:
355 return uuid
356 norm = identifier.value.strip()
357 else:
358 norm = identifier
359 stripped = norm.replace("-", "")
360 if len(stripped) in (4, 8, 32) and all(c in "0123456789abcdefABCDEF" for c in stripped):
361 return BluetoothUUID(norm)
363 raise ValueError(f"Unknown characteristic name: '{identifier}'")
365 async def stop_notify(self, char_name: str | CharacteristicName) -> None:
366 """Stop notifications for a characteristic.
368 Args:
369 char_name: Characteristic name or UUID
371 """
372 if not self.connection_manager:
373 raise RuntimeError("No connection manager attached")
375 resolved_uuid = self._resolve_characteristic_name(char_name)
376 await self.connection_manager.stop_notify(resolved_uuid)
378 def parse_advertiser_data(self, raw_data: bytes) -> None:
379 """Parse raw advertising data and update device information.
381 Args:
382 raw_data: Raw bytes from BLE advertising packet
384 """
385 parsed_data = self.advertising_parser.parse_advertising_data(raw_data)
386 self.advertiser_data = parsed_data
388 # Update device name if not set
389 if parsed_data.local_name and not self.name:
390 self.name = parsed_data.local_name
392 def get_characteristic_data(
393 self, service_name: str | ServiceName, char_uuid: str
394 ) -> CharacteristicDataProtocol | None:
395 """Get parsed characteristic data for a specific service and characteristic.
397 Args:
398 service_name: Name or enum of the service
399 char_uuid: UUID of the characteristic
401 Returns:
402 Parsed characteristic data or None if not found.
404 """
405 service_key = service_name if isinstance(service_name, str) else service_name.value
406 service = self.services.get(service_key)
407 if service:
408 return service.characteristics.get(char_uuid)
409 return None
411 def update_encryption_requirements(self, char_data: CharacteristicData) -> None:
412 """Update device encryption requirements based on characteristic properties.
414 Args:
415 char_data: The parsed characteristic data with properties
417 """
418 properties = char_data.properties
420 # Check for encryption requirements
421 encrypt_props = [GattProperty.ENCRYPT_READ, GattProperty.ENCRYPT_WRITE, GattProperty.ENCRYPT_NOTIFY]
422 if any(prop in properties for prop in encrypt_props):
423 self.encryption.requires_encryption = True
425 # Check for authentication requirements
426 auth_props = [GattProperty.AUTH_READ, GattProperty.AUTH_WRITE, GattProperty.AUTH_NOTIFY]
427 if any(prop in properties for prop in auth_props):
428 self.encryption.requires_authentication = True
430 async def discover_services(self) -> dict[str, Any]:
431 """Discover all services and characteristics from the device.
433 Returns:
434 Dictionary mapping service UUIDs to service information
436 Raises:
437 RuntimeError: If no connection manager is attached
439 """
440 if not self.connection_manager:
441 raise RuntimeError("No connection manager attached to Device")
443 services_data = await self.connection_manager.get_services()
445 # Store discovered services in our internal structure
446 for service_info in services_data:
447 service_uuid = service_info.uuid
448 if service_uuid not in self.services:
449 # Create a service instance - we'll use UnknownService for undiscovered services
450 service_instance = UnknownService(uuid=BluetoothUUID(service_uuid))
451 device_service = DeviceService(service=service_instance, characteristics={})
452 self.services[service_uuid] = device_service
454 # Add characteristics to the service
455 for char_info in service_info.characteristics:
456 char_uuid = char_info.uuid
457 self.services[service_uuid].characteristics[char_uuid] = char_info
459 return dict(self.services)
461 async def get_characteristic_info(self, char_uuid: str) -> Any | None: # noqa: ANN401 # Adapter-specific characteristic metadata
462 """Get information about a characteristic from the connection manager.
464 Args:
465 char_uuid: UUID of the characteristic
467 Returns:
468 Characteristic information or None if not found
470 Raises:
471 RuntimeError: If no connection manager is attached
473 """
474 if not self.connection_manager:
475 raise RuntimeError("No connection manager attached to Device")
477 services_data = await self.connection_manager.get_services()
478 for service_info in services_data:
479 for char_info in service_info.characteristics:
480 if char_info.uuid == char_uuid:
481 return char_info
482 return None
484 async def read_multiple(self, char_names: list[str | CharacteristicName]) -> dict[str, Any | None]:
485 """Read multiple characteristics in batch.
487 Args:
488 char_names: List of characteristic names or enums to read
490 Returns:
491 Dictionary mapping characteristic UUIDs to parsed values
493 Raises:
494 RuntimeError: If no connection manager is attached
496 """
497 if not self.connection_manager:
498 raise RuntimeError("No connection manager attached to Device")
500 results: dict[str, Any | None] = {}
501 for char_name in char_names:
502 try:
503 value = await self.read(char_name)
504 resolved_uuid = self._resolve_characteristic_name(char_name)
505 results[str(resolved_uuid)] = value
506 except Exception as exc: # pylint: disable=broad-exception-caught
507 resolved_uuid = self._resolve_characteristic_name(char_name)
508 results[str(resolved_uuid)] = None
509 logging.warning("Failed to read characteristic %s: %s", char_name, exc)
511 return results
513 async def write_multiple(self, data_map: dict[str | CharacteristicName, bytes]) -> dict[str, bool]:
514 """Write to multiple characteristics in batch.
516 Args:
517 data_map: Dictionary mapping characteristic names/enums to data bytes
519 Returns:
520 Dictionary mapping characteristic UUIDs to success status
522 Raises:
523 RuntimeError: If no connection manager is attached
525 """
526 if not self.connection_manager:
527 raise RuntimeError("No connection manager attached to Device")
529 results: dict[str, bool] = {}
530 for char_name, data in data_map.items():
531 try:
532 await self.write(char_name, data)
533 resolved_uuid = self._resolve_characteristic_name(char_name)
534 results[str(resolved_uuid)] = True
535 except Exception as exc: # pylint: disable=broad-exception-caught
536 resolved_uuid = self._resolve_characteristic_name(char_name)
537 results[str(resolved_uuid)] = False
538 logging.warning("Failed to write characteristic %s: %s", char_name, exc)
540 return results
542 @property
543 def device_info(self) -> DeviceInfo:
544 """Get cached device info object.
546 Returns:
547 DeviceInfo with current device metadata
549 """
550 if self._device_info_cache is None:
551 self._device_info_cache = DeviceInfo(
552 address=self.address,
553 name=self.name,
554 manufacturer_data=self.advertiser_data.manufacturer_data,
555 service_uuids=self.advertiser_data.service_uuids,
556 )
557 else:
558 # Update existing cache object with current data
559 self._device_info_cache.name = self.name
560 self._device_info_cache.manufacturer_data = self.advertiser_data.manufacturer_data
561 self._device_info_cache.service_uuids = self.advertiser_data.service_uuids
562 return self._device_info_cache
564 @property
565 def name(self) -> str:
566 """Get the device name."""
567 return self._name
569 @name.setter
570 def name(self, value: str) -> None:
571 """Set the device name and update cached device_info."""
572 self._name = value
573 # Update existing cache object if it exists
574 if self._device_info_cache is not None:
575 self._device_info_cache.name = value
577 @property
578 def is_connected(self) -> bool:
579 """Check if the device is currently connected.
581 Returns:
582 True if connected, False otherwise
584 """
585 if self.connection_manager is None:
586 return False
587 # Check if the connection manager has an is_connected property
588 return getattr(self.connection_manager, "is_connected", False)
590 def get_service_by_uuid(self, service_uuid: str) -> DeviceService | None:
591 """Get a service by its UUID.
593 Args:
594 service_uuid: UUID of the service
596 Returns:
597 DeviceService instance or None if not found
599 """
600 return self.services.get(service_uuid)
602 def get_services_by_name(self, service_name: str | ServiceName) -> list[DeviceService]:
603 """Get services by name.
605 Args:
606 service_name: Name or enum of the service
608 Returns:
609 List of matching DeviceService instances
611 """
612 service_uuid = self.translator.get_service_uuid_by_name(
613 service_name if isinstance(service_name, str) else service_name.value
614 )
615 if service_uuid and str(service_uuid) in self.services:
616 return [self.services[str(service_uuid)]]
617 return []
619 def list_characteristics(self, service_uuid: str | None = None) -> dict[str, list[str]]:
620 """List all characteristics, optionally filtered by service.
622 Args:
623 service_uuid: Optional service UUID to filter by
625 Returns:
626 Dictionary mapping service UUIDs to lists of characteristic UUIDs
628 """
629 if service_uuid:
630 service = self.services.get(service_uuid)
631 if service:
632 return {service_uuid: list(service.characteristics.keys())}
633 return {}
635 return {svc_uuid: list(service.characteristics.keys()) for svc_uuid, service in self.services.items()}