Coverage for src/bluetooth_sig/gatt/characteristics/pipeline/parse_pipeline.py: 100%
120 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 01:26 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 01:26 +0000
1"""Parse pipeline for GATT characteristic values.
3Orchestrates the multi-stage parsing process: length validation → raw integer
4extraction → special value detection → value decoding → range/type validation.
5"""
7from __future__ import annotations
9import os
10from typing import Any, TypeVar
12from ....types import ParseFieldError as FieldError
13from ....types import SpecialValueResult
14from ....types.data_types import ValidationAccumulator
15from ...exceptions import (
16 CharacteristicParseError,
17 ParseFieldError,
18 SpecialValueDetectedError,
19)
20from ..utils.extractors import get_extractor
21from .validation import CharacteristicValidator
23T = TypeVar("T")
26class ParsePipeline:
27 """Multi-stage parse pipeline for characteristic values.
29 Stages:
30 1. Length validation (pre-decode)
31 2. Raw integer extraction (little-endian per Bluetooth spec)
32 3. Special value detection (sentinel values like 0x8000)
33 4. Value decoding (via template or subclass ``_decode_value``)
34 5. Range validation (post-decode)
35 6. Type validation
37 Uses a back-reference to the owning characteristic for:
38 - ``_decode_value()`` dispatch (Template Method pattern)
39 - Metadata access (``name``, ``uuid``, ``_template``, ``_spec``)
40 - Special value resolver
41 """
43 def __init__(self, char: Any, validator: CharacteristicValidator) -> None: # noqa: ANN401
44 """Initialise with back-reference to the owning characteristic.
46 Args:
47 char: BaseCharacteristic instance.
48 validator: Shared validator instance.
50 """
51 self._char = char
52 self._validator = validator
54 # ------------------------------------------------------------------
55 # Main entry point
56 # ------------------------------------------------------------------
58 def run(
59 self,
60 data: bytes | bytearray,
61 ctx: Any | None = None, # noqa: ANN401 # CharacteristicContext
62 validate: bool = True,
63 ) -> Any: # noqa: ANN401 # Returns T (generic of owning char)
64 """Execute the full parse pipeline.
66 Args:
67 data: Raw bytes from BLE read.
68 ctx: Optional ``CharacteristicContext`` for dependency-aware parsing.
69 validate: Whether to run validation stages.
71 Returns:
72 Parsed value of type T.
74 Raises:
75 SpecialValueDetectedError: If a sentinel value is detected.
76 CharacteristicParseError: If parsing or validation fails.
78 """
79 char = self._char
80 data_bytes = bytearray(data)
81 enable_trace = self._is_trace_enabled()
82 parse_trace: list[str] = ["Starting parse"] if enable_trace else []
83 field_errors: list[FieldError] = []
84 validation = ValidationAccumulator()
85 raw_int: int | None = None
87 try:
88 self._perform_length_validation(data_bytes, enable_trace, parse_trace, validation, validate)
89 raw_int, parsed_value = self._extract_and_check_special(data_bytes, enable_trace, parse_trace, ctx)
90 except Exception as e:
91 if enable_trace:
92 parse_trace.append(f"Parse failed: {type(e).__name__}: {e}")
93 raise CharacteristicParseError(
94 message=str(e),
95 name=char.name,
96 uuid=char.uuid,
97 raw_data=bytes(data),
98 raw_int=raw_int,
99 field_errors=field_errors,
100 parse_trace=parse_trace,
101 validation=validation,
102 ) from e
104 if isinstance(parsed_value, SpecialValueResult):
105 if enable_trace:
106 parse_trace.append(f"Detected special value: {parsed_value.meaning}")
107 raise SpecialValueDetectedError(
108 special_value=parsed_value,
109 name=char.name,
110 uuid=char.uuid,
111 raw_data=bytes(data),
112 raw_int=raw_int,
113 )
115 try:
116 decoded_value = self._decode_and_validate(data_bytes, enable_trace, parse_trace, ctx, validation, validate)
117 except SpecialValueDetectedError:
118 raise
119 except Exception as e:
120 if enable_trace:
121 parse_trace.append(f"Parse failed: {type(e).__name__}: {e}")
122 if isinstance(e, ParseFieldError):
123 field_errors.append(
124 FieldError(
125 field=e.field,
126 reason=e.field_reason,
127 offset=e.offset,
128 raw_slice=bytes(e.data) if hasattr(e, "data") else None,
129 )
130 )
131 raise CharacteristicParseError(
132 message=str(e),
133 name=char.name,
134 uuid=char.uuid,
135 raw_data=bytes(data),
136 raw_int=raw_int,
137 field_errors=field_errors,
138 parse_trace=parse_trace,
139 validation=validation,
140 ) from e
142 if enable_trace:
143 parse_trace.append("Parse completed successfully")
145 return decoded_value
147 # ------------------------------------------------------------------
148 # Pipeline stages
149 # ------------------------------------------------------------------
151 def _perform_length_validation(
152 self,
153 data_bytes: bytearray,
154 enable_trace: bool,
155 parse_trace: list[str],
156 validation: ValidationAccumulator,
157 validate: bool,
158 ) -> None:
159 """Stage 1: validate data length before parsing."""
160 if not validate:
161 return
162 if enable_trace:
163 parse_trace.append(f"Validating data length (got {len(data_bytes)} bytes)")
164 length_validation = self._validator.validate_length(data_bytes)
165 validation.errors.extend(length_validation.errors)
166 validation.warnings.extend(length_validation.warnings)
167 if not length_validation.valid:
168 raise ValueError("; ".join(length_validation.errors))
170 def _extract_and_check_special( # pylint: disable=unused-argument
171 self,
172 data_bytes: bytearray,
173 enable_trace: bool,
174 parse_trace: list[str],
175 ctx: Any | None, # noqa: ANN401 # CharacteristicContext
176 ) -> tuple[int | None, int | SpecialValueResult | None]:
177 """Stage 2+3: extract raw int and check for special values."""
178 raw_int = self._extract_raw_int(data_bytes, enable_trace, parse_trace)
180 parsed_value = None
181 if raw_int is not None:
182 if enable_trace:
183 parse_trace.append("Checking for special values")
184 parsed_value = self._check_special_value(raw_int)
185 if enable_trace:
186 if isinstance(parsed_value, SpecialValueResult):
187 parse_trace.append(f"Found special value: {parsed_value}")
188 else:
189 parse_trace.append("Not a special value, proceeding with decode")
191 return raw_int, parsed_value
193 def _decode_and_validate(
194 self,
195 data_bytes: bytearray,
196 enable_trace: bool,
197 parse_trace: list[str],
198 ctx: Any | None, # noqa: ANN401 # CharacteristicContext
199 validation: ValidationAccumulator,
200 validate: bool,
201 ) -> Any: # noqa: ANN401 # Returns T
202 """Stage 4+5+6: decode value via template/subclass, then validate."""
203 if enable_trace:
204 parse_trace.append("Decoding value")
205 decoded_value = self._char._decode_value(data_bytes, ctx, validate=validate)
207 if validate:
208 if enable_trace:
209 parse_trace.append("Validating range")
210 range_validation = self._validator.validate_range(decoded_value, ctx)
211 validation.errors.extend(range_validation.errors)
212 validation.warnings.extend(range_validation.warnings)
213 if not range_validation.valid:
214 raise ValueError("; ".join(range_validation.errors))
215 if enable_trace:
216 parse_trace.append("Validating type")
217 type_validation = self._validator.validate_type(decoded_value)
218 validation.errors.extend(type_validation.errors)
219 validation.warnings.extend(type_validation.warnings)
220 if not type_validation.valid:
221 raise ValueError("; ".join(type_validation.errors))
222 return decoded_value
224 # ------------------------------------------------------------------
225 # Helpers
226 # ------------------------------------------------------------------
228 def _extract_raw_int(
229 self,
230 data: bytearray,
231 enable_trace: bool,
232 parse_trace: list[str],
233 ) -> int | None:
234 """Extract raw integer from bytes using template or YAML extractors."""
235 char = self._char
237 # Priority 1: Template extractor
238 if char._template is not None and char._template.extractor is not None:
239 if enable_trace:
240 parse_trace.append("Extracting raw integer via template extractor")
241 raw_int: int = char._template.extractor.extract(data, offset=0)
242 if enable_trace:
243 parse_trace.append(f"Extracted raw_int: {raw_int}")
244 return raw_int
246 # Priority 2: YAML data type extractor
247 yaml_type = char.get_yaml_data_type()
248 if yaml_type is not None:
249 extractor = get_extractor(yaml_type)
250 if extractor is not None:
251 if enable_trace:
252 parse_trace.append(f"Extracting raw integer via YAML type '{yaml_type}'")
253 raw_int = extractor.extract(data, offset=0)
254 if enable_trace:
255 parse_trace.append(f"Extracted raw_int: {raw_int}")
256 return raw_int
258 # No extractor available
259 if enable_trace:
260 parse_trace.append("No extractor available for raw_int extraction")
261 return None
263 def _check_special_value(self, raw_value: int) -> int | SpecialValueResult:
264 """Check if raw value is a special sentinel value."""
265 res: SpecialValueResult | None = self._char._special_resolver.resolve(raw_value)
266 if res is not None:
267 return res
268 return raw_value
270 def _is_trace_enabled(self) -> bool:
271 """Check if parse trace is enabled via environment variable or instance attribute."""
272 env_value = os.getenv("BLUETOOTH_SIG_ENABLE_PARSE_TRACE", "").lower()
273 if env_value in ("0", "false", "no"):
274 return False
275 return self._char._enable_parse_trace is not False