Coverage for src / bluetooth_sig / stream / pairing.py: 100%
49 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Stream helpers for pairing dependent characteristic notifications.
3This module provides a generic, backend-agnostic buffer that correlates
4dependent characteristic notifications based on caller-defined grouping keys.
5Useful for Bluetooth SIG profiles where characteristics must be paired by
6sequence numbers, timestamps, or other identifiers.
8"""
10from __future__ import annotations
12import time
13from collections.abc import Callable, Hashable
14from typing import Any
16import msgspec
18from ..core.translator import BluetoothSIGTranslator
21class BufferStats(msgspec.Struct, frozen=True, kw_only=True):
22 """Snapshot of pairing buffer statistics.
24 Attributes:
25 pending: Number of incomplete groups currently buffered.
26 completed: Total number of groups successfully paired since creation.
27 evicted: Total number of groups evicted due to TTL expiry since creation.
28 """
30 pending: int
31 completed: int
32 evicted: int
35class DependencyPairingBuffer:
36 """Buffer and pair dependent characteristic notifications.
38 Buffers incoming notifications until all required UUIDs for a grouping key
39 are present, then batch-parses and invokes the callback. Order-independent.
41 Args:
42 translator: BluetoothSIGTranslator instance for parsing characteristics.
43 required_uuids: Set of UUID strings that must be present to form a complete pair.
44 group_key: Function that extracts a grouping key from each parsed notification.
45 Called as ``group_key(uuid, parsed_result)`` and must return a hashable value.
46 on_pair: Callback invoked with complete parsed pairs as
47 ``on_pair(results: dict[str, Any])``.
48 max_age_seconds: Maximum age in seconds for buffered groups before eviction.
49 ``None`` disables TTL eviction (default).
50 clock: Callable returning current time as a float (seconds). Defaults to
51 ``time.monotonic``. Override in tests for deterministic timing.
53 Note:
54 Does not manage BLE subscriptions. Callers handle connection and notification setup.
55 """
57 def __init__(
58 self,
59 *,
60 translator: BluetoothSIGTranslator,
61 required_uuids: set[str],
62 group_key: Callable[[str, Any], Hashable],
63 on_pair: Callable[[dict[str, Any]], None],
64 max_age_seconds: float | None = None,
65 clock: Callable[[], float] = time.monotonic,
66 ) -> None:
67 """Initialize the pairing buffer."""
68 self._translator = translator
69 self._required = set(required_uuids)
70 self._group_key = group_key
71 self._on_pair = on_pair
72 self._max_age_seconds = max_age_seconds
73 self._clock = clock
74 self._buffer: dict[Hashable, dict[str, bytes]] = {}
75 self._group_timestamps: dict[Hashable, float] = {}
76 self._completed_count: int = 0
77 self._evicted_count: int = 0
79 def ingest(self, uuid: str, data: bytes) -> None:
80 """Ingest a single characteristic notification.
82 Evicts stale groups (if TTL is configured) before processing.
84 Args:
85 uuid: Characteristic UUID string (16-bit or 128-bit).
86 data: Raw bytes from the characteristic notification.
87 """
88 self._evict_stale()
90 parsed = self._translator.parse_characteristic(uuid, data)
91 group_id = self._group_key(uuid, parsed)
93 group = self._buffer.setdefault(group_id, {})
94 if group_id not in self._group_timestamps:
95 self._group_timestamps[group_id] = self._clock()
96 group[uuid] = data
98 if self._required.issubset(group.keys()):
99 batch = dict(group)
100 del self._buffer[group_id]
101 del self._group_timestamps[group_id]
102 self._completed_count += 1
104 results = self._translator.parse_characteristics(batch)
105 self._on_pair(results)
107 def stats(self) -> BufferStats:
108 """Return a snapshot of buffer statistics.
110 Returns:
111 BufferStats with current pending count and lifetime completed/evicted totals.
112 """
113 return BufferStats(
114 pending=len(self._buffer),
115 completed=self._completed_count,
116 evicted=self._evicted_count,
117 )
119 def _evict_stale(self) -> None:
120 """Remove groups older than max_age_seconds."""
121 if self._max_age_seconds is None:
122 return
124 now = self._clock()
125 cutoff = now - self._max_age_seconds
126 stale_keys = [key for key, timestamp in self._group_timestamps.items() if timestamp <= cutoff]
128 for key in stale_keys:
129 del self._buffer[key]
130 del self._group_timestamps[key]
131 self._evicted_count += 1