Coverage for src / bluetooth_sig / stream / pairing.py: 100%

21 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""Stream helpers for pairing dependent characteristic notifications. 

2 

3This module provides a generic, backend-agnostic buffer that correlates 

4dependent characteristic notifications based on caller-defined grouping keys. 

5Useful for Bluetooth SIG profiles where characteristics must be paired by 

6sequence numbers, timestamps, or other identifiers. 

7 

8""" 

9 

10from __future__ import annotations 

11 

12from collections.abc import Hashable 

13from typing import Any, Callable 

14 

15from ..core.translator import BluetoothSIGTranslator 

16 

17 

18class DependencyPairingBuffer: 

19 """Buffer and pair dependent characteristic notifications. 

20 

21 Buffers incoming notifications until all required UUIDs for a grouping key 

22 are present, then batch-parses and invokes the callback. Order-independent. 

23 

24 Args: 

25 translator: BluetoothSIGTranslator instance for parsing characteristics. 

26 required_uuids: Set of UUID strings that must be present to form a complete pair. 

27 group_key: Function that extracts a grouping key from each parsed notification. 

28 Called as ``group_key(uuid, parsed_result)`` and must return a hashable value. 

29 on_pair: Callback invoked with complete parsed pairs as 

30 ``on_pair(results: dict[str, Any])``. 

31 

32 Note: 

33 Does not manage BLE subscriptions. Callers handle connection and notification setup. 

34 """ 

35 

36 def __init__( 

37 self, 

38 *, 

39 translator: BluetoothSIGTranslator, 

40 required_uuids: set[str], 

41 group_key: Callable[[str, Any], Hashable], 

42 on_pair: Callable[[dict[str, Any]], None], 

43 ) -> None: 

44 """Initialize the pairing buffer.""" 

45 self._translator = translator 

46 self._required = set(required_uuids) 

47 self._group_key = group_key 

48 self._on_pair = on_pair 

49 self._buffer: dict[Hashable, dict[str, bytes]] = {} 

50 

51 def ingest(self, uuid: str, data: bytes) -> None: 

52 """Ingest a single characteristic notification. 

53 

54 Args: 

55 uuid: Characteristic UUID string (16-bit or 128-bit). 

56 data: Raw bytes from the characteristic notification. 

57 """ 

58 parsed = self._translator.parse_characteristic(uuid, data) 

59 group_id = self._group_key(uuid, parsed) 

60 

61 group = self._buffer.setdefault(group_id, {}) 

62 group[uuid] = data 

63 

64 if self._required.issubset(group.keys()): 

65 batch = dict(group) 

66 del self._buffer[group_id] 

67 

68 results = self._translator.parse_characteristics(batch) 

69 self._on_pair(results)