Coverage for src / bluetooth_sig / stream / pairing.py: 100%

49 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Stream helpers for pairing dependent characteristic notifications. 

2 

3This module provides a generic, backend-agnostic buffer that correlates 

4dependent characteristic notifications based on caller-defined grouping keys. 

5Useful for Bluetooth SIG profiles where characteristics must be paired by 

6sequence numbers, timestamps, or other identifiers. 

7 

8""" 

9 

10from __future__ import annotations 

11 

12import time 

13from collections.abc import Callable, Hashable 

14from typing import Any 

15 

16import msgspec 

17 

18from ..core.translator import BluetoothSIGTranslator 

19 

20 

21class BufferStats(msgspec.Struct, frozen=True, kw_only=True): 

22 """Snapshot of pairing buffer statistics. 

23 

24 Attributes: 

25 pending: Number of incomplete groups currently buffered. 

26 completed: Total number of groups successfully paired since creation. 

27 evicted: Total number of groups evicted due to TTL expiry since creation. 

28 """ 

29 

30 pending: int 

31 completed: int 

32 evicted: int 

33 

34 

35class DependencyPairingBuffer: 

36 """Buffer and pair dependent characteristic notifications. 

37 

38 Buffers incoming notifications until all required UUIDs for a grouping key 

39 are present, then batch-parses and invokes the callback. Order-independent. 

40 

41 Args: 

42 translator: BluetoothSIGTranslator instance for parsing characteristics. 

43 required_uuids: Set of UUID strings that must be present to form a complete pair. 

44 group_key: Function that extracts a grouping key from each parsed notification. 

45 Called as ``group_key(uuid, parsed_result)`` and must return a hashable value. 

46 on_pair: Callback invoked with complete parsed pairs as 

47 ``on_pair(results: dict[str, Any])``. 

48 max_age_seconds: Maximum age in seconds for buffered groups before eviction. 

49 ``None`` disables TTL eviction (default). 

50 clock: Callable returning current time as a float (seconds). Defaults to 

51 ``time.monotonic``. Override in tests for deterministic timing. 

52 

53 Note: 

54 Does not manage BLE subscriptions. Callers handle connection and notification setup. 

55 """ 

56 

57 def __init__( 

58 self, 

59 *, 

60 translator: BluetoothSIGTranslator, 

61 required_uuids: set[str], 

62 group_key: Callable[[str, Any], Hashable], 

63 on_pair: Callable[[dict[str, Any]], None], 

64 max_age_seconds: float | None = None, 

65 clock: Callable[[], float] = time.monotonic, 

66 ) -> None: 

67 """Initialize the pairing buffer.""" 

68 self._translator = translator 

69 self._required = set(required_uuids) 

70 self._group_key = group_key 

71 self._on_pair = on_pair 

72 self._max_age_seconds = max_age_seconds 

73 self._clock = clock 

74 self._buffer: dict[Hashable, dict[str, bytes]] = {} 

75 self._group_timestamps: dict[Hashable, float] = {} 

76 self._completed_count: int = 0 

77 self._evicted_count: int = 0 

78 

79 def ingest(self, uuid: str, data: bytes) -> None: 

80 """Ingest a single characteristic notification. 

81 

82 Evicts stale groups (if TTL is configured) before processing. 

83 

84 Args: 

85 uuid: Characteristic UUID string (16-bit or 128-bit). 

86 data: Raw bytes from the characteristic notification. 

87 """ 

88 self._evict_stale() 

89 

90 parsed = self._translator.parse_characteristic(uuid, data) 

91 group_id = self._group_key(uuid, parsed) 

92 

93 group = self._buffer.setdefault(group_id, {}) 

94 if group_id not in self._group_timestamps: 

95 self._group_timestamps[group_id] = self._clock() 

96 group[uuid] = data 

97 

98 if self._required.issubset(group.keys()): 

99 batch = dict(group) 

100 del self._buffer[group_id] 

101 del self._group_timestamps[group_id] 

102 self._completed_count += 1 

103 

104 results = self._translator.parse_characteristics(batch) 

105 self._on_pair(results) 

106 

107 def stats(self) -> BufferStats: 

108 """Return a snapshot of buffer statistics. 

109 

110 Returns: 

111 BufferStats with current pending count and lifetime completed/evicted totals. 

112 """ 

113 return BufferStats( 

114 pending=len(self._buffer), 

115 completed=self._completed_count, 

116 evicted=self._evicted_count, 

117 ) 

118 

119 def _evict_stale(self) -> None: 

120 """Remove groups older than max_age_seconds.""" 

121 if self._max_age_seconds is None: 

122 return 

123 

124 now = self._clock() 

125 cutoff = now - self._max_age_seconds 

126 stale_keys = [key for key, timestamp in self._group_timestamps.items() if timestamp <= cutoff] 

127 

128 for key in stale_keys: 

129 del self._buffer[key] 

130 del self._group_timestamps[key] 

131 self._evicted_count += 1