Coverage for src / bluetooth_sig / device / client.py: 92%

72 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Client manager protocol for BLE transport adapters. 

2 

3Defines an async abstract base class that client adapter implementations (Bleak, 

4SimplePyBLE, etc.) must inherit from so the `Device` class can operate 

5independently of the underlying BLE library. 

6 

7Adapters must provide async implementations of all abstract methods below. 

8For sync-only libraries an adapter can run sync calls in a thread and 

9expose an async interface. 

10""" 

11# pylint: disable=duplicate-code # Pattern repetition is expected for protocol definitions 

12# pylint: disable=too-many-public-methods # BLE connection protocol requires complete interface 

13 

14from __future__ import annotations 

15 

16from abc import ABC, abstractmethod 

17from collections.abc import AsyncIterator, Callable 

18from typing import ClassVar 

19 

20from bluetooth_sig.types.advertising.result import AdvertisementData 

21from bluetooth_sig.types.device_types import ( 

22 DeviceService, 

23 ScanDetectionCallback, 

24 ScanFilter, 

25 ScannedDevice, 

26 ScanningMode, 

27) 

28from bluetooth_sig.types.uuid import BluetoothUUID 

29 

30 

31class ClientManagerProtocol(ABC): 

32 """Abstract base class describing the transport operations Device expects. 

33 

34 All methods are async so adapters can integrate naturally with async 

35 libraries like Bleak. Synchronous libraries must be wrapped by adapters 

36 to provide async interfaces. 

37 

38 Subclasses MUST implement all abstract methods and properties. 

39 """ 

40 

41 # Class-level flag to indicate if this backend supports scanning 

42 supports_scanning: ClassVar[bool] = False 

43 

44 def __init__(self, address: str) -> None: 

45 """Initialize the connection manager. 

46 

47 Args: 

48 address: The Bluetooth device address (MAC address) 

49 

50 """ 

51 self._address = address 

52 

53 @property 

54 def address(self) -> str: 

55 """Get the device address. 

56 

57 Returns: 

58 Bluetooth device address (MAC address) 

59 

60 Note: 

61 Subclasses may override this to provide address from underlying library. 

62 

63 """ 

64 return self._address 

65 

66 @abstractmethod 

67 async def connect(self, *, timeout: float = 10.0) -> None: 

68 """Open a connection to the device. 

69 

70 Args: 

71 timeout: Connection timeout in seconds. 

72 

73 """ 

74 

75 @abstractmethod 

76 async def disconnect(self) -> None: 

77 """Close the connection to the device.""" 

78 

79 @abstractmethod 

80 async def read_gatt_char(self, char_uuid: BluetoothUUID) -> bytes: 

81 """Read the raw bytes of a characteristic identified by `char_uuid`.""" 

82 

83 @abstractmethod 

84 async def write_gatt_char(self, char_uuid: BluetoothUUID, data: bytes, response: bool = True) -> None: 

85 """Write raw bytes to a characteristic identified by `char_uuid`. 

86 

87 Args: 

88 char_uuid: The UUID of the characteristic to write to 

89 data: The raw bytes to write 

90 response: If True, use write-with-response (wait for acknowledgment). 

91 If False, use write-without-response (faster but no confirmation). 

92 Default is True for reliability. 

93 

94 """ 

95 

96 @abstractmethod 

97 async def read_gatt_descriptor(self, desc_uuid: BluetoothUUID) -> bytes: 

98 """Read the raw bytes of a descriptor identified by `desc_uuid`. 

99 

100 Args: 

101 desc_uuid: The UUID of the descriptor to read 

102 

103 Returns: 

104 The raw descriptor data as bytes 

105 

106 """ 

107 

108 @abstractmethod 

109 async def write_gatt_descriptor(self, desc_uuid: BluetoothUUID, data: bytes) -> None: 

110 """Write raw bytes to a descriptor identified by `desc_uuid`. 

111 

112 Args: 

113 desc_uuid: The UUID of the descriptor to write to 

114 data: The raw bytes to write 

115 

116 """ 

117 

118 @abstractmethod 

119 async def get_services(self) -> list[DeviceService]: 

120 """Return a structure describing services/characteristics from the adapter. 

121 

122 The concrete return type depends on the adapter; `Device` uses 

123 this only for enumeration in examples. Adapters should provide 

124 iterable objects with `.characteristics` elements that have 

125 `.uuid` and `.properties` attributes, or the adapter can return 

126 a mapping. 

127 """ 

128 

129 @abstractmethod 

130 async def start_notify(self, char_uuid: BluetoothUUID, callback: Callable[[str, bytes], None]) -> None: 

131 """Start notifications for `char_uuid` and invoke `callback(uuid, data)` on updates.""" 

132 

133 @abstractmethod 

134 async def stop_notify(self, char_uuid: BluetoothUUID) -> None: 

135 """Stop notifications for `char_uuid`.""" 

136 

137 @abstractmethod 

138 async def pair(self) -> None: 

139 """Pair with the device. 

140 

141 Raises an exception if pairing fails. 

142 

143 Note: 

144 On macOS, pairing is automatic when accessing authenticated characteristics. 

145 This method may not be needed on that platform. 

146 

147 """ 

148 

149 @abstractmethod 

150 async def unpair(self) -> None: 

151 """Unpair from the device. 

152 

153 Raises an exception if unpairing fails. 

154 

155 """ 

156 

157 @abstractmethod 

158 async def read_rssi(self) -> int: 

159 """Read the RSSI (signal strength) during an active connection. 

160 

161 This reads RSSI from the active BLE connection. Not all backends 

162 support this - some only provide RSSI from advertisement data. 

163 

164 Returns: 

165 RSSI value in dBm (typically negative, e.g., -60) 

166 

167 Raises: 

168 NotImplementedError: If this backend doesn't support connection RSSI 

169 RuntimeError: If not connected 

170 

171 """ 

172 

173 @abstractmethod 

174 async def get_advertisement_rssi(self, refresh: bool = False) -> int | None: 

175 """Get the RSSI from advertisement data. 

176 

177 This returns the RSSI from advertisement data. Does not require 

178 an active connection. 

179 

180 Args: 

181 refresh: If True, perform an active scan to get fresh RSSI. 

182 If False, return the cached RSSI from last advertisement. 

183 

184 Returns: 

185 RSSI value in dBm, or None if no advertisement has been received 

186 

187 """ 

188 

189 @abstractmethod 

190 def set_disconnected_callback(self, callback: Callable[[], None]) -> None: 

191 """Set a callback to be invoked when the device disconnects. 

192 

193 Args: 

194 callback: Function to call when disconnection occurs 

195 

196 """ 

197 

198 def register_advertisement_callback( 

199 self, 

200 callback: Callable[[AdvertisementData], None], 

201 ) -> None: 

202 """Register a callback for advertisement updates. 

203 

204 This enables continuous monitoring of a device's advertisements even while 

205 connected, useful for devices that update sensor data in advertisements. 

206 The callback is invoked whenever new advertisement data is received. 

207 

208 Args: 

209 callback: Function called with AdvertisementData when advertisements are 

210 received. Can be sync or async. 

211 

212 Raises: 

213 NotImplementedError: If this backend doesn't support advertisement monitoring 

214 

215 Example:: 

216 

217 def on_advertisement(ad: AdvertisementData) -> None: 

218 if ad.interpreted_data: 

219 print(f"Sensor update: {ad.interpreted_data}") 

220 

221 

222 # Register callback 

223 manager.register_advertisement_callback(on_advertisement) 

224 

225 # Later, stop monitoring 

226 manager.unregister_advertisement_callback(on_advertisement) 

227 

228 """ 

229 raise NotImplementedError( 

230 f"{self.__class__.__name__} does not support advertisement monitoring. " 

231 "Use get_latest_advertisement() for polling instead." 

232 ) 

233 

234 def unregister_advertisement_callback( 

235 self, 

236 callback: Callable[[AdvertisementData], None], 

237 ) -> None: 

238 """Unregister a callback for advertisement updates. 

239 

240 Args: 

241 callback: The callback function to remove 

242 

243 Raises: 

244 NotImplementedError: If this backend doesn't support advertisement monitoring 

245 

246 """ 

247 raise NotImplementedError( 

248 f"{self.__class__.__name__} does not support advertisement monitoring. " 

249 "Use get_latest_advertisement() for polling instead." 

250 ) 

251 

252 @property 

253 @abstractmethod 

254 def is_connected(self) -> bool: 

255 """Check if the connection is currently active. 

256 

257 Returns: 

258 True if connected to the device, False otherwise 

259 

260 """ 

261 

262 @property 

263 @abstractmethod 

264 def mtu_size(self) -> int: 

265 """Get the negotiated MTU size in bytes. 

266 

267 Returns: 

268 The MTU size negotiated for this connection (typically 23-512 bytes) 

269 

270 """ 

271 

272 @property 

273 @abstractmethod 

274 def name(self) -> str: 

275 """Get the name of the device. 

276 

277 Returns: 

278 The name of the device as a string 

279 

280 """ 

281 

282 @classmethod 

283 async def scan( # pylint: disable=too-many-arguments 

284 cls, 

285 timeout: float = 5.0, 

286 *, 

287 filters: ScanFilter | None = None, 

288 scanning_mode: ScanningMode = "active", 

289 adapter: str | None = None, 

290 callback: ScanDetectionCallback | None = None, 

291 ) -> list[ScannedDevice]: 

292 """Scan for nearby BLE devices. 

293 

294 This is a class method that doesn't require an instance. Not all backends 

295 support scanning - check the `supports_scanning` class attribute. 

296 

297 Args: 

298 timeout: Scan duration in seconds (default: 5.0) 

299 filters: Optional filter criteria. Devices not matching filters are excluded. 

300 Filtering may happen at OS level (more efficient) or post-scan depending 

301 on backend capabilities. 

302 scanning_mode: 'active' (default) sends scan requests for scan response data, 

303 'passive' only listens to advertisements (saves power, faster). 

304 Note: Passive scanning is NOT supported on macOS. 

305 adapter: Backend-specific adapter identifier (e.g., "hci0" for BlueZ). 

306 None uses the default adapter. 

307 callback: Optional async or sync function called with each ScannedDevice 

308 as it's discovered. Enables real-time UI updates and early processing. 

309 If async, it's awaited before continuing. 

310 

311 Returns: 

312 List of discovered devices matching the filters 

313 

314 Raises: 

315 NotImplementedError: If this backend doesn't support scanning 

316 

317 Example:: 

318 

319 # Basic scan 

320 devices = await MyConnectionManager.scan(timeout=5.0) 

321 

322 # Filtered scan for Heart Rate monitors 

323 from bluetooth_sig.types.device_types import ScanFilter 

324 

325 filters = ScanFilter(service_uuids=["180d"], rssi_threshold=-70) 

326 devices = await MyConnectionManager.scan(timeout=10.0, filters=filters) 

327 

328 

329 # Scan with real-time callback 

330 async def on_device(device: ScannedDevice) -> None: 

331 print(f"Found: {device.name or device.address}") 

332 

333 

334 devices = await MyConnectionManager.scan(timeout=10.0, callback=on_device) 

335 

336 """ 

337 raise NotImplementedError(f"{cls.__name__} does not support scanning") 

338 

339 @classmethod 

340 async def find_device( 

341 cls, 

342 filters: ScanFilter, 

343 timeout: float = 10.0, 

344 *, 

345 scanning_mode: ScanningMode = "active", 

346 adapter: str | None = None, 

347 ) -> ScannedDevice | None: 

348 """Find the first device matching the filter criteria. 

349 

350 This is more efficient than a full scan when looking for a specific device. 

351 Use ScanFilter to match by address, name, service UUIDs, or custom function. 

352 

353 Args: 

354 filters: Filter criteria. Use ScanFilter(addresses=[...]) for address, 

355 ScanFilter(names=[...]) for name, or ScanFilter(filter_func=...) for 

356 custom matching logic. 

357 timeout: Maximum time to scan in seconds (default: 10.0) 

358 scanning_mode: 'active' or 'passive' scanning mode. 

359 adapter: Backend-specific adapter identifier. None uses default. 

360 

361 Returns: 

362 The first matching device, or None if not found within timeout 

363 

364 Raises: 

365 NotImplementedError: If this backend doesn't support scanning 

366 

367 Example:: 

368 

369 # Find by address 

370 device = await MyConnectionManager.find_device( 

371 ScanFilter(addresses=["AA:BB:CC:DD:EE:FF"]), 

372 timeout=15.0, 

373 ) 

374 

375 # Find by name 

376 device = await MyConnectionManager.find_device( 

377 ScanFilter(names=["Polar H10"]), 

378 timeout=15.0, 

379 ) 

380 

381 

382 # Find with custom filter 

383 def has_apple_data(device: ScannedDevice) -> bool: 

384 if device.advertisement_data is None: 

385 return False 

386 mfr = device.advertisement_data.ad_structures.core.manufacturer_data 

387 return 0x004C in mfr 

388 

389 

390 device = await MyConnectionManager.find_device( 

391 ScanFilter(filter_func=has_apple_data), 

392 timeout=10.0, 

393 ) 

394 

395 """ 

396 raise NotImplementedError(f"{cls.__name__} does not support find_device") 

397 

398 @classmethod 

399 def scan_stream( 

400 cls, 

401 timeout: float | None = 5.0, 

402 *, 

403 filters: ScanFilter | None = None, 

404 scanning_mode: ScanningMode = "active", 

405 adapter: str | None = None, 

406 ) -> AsyncIterator[ScannedDevice]: 

407 """Stream discovered devices as an async iterator. 

408 

409 This provides the most Pythonic way to process devices as they're 

410 discovered, with full async/await support and easy early termination. 

411 

412 Args: 

413 timeout: Scan duration in seconds. None for indefinite. 

414 filters: Optional filter criteria. 

415 scanning_mode: 'active' or 'passive'. 

416 adapter: Backend-specific adapter identifier. 

417 

418 Yields: 

419 ScannedDevice objects as they are discovered 

420 

421 Raises: 

422 NotImplementedError: If this backend doesn't support streaming 

423 

424 Example:: 

425 

426 async for device in MyConnectionManager.scan_stream(timeout=10.0): 

427 print(f"Found: {device.name}") 

428 if device.name == "My Target Device": 

429 break # Stop scanning early 

430 

431 """ 

432 raise NotImplementedError(f"{cls.__name__} does not support scan_stream") 

433 

434 @abstractmethod 

435 async def get_latest_advertisement(self, refresh: bool = False) -> AdvertisementData | None: 

436 """Return the most recently received advertisement data. 

437 

438 Args: 

439 refresh: If True, perform an active scan to get fresh data. 

440 If False, return the last cached advertisement. 

441 

442 Returns: 

443 Latest AdvertisementData, or None if none received yet 

444 

445 """ 

446 

447 def on_advertisement_received(self, advertisement: AdvertisementData) -> None: 

448 """Handle new advertisement data received from the OS. 

449 

450 Backends should call this method from their scan/detection callbacks when new 

451 advertisement data arrives. This enables caching and callback notifications. 

452 

453 Args: 

454 advertisement: New AdvertisementData received from OS scan callbacks 

455 

456 Raises: 

457 NotImplementedError: If this backend doesn't support advertisement monitoring 

458 

459 Note: 

460 This is for backend implementations that support advertisement monitoring. 

461 Application code should use get_latest_advertisement() to retrieve cached 

462 data or register_advertisement_callback() for push notifications. 

463 

464 """ 

465 raise NotImplementedError(f"{self.__class__.__name__} does not support advertisement monitoring") 

466 

467 @classmethod 

468 @abstractmethod 

469 def convert_advertisement(cls, advertisement: object) -> AdvertisementData: 

470 """Convert framework-specific advertisement data to AdvertisementData. 

471 

472 This method bridges the gap between BLE framework-specific advertisement 

473 representations (Bleak's AdvertisementData, SimpleBLE's Peripheral, etc.) 

474 and our unified AdvertisementData type. 

475 

476 Each connection manager implementation knows how to extract manufacturer_data, 

477 service_data, local_name, RSSI, etc. from its framework's format. 

478 

479 Args: 

480 advertisement: Framework-specific advertisement object 

481 (e.g., bleak.backends.scanner.AdvertisementData) 

482 

483 Returns: 

484 Unified AdvertisementData with ad_structures populated 

485 

486 """ 

487 

488 

489__all__ = ["ClientManagerProtocol"]