Coverage for src / bluetooth_sig / core / parser.py: 91%
127 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Characteristic parsing with dependency-aware batch support.
3Provides single and batch characteristic parsing, including topological
4dependency ordering for multi-characteristic reads. Stateless.
5"""
7from __future__ import annotations
9import logging
10from collections.abc import Mapping
11from graphlib import TopologicalSorter
12from typing import Any, TypeVar, overload
14from ..gatt.characteristics.base import BaseCharacteristic
15from ..gatt.characteristics.registry import CharacteristicRegistry
16from ..gatt.exceptions import (
17 CharacteristicParseError,
18 MissingDependencyError,
19 SpecialValueDetectedError,
20)
21from ..types import CharacteristicContext
22from ..types.uuid import BluetoothUUID
24T = TypeVar("T")
26logger = logging.getLogger(__name__)
29class CharacteristicParser:
30 """Stateless parser for single and batch characteristic data.
32 Handles parse_characteristic (with @overload support), parse_characteristics
33 (batch with dependency ordering), and all private batch helpers.
34 """
36 @overload
37 def parse_characteristic(
38 self,
39 char: type[BaseCharacteristic[T]],
40 raw_data: bytes | bytearray,
41 ctx: CharacteristicContext | None = ...,
42 ) -> T: ...
44 @overload
45 def parse_characteristic(
46 self,
47 char: str,
48 raw_data: bytes | bytearray,
49 ctx: CharacteristicContext | None = ...,
50 ) -> Any: ... # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe
52 def parse_characteristic(
53 self,
54 char: str | type[BaseCharacteristic[T]],
55 raw_data: bytes | bytearray,
56 ctx: CharacteristicContext | None = None,
57 ) -> T | Any: # Runtime UUID dispatch cannot be type-safe
58 r"""Parse a characteristic's raw data using Bluetooth SIG standards.
60 Args:
61 char: Characteristic class (type-safe) or UUID string (not type-safe).
62 raw_data: Raw bytes from the characteristic (bytes or bytearray)
63 ctx: Optional CharacteristicContext providing device-level info
65 Returns:
66 Parsed value. Return type is inferred when passing characteristic class.
68 Raises:
69 SpecialValueDetectedError: Special sentinel value detected
70 CharacteristicParseError: Parse/validation failure
72 """
73 # Handle characteristic class input (type-safe path)
74 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
75 char_instance = char()
76 logger.debug("Parsing characteristic class=%s, data_len=%d", char.__name__, len(raw_data))
77 try:
78 value = char_instance.parse_value(raw_data, ctx)
79 logger.debug("Successfully parsed %s: %s", char_instance.name, value)
80 except SpecialValueDetectedError as e:
81 logger.debug("Special value detected for %s: %s", char_instance.name, e.special_value.meaning)
82 raise
83 except CharacteristicParseError as e:
84 logger.warning("Parse failed for %s: %s", char_instance.name, e)
85 raise
86 else:
87 return value
89 # Handle string UUID input (not type-safe path)
90 logger.debug("Parsing characteristic UUID=%s, data_len=%d", char, len(raw_data))
92 characteristic = CharacteristicRegistry.get_characteristic(char)
94 if characteristic:
95 logger.debug("Found parser for UUID=%s: %s", char, type(characteristic).__name__)
96 try:
97 value = characteristic.parse_value(raw_data, ctx)
98 logger.debug("Successfully parsed %s: %s", characteristic.name, value)
99 except SpecialValueDetectedError as e:
100 logger.debug("Special value detected for %s: %s", characteristic.name, e.special_value.meaning)
101 raise
102 except CharacteristicParseError as e:
103 logger.warning("Parse failed for %s: %s", characteristic.name, e)
104 raise
105 else:
106 return value
107 else:
108 logger.info("No parser available for UUID=%s", char)
109 raise CharacteristicParseError(
110 message=f"No parser available for characteristic UUID: {char}",
111 name="Unknown",
112 uuid=BluetoothUUID(char),
113 raw_data=bytes(raw_data),
114 )
116 def parse_characteristics(
117 self,
118 char_data: dict[str, bytes],
119 ctx: CharacteristicContext | None = None,
120 ) -> dict[str, Any]:
121 r"""Parse multiple characteristics at once with dependency-aware ordering.
123 Args:
124 char_data: Dictionary mapping UUIDs to raw data bytes
125 ctx: Optional CharacteristicContext used as the starting context
127 Returns:
128 Dictionary mapping UUIDs to parsed values
130 Raises:
131 ValueError: If circular dependencies are detected
132 CharacteristicParseError: If parsing fails for any characteristic
134 """
135 return self._parse_characteristics_batch(char_data, ctx)
137 def _parse_characteristics_batch(
138 self,
139 char_data: dict[str, bytes],
140 ctx: CharacteristicContext | None,
141 ) -> dict[str, Any]:
142 """Parse multiple characteristics using dependency-aware ordering."""
143 logger.debug("Batch parsing %d characteristics", len(char_data))
145 uuid_to_characteristic, uuid_to_required_deps, uuid_to_optional_deps = (
146 self._prepare_characteristic_dependencies(char_data)
147 )
149 sorted_uuids = self._resolve_dependency_order(char_data, uuid_to_required_deps, uuid_to_optional_deps)
151 base_context = ctx
153 results: dict[str, Any] = {}
154 for uuid_str in sorted_uuids:
155 raw_data = char_data[uuid_str]
156 characteristic = uuid_to_characteristic.get(uuid_str)
158 missing_required = self._find_missing_required_dependencies(
159 uuid_str,
160 uuid_to_required_deps.get(uuid_str, []),
161 results,
162 base_context,
163 )
165 if missing_required:
166 raise MissingDependencyError(characteristic.name if characteristic else "Unknown", missing_required)
168 self._log_optional_dependency_gaps(
169 uuid_str,
170 uuid_to_optional_deps.get(uuid_str, []),
171 results,
172 base_context,
173 )
175 parse_context = self._build_parse_context(base_context, results)
177 value = self.parse_characteristic(uuid_str, raw_data, ctx=parse_context)
178 results[uuid_str] = value
180 logger.debug("Batch parsing complete: %d results", len(results))
181 return results
183 def _prepare_characteristic_dependencies(
184 self, characteristic_data: Mapping[str, bytes]
185 ) -> tuple[dict[str, BaseCharacteristic[Any]], dict[str, list[str]], dict[str, list[str]]]:
186 """Instantiate characteristics once and collect declared dependencies."""
187 uuid_to_characteristic: dict[str, BaseCharacteristic[Any]] = {}
188 uuid_to_required_deps: dict[str, list[str]] = {}
189 uuid_to_optional_deps: dict[str, list[str]] = {}
191 for uuid in characteristic_data:
192 characteristic = CharacteristicRegistry.get_characteristic(uuid)
193 if characteristic is None:
194 continue
196 uuid_to_characteristic[uuid] = characteristic
198 required = characteristic.required_dependencies
199 optional = characteristic.optional_dependencies
201 if required:
202 uuid_to_required_deps[uuid] = required
203 logger.debug("Characteristic %s has required dependencies: %s", uuid, required)
204 if optional:
205 uuid_to_optional_deps[uuid] = optional
206 logger.debug("Characteristic %s has optional dependencies: %s", uuid, optional)
208 return uuid_to_characteristic, uuid_to_required_deps, uuid_to_optional_deps
210 @staticmethod
211 def _resolve_dependency_order(
212 characteristic_data: Mapping[str, bytes],
213 uuid_to_required_deps: Mapping[str, list[str]],
214 uuid_to_optional_deps: Mapping[str, list[str]],
215 ) -> list[str]:
216 """Topologically sort characteristics based on declared dependencies."""
217 try:
218 sorter: TopologicalSorter[str] = TopologicalSorter()
219 for uuid in characteristic_data:
220 all_deps = uuid_to_required_deps.get(uuid, []) + uuid_to_optional_deps.get(uuid, [])
221 batch_deps = [dep for dep in all_deps if dep in characteristic_data]
222 sorter.add(uuid, *batch_deps)
224 sorted_sequence = sorter.static_order()
225 sorted_uuids = list(sorted_sequence)
226 logger.debug("Dependency-sorted parsing order: %s", sorted_uuids)
227 except Exception as exc: # pylint: disable=broad-exception-caught
228 logger.warning("Dependency sorting failed: %s. Using original order.", exc)
229 return list(characteristic_data.keys())
230 else:
231 return sorted_uuids
233 @staticmethod
234 def _find_missing_required_dependencies(
235 uuid: str,
236 required_deps: list[str],
237 results: Mapping[str, Any],
238 base_context: CharacteristicContext | None,
239 ) -> list[str]:
240 """Determine which required dependencies are unavailable for a characteristic."""
241 if not required_deps:
242 return []
244 missing: list[str] = []
245 other_characteristics = (
246 base_context.other_characteristics if base_context and base_context.other_characteristics else None
247 )
249 for dep_uuid in required_deps:
250 if dep_uuid in results:
251 continue
253 if other_characteristics and dep_uuid in other_characteristics:
254 continue
256 missing.append(dep_uuid)
258 if missing:
259 logger.debug("Characteristic %s missing required dependencies: %s", uuid, missing)
261 return missing
263 @staticmethod
264 def _log_optional_dependency_gaps(
265 uuid: str,
266 optional_deps: list[str],
267 results: Mapping[str, Any],
268 base_context: CharacteristicContext | None,
269 ) -> None:
270 """Emit debug logs when optional dependencies are unavailable."""
271 if not optional_deps:
272 return
274 other_characteristics = (
275 base_context.other_characteristics if base_context and base_context.other_characteristics else None
276 )
278 for dep_uuid in optional_deps:
279 if dep_uuid in results:
280 continue
281 if other_characteristics and dep_uuid in other_characteristics:
282 continue
283 logger.debug("Optional dependency %s not available for %s", dep_uuid, uuid)
285 @staticmethod
286 def _build_parse_context(
287 base_context: CharacteristicContext | None,
288 results: Mapping[str, Any],
289 ) -> CharacteristicContext:
290 """Construct the context passed to per-characteristic parsers."""
291 if base_context is not None:
292 return CharacteristicContext(
293 device_info=base_context.device_info,
294 advertisement=base_context.advertisement,
295 other_characteristics=results,
296 raw_service=base_context.raw_service,
297 )
299 return CharacteristicContext(other_characteristics=results)