Coverage for src / bluetooth_sig / device / peripheral.py: 76%
112 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Peripheral manager protocol for BLE GATT server adapters.
3Defines an async abstract base class that peripheral adapter implementations
4(bless, bluez_peripheral, etc.) must inherit from to create BLE GATT servers
5that broadcast services and characteristics.
7This is the server-side counterpart to ClientManagerProtocol. Where clients
8connect TO devices and READ/PARSE data, peripherals ARE devices that ENCODE
9and BROADCAST data for others to read.
11Adapters must provide async implementations of all abstract methods below.
13TODO: PeripheralDevice exists in peripheral_device.py with core functionality.
14 Remaining gaps to address (see ROADMAP.md Workstream F):
15 - Subscription management (on_subscribe/on_unsubscribe, subscribed_clients tracking)
16 - Client event callbacks (on_client_connected/on_client_disconnected)
17 - Read/write request handling (typed on_read_request/on_write_request)
18 - Descriptor hosting (CCCD, User Description, Presentation Format)
19"""
21from __future__ import annotations
23from abc import ABC, abstractmethod
24from collections.abc import Callable
25from typing import ClassVar
27from typing_extensions import Self
29from bluetooth_sig.types.company import CompanyIdentifier, ManufacturerData
30from bluetooth_sig.types.peripheral_types import CharacteristicDefinition, ServiceDefinition
31from bluetooth_sig.types.uuid import BluetoothUUID
34class PeripheralManagerProtocol(ABC):
35 """Abstract base class for BLE peripheral/GATT server implementations.
37 This protocol defines the interface for creating BLE peripherals that
38 broadcast services and characteristics. Implementations wrap backend
39 libraries like bless, bluez_peripheral, etc.
41 Uses a fluent builder pattern for advertisement configuration - call
42 configuration methods before start() to customise advertising.
44 The workflow is:
45 1. Create peripheral manager with a device name
46 2. Configure advertising (optional): with_manufacturer_data(), with_tx_power(), etc.
47 3. Add services and characteristics (using CharacteristicDefinition)
48 4. Start advertising
49 5. Update characteristic values as needed
50 6. Stop when done
52 Example::
53 >>> from bluetooth_sig.gatt.characteristics import BatteryLevelCharacteristic
54 >>> from bluetooth_sig.gatt.services import BatteryService
55 >>> from bluetooth_sig.types.company import ManufacturerData
56 >>>
57 >>> # Create peripheral with fluent configuration
58 >>> peripheral = SomePeripheralManager("My Sensor")
59 >>> peripheral.with_tx_power(-10).with_connectable(True)
60 >>>
61 >>> # Define a service with battery level
62 >>> char = BatteryLevelCharacteristic()
63 >>> char_def = CharacteristicDefinition.from_characteristic(char, 85)
64 >>>
65 >>> service = ServiceDefinition(
66 ... uuid=BatteryService.get_class_uuid(),
67 ... characteristics=[char_def],
68 ... )
69 >>>
70 >>> await peripheral.add_service(service)
71 >>> await peripheral.start()
72 >>>
73 >>> # Later, update the battery level
74 >>> await peripheral.update_characteristic("2A19", char.build_value(75))
76 """
78 # Class-level flag indicating backend capabilities
79 supports_advertising: ClassVar[bool] = True
81 def __init__(self, name: str) -> None:
82 """Initialize the peripheral manager.
84 Args:
85 name: The advertised device name visible to scanners
87 """
88 self._name = name
90 # Service and characteristic tracking
91 self._services: list[ServiceDefinition] = []
92 self._char_definitions: dict[str, CharacteristicDefinition] = {}
94 # Advertisement configuration (set via fluent methods)
95 self._manufacturer_data: ManufacturerData | None = None
96 self._service_data: dict[BluetoothUUID, bytes] = {}
97 self._tx_power: int | None = None
98 self._is_connectable = True
99 self._is_discoverable = True
101 # Callbacks for read/write handling
102 self._read_callbacks: dict[str, Callable[[], bytearray]] = {}
103 self._write_callbacks: dict[str, Callable[[bytearray], None]] = {}
105 @property
106 def name(self) -> str:
107 """Get the advertised device name.
109 Returns:
110 The device name as it appears to BLE scanners
112 """
113 return self._name
115 @property
116 def services(self) -> list[ServiceDefinition]:
117 """Get the list of registered services.
119 Returns:
120 List of ServiceDefinition objects added to this peripheral.
122 """
123 return self._services
125 @property
126 def manufacturer_data(self) -> ManufacturerData | None:
127 """Get the configured manufacturer data.
129 Returns:
130 ManufacturerData if configured, None otherwise.
132 """
133 return self._manufacturer_data
135 @property
136 def service_data(self) -> dict[BluetoothUUID, bytes]:
137 """Get the configured service data.
139 Returns:
140 Dictionary mapping service UUIDs to data bytes.
142 """
143 return self._service_data
145 @property
146 def tx_power(self) -> int | None:
147 """Get the configured TX power level.
149 Returns:
150 TX power in dBm if configured, None otherwise.
152 """
153 return self._tx_power
155 @property
156 def is_connectable_config(self) -> bool:
157 """Get the connectable configuration.
159 Returns:
160 True if peripheral is configured to accept connections.
162 """
163 return self._is_connectable
165 @property
166 def is_discoverable_config(self) -> bool:
167 """Get the discoverable configuration.
169 Returns:
170 True if peripheral is configured to be discoverable.
172 """
173 return self._is_discoverable
175 def with_manufacturer_data(self, manufacturer_data: ManufacturerData) -> Self:
176 r"""Set manufacturer-specific advertising data.
178 Args:
179 manufacturer_data: ManufacturerData instance from the types module.
181 Returns:
182 Self for method chaining.
184 Raises:
185 RuntimeError: If called after start().
187 Example::
188 >>> from bluetooth_sig.types.company import ManufacturerData
189 >>> mfr = ManufacturerData.from_id_and_payload(0x004C, b"\x02\x15...")
190 >>> peripheral.with_manufacturer_data(mfr)
192 """
193 if self.is_advertising:
194 raise RuntimeError("Cannot configure after peripheral has started")
195 self._manufacturer_data = manufacturer_data
196 return self
198 def with_manufacturer_id(
199 self,
200 company_id: int | CompanyIdentifier,
201 payload: bytes,
202 ) -> Self:
203 r"""Set manufacturer data from company ID and payload.
205 Args:
206 company_id: Bluetooth SIG company identifier (e.g., 0x004C for Apple)
207 or CompanyIdentifier instance.
208 payload: Manufacturer-specific payload bytes.
210 Returns:
211 Self for method chaining.
213 Raises:
214 RuntimeError: If called after start().
216 Example::
217 >>> peripheral.with_manufacturer_id(0x004C, b"\x02\x15...")
219 """
220 if self.is_advertising:
221 raise RuntimeError("Cannot configure after peripheral has started")
222 cid = company_id if isinstance(company_id, int) else company_id.id
223 self._manufacturer_data = ManufacturerData.from_id_and_payload(cid, payload)
224 return self
226 def with_service_data(
227 self,
228 service_uuid: BluetoothUUID,
229 data: bytes,
230 ) -> Self:
231 r"""Add service data to advertisement.
233 Args:
234 service_uuid: BluetoothUUID of the service.
235 data: Service-specific data bytes.
237 Returns:
238 Self for method chaining.
240 Raises:
241 RuntimeError: If called after start().
243 Example::
244 >>> from bluetooth_sig.gatt.services import BatteryService
245 >>> peripheral.with_service_data(
246 ... BatteryService.get_class_uuid(),
247 ... b"\x50", # 80% battery
248 ... )
250 """
251 if self.is_advertising:
252 raise RuntimeError("Cannot configure after peripheral has started")
253 self._service_data[service_uuid] = data
254 return self
256 def with_tx_power(self, power_dbm: int) -> Self:
257 """Set TX power level for advertising.
259 Args:
260 power_dbm: Transmission power in dBm (-127 to +127).
262 Returns:
263 Self for method chaining.
265 Raises:
266 RuntimeError: If called after start().
268 """
269 if self.is_advertising:
270 raise RuntimeError("Cannot configure after peripheral has started")
271 self._tx_power = power_dbm
272 return self
274 def with_connectable(self, connectable: bool) -> Self:
275 """Set whether the peripheral accepts connections.
277 Args:
278 connectable: True to accept connections (default), False for broadcast only.
280 Returns:
281 Self for method chaining.
283 Raises:
284 RuntimeError: If called after start().
286 """
287 if self.is_advertising:
288 raise RuntimeError("Cannot configure after peripheral has started")
289 self._is_connectable = connectable
290 return self
292 def with_discoverable(self, discoverable: bool) -> Self:
293 """Set whether the peripheral is discoverable.
295 Args:
296 discoverable: True to be discoverable (default), False otherwise.
298 Returns:
299 Self for method chaining.
301 Raises:
302 RuntimeError: If called after start().
304 """
305 if self.is_advertising:
306 raise RuntimeError("Cannot configure after peripheral has started")
307 self._is_discoverable = discoverable
308 return self
310 # -------------------------------------------------------------------------
311 # Service Management (Generic Implementation)
312 # -------------------------------------------------------------------------
314 async def add_service(self, service: ServiceDefinition) -> None:
315 """Add a GATT service to the peripheral.
317 Services must be added before calling start(). Each service contains
318 one or more characteristics that clients can interact with.
320 Args:
321 service: The service definition to add
323 Raises:
324 RuntimeError: If called after start()
326 """
327 if self.is_advertising:
328 raise RuntimeError("Cannot add services after peripheral has started")
330 self._services.append(service)
332 # Track characteristic definitions for later lookup
333 for char_def in service.characteristics:
334 uuid_upper = str(char_def.uuid).upper()
335 self._char_definitions[uuid_upper] = char_def
337 # Register any callbacks from the definition
338 if char_def.on_read:
339 self._read_callbacks[uuid_upper] = char_def.on_read
340 if char_def.on_write:
341 self._write_callbacks[uuid_upper] = char_def.on_write
343 def get_characteristic_definition(
344 self,
345 char_uuid: str | BluetoothUUID,
346 ) -> CharacteristicDefinition | None:
347 """Get the characteristic definition by UUID.
349 Args:
350 char_uuid: UUID of the characteristic.
352 Returns:
353 CharacteristicDefinition if found, None otherwise.
355 """
356 uuid_upper = str(char_uuid).upper()
357 return self._char_definitions.get(uuid_upper)
359 def set_read_callback(
360 self,
361 char_uuid: str | BluetoothUUID,
362 callback: Callable[[], bytearray],
363 ) -> None:
364 """Set a callback for dynamic read value generation.
366 When a client reads the characteristic, this callback will be invoked
367 to generate the current value instead of returning the stored value.
369 Args:
370 char_uuid: UUID of the characteristic
371 callback: Function that returns the encoded value to serve
373 Raises:
374 KeyError: If characteristic UUID not found
376 """
377 uuid_str = str(char_uuid).upper()
378 if uuid_str not in self._char_definitions:
379 raise KeyError(f"Characteristic {uuid_str} not found")
380 self._read_callbacks[uuid_str] = callback
382 def set_write_callback(
383 self,
384 char_uuid: str | BluetoothUUID,
385 callback: Callable[[bytearray], None],
386 ) -> None:
387 """Set a callback for handling client writes.
389 When a client writes to the characteristic, this callback will be
390 invoked with the written data.
392 Args:
393 char_uuid: UUID of the characteristic
394 callback: Function called with the written data
396 Raises:
397 KeyError: If characteristic UUID not found
399 """
400 uuid_str = str(char_uuid).upper()
401 if uuid_str not in self._char_definitions:
402 raise KeyError(f"Characteristic {uuid_str} not found")
403 self._write_callbacks[uuid_str] = callback
405 # -------------------------------------------------------------------------
406 # Abstract Methods (Backend-Specific Implementation Required)
407 # -------------------------------------------------------------------------
409 @abstractmethod
410 async def start(self) -> None:
411 """Start advertising and accepting connections.
413 Backend implementations must:
414 1. Create the platform-specific GATT server
415 2. Register all services and characteristics from self._services
416 3. Configure advertisement data from self._manufacturer_data, etc.
417 4. Begin advertising
419 Raises:
420 RuntimeError: If no services have been added
422 """
424 @abstractmethod
425 async def stop(self) -> None:
426 """Stop advertising and disconnect all clients."""
428 @property
429 @abstractmethod
430 def is_advertising(self) -> bool:
431 """Check if the peripheral is currently advertising.
433 Returns:
434 True if advertising, False otherwise
436 """
438 @abstractmethod
439 async def update_characteristic(
440 self,
441 char_uuid: str | BluetoothUUID,
442 value: bytearray,
443 *,
444 notify: bool = True,
445 ) -> None:
446 """Update a characteristic's value.
448 This sets the new value that will be returned when clients read the
449 characteristic. If notify=True and the characteristic supports
450 notifications, subscribed clients will be notified of the change.
452 Args:
453 char_uuid: UUID of the characteristic to update
454 value: New encoded value (use characteristic.build_value() to encode)
455 notify: If True, notify subscribed clients of the change
457 Raises:
458 KeyError: If characteristic UUID not found
459 RuntimeError: If peripheral not started
461 """
463 @abstractmethod
464 async def get_characteristic_value(self, char_uuid: str | BluetoothUUID) -> bytearray:
465 """Get the current value of a characteristic.
467 Args:
468 char_uuid: UUID of the characteristic
470 Returns:
471 The current encoded value
473 Raises:
474 KeyError: If characteristic UUID not found
476 """
478 @property
479 def connected_clients(self) -> int:
480 """Get the number of currently connected clients.
482 Returns:
483 Number of connected BLE centrals
485 Raises:
486 NotImplementedError: If backend doesn't track connections
488 """
489 raise NotImplementedError(f"{self.__class__.__name__} does not track connected clients")
492__all__ = [
493 "CharacteristicDefinition",
494 "PeripheralManagerProtocol",
495 "ServiceDefinition",
496]