Coverage for src / bluetooth_sig / device / peripheral_device.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""High-level peripheral (GATT server) abstraction. 

2 

3Provides :class:`PeripheralDevice`, the server-side counterpart to 

4:class:`Device`. Where ``Device`` connects TO remote peripherals and reads 

5GATT data, ``PeripheralDevice`` **hosts** GATT services and encodes values 

6for remote centrals to read. 

7 

8Composes :class:`PeripheralManagerProtocol` with ``BaseCharacteristic`` 

9instances that handle value encoding via ``build_value()``. 

10""" 

11 

12from __future__ import annotations 

13 

14import logging 

15 

16# Any is required: BaseCharacteristic is generic over its value type (T), but 

17# PeripheralDevice hosts heterogeneous characteristics with different T types 

18# in a single dict, so the container must erase the type parameter to Any. 

19from typing import Any 

20 

21from bluetooth_sig.gatt.characteristics.base import BaseCharacteristic 

22from bluetooth_sig.types.gatt_enums import GattProperty 

23from bluetooth_sig.types.peripheral_types import CharacteristicDefinition, ServiceDefinition 

24from bluetooth_sig.types.uuid import BluetoothUUID 

25 

26from .peripheral import PeripheralManagerProtocol 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class HostedCharacteristic: 

32 """Tracks a hosted characteristic with its definition and class instance. 

33 

34 Attributes: 

35 definition: The GATT characteristic definition registered on the peripheral. 

36 characteristic: The SIG characteristic class instance used for encoding/decoding. 

37 last_value: The last Python value that was encoded and set on this characteristic. 

38 

39 """ 

40 

41 __slots__ = ("characteristic", "definition", "last_value") 

42 

43 def __init__( 

44 self, 

45 definition: CharacteristicDefinition, 

46 characteristic: BaseCharacteristic[Any], 

47 initial_value: Any = None, # noqa: ANN401 

48 ) -> None: 

49 """Initialise a hosted characteristic record. 

50 

51 Args: 

52 definition: The GATT characteristic definition registered on the peripheral. 

53 characteristic: The SIG characteristic class instance for encoding/decoding. 

54 initial_value: Optional initial Python value set on this characteristic. 

55 

56 """ 

57 self.definition = definition 

58 self.characteristic = characteristic 

59 self.last_value: Any = initial_value 

60 

61 

62class PeripheralDevice: 

63 """High-level BLE peripheral abstraction using composition pattern. 

64 

65 Coordinates between :class:`PeripheralManagerProtocol` (backend) and 

66 ``BaseCharacteristic`` instances (encoding) so callers work with typed 

67 Python values. 

68 

69 Encoding is handled directly by the characteristic's ``build_value()`` 

70 method — no translator is needed on the peripheral (server) side. 

71 

72 The workflow mirrors :class:`Device` but for the server role: 

73 

74 1. Create a ``PeripheralDevice`` wrapping a backend. 

75 2. Add services with :meth:`add_service` (typed helpers encode initial values). 

76 3. Start advertising with :meth:`start`. 

77 4. Update characteristic values with :meth:`update_value`. 

78 5. Stop with :meth:`stop`. 

79 

80 Example:: 

81 

82 >>> from bluetooth_sig.gatt.characteristics import BatteryLevelCharacteristic 

83 >>> from bluetooth_sig.gatt.services import BatteryService 

84 >>> 

85 >>> peripheral = PeripheralDevice(backend) 

86 >>> battery_char = BatteryLevelCharacteristic() 

87 >>> peripheral.add_characteristic( 

88 ... service_uuid=BatteryService.get_class_uuid(), 

89 ... characteristic=battery_char, 

90 ... initial_value=85, 

91 ... ) 

92 >>> await peripheral.start() 

93 >>> await peripheral.update_value(battery_char, 72) 

94 

95 """ 

96 

97 def __init__( 

98 self, 

99 peripheral_manager: PeripheralManagerProtocol, 

100 ) -> None: 

101 """Initialise PeripheralDevice. 

102 

103 Args: 

104 peripheral_manager: Backend implementing PeripheralManagerProtocol. 

105 

106 """ 

107 self._manager = peripheral_manager 

108 

109 # UUID (normalised upper-case) → HostedCharacteristic 

110 self._hosted: dict[str, HostedCharacteristic] = {} 

111 

112 # Service UUID → ServiceDefinition (tracks services added via helpers) 

113 self._pending_services: dict[str, ServiceDefinition] = {} 

114 

115 # ------------------------------------------------------------------ 

116 # Properties 

117 # ------------------------------------------------------------------ 

118 

119 @property 

120 def name(self) -> str: 

121 """Advertised device name.""" 

122 return self._manager.name 

123 

124 @property 

125 def is_advertising(self) -> bool: 

126 """Whether the peripheral is currently advertising.""" 

127 return self._manager.is_advertising 

128 

129 @property 

130 def services(self) -> list[ServiceDefinition]: 

131 """Registered GATT services.""" 

132 return self._manager.services 

133 

134 @property 

135 def hosted_characteristics(self) -> dict[str, HostedCharacteristic]: 

136 """Map of UUID → HostedCharacteristic for all hosted characteristics.""" 

137 return dict(self._hosted) 

138 

139 # ------------------------------------------------------------------ 

140 # Service & Characteristic Registration 

141 # ------------------------------------------------------------------ 

142 

143 def add_characteristic( 

144 self, 

145 service_uuid: str | BluetoothUUID, 

146 characteristic: BaseCharacteristic[Any], 

147 initial_value: Any, # noqa: ANN401 

148 *, 

149 properties: GattProperty | None = None, 

150 ) -> CharacteristicDefinition: 

151 """Register a characteristic on a service, encoding the initial value. 

152 

153 If the service has not been seen before, a new primary 

154 :class:`ServiceDefinition` is created automatically. 

155 

156 Args: 

157 service_uuid: UUID of the parent service. 

158 characteristic: SIG characteristic class instance. 

159 initial_value: Python value to encode as the initial value. 

160 properties: GATT properties. Defaults to ``READ | NOTIFY``. 

161 

162 Returns: 

163 The created :class:`CharacteristicDefinition`. 

164 

165 Raises: 

166 RuntimeError: If the peripheral has already started advertising. 

167 

168 """ 

169 if self._manager.is_advertising: 

170 raise RuntimeError("Cannot add characteristics after peripheral has started") 

171 

172 char_def = CharacteristicDefinition.from_characteristic( 

173 characteristic, 

174 initial_value, 

175 properties=properties, 

176 ) 

177 

178 svc_key = str(service_uuid).upper() 

179 if svc_key not in self._pending_services: 

180 self._pending_services[svc_key] = ServiceDefinition( 

181 uuid=BluetoothUUID(svc_key), 

182 characteristics=[], 

183 ) 

184 self._pending_services[svc_key].characteristics.append(char_def) 

185 

186 uuid_key = str(char_def.uuid).upper() 

187 self._hosted[uuid_key] = HostedCharacteristic( 

188 definition=char_def, 

189 characteristic=characteristic, 

190 initial_value=initial_value, 

191 ) 

192 

193 return char_def 

194 

195 async def add_service(self, service: ServiceDefinition) -> None: 

196 """Register a pre-built service definition directly. 

197 

198 For full control over the service definition. If you prefer typed 

199 helpers, use :meth:`add_characteristic` instead. 

200 

201 Args: 

202 service: Complete service definition. 

203 

204 Raises: 

205 RuntimeError: If the peripheral has already started advertising. 

206 

207 """ 

208 await self._manager.add_service(service) 

209 

210 # ------------------------------------------------------------------ 

211 # Lifecycle 

212 # ------------------------------------------------------------------ 

213 

214 async def start(self) -> None: 

215 """Register pending services on the backend and start advertising. 

216 

217 All services added via :meth:`add_characteristic` are flushed to the 

218 backend before ``start()`` is called on the manager. 

219 

220 Raises: 

221 RuntimeError: If the peripheral has already started. 

222 

223 """ 

224 # Flush pending services to the backend 

225 for service_def in self._pending_services.values(): 

226 await self._manager.add_service(service_def) 

227 self._pending_services.clear() 

228 

229 await self._manager.start() 

230 

231 async def stop(self) -> None: 

232 """Stop advertising and disconnect all clients.""" 

233 await self._manager.stop() 

234 

235 # ------------------------------------------------------------------ 

236 # Value Updates 

237 # ------------------------------------------------------------------ 

238 

239 async def update_value( 

240 self, 

241 characteristic: BaseCharacteristic[Any] | str | BluetoothUUID, 

242 value: Any, # noqa: ANN401 

243 *, 

244 notify: bool = True, 

245 ) -> None: 

246 """Encode a typed value and push it to the hosted characteristic. 

247 

248 Args: 

249 characteristic: The characteristic instance, UUID string, or BluetoothUUID. 

250 value: Python value to encode via ``build_value()``. 

251 notify: Whether to notify subscribed centrals. Default ``True``. 

252 

253 Raises: 

254 KeyError: If the characteristic is not hosted on this peripheral. 

255 RuntimeError: If the peripheral has not started. 

256 

257 """ 

258 uuid_key = self._resolve_uuid_key(characteristic) 

259 hosted = self._hosted.get(uuid_key) 

260 if hosted is None: 

261 raise KeyError(f"Characteristic {uuid_key} is not hosted on this peripheral") 

262 

263 encoded = hosted.characteristic.build_value(value) 

264 hosted.last_value = value 

265 

266 await self._manager.update_characteristic(uuid_key, encoded, notify=notify) 

267 

268 async def update_raw( 

269 self, 

270 char_uuid: str | BluetoothUUID, 

271 raw_value: bytearray, 

272 *, 

273 notify: bool = True, 

274 ) -> None: 

275 """Push pre-encoded bytes to a hosted characteristic. 

276 

277 Use this when you already have the encoded value or the 

278 characteristic does not have a SIG class registered. 

279 

280 Args: 

281 char_uuid: UUID of the characteristic. 

282 raw_value: Pre-encoded bytes to set. 

283 notify: Whether to notify subscribed centrals. 

284 

285 Raises: 

286 KeyError: If the characteristic UUID is not hosted. 

287 RuntimeError: If the peripheral has not started. 

288 

289 """ 

290 uuid_key = str(char_uuid).upper() 

291 await self._manager.update_characteristic(uuid_key, raw_value, notify=notify) 

292 

293 async def get_current_value( 

294 self, 

295 characteristic: BaseCharacteristic[Any] | str | BluetoothUUID, 

296 ) -> Any: # noqa: ANN401 

297 """Get the last Python value set for a hosted characteristic. 

298 

299 Args: 

300 characteristic: The characteristic instance, UUID string, or BluetoothUUID. 

301 

302 Returns: 

303 The last value passed to :meth:`update_value`, or the initial value. 

304 

305 Raises: 

306 KeyError: If the characteristic is not hosted. 

307 

308 """ 

309 uuid_key = self._resolve_uuid_key(characteristic) 

310 hosted = self._hosted.get(uuid_key) 

311 if hosted is None: 

312 raise KeyError(f"Characteristic {uuid_key} is not hosted on this peripheral") 

313 return hosted.last_value 

314 

315 # ------------------------------------------------------------------ 

316 # Fluent Configuration Delegation 

317 # ------------------------------------------------------------------ 

318 

319 def with_manufacturer_data(self, company_id: int, payload: bytes) -> PeripheralDevice: 

320 """Configure manufacturer data for advertising. 

321 

322 Args: 

323 company_id: Bluetooth SIG company identifier. 

324 payload: Manufacturer-specific payload bytes. 

325 

326 Returns: 

327 Self for method chaining. 

328 

329 """ 

330 self._manager.with_manufacturer_id(company_id, payload) 

331 return self 

332 

333 def with_tx_power(self, power_dbm: int) -> PeripheralDevice: 

334 """Set TX power level. 

335 

336 Args: 

337 power_dbm: Transmission power in dBm. 

338 

339 Returns: 

340 Self for method chaining. 

341 

342 """ 

343 self._manager.with_tx_power(power_dbm) 

344 return self 

345 

346 def with_connectable(self, connectable: bool) -> PeripheralDevice: 

347 """Set whether the peripheral accepts connections. 

348 

349 Args: 

350 connectable: True to accept connections. 

351 

352 Returns: 

353 Self for method chaining. 

354 

355 """ 

356 self._manager.with_connectable(connectable) 

357 return self 

358 

359 def with_discoverable(self, discoverable: bool) -> PeripheralDevice: 

360 """Set whether the peripheral is discoverable. 

361 

362 Args: 

363 discoverable: True to be discoverable. 

364 

365 Returns: 

366 Self for method chaining. 

367 

368 """ 

369 self._manager.with_discoverable(discoverable) 

370 return self 

371 

372 # ------------------------------------------------------------------ 

373 # Internals 

374 # ------------------------------------------------------------------ 

375 

376 def _resolve_uuid_key(self, characteristic: BaseCharacteristic[Any] | str | BluetoothUUID) -> str: 

377 """Normalise a characteristic reference to an upper-case UUID string.""" 

378 if isinstance(characteristic, BaseCharacteristic): 

379 return str(characteristic.uuid).upper() 

380 return str(characteristic).upper() 

381 

382 def __repr__(self) -> str: 

383 """Return a developer-friendly representation.""" 

384 state = "advertising" if self.is_advertising else "stopped" 

385 return ( 

386 f"PeripheralDevice(name={self.name!r}, " 

387 f"state={state}, " 

388 f"services={len(self.services)}, " 

389 f"characteristics={len(self._hosted)})" 

390 ) 

391 

392 

393__all__ = [ 

394 "HostedCharacteristic", 

395 "PeripheralDevice", 

396]