Coverage for src / bluetooth_sig / device / connected.py: 53%
160 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Connection-related functionality for Device.
3Manages GATT connection operations for a BLE device using
4the composition pattern. This class is accessed via `device.connected`.
6Based on patterns from bleak (BLEDevice + BleakClient) and real-world
7implementations.
8"""
10from __future__ import annotations
12import logging
13from collections.abc import Callable
14from typing import Any
16import msgspec
18from bluetooth_sig.device.client import ClientManagerProtocol
19from bluetooth_sig.gatt.characteristics.base import BaseCharacteristic
20from bluetooth_sig.gatt.characteristics.registry import CharacteristicRegistry
21from bluetooth_sig.gatt.services.base import BaseGattService
22from bluetooth_sig.gatt.services.registry import GattServiceRegistry
23from bluetooth_sig.types.uuid import BluetoothUUID
25logger = logging.getLogger(__name__)
28class DeviceEncryption(msgspec.Struct, kw_only=True, frozen=False):
29 """Encryption state for connected device.
31 Attributes:
32 paired: Whether the device is paired.
33 bonded: Whether the device is bonded (persistent pairing).
34 encrypted: Whether the current connection is encrypted.
36 """
38 paired: bool = False
39 bonded: bool = False
40 encrypted: bool = False
43class DeviceService(msgspec.Struct, kw_only=True, frozen=False):
44 """Wrapper for a discovered GATT service.
46 Attributes:
47 uuid: Service UUID.
48 service_class: The GATT service class, or None if unknown.
49 characteristics: Discovered characteristics by UUID string.
51 """
53 uuid: BluetoothUUID
54 service_class: type[BaseGattService] | None = None
55 characteristics: dict[str, BaseCharacteristic[Any]] = msgspec.field(default_factory=dict)
58class DeviceConnected:
59 """Manages GATT connection operations for a device.
61 Accessed via `device.connected`.
63 Attributes:
64 services: Discovered GATT services by UUID string.
65 encryption: Current encryption state.
67 Example::
68 device = Device(mac_address="AA:BB:CC:DD:EE:FF", translator=translator)
70 # Connect and discover services
71 await device.connected.connect()
72 services = await device.connected.discover_services()
74 # Read a characteristic
75 battery = await device.connected.read(BluetoothUUID("00002a19-0000-1000-8000-00805f9b34fb"))
78 # Subscribe to notifications
79 async def on_heart_rate(value):
80 print(f"Heart rate: {value}")
83 await device.connected.subscribe(
84 BluetoothUUID("00002a37-0000-1000-8000-00805f9b34fb"),
85 on_heart_rate,
86 )
88 await device.connected.disconnect()
90 """
92 def __init__(
93 self,
94 mac_address: str,
95 connection_manager: ClientManagerProtocol | None = None,
96 ) -> None:
97 """Initialise connection subsystem.
99 Args:
100 mac_address: Device MAC address.
101 connection_manager: Optional connection manager (can be set later).
103 """
104 self._mac_address = mac_address
105 self._connection_manager = connection_manager
106 self.services: dict[str, DeviceService] = {}
107 self.encryption = DeviceEncryption()
108 self._is_connected = False
109 self._subscriptions: dict[str, list[Callable[[Any], None]]] = {}
111 @property
112 def mac_address(self) -> str:
113 """Device MAC address."""
114 return self._mac_address
116 @property
117 def connection_manager(self) -> ClientManagerProtocol | None:
118 """Current connection manager."""
119 return self._connection_manager
121 @connection_manager.setter
122 def connection_manager(self, value: ClientManagerProtocol | None) -> None:
123 """Set connection manager."""
124 self._connection_manager = value
126 @property
127 def is_connected(self) -> bool:
128 """Whether currently connected."""
129 return self._is_connected
131 async def connect(self, *, timeout: float = 10.0) -> None:
132 """Establish GATT connection.
134 Args:
135 timeout: Connection timeout in seconds.
137 Raises:
138 RuntimeError: If no connection manager is set.
140 """
141 if self._connection_manager is None:
142 raise RuntimeError("No connection manager set")
144 await self._connection_manager.connect(timeout=timeout)
145 self._is_connected = True
147 async def disconnect(self) -> None:
148 """Disconnect from device.
150 Raises:
151 RuntimeError: If no connection manager is set.
153 """
154 if self._connection_manager is None:
155 raise RuntimeError("No connection manager set")
157 await self._connection_manager.disconnect()
158 self._is_connected = False
159 self._subscriptions.clear()
161 async def discover_services(self) -> list[DeviceService]:
162 """Discover and cache GATT services.
164 Returns:
165 List of discovered services.
167 Raises:
168 RuntimeError: If no connection manager is set.
170 """
171 if self._connection_manager is None:
172 raise RuntimeError("No connection manager set")
174 # Get raw services from connection manager
175 raw_services = await self._connection_manager.get_services()
177 self.services.clear()
178 for raw_svc in raw_services:
179 uuid_str = str(raw_svc.service.uuid)
180 service_class = GattServiceRegistry.get_service_class_by_uuid(raw_svc.service.uuid)
182 device_service = DeviceService(
183 uuid=raw_svc.service.uuid,
184 service_class=service_class,
185 )
187 # Discover characteristics for this service
188 for char_uuid_str, char_instance in raw_svc.characteristics.items():
189 device_service.characteristics[char_uuid_str] = char_instance
191 self.services[uuid_str] = device_service
193 return list(self.services.values())
195 async def read(self, characteristic_uuid: BluetoothUUID | str) -> Any: # noqa: ANN401
196 """Read a characteristic value.
198 Args:
199 characteristic_uuid: UUID of the characteristic to read.
201 Returns:
202 Parsed characteristic value.
204 Raises:
205 RuntimeError: If no connection manager is set.
206 ValueError: If characteristic is unknown.
208 """
209 if self._connection_manager is None:
210 raise RuntimeError("No connection manager set")
212 if isinstance(characteristic_uuid, str):
213 characteristic_uuid = BluetoothUUID(characteristic_uuid)
215 raw_data = await self._connection_manager.read_gatt_char(characteristic_uuid)
217 # Try to parse with registered characteristic class
218 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(characteristic_uuid)
219 if char_class is not None:
220 char_instance = char_class()
221 return char_instance.parse_value(raw_data)
223 # Return raw bytes if no parser available
224 return raw_data
226 async def write(
227 self,
228 characteristic_uuid: BluetoothUUID | str,
229 value: Any, # noqa: ANN401
230 *,
231 response: bool = True,
232 ) -> None:
233 """Write a value to a characteristic.
235 Args:
236 characteristic_uuid: UUID of the characteristic to write.
237 value: Value to write (will be encoded if characteristic is known).
238 response: Whether to wait for write response.
240 Raises:
241 RuntimeError: If no connection manager is set.
243 """
244 if self._connection_manager is None:
245 raise RuntimeError("No connection manager set")
247 if isinstance(characteristic_uuid, str):
248 characteristic_uuid = BluetoothUUID(characteristic_uuid)
250 # Try to encode with registered characteristic class
251 data: bytes | bytearray
252 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(characteristic_uuid)
253 if char_class is not None and hasattr(char_class, "build_value"):
254 char_instance = char_class()
255 data = char_instance.build_value(value)
256 elif isinstance(value, (bytes, bytearray)):
257 data = value
258 else:
259 raise ValueError(f"Cannot write value of type {type(value).__name__} to unknown characteristic")
261 await self._connection_manager.write_gatt_char(
262 characteristic_uuid,
263 bytes(data),
264 response=response,
265 )
267 async def subscribe(
268 self,
269 characteristic_uuid: BluetoothUUID | str,
270 callback: Callable[[Any], None],
271 ) -> None:
272 """Subscribe to characteristic notifications.
274 Args:
275 characteristic_uuid: UUID of the characteristic to subscribe to.
276 callback: Function called with parsed value on each notification.
278 Raises:
279 RuntimeError: If no connection manager is set.
281 """
282 if self._connection_manager is None:
283 raise RuntimeError("No connection manager set")
285 if isinstance(characteristic_uuid, str):
286 characteristic_uuid = BluetoothUUID(characteristic_uuid)
288 uuid_str = str(characteristic_uuid)
290 # Get characteristic class for parsing
291 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(characteristic_uuid)
293 def notification_handler(_sender: str, data: bytes) -> None:
294 """Parse notification data and dispatch to callbacks."""
295 parsed_value: Any
296 if char_class is not None:
297 char_instance = char_class()
298 parsed_value = char_instance.parse_value(data)
299 else:
300 parsed_value = data
302 for cb in self._subscriptions.get(uuid_str, []):
303 cb(parsed_value)
305 # Start notifications via connection manager
306 await self._connection_manager.start_notify(characteristic_uuid, notification_handler)
308 # Track subscription
309 if uuid_str not in self._subscriptions:
310 self._subscriptions[uuid_str] = []
311 self._subscriptions[uuid_str].append(callback)
313 async def unsubscribe(self, characteristic_uuid: BluetoothUUID | str) -> None:
314 """Unsubscribe from characteristic notifications.
316 Args:
317 characteristic_uuid: UUID of the characteristic to unsubscribe from.
319 Raises:
320 RuntimeError: If no connection manager is set.
322 """
323 if self._connection_manager is None:
324 raise RuntimeError("No connection manager set")
326 if isinstance(characteristic_uuid, str):
327 characteristic_uuid = BluetoothUUID(characteristic_uuid)
329 uuid_str = str(characteristic_uuid)
331 await self._connection_manager.stop_notify(characteristic_uuid)
332 self._subscriptions.pop(uuid_str, None)
334 async def read_descriptor(self, descriptor_uuid: BluetoothUUID | str) -> bytes:
335 """Read a descriptor value.
337 Args:
338 descriptor_uuid: UUID of the descriptor to read.
340 Returns:
341 Raw descriptor bytes.
343 Raises:
344 RuntimeError: If no connection manager is set.
346 """
347 if self._connection_manager is None:
348 raise RuntimeError("No connection manager set")
350 if isinstance(descriptor_uuid, str):
351 descriptor_uuid = BluetoothUUID(descriptor_uuid)
353 return await self._connection_manager.read_gatt_descriptor(descriptor_uuid)
355 async def write_descriptor(self, descriptor_uuid: BluetoothUUID | str, data: bytes) -> None:
356 """Write data to a descriptor.
358 Args:
359 descriptor_uuid: UUID of the descriptor to write.
360 data: Raw bytes to write.
362 Raises:
363 RuntimeError: If no connection manager is set.
365 """
366 if self._connection_manager is None:
367 raise RuntimeError("No connection manager set")
369 if isinstance(descriptor_uuid, str):
370 descriptor_uuid = BluetoothUUID(descriptor_uuid)
372 await self._connection_manager.write_gatt_descriptor(descriptor_uuid, data)
374 async def pair(self) -> None:
375 """Pair with the device.
377 Raises:
378 RuntimeError: If no connection manager is set.
380 """
381 if self._connection_manager is None:
382 raise RuntimeError("No connection manager set")
384 await self._connection_manager.pair()
386 async def unpair(self) -> None:
387 """Unpair from the device.
389 Raises:
390 RuntimeError: If no connection manager is set.
392 """
393 if self._connection_manager is None:
394 raise RuntimeError("No connection manager set")
396 await self._connection_manager.unpair()
398 async def read_rssi(self) -> int:
399 """Read the RSSI (signal strength) of the connection.
401 Returns:
402 RSSI value in dBm.
404 Raises:
405 RuntimeError: If no connection manager is set.
407 """
408 if self._connection_manager is None:
409 raise RuntimeError("No connection manager set")
411 return await self._connection_manager.read_rssi()
413 @property
414 def mtu_size(self) -> int:
415 """Get the MTU size of the connection.
417 Returns:
418 MTU size in bytes.
420 Raises:
421 RuntimeError: If no connection manager is set.
423 """
424 if self._connection_manager is None:
425 raise RuntimeError("No connection manager set")
427 return self._connection_manager.mtu_size
429 def set_disconnected_callback(self, callback: Callable[[], None]) -> None:
430 """Set a callback to be invoked when the device disconnects.
432 Args:
433 callback: Function to call when disconnection occurs.
435 Raises:
436 RuntimeError: If no connection manager is set.
438 """
439 if self._connection_manager is None:
440 raise RuntimeError("No connection manager set")
442 self._connection_manager.set_disconnected_callback(callback)
444 def get_cached_characteristic(self, char_uuid: BluetoothUUID) -> BaseCharacteristic[Any] | None:
445 """Get cached characteristic instance from services.
447 Args:
448 char_uuid: UUID of the characteristic to find.
450 Returns:
451 BaseCharacteristic instance if found, None otherwise.
453 """
454 char_uuid_str = str(char_uuid)
455 for service in self.services.values():
456 if char_uuid_str in service.characteristics:
457 return service.characteristics[char_uuid_str]
458 return None
460 def cache_characteristic(self, char_uuid: BluetoothUUID, char_instance: BaseCharacteristic[Any]) -> None:
461 """Store characteristic instance in services cache.
463 Args:
464 char_uuid: UUID of the characteristic.
465 char_instance: BaseCharacteristic instance to cache.
467 """
468 char_uuid_str = str(char_uuid)
469 for service in self.services.values():
470 if char_uuid_str in service.characteristics:
471 service.characteristics[char_uuid_str] = char_instance
472 return
473 logger.warning(
474 "Cannot cache characteristic %s - not found in any discovered service. Run discover_services() first.",
475 char_uuid_str,
476 )