Coverage for src / bluetooth_sig / gatt / characteristics / pipeline / parse_pipeline.py: 100%
118 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Parse pipeline for GATT characteristic values.
3Orchestrates the multi-stage parsing process: length validation → raw integer
4extraction → special value detection → value decoding → range/type validation.
5"""
7from __future__ import annotations
9import os
10from typing import Any, TypeVar
12from ....types import ParseFieldError as FieldError
13from ....types import SpecialValueResult
14from ....types.data_types import ValidationAccumulator
15from ...exceptions import (
16 CharacteristicParseError,
17 ParseFieldError,
18 SpecialValueDetectedError,
19)
20from ..utils.extractors import get_extractor
21from .validation import CharacteristicValidator
23T = TypeVar("T")
26class ParsePipeline:
27 """Multi-stage parse pipeline for characteristic values.
29 Stages:
30 1. Length validation (pre-decode)
31 2. Raw integer extraction (little-endian per Bluetooth spec)
32 3. Special value detection (sentinel values like 0x8000)
33 4. Value decoding (via template or subclass ``_decode_value``)
34 5. Range validation (post-decode)
35 6. Type validation
37 Uses a back-reference to the owning characteristic for:
38 - ``_decode_value()`` dispatch (Template Method pattern)
39 - Metadata access (``name``, ``uuid``, ``_template``, ``_spec``)
40 - Special value resolver
41 """
43 def __init__(self, char: Any, validator: CharacteristicValidator) -> None: # noqa: ANN401
44 """Initialise with back-reference to the owning characteristic.
46 Args:
47 char: BaseCharacteristic instance.
48 validator: Shared validator instance.
50 """
51 self._char = char
52 self._validator = validator
54 # ------------------------------------------------------------------
55 # Main entry point
56 # ------------------------------------------------------------------
58 def run(
59 self,
60 data: bytes | bytearray,
61 ctx: Any | None = None, # noqa: ANN401 # CharacteristicContext
62 validate: bool = True,
63 ) -> Any: # noqa: ANN401 # Returns T (generic of owning char)
64 """Execute the full parse pipeline.
66 Args:
67 data: Raw bytes from BLE read.
68 ctx: Optional ``CharacteristicContext`` for dependency-aware parsing.
69 validate: Whether to run validation stages.
71 Returns:
72 Parsed value of type T.
74 Raises:
75 SpecialValueDetectedError: If a sentinel value is detected.
76 CharacteristicParseError: If parsing or validation fails.
78 """
79 char = self._char
80 data_bytes = bytearray(data)
81 enable_trace = self._is_trace_enabled()
82 parse_trace: list[str] = ["Starting parse"] if enable_trace else []
83 field_errors: list[FieldError] = []
84 validation = ValidationAccumulator()
85 raw_int: int | None = None
87 try:
88 self._perform_length_validation(data_bytes, enable_trace, parse_trace, validation, validate)
89 raw_int, parsed_value = self._extract_and_check_special(data_bytes, enable_trace, parse_trace, ctx)
90 except Exception as e:
91 if enable_trace:
92 parse_trace.append(f"Parse failed: {type(e).__name__}: {e}")
93 raise CharacteristicParseError(
94 message=str(e),
95 name=char.name,
96 uuid=char.uuid,
97 raw_data=bytes(data),
98 raw_int=raw_int,
99 field_errors=field_errors,
100 parse_trace=parse_trace,
101 validation=validation,
102 ) from e
104 if isinstance(parsed_value, SpecialValueResult):
105 if enable_trace:
106 parse_trace.append(f"Detected special value: {parsed_value.meaning}")
107 raise SpecialValueDetectedError(
108 special_value=parsed_value,
109 name=char.name,
110 uuid=char.uuid,
111 raw_data=bytes(data),
112 raw_int=raw_int,
113 )
115 try:
116 decoded_value = self._decode_and_validate(data_bytes, enable_trace, parse_trace, ctx, validation, validate)
117 except Exception as e:
118 if enable_trace:
119 parse_trace.append(f"Parse failed: {type(e).__name__}: {e}")
120 if isinstance(e, ParseFieldError):
121 field_errors.append(
122 FieldError(
123 field=e.field,
124 reason=e.field_reason,
125 offset=e.offset,
126 raw_slice=bytes(e.data) if hasattr(e, "data") else None,
127 )
128 )
129 raise CharacteristicParseError(
130 message=str(e),
131 name=char.name,
132 uuid=char.uuid,
133 raw_data=bytes(data),
134 raw_int=raw_int,
135 field_errors=field_errors,
136 parse_trace=parse_trace,
137 validation=validation,
138 ) from e
140 if enable_trace:
141 parse_trace.append("Parse completed successfully")
143 return decoded_value
145 # ------------------------------------------------------------------
146 # Pipeline stages
147 # ------------------------------------------------------------------
149 def _perform_length_validation(
150 self,
151 data_bytes: bytearray,
152 enable_trace: bool,
153 parse_trace: list[str],
154 validation: ValidationAccumulator,
155 validate: bool,
156 ) -> None:
157 """Stage 1: validate data length before parsing."""
158 if not validate:
159 return
160 if enable_trace:
161 parse_trace.append(f"Validating data length (got {len(data_bytes)} bytes)")
162 length_validation = self._validator.validate_length(data_bytes)
163 validation.errors.extend(length_validation.errors)
164 validation.warnings.extend(length_validation.warnings)
165 if not length_validation.valid:
166 raise ValueError("; ".join(length_validation.errors))
168 def _extract_and_check_special( # pylint: disable=unused-argument
169 self,
170 data_bytes: bytearray,
171 enable_trace: bool,
172 parse_trace: list[str],
173 ctx: Any | None, # noqa: ANN401 # CharacteristicContext
174 ) -> tuple[int | None, int | SpecialValueResult | None]:
175 """Stage 2+3: extract raw int and check for special values."""
176 raw_int = self._extract_raw_int(data_bytes, enable_trace, parse_trace)
178 parsed_value = None
179 if raw_int is not None:
180 if enable_trace:
181 parse_trace.append("Checking for special values")
182 parsed_value = self._check_special_value(raw_int)
183 if enable_trace:
184 if isinstance(parsed_value, SpecialValueResult):
185 parse_trace.append(f"Found special value: {parsed_value}")
186 else:
187 parse_trace.append("Not a special value, proceeding with decode")
189 return raw_int, parsed_value
191 def _decode_and_validate(
192 self,
193 data_bytes: bytearray,
194 enable_trace: bool,
195 parse_trace: list[str],
196 ctx: Any | None, # noqa: ANN401 # CharacteristicContext
197 validation: ValidationAccumulator,
198 validate: bool,
199 ) -> Any: # noqa: ANN401 # Returns T
200 """Stage 4+5+6: decode value via template/subclass, then validate."""
201 if enable_trace:
202 parse_trace.append("Decoding value")
203 decoded_value = self._char._decode_value(data_bytes, ctx, validate=validate)
205 if validate:
206 if enable_trace:
207 parse_trace.append("Validating range")
208 range_validation = self._validator.validate_range(decoded_value, ctx)
209 validation.errors.extend(range_validation.errors)
210 validation.warnings.extend(range_validation.warnings)
211 if not range_validation.valid:
212 raise ValueError("; ".join(range_validation.errors))
213 if enable_trace:
214 parse_trace.append("Validating type")
215 type_validation = self._validator.validate_type(decoded_value)
216 validation.errors.extend(type_validation.errors)
217 validation.warnings.extend(type_validation.warnings)
218 if not type_validation.valid:
219 raise ValueError("; ".join(type_validation.errors))
220 return decoded_value
222 # ------------------------------------------------------------------
223 # Helpers
224 # ------------------------------------------------------------------
226 def _extract_raw_int(
227 self,
228 data: bytearray,
229 enable_trace: bool,
230 parse_trace: list[str],
231 ) -> int | None:
232 """Extract raw integer from bytes using template or YAML extractors."""
233 char = self._char
235 # Priority 1: Template extractor
236 if char._template is not None and char._template.extractor is not None:
237 if enable_trace:
238 parse_trace.append("Extracting raw integer via template extractor")
239 raw_int: int = char._template.extractor.extract(data, offset=0)
240 if enable_trace:
241 parse_trace.append(f"Extracted raw_int: {raw_int}")
242 return raw_int
244 # Priority 2: YAML data type extractor
245 yaml_type = char.get_yaml_data_type()
246 if yaml_type is not None:
247 extractor = get_extractor(yaml_type)
248 if extractor is not None:
249 if enable_trace:
250 parse_trace.append(f"Extracting raw integer via YAML type '{yaml_type}'")
251 raw_int = extractor.extract(data, offset=0)
252 if enable_trace:
253 parse_trace.append(f"Extracted raw_int: {raw_int}")
254 return raw_int
256 # No extractor available
257 if enable_trace:
258 parse_trace.append("No extractor available for raw_int extraction")
259 return None
261 def _check_special_value(self, raw_value: int) -> int | SpecialValueResult:
262 """Check if raw value is a special sentinel value."""
263 res: SpecialValueResult | None = self._char._special_resolver.resolve(raw_value)
264 if res is not None:
265 return res
266 return raw_value
268 def _is_trace_enabled(self) -> bool:
269 """Check if parse trace is enabled via environment variable or instance attribute."""
270 env_value = os.getenv("BLUETOOTH_SIG_ENABLE_PARSE_TRACE", "").lower()
271 if env_value in ("0", "false", "no"):
272 return False
273 return self._char._enable_parse_trace is not False