Coverage for src / bluetooth_sig / gatt / characteristics / pipeline / encode_pipeline.py: 69%
86 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Encode pipeline for GATT characteristic values.
3Orchestrates the multi-stage encoding process: type validation → range
4validation → value encoding → length validation.
5"""
7from __future__ import annotations
9import os
10from typing import Any
12from ....types import SpecialValueResult
13from ....types.data_types import ValidationAccumulator
14from ...exceptions import CharacteristicEncodeError
15from ..utils.extractors import get_extractor
16from .validation import CharacteristicValidator
19class EncodePipeline:
20 """Multi-stage encode pipeline for characteristic values.
22 Stages:
23 1. Type validation
24 2. Range validation (numeric types only)
25 3. Value encoding (via template or subclass ``_encode_value``)
26 4. Length validation (post-encode)
28 Uses a back-reference to the owning characteristic for:
29 - ``_encode_value()`` dispatch (Template Method pattern)
30 - Metadata access (``name``, ``uuid``, ``_template``, ``_spec``)
31 - Special value resolver
32 """
34 def __init__(self, char: Any, validator: CharacteristicValidator) -> None: # noqa: ANN401
35 """Initialise with back-reference to the owning characteristic.
37 Args:
38 char: BaseCharacteristic instance.
39 validator: Shared validator instance.
41 """
42 self._char = char
43 self._validator = validator
45 # ------------------------------------------------------------------
46 # Main entry point
47 # ------------------------------------------------------------------
49 def run( # pylint: disable=too-many-branches
50 self,
51 data: Any, # noqa: ANN401 # T | SpecialValueResult
52 validate: bool = True,
53 ) -> bytearray:
54 """Execute the full encode pipeline.
56 Args:
57 data: Value to encode (type T) or ``SpecialValueResult``.
58 validate: Enable validation (type, range, length checks).
59 Special values bypass validation.
61 Returns:
62 Encoded bytes ready for BLE write.
64 Raises:
65 CharacteristicEncodeError: If encoding or validation fails.
67 """
68 char = self._char
69 enable_trace = self._is_trace_enabled()
70 build_trace: list[str] = ["Starting build"] if enable_trace else []
71 validation = ValidationAccumulator()
73 # Special value encoding — bypass validation
74 if isinstance(data, SpecialValueResult):
75 if enable_trace:
76 build_trace.append(f"Encoding special value: {data.meaning}")
77 try:
78 return self._pack_raw_int(data.raw_value)
79 except Exception as e:
80 raise CharacteristicEncodeError(
81 message=f"Failed to encode special value: {e}",
82 name=char.name,
83 uuid=char.uuid,
84 value=data,
85 validation=None,
86 ) from e
88 try:
89 # Type validation
90 if validate:
91 if enable_trace:
92 build_trace.append("Validating type")
93 type_validation = self._validator.validate_type(data)
94 validation.errors.extend(type_validation.errors)
95 validation.warnings.extend(type_validation.warnings)
96 if not type_validation.valid:
97 raise TypeError("; ".join(type_validation.errors)) # noqa: TRY301
99 # Range validation for numeric types
100 if validate and isinstance(data, (int, float)):
101 if enable_trace:
102 build_trace.append("Validating range")
103 range_validation = self._validator.validate_range(data, ctx=None)
104 validation.errors.extend(range_validation.errors)
105 validation.warnings.extend(range_validation.warnings)
106 if not range_validation.valid:
107 raise ValueError("; ".join(range_validation.errors)) # noqa: TRY301
109 # Encode
110 if enable_trace:
111 build_trace.append("Encoding value")
112 encoded: bytearray = char._encode_value(data)
114 # Length validation
115 if validate:
116 if enable_trace:
117 build_trace.append("Validating encoded length")
118 length_validation = self._validator.validate_length(encoded)
119 validation.errors.extend(length_validation.errors)
120 validation.warnings.extend(length_validation.warnings)
121 if not length_validation.valid:
122 raise ValueError("; ".join(length_validation.errors)) # noqa: TRY301
124 if enable_trace:
125 build_trace.append("Build completed successfully")
127 except Exception as e:
128 if enable_trace:
129 build_trace.append(f"Build failed: {type(e).__name__}: {e}")
131 raise CharacteristicEncodeError(
132 message=str(e),
133 name=char.name,
134 uuid=char.uuid,
135 value=data,
136 validation=validation,
137 ) from e
138 else:
139 return encoded
141 # ------------------------------------------------------------------
142 # Special value encoding helpers
143 # ------------------------------------------------------------------
145 def encode_special(self, value_type: Any) -> bytearray: # noqa: ANN401 # SpecialValueType
146 """Encode a special value type to bytes (reverse lookup).
148 Args:
149 value_type: ``SpecialValueType`` enum member.
151 Returns:
152 Encoded bytes for the special value.
154 Raises:
155 ValueError: If no raw value of that type is defined.
157 """
158 raw = self._char._special_resolver.get_raw_for_type(value_type)
159 if raw is None:
160 raise ValueError(f"No special value of type {value_type.name} defined for this characteristic")
161 return self._pack_raw_int(raw)
163 def encode_special_by_meaning(self, meaning: str) -> bytearray:
164 """Encode a special value by a partial meaning string match.
166 Args:
167 meaning: Partial meaning string to match.
169 Returns:
170 Encoded bytes for the matching special value.
172 Raises:
173 ValueError: If no matching special value is found.
175 """
176 raw = self._char._special_resolver.get_raw_for_meaning(meaning)
177 if raw is None:
178 raise ValueError(f"No special value matching '{meaning}' defined for this characteristic")
179 return self._pack_raw_int(raw)
181 # ------------------------------------------------------------------
182 # Helpers
183 # ------------------------------------------------------------------
185 def _pack_raw_int(self, raw: int) -> bytearray:
186 """Pack a raw integer to bytes using template extractor or YAML extractor."""
187 char = self._char
188 # Priority 1: template extractor
189 if char._template is not None:
190 extractor = getattr(char._template, "extractor", None)
191 if extractor is not None:
192 return bytearray(extractor.pack(raw))
194 # Priority 2: YAML-derived extractor
195 yaml_type = char.get_yaml_data_type()
196 if yaml_type is not None:
197 extractor = get_extractor(yaml_type)
198 if extractor is not None:
199 return bytearray(extractor.pack(raw))
201 raise ValueError("No extractor available to pack raw integer for this characteristic")
203 def _is_trace_enabled(self) -> bool:
204 """Check if build trace is enabled."""
205 env_value = os.getenv("BLUETOOTH_SIG_ENABLE_PARSE_TRACE", "").lower()
206 if env_value in ("0", "false", "no"):
207 return False
208 return self._char._enable_parse_trace is not False