Coverage for src / bluetooth_sig / device / peripheral.py: 76%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Peripheral manager protocol for BLE GATT server adapters. 

2 

3Defines an async abstract base class that peripheral adapter implementations 

4(bless, bluez_peripheral, etc.) must inherit from to create BLE GATT servers 

5that broadcast services and characteristics. 

6 

7This is the server-side counterpart to ClientManagerProtocol. Where clients 

8connect TO devices and READ/PARSE data, peripherals ARE devices that ENCODE 

9and BROADCAST data for others to read. 

10 

11Adapters must provide async implementations of all abstract methods below. 

12 

13TODO: PeripheralDevice exists in peripheral_device.py with core functionality. 

14 Remaining gaps to address (see ROADMAP.md Workstream F): 

15 - Subscription management (on_subscribe/on_unsubscribe, subscribed_clients tracking) 

16 - Client event callbacks (on_client_connected/on_client_disconnected) 

17 - Read/write request handling (typed on_read_request/on_write_request) 

18 - Descriptor hosting (CCCD, User Description, Presentation Format) 

19""" 

20 

21from __future__ import annotations 

22 

23from abc import ABC, abstractmethod 

24from collections.abc import Callable 

25from typing import ClassVar 

26 

27from typing_extensions import Self 

28 

29from bluetooth_sig.types.company import CompanyIdentifier, ManufacturerData 

30from bluetooth_sig.types.peripheral_types import CharacteristicDefinition, ServiceDefinition 

31from bluetooth_sig.types.uuid import BluetoothUUID 

32 

33 

34class PeripheralManagerProtocol(ABC): 

35 """Abstract base class for BLE peripheral/GATT server implementations. 

36 

37 This protocol defines the interface for creating BLE peripherals that 

38 broadcast services and characteristics. Implementations wrap backend 

39 libraries like bless, bluez_peripheral, etc. 

40 

41 Uses a fluent builder pattern for advertisement configuration - call 

42 configuration methods before start() to customise advertising. 

43 

44 The workflow is: 

45 1. Create peripheral manager with a device name 

46 2. Configure advertising (optional): with_manufacturer_data(), with_tx_power(), etc. 

47 3. Add services and characteristics (using CharacteristicDefinition) 

48 4. Start advertising 

49 5. Update characteristic values as needed 

50 6. Stop when done 

51 

52 Example:: 

53 >>> from bluetooth_sig.gatt.characteristics import BatteryLevelCharacteristic 

54 >>> from bluetooth_sig.gatt.services import BatteryService 

55 >>> from bluetooth_sig.types.company import ManufacturerData 

56 >>> 

57 >>> # Create peripheral with fluent configuration 

58 >>> peripheral = SomePeripheralManager("My Sensor") 

59 >>> peripheral.with_tx_power(-10).with_connectable(True) 

60 >>> 

61 >>> # Define a service with battery level 

62 >>> char = BatteryLevelCharacteristic() 

63 >>> char_def = CharacteristicDefinition.from_characteristic(char, 85) 

64 >>> 

65 >>> service = ServiceDefinition( 

66 ... uuid=BatteryService.get_class_uuid(), 

67 ... characteristics=[char_def], 

68 ... ) 

69 >>> 

70 >>> await peripheral.add_service(service) 

71 >>> await peripheral.start() 

72 >>> 

73 >>> # Later, update the battery level 

74 >>> await peripheral.update_characteristic("2A19", char.build_value(75)) 

75 

76 """ 

77 

78 # Class-level flag indicating backend capabilities 

79 supports_advertising: ClassVar[bool] = True 

80 

81 def __init__(self, name: str) -> None: 

82 """Initialize the peripheral manager. 

83 

84 Args: 

85 name: The advertised device name visible to scanners 

86 

87 """ 

88 self._name = name 

89 

90 # Service and characteristic tracking 

91 self._services: list[ServiceDefinition] = [] 

92 self._char_definitions: dict[str, CharacteristicDefinition] = {} 

93 

94 # Advertisement configuration (set via fluent methods) 

95 self._manufacturer_data: ManufacturerData | None = None 

96 self._service_data: dict[BluetoothUUID, bytes] = {} 

97 self._tx_power: int | None = None 

98 self._is_connectable = True 

99 self._is_discoverable = True 

100 

101 # Callbacks for read/write handling 

102 self._read_callbacks: dict[str, Callable[[], bytearray]] = {} 

103 self._write_callbacks: dict[str, Callable[[bytearray], None]] = {} 

104 

105 @property 

106 def name(self) -> str: 

107 """Get the advertised device name. 

108 

109 Returns: 

110 The device name as it appears to BLE scanners 

111 

112 """ 

113 return self._name 

114 

115 @property 

116 def services(self) -> list[ServiceDefinition]: 

117 """Get the list of registered services. 

118 

119 Returns: 

120 List of ServiceDefinition objects added to this peripheral. 

121 

122 """ 

123 return self._services 

124 

125 @property 

126 def manufacturer_data(self) -> ManufacturerData | None: 

127 """Get the configured manufacturer data. 

128 

129 Returns: 

130 ManufacturerData if configured, None otherwise. 

131 

132 """ 

133 return self._manufacturer_data 

134 

135 @property 

136 def service_data(self) -> dict[BluetoothUUID, bytes]: 

137 """Get the configured service data. 

138 

139 Returns: 

140 Dictionary mapping service UUIDs to data bytes. 

141 

142 """ 

143 return self._service_data 

144 

145 @property 

146 def tx_power(self) -> int | None: 

147 """Get the configured TX power level. 

148 

149 Returns: 

150 TX power in dBm if configured, None otherwise. 

151 

152 """ 

153 return self._tx_power 

154 

155 @property 

156 def is_connectable_config(self) -> bool: 

157 """Get the connectable configuration. 

158 

159 Returns: 

160 True if peripheral is configured to accept connections. 

161 

162 """ 

163 return self._is_connectable 

164 

165 @property 

166 def is_discoverable_config(self) -> bool: 

167 """Get the discoverable configuration. 

168 

169 Returns: 

170 True if peripheral is configured to be discoverable. 

171 

172 """ 

173 return self._is_discoverable 

174 

175 def with_manufacturer_data(self, manufacturer_data: ManufacturerData) -> Self: 

176 r"""Set manufacturer-specific advertising data. 

177 

178 Args: 

179 manufacturer_data: ManufacturerData instance from the types module. 

180 

181 Returns: 

182 Self for method chaining. 

183 

184 Raises: 

185 RuntimeError: If called after start(). 

186 

187 Example:: 

188 >>> from bluetooth_sig.types.company import ManufacturerData 

189 >>> mfr = ManufacturerData.from_id_and_payload(0x004C, b"\x02\x15...") 

190 >>> peripheral.with_manufacturer_data(mfr) 

191 

192 """ 

193 if self.is_advertising: 

194 raise RuntimeError("Cannot configure after peripheral has started") 

195 self._manufacturer_data = manufacturer_data 

196 return self 

197 

198 def with_manufacturer_id( 

199 self, 

200 company_id: int | CompanyIdentifier, 

201 payload: bytes, 

202 ) -> Self: 

203 r"""Set manufacturer data from company ID and payload. 

204 

205 Args: 

206 company_id: Bluetooth SIG company identifier (e.g., 0x004C for Apple) 

207 or CompanyIdentifier instance. 

208 payload: Manufacturer-specific payload bytes. 

209 

210 Returns: 

211 Self for method chaining. 

212 

213 Raises: 

214 RuntimeError: If called after start(). 

215 

216 Example:: 

217 >>> peripheral.with_manufacturer_id(0x004C, b"\x02\x15...") 

218 

219 """ 

220 if self.is_advertising: 

221 raise RuntimeError("Cannot configure after peripheral has started") 

222 cid = company_id if isinstance(company_id, int) else company_id.id 

223 self._manufacturer_data = ManufacturerData.from_id_and_payload(cid, payload) 

224 return self 

225 

226 def with_service_data( 

227 self, 

228 service_uuid: BluetoothUUID, 

229 data: bytes, 

230 ) -> Self: 

231 r"""Add service data to advertisement. 

232 

233 Args: 

234 service_uuid: BluetoothUUID of the service. 

235 data: Service-specific data bytes. 

236 

237 Returns: 

238 Self for method chaining. 

239 

240 Raises: 

241 RuntimeError: If called after start(). 

242 

243 Example:: 

244 >>> from bluetooth_sig.gatt.services import BatteryService 

245 >>> peripheral.with_service_data( 

246 ... BatteryService.get_class_uuid(), 

247 ... b"\x50", # 80% battery 

248 ... ) 

249 

250 """ 

251 if self.is_advertising: 

252 raise RuntimeError("Cannot configure after peripheral has started") 

253 self._service_data[service_uuid] = data 

254 return self 

255 

256 def with_tx_power(self, power_dbm: int) -> Self: 

257 """Set TX power level for advertising. 

258 

259 Args: 

260 power_dbm: Transmission power in dBm (-127 to +127). 

261 

262 Returns: 

263 Self for method chaining. 

264 

265 Raises: 

266 RuntimeError: If called after start(). 

267 

268 """ 

269 if self.is_advertising: 

270 raise RuntimeError("Cannot configure after peripheral has started") 

271 self._tx_power = power_dbm 

272 return self 

273 

274 def with_connectable(self, connectable: bool) -> Self: 

275 """Set whether the peripheral accepts connections. 

276 

277 Args: 

278 connectable: True to accept connections (default), False for broadcast only. 

279 

280 Returns: 

281 Self for method chaining. 

282 

283 Raises: 

284 RuntimeError: If called after start(). 

285 

286 """ 

287 if self.is_advertising: 

288 raise RuntimeError("Cannot configure after peripheral has started") 

289 self._is_connectable = connectable 

290 return self 

291 

292 def with_discoverable(self, discoverable: bool) -> Self: 

293 """Set whether the peripheral is discoverable. 

294 

295 Args: 

296 discoverable: True to be discoverable (default), False otherwise. 

297 

298 Returns: 

299 Self for method chaining. 

300 

301 Raises: 

302 RuntimeError: If called after start(). 

303 

304 """ 

305 if self.is_advertising: 

306 raise RuntimeError("Cannot configure after peripheral has started") 

307 self._is_discoverable = discoverable 

308 return self 

309 

310 # ------------------------------------------------------------------------- 

311 # Service Management (Generic Implementation) 

312 # ------------------------------------------------------------------------- 

313 

314 async def add_service(self, service: ServiceDefinition) -> None: 

315 """Add a GATT service to the peripheral. 

316 

317 Services must be added before calling start(). Each service contains 

318 one or more characteristics that clients can interact with. 

319 

320 Args: 

321 service: The service definition to add 

322 

323 Raises: 

324 RuntimeError: If called after start() 

325 

326 """ 

327 if self.is_advertising: 

328 raise RuntimeError("Cannot add services after peripheral has started") 

329 

330 self._services.append(service) 

331 

332 # Track characteristic definitions for later lookup 

333 for char_def in service.characteristics: 

334 uuid_upper = str(char_def.uuid).upper() 

335 self._char_definitions[uuid_upper] = char_def 

336 

337 # Register any callbacks from the definition 

338 if char_def.on_read: 

339 self._read_callbacks[uuid_upper] = char_def.on_read 

340 if char_def.on_write: 

341 self._write_callbacks[uuid_upper] = char_def.on_write 

342 

343 def get_characteristic_definition( 

344 self, 

345 char_uuid: str | BluetoothUUID, 

346 ) -> CharacteristicDefinition | None: 

347 """Get the characteristic definition by UUID. 

348 

349 Args: 

350 char_uuid: UUID of the characteristic. 

351 

352 Returns: 

353 CharacteristicDefinition if found, None otherwise. 

354 

355 """ 

356 uuid_upper = str(char_uuid).upper() 

357 return self._char_definitions.get(uuid_upper) 

358 

359 def set_read_callback( 

360 self, 

361 char_uuid: str | BluetoothUUID, 

362 callback: Callable[[], bytearray], 

363 ) -> None: 

364 """Set a callback for dynamic read value generation. 

365 

366 When a client reads the characteristic, this callback will be invoked 

367 to generate the current value instead of returning the stored value. 

368 

369 Args: 

370 char_uuid: UUID of the characteristic 

371 callback: Function that returns the encoded value to serve 

372 

373 Raises: 

374 KeyError: If characteristic UUID not found 

375 

376 """ 

377 uuid_str = str(char_uuid).upper() 

378 if uuid_str not in self._char_definitions: 

379 raise KeyError(f"Characteristic {uuid_str} not found") 

380 self._read_callbacks[uuid_str] = callback 

381 

382 def set_write_callback( 

383 self, 

384 char_uuid: str | BluetoothUUID, 

385 callback: Callable[[bytearray], None], 

386 ) -> None: 

387 """Set a callback for handling client writes. 

388 

389 When a client writes to the characteristic, this callback will be 

390 invoked with the written data. 

391 

392 Args: 

393 char_uuid: UUID of the characteristic 

394 callback: Function called with the written data 

395 

396 Raises: 

397 KeyError: If characteristic UUID not found 

398 

399 """ 

400 uuid_str = str(char_uuid).upper() 

401 if uuid_str not in self._char_definitions: 

402 raise KeyError(f"Characteristic {uuid_str} not found") 

403 self._write_callbacks[uuid_str] = callback 

404 

405 # ------------------------------------------------------------------------- 

406 # Abstract Methods (Backend-Specific Implementation Required) 

407 # ------------------------------------------------------------------------- 

408 

409 @abstractmethod 

410 async def start(self) -> None: 

411 """Start advertising and accepting connections. 

412 

413 Backend implementations must: 

414 1. Create the platform-specific GATT server 

415 2. Register all services and characteristics from self._services 

416 3. Configure advertisement data from self._manufacturer_data, etc. 

417 4. Begin advertising 

418 

419 Raises: 

420 RuntimeError: If no services have been added 

421 

422 """ 

423 

424 @abstractmethod 

425 async def stop(self) -> None: 

426 """Stop advertising and disconnect all clients.""" 

427 

428 @property 

429 @abstractmethod 

430 def is_advertising(self) -> bool: 

431 """Check if the peripheral is currently advertising. 

432 

433 Returns: 

434 True if advertising, False otherwise 

435 

436 """ 

437 

438 @abstractmethod 

439 async def update_characteristic( 

440 self, 

441 char_uuid: str | BluetoothUUID, 

442 value: bytearray, 

443 *, 

444 notify: bool = True, 

445 ) -> None: 

446 """Update a characteristic's value. 

447 

448 This sets the new value that will be returned when clients read the 

449 characteristic. If notify=True and the characteristic supports 

450 notifications, subscribed clients will be notified of the change. 

451 

452 Args: 

453 char_uuid: UUID of the characteristic to update 

454 value: New encoded value (use characteristic.build_value() to encode) 

455 notify: If True, notify subscribed clients of the change 

456 

457 Raises: 

458 KeyError: If characteristic UUID not found 

459 RuntimeError: If peripheral not started 

460 

461 """ 

462 

463 @abstractmethod 

464 async def get_characteristic_value(self, char_uuid: str | BluetoothUUID) -> bytearray: 

465 """Get the current value of a characteristic. 

466 

467 Args: 

468 char_uuid: UUID of the characteristic 

469 

470 Returns: 

471 The current encoded value 

472 

473 Raises: 

474 KeyError: If characteristic UUID not found 

475 

476 """ 

477 

478 @property 

479 def connected_clients(self) -> int: 

480 """Get the number of currently connected clients. 

481 

482 Returns: 

483 Number of connected BLE centrals 

484 

485 Raises: 

486 NotImplementedError: If backend doesn't track connections 

487 

488 """ 

489 raise NotImplementedError(f"{self.__class__.__name__} does not track connected clients") 

490 

491 

492__all__ = [ 

493 "CharacteristicDefinition", 

494 "PeripheralManagerProtocol", 

495 "ServiceDefinition", 

496]