Coverage for src/bluetooth_sig/gatt/characteristics/pipeline/parse_pipeline.py: 100%

120 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-28 01:26 +0000

1"""Parse pipeline for GATT characteristic values. 

2 

3Orchestrates the multi-stage parsing process: length validation → raw integer 

4extraction → special value detection → value decoding → range/type validation. 

5""" 

6 

7from __future__ import annotations 

8 

9import os 

10from typing import Any, TypeVar 

11 

12from ....types import ParseFieldError as FieldError 

13from ....types import SpecialValueResult 

14from ....types.data_types import ValidationAccumulator 

15from ...exceptions import ( 

16 CharacteristicParseError, 

17 ParseFieldError, 

18 SpecialValueDetectedError, 

19) 

20from ..utils.extractors import get_extractor 

21from .validation import CharacteristicValidator 

22 

23T = TypeVar("T") 

24 

25 

26class ParsePipeline: 

27 """Multi-stage parse pipeline for characteristic values. 

28 

29 Stages: 

30 1. Length validation (pre-decode) 

31 2. Raw integer extraction (little-endian per Bluetooth spec) 

32 3. Special value detection (sentinel values like 0x8000) 

33 4. Value decoding (via template or subclass ``_decode_value``) 

34 5. Range validation (post-decode) 

35 6. Type validation 

36 

37 Uses a back-reference to the owning characteristic for: 

38 - ``_decode_value()`` dispatch (Template Method pattern) 

39 - Metadata access (``name``, ``uuid``, ``_template``, ``_spec``) 

40 - Special value resolver 

41 """ 

42 

43 def __init__(self, char: Any, validator: CharacteristicValidator) -> None: # noqa: ANN401 

44 """Initialise with back-reference to the owning characteristic. 

45 

46 Args: 

47 char: BaseCharacteristic instance. 

48 validator: Shared validator instance. 

49 

50 """ 

51 self._char = char 

52 self._validator = validator 

53 

54 # ------------------------------------------------------------------ 

55 # Main entry point 

56 # ------------------------------------------------------------------ 

57 

58 def run( 

59 self, 

60 data: bytes | bytearray, 

61 ctx: Any | None = None, # noqa: ANN401 # CharacteristicContext 

62 validate: bool = True, 

63 ) -> Any: # noqa: ANN401 # Returns T (generic of owning char) 

64 """Execute the full parse pipeline. 

65 

66 Args: 

67 data: Raw bytes from BLE read. 

68 ctx: Optional ``CharacteristicContext`` for dependency-aware parsing. 

69 validate: Whether to run validation stages. 

70 

71 Returns: 

72 Parsed value of type T. 

73 

74 Raises: 

75 SpecialValueDetectedError: If a sentinel value is detected. 

76 CharacteristicParseError: If parsing or validation fails. 

77 

78 """ 

79 char = self._char 

80 data_bytes = bytearray(data) 

81 enable_trace = self._is_trace_enabled() 

82 parse_trace: list[str] = ["Starting parse"] if enable_trace else [] 

83 field_errors: list[FieldError] = [] 

84 validation = ValidationAccumulator() 

85 raw_int: int | None = None 

86 

87 try: 

88 self._perform_length_validation(data_bytes, enable_trace, parse_trace, validation, validate) 

89 raw_int, parsed_value = self._extract_and_check_special(data_bytes, enable_trace, parse_trace, ctx) 

90 except Exception as e: 

91 if enable_trace: 

92 parse_trace.append(f"Parse failed: {type(e).__name__}: {e}") 

93 raise CharacteristicParseError( 

94 message=str(e), 

95 name=char.name, 

96 uuid=char.uuid, 

97 raw_data=bytes(data), 

98 raw_int=raw_int, 

99 field_errors=field_errors, 

100 parse_trace=parse_trace, 

101 validation=validation, 

102 ) from e 

103 

104 if isinstance(parsed_value, SpecialValueResult): 

105 if enable_trace: 

106 parse_trace.append(f"Detected special value: {parsed_value.meaning}") 

107 raise SpecialValueDetectedError( 

108 special_value=parsed_value, 

109 name=char.name, 

110 uuid=char.uuid, 

111 raw_data=bytes(data), 

112 raw_int=raw_int, 

113 ) 

114 

115 try: 

116 decoded_value = self._decode_and_validate(data_bytes, enable_trace, parse_trace, ctx, validation, validate) 

117 except SpecialValueDetectedError: 

118 raise 

119 except Exception as e: 

120 if enable_trace: 

121 parse_trace.append(f"Parse failed: {type(e).__name__}: {e}") 

122 if isinstance(e, ParseFieldError): 

123 field_errors.append( 

124 FieldError( 

125 field=e.field, 

126 reason=e.field_reason, 

127 offset=e.offset, 

128 raw_slice=bytes(e.data) if hasattr(e, "data") else None, 

129 ) 

130 ) 

131 raise CharacteristicParseError( 

132 message=str(e), 

133 name=char.name, 

134 uuid=char.uuid, 

135 raw_data=bytes(data), 

136 raw_int=raw_int, 

137 field_errors=field_errors, 

138 parse_trace=parse_trace, 

139 validation=validation, 

140 ) from e 

141 

142 if enable_trace: 

143 parse_trace.append("Parse completed successfully") 

144 

145 return decoded_value 

146 

147 # ------------------------------------------------------------------ 

148 # Pipeline stages 

149 # ------------------------------------------------------------------ 

150 

151 def _perform_length_validation( 

152 self, 

153 data_bytes: bytearray, 

154 enable_trace: bool, 

155 parse_trace: list[str], 

156 validation: ValidationAccumulator, 

157 validate: bool, 

158 ) -> None: 

159 """Stage 1: validate data length before parsing.""" 

160 if not validate: 

161 return 

162 if enable_trace: 

163 parse_trace.append(f"Validating data length (got {len(data_bytes)} bytes)") 

164 length_validation = self._validator.validate_length(data_bytes) 

165 validation.errors.extend(length_validation.errors) 

166 validation.warnings.extend(length_validation.warnings) 

167 if not length_validation.valid: 

168 raise ValueError("; ".join(length_validation.errors)) 

169 

170 def _extract_and_check_special( # pylint: disable=unused-argument 

171 self, 

172 data_bytes: bytearray, 

173 enable_trace: bool, 

174 parse_trace: list[str], 

175 ctx: Any | None, # noqa: ANN401 # CharacteristicContext 

176 ) -> tuple[int | None, int | SpecialValueResult | None]: 

177 """Stage 2+3: extract raw int and check for special values.""" 

178 raw_int = self._extract_raw_int(data_bytes, enable_trace, parse_trace) 

179 

180 parsed_value = None 

181 if raw_int is not None: 

182 if enable_trace: 

183 parse_trace.append("Checking for special values") 

184 parsed_value = self._check_special_value(raw_int) 

185 if enable_trace: 

186 if isinstance(parsed_value, SpecialValueResult): 

187 parse_trace.append(f"Found special value: {parsed_value}") 

188 else: 

189 parse_trace.append("Not a special value, proceeding with decode") 

190 

191 return raw_int, parsed_value 

192 

193 def _decode_and_validate( 

194 self, 

195 data_bytes: bytearray, 

196 enable_trace: bool, 

197 parse_trace: list[str], 

198 ctx: Any | None, # noqa: ANN401 # CharacteristicContext 

199 validation: ValidationAccumulator, 

200 validate: bool, 

201 ) -> Any: # noqa: ANN401 # Returns T 

202 """Stage 4+5+6: decode value via template/subclass, then validate.""" 

203 if enable_trace: 

204 parse_trace.append("Decoding value") 

205 decoded_value = self._char._decode_value(data_bytes, ctx, validate=validate) 

206 

207 if validate: 

208 if enable_trace: 

209 parse_trace.append("Validating range") 

210 range_validation = self._validator.validate_range(decoded_value, ctx) 

211 validation.errors.extend(range_validation.errors) 

212 validation.warnings.extend(range_validation.warnings) 

213 if not range_validation.valid: 

214 raise ValueError("; ".join(range_validation.errors)) 

215 if enable_trace: 

216 parse_trace.append("Validating type") 

217 type_validation = self._validator.validate_type(decoded_value) 

218 validation.errors.extend(type_validation.errors) 

219 validation.warnings.extend(type_validation.warnings) 

220 if not type_validation.valid: 

221 raise ValueError("; ".join(type_validation.errors)) 

222 return decoded_value 

223 

224 # ------------------------------------------------------------------ 

225 # Helpers 

226 # ------------------------------------------------------------------ 

227 

228 def _extract_raw_int( 

229 self, 

230 data: bytearray, 

231 enable_trace: bool, 

232 parse_trace: list[str], 

233 ) -> int | None: 

234 """Extract raw integer from bytes using template or YAML extractors.""" 

235 char = self._char 

236 

237 # Priority 1: Template extractor 

238 if char._template is not None and char._template.extractor is not None: 

239 if enable_trace: 

240 parse_trace.append("Extracting raw integer via template extractor") 

241 raw_int: int = char._template.extractor.extract(data, offset=0) 

242 if enable_trace: 

243 parse_trace.append(f"Extracted raw_int: {raw_int}") 

244 return raw_int 

245 

246 # Priority 2: YAML data type extractor 

247 yaml_type = char.get_yaml_data_type() 

248 if yaml_type is not None: 

249 extractor = get_extractor(yaml_type) 

250 if extractor is not None: 

251 if enable_trace: 

252 parse_trace.append(f"Extracting raw integer via YAML type '{yaml_type}'") 

253 raw_int = extractor.extract(data, offset=0) 

254 if enable_trace: 

255 parse_trace.append(f"Extracted raw_int: {raw_int}") 

256 return raw_int 

257 

258 # No extractor available 

259 if enable_trace: 

260 parse_trace.append("No extractor available for raw_int extraction") 

261 return None 

262 

263 def _check_special_value(self, raw_value: int) -> int | SpecialValueResult: 

264 """Check if raw value is a special sentinel value.""" 

265 res: SpecialValueResult | None = self._char._special_resolver.resolve(raw_value) 

266 if res is not None: 

267 return res 

268 return raw_value 

269 

270 def _is_trace_enabled(self) -> bool: 

271 """Check if parse trace is enabled via environment variable or instance attribute.""" 

272 env_value = os.getenv("BLUETOOTH_SIG_ENABLE_PARSE_TRACE", "").lower() 

273 if env_value in ("0", "false", "no"): 

274 return False 

275 return self._char._enable_parse_trace is not False