Coverage for src / bluetooth_sig / gatt / characteristics / pipeline / encode_pipeline.py: 69%

86 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Encode pipeline for GATT characteristic values. 

2 

3Orchestrates the multi-stage encoding process: type validation → range 

4validation → value encoding → length validation. 

5""" 

6 

7from __future__ import annotations 

8 

9import os 

10from typing import Any 

11 

12from ....types import SpecialValueResult 

13from ....types.data_types import ValidationAccumulator 

14from ...exceptions import CharacteristicEncodeError 

15from ..utils.extractors import get_extractor 

16from .validation import CharacteristicValidator 

17 

18 

19class EncodePipeline: 

20 """Multi-stage encode pipeline for characteristic values. 

21 

22 Stages: 

23 1. Type validation 

24 2. Range validation (numeric types only) 

25 3. Value encoding (via template or subclass ``_encode_value``) 

26 4. Length validation (post-encode) 

27 

28 Uses a back-reference to the owning characteristic for: 

29 - ``_encode_value()`` dispatch (Template Method pattern) 

30 - Metadata access (``name``, ``uuid``, ``_template``, ``_spec``) 

31 - Special value resolver 

32 """ 

33 

34 def __init__(self, char: Any, validator: CharacteristicValidator) -> None: # noqa: ANN401 

35 """Initialise with back-reference to the owning characteristic. 

36 

37 Args: 

38 char: BaseCharacteristic instance. 

39 validator: Shared validator instance. 

40 

41 """ 

42 self._char = char 

43 self._validator = validator 

44 

45 # ------------------------------------------------------------------ 

46 # Main entry point 

47 # ------------------------------------------------------------------ 

48 

49 def run( # pylint: disable=too-many-branches 

50 self, 

51 data: Any, # noqa: ANN401 # T | SpecialValueResult 

52 validate: bool = True, 

53 ) -> bytearray: 

54 """Execute the full encode pipeline. 

55 

56 Args: 

57 data: Value to encode (type T) or ``SpecialValueResult``. 

58 validate: Enable validation (type, range, length checks). 

59 Special values bypass validation. 

60 

61 Returns: 

62 Encoded bytes ready for BLE write. 

63 

64 Raises: 

65 CharacteristicEncodeError: If encoding or validation fails. 

66 

67 """ 

68 char = self._char 

69 enable_trace = self._is_trace_enabled() 

70 build_trace: list[str] = ["Starting build"] if enable_trace else [] 

71 validation = ValidationAccumulator() 

72 

73 # Special value encoding — bypass validation 

74 if isinstance(data, SpecialValueResult): 

75 if enable_trace: 

76 build_trace.append(f"Encoding special value: {data.meaning}") 

77 try: 

78 return self._pack_raw_int(data.raw_value) 

79 except Exception as e: 

80 raise CharacteristicEncodeError( 

81 message=f"Failed to encode special value: {e}", 

82 name=char.name, 

83 uuid=char.uuid, 

84 value=data, 

85 validation=None, 

86 ) from e 

87 

88 try: 

89 # Type validation 

90 if validate: 

91 if enable_trace: 

92 build_trace.append("Validating type") 

93 type_validation = self._validator.validate_type(data) 

94 validation.errors.extend(type_validation.errors) 

95 validation.warnings.extend(type_validation.warnings) 

96 if not type_validation.valid: 

97 raise TypeError("; ".join(type_validation.errors)) # noqa: TRY301 

98 

99 # Range validation for numeric types 

100 if validate and isinstance(data, (int, float)): 

101 if enable_trace: 

102 build_trace.append("Validating range") 

103 range_validation = self._validator.validate_range(data, ctx=None) 

104 validation.errors.extend(range_validation.errors) 

105 validation.warnings.extend(range_validation.warnings) 

106 if not range_validation.valid: 

107 raise ValueError("; ".join(range_validation.errors)) # noqa: TRY301 

108 

109 # Encode 

110 if enable_trace: 

111 build_trace.append("Encoding value") 

112 encoded: bytearray = char._encode_value(data) 

113 

114 # Length validation 

115 if validate: 

116 if enable_trace: 

117 build_trace.append("Validating encoded length") 

118 length_validation = self._validator.validate_length(encoded) 

119 validation.errors.extend(length_validation.errors) 

120 validation.warnings.extend(length_validation.warnings) 

121 if not length_validation.valid: 

122 raise ValueError("; ".join(length_validation.errors)) # noqa: TRY301 

123 

124 if enable_trace: 

125 build_trace.append("Build completed successfully") 

126 

127 except Exception as e: 

128 if enable_trace: 

129 build_trace.append(f"Build failed: {type(e).__name__}: {e}") 

130 

131 raise CharacteristicEncodeError( 

132 message=str(e), 

133 name=char.name, 

134 uuid=char.uuid, 

135 value=data, 

136 validation=validation, 

137 ) from e 

138 else: 

139 return encoded 

140 

141 # ------------------------------------------------------------------ 

142 # Special value encoding helpers 

143 # ------------------------------------------------------------------ 

144 

145 def encode_special(self, value_type: Any) -> bytearray: # noqa: ANN401 # SpecialValueType 

146 """Encode a special value type to bytes (reverse lookup). 

147 

148 Args: 

149 value_type: ``SpecialValueType`` enum member. 

150 

151 Returns: 

152 Encoded bytes for the special value. 

153 

154 Raises: 

155 ValueError: If no raw value of that type is defined. 

156 

157 """ 

158 raw = self._char._special_resolver.get_raw_for_type(value_type) 

159 if raw is None: 

160 raise ValueError(f"No special value of type {value_type.name} defined for this characteristic") 

161 return self._pack_raw_int(raw) 

162 

163 def encode_special_by_meaning(self, meaning: str) -> bytearray: 

164 """Encode a special value by a partial meaning string match. 

165 

166 Args: 

167 meaning: Partial meaning string to match. 

168 

169 Returns: 

170 Encoded bytes for the matching special value. 

171 

172 Raises: 

173 ValueError: If no matching special value is found. 

174 

175 """ 

176 raw = self._char._special_resolver.get_raw_for_meaning(meaning) 

177 if raw is None: 

178 raise ValueError(f"No special value matching '{meaning}' defined for this characteristic") 

179 return self._pack_raw_int(raw) 

180 

181 # ------------------------------------------------------------------ 

182 # Helpers 

183 # ------------------------------------------------------------------ 

184 

185 def _pack_raw_int(self, raw: int) -> bytearray: 

186 """Pack a raw integer to bytes using template extractor or YAML extractor.""" 

187 char = self._char 

188 # Priority 1: template extractor 

189 if char._template is not None: 

190 extractor = getattr(char._template, "extractor", None) 

191 if extractor is not None: 

192 return bytearray(extractor.pack(raw)) 

193 

194 # Priority 2: YAML-derived extractor 

195 yaml_type = char.get_yaml_data_type() 

196 if yaml_type is not None: 

197 extractor = get_extractor(yaml_type) 

198 if extractor is not None: 

199 return bytearray(extractor.pack(raw)) 

200 

201 raise ValueError("No extractor available to pack raw integer for this characteristic") 

202 

203 def _is_trace_enabled(self) -> bool: 

204 """Check if build trace is enabled.""" 

205 env_value = os.getenv("BLUETOOTH_SIG_ENABLE_PARSE_TRACE", "").lower() 

206 if env_value in ("0", "false", "no"): 

207 return False 

208 return self._char._enable_parse_trace is not False