Coverage for src / bluetooth_sig / device / connected.py: 53%

160 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Connection-related functionality for Device. 

2 

3Manages GATT connection operations for a BLE device using 

4the composition pattern. This class is accessed via `device.connected`. 

5 

6Based on patterns from bleak (BLEDevice + BleakClient) and real-world 

7implementations. 

8""" 

9 

10from __future__ import annotations 

11 

12import logging 

13from collections.abc import Callable 

14from typing import Any 

15 

16import msgspec 

17 

18from bluetooth_sig.device.client import ClientManagerProtocol 

19from bluetooth_sig.gatt.characteristics.base import BaseCharacteristic 

20from bluetooth_sig.gatt.characteristics.registry import CharacteristicRegistry 

21from bluetooth_sig.gatt.services.base import BaseGattService 

22from bluetooth_sig.gatt.services.registry import GattServiceRegistry 

23from bluetooth_sig.types.uuid import BluetoothUUID 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class DeviceEncryption(msgspec.Struct, kw_only=True, frozen=False): 

29 """Encryption state for connected device. 

30 

31 Attributes: 

32 paired: Whether the device is paired. 

33 bonded: Whether the device is bonded (persistent pairing). 

34 encrypted: Whether the current connection is encrypted. 

35 

36 """ 

37 

38 paired: bool = False 

39 bonded: bool = False 

40 encrypted: bool = False 

41 

42 

43class DeviceService(msgspec.Struct, kw_only=True, frozen=False): 

44 """Wrapper for a discovered GATT service. 

45 

46 Attributes: 

47 uuid: Service UUID. 

48 service_class: The GATT service class, or None if unknown. 

49 characteristics: Discovered characteristics by UUID string. 

50 

51 """ 

52 

53 uuid: BluetoothUUID 

54 service_class: type[BaseGattService] | None = None 

55 characteristics: dict[str, BaseCharacteristic[Any]] = msgspec.field(default_factory=dict) 

56 

57 

58class DeviceConnected: 

59 """Manages GATT connection operations for a device. 

60 

61 Accessed via `device.connected`. 

62 

63 Attributes: 

64 services: Discovered GATT services by UUID string. 

65 encryption: Current encryption state. 

66 

67 Example:: 

68 device = Device(mac_address="AA:BB:CC:DD:EE:FF", translator=translator) 

69 

70 # Connect and discover services 

71 await device.connected.connect() 

72 services = await device.connected.discover_services() 

73 

74 # Read a characteristic 

75 battery = await device.connected.read(BluetoothUUID("00002a19-0000-1000-8000-00805f9b34fb")) 

76 

77 

78 # Subscribe to notifications 

79 async def on_heart_rate(value): 

80 print(f"Heart rate: {value}") 

81 

82 

83 await device.connected.subscribe( 

84 BluetoothUUID("00002a37-0000-1000-8000-00805f9b34fb"), 

85 on_heart_rate, 

86 ) 

87 

88 await device.connected.disconnect() 

89 

90 """ 

91 

92 def __init__( 

93 self, 

94 mac_address: str, 

95 connection_manager: ClientManagerProtocol | None = None, 

96 ) -> None: 

97 """Initialise connection subsystem. 

98 

99 Args: 

100 mac_address: Device MAC address. 

101 connection_manager: Optional connection manager (can be set later). 

102 

103 """ 

104 self._mac_address = mac_address 

105 self._connection_manager = connection_manager 

106 self.services: dict[str, DeviceService] = {} 

107 self.encryption = DeviceEncryption() 

108 self._is_connected = False 

109 self._subscriptions: dict[str, list[Callable[[Any], None]]] = {} 

110 

111 @property 

112 def mac_address(self) -> str: 

113 """Device MAC address.""" 

114 return self._mac_address 

115 

116 @property 

117 def connection_manager(self) -> ClientManagerProtocol | None: 

118 """Current connection manager.""" 

119 return self._connection_manager 

120 

121 @connection_manager.setter 

122 def connection_manager(self, value: ClientManagerProtocol | None) -> None: 

123 """Set connection manager.""" 

124 self._connection_manager = value 

125 

126 @property 

127 def is_connected(self) -> bool: 

128 """Whether currently connected.""" 

129 return self._is_connected 

130 

131 async def connect(self, *, timeout: float = 10.0) -> None: 

132 """Establish GATT connection. 

133 

134 Args: 

135 timeout: Connection timeout in seconds. 

136 

137 Raises: 

138 RuntimeError: If no connection manager is set. 

139 

140 """ 

141 if self._connection_manager is None: 

142 raise RuntimeError("No connection manager set") 

143 

144 await self._connection_manager.connect(timeout=timeout) 

145 self._is_connected = True 

146 

147 async def disconnect(self) -> None: 

148 """Disconnect from device. 

149 

150 Raises: 

151 RuntimeError: If no connection manager is set. 

152 

153 """ 

154 if self._connection_manager is None: 

155 raise RuntimeError("No connection manager set") 

156 

157 await self._connection_manager.disconnect() 

158 self._is_connected = False 

159 self._subscriptions.clear() 

160 

161 async def discover_services(self) -> list[DeviceService]: 

162 """Discover and cache GATT services. 

163 

164 Returns: 

165 List of discovered services. 

166 

167 Raises: 

168 RuntimeError: If no connection manager is set. 

169 

170 """ 

171 if self._connection_manager is None: 

172 raise RuntimeError("No connection manager set") 

173 

174 # Get raw services from connection manager 

175 raw_services = await self._connection_manager.get_services() 

176 

177 self.services.clear() 

178 for raw_svc in raw_services: 

179 uuid_str = str(raw_svc.service.uuid) 

180 service_class = GattServiceRegistry.get_service_class_by_uuid(raw_svc.service.uuid) 

181 

182 device_service = DeviceService( 

183 uuid=raw_svc.service.uuid, 

184 service_class=service_class, 

185 ) 

186 

187 # Discover characteristics for this service 

188 for char_uuid_str, char_instance in raw_svc.characteristics.items(): 

189 device_service.characteristics[char_uuid_str] = char_instance 

190 

191 self.services[uuid_str] = device_service 

192 

193 return list(self.services.values()) 

194 

195 async def read(self, characteristic_uuid: BluetoothUUID | str) -> Any: # noqa: ANN401 

196 """Read a characteristic value. 

197 

198 Args: 

199 characteristic_uuid: UUID of the characteristic to read. 

200 

201 Returns: 

202 Parsed characteristic value. 

203 

204 Raises: 

205 RuntimeError: If no connection manager is set. 

206 ValueError: If characteristic is unknown. 

207 

208 """ 

209 if self._connection_manager is None: 

210 raise RuntimeError("No connection manager set") 

211 

212 if isinstance(characteristic_uuid, str): 

213 characteristic_uuid = BluetoothUUID(characteristic_uuid) 

214 

215 raw_data = await self._connection_manager.read_gatt_char(characteristic_uuid) 

216 

217 # Try to parse with registered characteristic class 

218 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(characteristic_uuid) 

219 if char_class is not None: 

220 char_instance = char_class() 

221 return char_instance.parse_value(raw_data) 

222 

223 # Return raw bytes if no parser available 

224 return raw_data 

225 

226 async def write( 

227 self, 

228 characteristic_uuid: BluetoothUUID | str, 

229 value: Any, # noqa: ANN401 

230 *, 

231 response: bool = True, 

232 ) -> None: 

233 """Write a value to a characteristic. 

234 

235 Args: 

236 characteristic_uuid: UUID of the characteristic to write. 

237 value: Value to write (will be encoded if characteristic is known). 

238 response: Whether to wait for write response. 

239 

240 Raises: 

241 RuntimeError: If no connection manager is set. 

242 

243 """ 

244 if self._connection_manager is None: 

245 raise RuntimeError("No connection manager set") 

246 

247 if isinstance(characteristic_uuid, str): 

248 characteristic_uuid = BluetoothUUID(characteristic_uuid) 

249 

250 # Try to encode with registered characteristic class 

251 data: bytes | bytearray 

252 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(characteristic_uuid) 

253 if char_class is not None and hasattr(char_class, "build_value"): 

254 char_instance = char_class() 

255 data = char_instance.build_value(value) 

256 elif isinstance(value, (bytes, bytearray)): 

257 data = value 

258 else: 

259 raise ValueError(f"Cannot write value of type {type(value).__name__} to unknown characteristic") 

260 

261 await self._connection_manager.write_gatt_char( 

262 characteristic_uuid, 

263 bytes(data), 

264 response=response, 

265 ) 

266 

267 async def subscribe( 

268 self, 

269 characteristic_uuid: BluetoothUUID | str, 

270 callback: Callable[[Any], None], 

271 ) -> None: 

272 """Subscribe to characteristic notifications. 

273 

274 Args: 

275 characteristic_uuid: UUID of the characteristic to subscribe to. 

276 callback: Function called with parsed value on each notification. 

277 

278 Raises: 

279 RuntimeError: If no connection manager is set. 

280 

281 """ 

282 if self._connection_manager is None: 

283 raise RuntimeError("No connection manager set") 

284 

285 if isinstance(characteristic_uuid, str): 

286 characteristic_uuid = BluetoothUUID(characteristic_uuid) 

287 

288 uuid_str = str(characteristic_uuid) 

289 

290 # Get characteristic class for parsing 

291 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(characteristic_uuid) 

292 

293 def notification_handler(_sender: str, data: bytes) -> None: 

294 """Parse notification data and dispatch to callbacks.""" 

295 parsed_value: Any 

296 if char_class is not None: 

297 char_instance = char_class() 

298 parsed_value = char_instance.parse_value(data) 

299 else: 

300 parsed_value = data 

301 

302 for cb in self._subscriptions.get(uuid_str, []): 

303 cb(parsed_value) 

304 

305 # Start notifications via connection manager 

306 await self._connection_manager.start_notify(characteristic_uuid, notification_handler) 

307 

308 # Track subscription 

309 if uuid_str not in self._subscriptions: 

310 self._subscriptions[uuid_str] = [] 

311 self._subscriptions[uuid_str].append(callback) 

312 

313 async def unsubscribe(self, characteristic_uuid: BluetoothUUID | str) -> None: 

314 """Unsubscribe from characteristic notifications. 

315 

316 Args: 

317 characteristic_uuid: UUID of the characteristic to unsubscribe from. 

318 

319 Raises: 

320 RuntimeError: If no connection manager is set. 

321 

322 """ 

323 if self._connection_manager is None: 

324 raise RuntimeError("No connection manager set") 

325 

326 if isinstance(characteristic_uuid, str): 

327 characteristic_uuid = BluetoothUUID(characteristic_uuid) 

328 

329 uuid_str = str(characteristic_uuid) 

330 

331 await self._connection_manager.stop_notify(characteristic_uuid) 

332 self._subscriptions.pop(uuid_str, None) 

333 

334 async def read_descriptor(self, descriptor_uuid: BluetoothUUID | str) -> bytes: 

335 """Read a descriptor value. 

336 

337 Args: 

338 descriptor_uuid: UUID of the descriptor to read. 

339 

340 Returns: 

341 Raw descriptor bytes. 

342 

343 Raises: 

344 RuntimeError: If no connection manager is set. 

345 

346 """ 

347 if self._connection_manager is None: 

348 raise RuntimeError("No connection manager set") 

349 

350 if isinstance(descriptor_uuid, str): 

351 descriptor_uuid = BluetoothUUID(descriptor_uuid) 

352 

353 return await self._connection_manager.read_gatt_descriptor(descriptor_uuid) 

354 

355 async def write_descriptor(self, descriptor_uuid: BluetoothUUID | str, data: bytes) -> None: 

356 """Write data to a descriptor. 

357 

358 Args: 

359 descriptor_uuid: UUID of the descriptor to write. 

360 data: Raw bytes to write. 

361 

362 Raises: 

363 RuntimeError: If no connection manager is set. 

364 

365 """ 

366 if self._connection_manager is None: 

367 raise RuntimeError("No connection manager set") 

368 

369 if isinstance(descriptor_uuid, str): 

370 descriptor_uuid = BluetoothUUID(descriptor_uuid) 

371 

372 await self._connection_manager.write_gatt_descriptor(descriptor_uuid, data) 

373 

374 async def pair(self) -> None: 

375 """Pair with the device. 

376 

377 Raises: 

378 RuntimeError: If no connection manager is set. 

379 

380 """ 

381 if self._connection_manager is None: 

382 raise RuntimeError("No connection manager set") 

383 

384 await self._connection_manager.pair() 

385 

386 async def unpair(self) -> None: 

387 """Unpair from the device. 

388 

389 Raises: 

390 RuntimeError: If no connection manager is set. 

391 

392 """ 

393 if self._connection_manager is None: 

394 raise RuntimeError("No connection manager set") 

395 

396 await self._connection_manager.unpair() 

397 

398 async def read_rssi(self) -> int: 

399 """Read the RSSI (signal strength) of the connection. 

400 

401 Returns: 

402 RSSI value in dBm. 

403 

404 Raises: 

405 RuntimeError: If no connection manager is set. 

406 

407 """ 

408 if self._connection_manager is None: 

409 raise RuntimeError("No connection manager set") 

410 

411 return await self._connection_manager.read_rssi() 

412 

413 @property 

414 def mtu_size(self) -> int: 

415 """Get the MTU size of the connection. 

416 

417 Returns: 

418 MTU size in bytes. 

419 

420 Raises: 

421 RuntimeError: If no connection manager is set. 

422 

423 """ 

424 if self._connection_manager is None: 

425 raise RuntimeError("No connection manager set") 

426 

427 return self._connection_manager.mtu_size 

428 

429 def set_disconnected_callback(self, callback: Callable[[], None]) -> None: 

430 """Set a callback to be invoked when the device disconnects. 

431 

432 Args: 

433 callback: Function to call when disconnection occurs. 

434 

435 Raises: 

436 RuntimeError: If no connection manager is set. 

437 

438 """ 

439 if self._connection_manager is None: 

440 raise RuntimeError("No connection manager set") 

441 

442 self._connection_manager.set_disconnected_callback(callback) 

443 

444 def get_cached_characteristic(self, char_uuid: BluetoothUUID) -> BaseCharacteristic[Any] | None: 

445 """Get cached characteristic instance from services. 

446 

447 Args: 

448 char_uuid: UUID of the characteristic to find. 

449 

450 Returns: 

451 BaseCharacteristic instance if found, None otherwise. 

452 

453 """ 

454 char_uuid_str = str(char_uuid) 

455 for service in self.services.values(): 

456 if char_uuid_str in service.characteristics: 

457 return service.characteristics[char_uuid_str] 

458 return None 

459 

460 def cache_characteristic(self, char_uuid: BluetoothUUID, char_instance: BaseCharacteristic[Any]) -> None: 

461 """Store characteristic instance in services cache. 

462 

463 Args: 

464 char_uuid: UUID of the characteristic. 

465 char_instance: BaseCharacteristic instance to cache. 

466 

467 """ 

468 char_uuid_str = str(char_uuid) 

469 for service in self.services.values(): 

470 if char_uuid_str in service.characteristics: 

471 service.characteristics[char_uuid_str] = char_instance 

472 return 

473 logger.warning( 

474 "Cannot cache characteristic %s - not found in any discovered service. Run discover_services() first.", 

475 char_uuid_str, 

476 )