Coverage for src / bluetooth_sig / gatt / characteristics / pipeline / parse_pipeline.py: 100%

118 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Parse pipeline for GATT characteristic values. 

2 

3Orchestrates the multi-stage parsing process: length validation → raw integer 

4extraction → special value detection → value decoding → range/type validation. 

5""" 

6 

7from __future__ import annotations 

8 

9import os 

10from typing import Any, TypeVar 

11 

12from ....types import ParseFieldError as FieldError 

13from ....types import SpecialValueResult 

14from ....types.data_types import ValidationAccumulator 

15from ...exceptions import ( 

16 CharacteristicParseError, 

17 ParseFieldError, 

18 SpecialValueDetectedError, 

19) 

20from ..utils.extractors import get_extractor 

21from .validation import CharacteristicValidator 

22 

23T = TypeVar("T") 

24 

25 

26class ParsePipeline: 

27 """Multi-stage parse pipeline for characteristic values. 

28 

29 Stages: 

30 1. Length validation (pre-decode) 

31 2. Raw integer extraction (little-endian per Bluetooth spec) 

32 3. Special value detection (sentinel values like 0x8000) 

33 4. Value decoding (via template or subclass ``_decode_value``) 

34 5. Range validation (post-decode) 

35 6. Type validation 

36 

37 Uses a back-reference to the owning characteristic for: 

38 - ``_decode_value()`` dispatch (Template Method pattern) 

39 - Metadata access (``name``, ``uuid``, ``_template``, ``_spec``) 

40 - Special value resolver 

41 """ 

42 

43 def __init__(self, char: Any, validator: CharacteristicValidator) -> None: # noqa: ANN401 

44 """Initialise with back-reference to the owning characteristic. 

45 

46 Args: 

47 char: BaseCharacteristic instance. 

48 validator: Shared validator instance. 

49 

50 """ 

51 self._char = char 

52 self._validator = validator 

53 

54 # ------------------------------------------------------------------ 

55 # Main entry point 

56 # ------------------------------------------------------------------ 

57 

58 def run( 

59 self, 

60 data: bytes | bytearray, 

61 ctx: Any | None = None, # noqa: ANN401 # CharacteristicContext 

62 validate: bool = True, 

63 ) -> Any: # noqa: ANN401 # Returns T (generic of owning char) 

64 """Execute the full parse pipeline. 

65 

66 Args: 

67 data: Raw bytes from BLE read. 

68 ctx: Optional ``CharacteristicContext`` for dependency-aware parsing. 

69 validate: Whether to run validation stages. 

70 

71 Returns: 

72 Parsed value of type T. 

73 

74 Raises: 

75 SpecialValueDetectedError: If a sentinel value is detected. 

76 CharacteristicParseError: If parsing or validation fails. 

77 

78 """ 

79 char = self._char 

80 data_bytes = bytearray(data) 

81 enable_trace = self._is_trace_enabled() 

82 parse_trace: list[str] = ["Starting parse"] if enable_trace else [] 

83 field_errors: list[FieldError] = [] 

84 validation = ValidationAccumulator() 

85 raw_int: int | None = None 

86 

87 try: 

88 self._perform_length_validation(data_bytes, enable_trace, parse_trace, validation, validate) 

89 raw_int, parsed_value = self._extract_and_check_special(data_bytes, enable_trace, parse_trace, ctx) 

90 except Exception as e: 

91 if enable_trace: 

92 parse_trace.append(f"Parse failed: {type(e).__name__}: {e}") 

93 raise CharacteristicParseError( 

94 message=str(e), 

95 name=char.name, 

96 uuid=char.uuid, 

97 raw_data=bytes(data), 

98 raw_int=raw_int, 

99 field_errors=field_errors, 

100 parse_trace=parse_trace, 

101 validation=validation, 

102 ) from e 

103 

104 if isinstance(parsed_value, SpecialValueResult): 

105 if enable_trace: 

106 parse_trace.append(f"Detected special value: {parsed_value.meaning}") 

107 raise SpecialValueDetectedError( 

108 special_value=parsed_value, 

109 name=char.name, 

110 uuid=char.uuid, 

111 raw_data=bytes(data), 

112 raw_int=raw_int, 

113 ) 

114 

115 try: 

116 decoded_value = self._decode_and_validate(data_bytes, enable_trace, parse_trace, ctx, validation, validate) 

117 except Exception as e: 

118 if enable_trace: 

119 parse_trace.append(f"Parse failed: {type(e).__name__}: {e}") 

120 if isinstance(e, ParseFieldError): 

121 field_errors.append( 

122 FieldError( 

123 field=e.field, 

124 reason=e.field_reason, 

125 offset=e.offset, 

126 raw_slice=bytes(e.data) if hasattr(e, "data") else None, 

127 ) 

128 ) 

129 raise CharacteristicParseError( 

130 message=str(e), 

131 name=char.name, 

132 uuid=char.uuid, 

133 raw_data=bytes(data), 

134 raw_int=raw_int, 

135 field_errors=field_errors, 

136 parse_trace=parse_trace, 

137 validation=validation, 

138 ) from e 

139 

140 if enable_trace: 

141 parse_trace.append("Parse completed successfully") 

142 

143 return decoded_value 

144 

145 # ------------------------------------------------------------------ 

146 # Pipeline stages 

147 # ------------------------------------------------------------------ 

148 

149 def _perform_length_validation( 

150 self, 

151 data_bytes: bytearray, 

152 enable_trace: bool, 

153 parse_trace: list[str], 

154 validation: ValidationAccumulator, 

155 validate: bool, 

156 ) -> None: 

157 """Stage 1: validate data length before parsing.""" 

158 if not validate: 

159 return 

160 if enable_trace: 

161 parse_trace.append(f"Validating data length (got {len(data_bytes)} bytes)") 

162 length_validation = self._validator.validate_length(data_bytes) 

163 validation.errors.extend(length_validation.errors) 

164 validation.warnings.extend(length_validation.warnings) 

165 if not length_validation.valid: 

166 raise ValueError("; ".join(length_validation.errors)) 

167 

168 def _extract_and_check_special( # pylint: disable=unused-argument 

169 self, 

170 data_bytes: bytearray, 

171 enable_trace: bool, 

172 parse_trace: list[str], 

173 ctx: Any | None, # noqa: ANN401 # CharacteristicContext 

174 ) -> tuple[int | None, int | SpecialValueResult | None]: 

175 """Stage 2+3: extract raw int and check for special values.""" 

176 raw_int = self._extract_raw_int(data_bytes, enable_trace, parse_trace) 

177 

178 parsed_value = None 

179 if raw_int is not None: 

180 if enable_trace: 

181 parse_trace.append("Checking for special values") 

182 parsed_value = self._check_special_value(raw_int) 

183 if enable_trace: 

184 if isinstance(parsed_value, SpecialValueResult): 

185 parse_trace.append(f"Found special value: {parsed_value}") 

186 else: 

187 parse_trace.append("Not a special value, proceeding with decode") 

188 

189 return raw_int, parsed_value 

190 

191 def _decode_and_validate( 

192 self, 

193 data_bytes: bytearray, 

194 enable_trace: bool, 

195 parse_trace: list[str], 

196 ctx: Any | None, # noqa: ANN401 # CharacteristicContext 

197 validation: ValidationAccumulator, 

198 validate: bool, 

199 ) -> Any: # noqa: ANN401 # Returns T 

200 """Stage 4+5+6: decode value via template/subclass, then validate.""" 

201 if enable_trace: 

202 parse_trace.append("Decoding value") 

203 decoded_value = self._char._decode_value(data_bytes, ctx, validate=validate) 

204 

205 if validate: 

206 if enable_trace: 

207 parse_trace.append("Validating range") 

208 range_validation = self._validator.validate_range(decoded_value, ctx) 

209 validation.errors.extend(range_validation.errors) 

210 validation.warnings.extend(range_validation.warnings) 

211 if not range_validation.valid: 

212 raise ValueError("; ".join(range_validation.errors)) 

213 if enable_trace: 

214 parse_trace.append("Validating type") 

215 type_validation = self._validator.validate_type(decoded_value) 

216 validation.errors.extend(type_validation.errors) 

217 validation.warnings.extend(type_validation.warnings) 

218 if not type_validation.valid: 

219 raise ValueError("; ".join(type_validation.errors)) 

220 return decoded_value 

221 

222 # ------------------------------------------------------------------ 

223 # Helpers 

224 # ------------------------------------------------------------------ 

225 

226 def _extract_raw_int( 

227 self, 

228 data: bytearray, 

229 enable_trace: bool, 

230 parse_trace: list[str], 

231 ) -> int | None: 

232 """Extract raw integer from bytes using template or YAML extractors.""" 

233 char = self._char 

234 

235 # Priority 1: Template extractor 

236 if char._template is not None and char._template.extractor is not None: 

237 if enable_trace: 

238 parse_trace.append("Extracting raw integer via template extractor") 

239 raw_int: int = char._template.extractor.extract(data, offset=0) 

240 if enable_trace: 

241 parse_trace.append(f"Extracted raw_int: {raw_int}") 

242 return raw_int 

243 

244 # Priority 2: YAML data type extractor 

245 yaml_type = char.get_yaml_data_type() 

246 if yaml_type is not None: 

247 extractor = get_extractor(yaml_type) 

248 if extractor is not None: 

249 if enable_trace: 

250 parse_trace.append(f"Extracting raw integer via YAML type '{yaml_type}'") 

251 raw_int = extractor.extract(data, offset=0) 

252 if enable_trace: 

253 parse_trace.append(f"Extracted raw_int: {raw_int}") 

254 return raw_int 

255 

256 # No extractor available 

257 if enable_trace: 

258 parse_trace.append("No extractor available for raw_int extraction") 

259 return None 

260 

261 def _check_special_value(self, raw_value: int) -> int | SpecialValueResult: 

262 """Check if raw value is a special sentinel value.""" 

263 res: SpecialValueResult | None = self._char._special_resolver.resolve(raw_value) 

264 if res is not None: 

265 return res 

266 return raw_value 

267 

268 def _is_trace_enabled(self) -> bool: 

269 """Check if parse trace is enabled via environment variable or instance attribute.""" 

270 env_value = os.getenv("BLUETOOTH_SIG_ENABLE_PARSE_TRACE", "").lower() 

271 if env_value in ("0", "false", "no"): 

272 return False 

273 return self._char._enable_parse_trace is not False