Coverage for src / bluetooth_sig / core / encoder.py: 79%
127 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Characteristic encoding, value creation, and data validation.
3Provides encode_characteristic, create_value, validate_characteristic_data,
4and type introspection for characteristic value types.
5"""
7from __future__ import annotations
9import inspect
10import logging
11import struct
12import typing
13from typing import Any, TypeVar, overload
15from ..gatt.characteristics import templates
16from ..gatt.characteristics.base import BaseCharacteristic
17from ..gatt.characteristics.registry import CharacteristicRegistry
18from ..gatt.exceptions import (
19 CharacteristicError,
20 CharacteristicParseError,
21)
22from ..types import (
23 ValidationResult,
24)
25from ..types.uuid import BluetoothUUID
26from .parser import CharacteristicParser
28T = TypeVar("T")
30logger = logging.getLogger(__name__)
33class CharacteristicEncoder:
34 """Handles characteristic encoding, value creation, and data validation.
36 Takes a CharacteristicParser reference for validate_characteristic_data,
37 which needs to attempt a parse to check data format validity.
38 """
40 def __init__(self, parser: CharacteristicParser) -> None:
41 """Initialise with a parser reference for validation support.
43 Args:
44 parser: CharacteristicParser instance for data validation
46 """
47 self._parser = parser
49 @overload
50 def encode_characteristic(
51 self,
52 char: type[BaseCharacteristic[T]],
53 value: T,
54 validate: bool = ...,
55 ) -> bytes: ...
57 @overload
58 def encode_characteristic(
59 self,
60 char: str,
61 value: Any, # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe
62 validate: bool = ...,
63 ) -> bytes: ...
65 def encode_characteristic(
66 self,
67 char: str | type[BaseCharacteristic[T]],
68 value: T | Any, # Runtime UUID dispatch cannot be type-safe
69 validate: bool = True,
70 ) -> bytes:
71 r"""Encode a value for writing to a characteristic.
73 Args:
74 char: Characteristic class (type-safe) or UUID string (not type-safe).
75 value: The value to encode. Type is checked when using characteristic class.
76 validate: If True, validates the value before encoding (default: True)
78 Returns:
79 Encoded bytes ready to write to the characteristic
81 Raises:
82 ValueError: If UUID is invalid, characteristic not found, or value is invalid
83 TypeError: If value type doesn't match characteristic's expected type
84 CharacteristicEncodeError: If encoding fails
86 """
87 # Handle characteristic class input (type-safe path)
88 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
89 char_instance = char()
90 logger.debug("Encoding characteristic class=%s, value=%s", char.__name__, value)
91 try:
92 if validate:
93 encoded = char_instance.build_value(value)
94 logger.debug("Successfully encoded %s with validation", char_instance.name)
95 else:
96 encoded = char_instance._encode_value(value) # pylint: disable=protected-access
97 logger.debug("Successfully encoded %s without validation", char_instance.name)
98 return bytes(encoded)
99 except Exception:
100 logger.exception("Encoding failed for %s", char_instance.name)
101 raise
103 # Handle string UUID input (not type-safe path)
104 logger.debug("Encoding characteristic UUID=%s, value=%s", char, value)
106 characteristic = CharacteristicRegistry.get_characteristic(char)
107 if not characteristic:
108 raise ValueError(f"No encoder available for characteristic UUID: {char}")
110 logger.debug("Found encoder for UUID=%s: %s", char, type(characteristic).__name__)
112 # Handle dict input - convert to proper type
113 if isinstance(value, dict):
114 value_type = self._get_characteristic_value_type_class(characteristic)
115 if value_type and hasattr(value_type, "__init__") and not isinstance(value_type, str):
116 try:
117 value = value_type(**value)
118 logger.debug("Converted dict to %s", value_type.__name__)
119 except (TypeError, ValueError) as e:
120 type_name = getattr(value_type, "__name__", str(value_type))
121 raise TypeError(f"Failed to convert dict to {type_name} for characteristic {char}: {e}") from e
123 # Encode using build_value (with validation) or encode_value (without)
124 try:
125 if validate:
126 encoded = characteristic.build_value(value)
127 logger.debug("Successfully encoded %s with validation", characteristic.name)
128 else:
129 encoded = characteristic._encode_value(value) # pylint: disable=protected-access
130 logger.debug("Successfully encoded %s without validation", characteristic.name)
131 return bytes(encoded)
132 except Exception:
133 logger.exception("Encoding failed for %s", characteristic.name)
134 raise
136 def _get_characteristic_value_type_class( # pylint: disable=too-many-return-statements,too-many-branches
137 self, characteristic: BaseCharacteristic[Any]
138 ) -> type[Any] | None:
139 """Get the Python type class that a characteristic expects.
141 Args:
142 characteristic: The characteristic instance
144 Returns:
145 The type class, or None if it can't be determined
147 """
148 # Try to infer from decode_value return type annotation
149 if hasattr(characteristic, "_decode_value"):
150 try:
151 module = inspect.getmodule(characteristic.__class__)
152 globalns = getattr(module, "__dict__", {}) if module else {}
153 type_hints = typing.get_type_hints(characteristic._decode_value, globalns=globalns) # pylint: disable=protected-access
154 return_type = type_hints.get("return")
155 if return_type and return_type is not type(None):
156 return return_type # type: ignore[no-any-return] # Dynamic introspection of _decode_value return annotation
157 except (TypeError, AttributeError, NameError):
158 return_type = inspect.signature(characteristic._decode_value).return_annotation # pylint: disable=protected-access
159 sig = inspect.signature(characteristic._decode_value) # pylint: disable=protected-access
160 return_annotation = sig.return_annotation
161 if (
162 return_annotation
163 and return_annotation != inspect.Parameter.empty
164 and not isinstance(return_annotation, str)
165 ):
166 return return_annotation # type: ignore[no-any-return] # Dynamic introspection fallback via inspect.signature
168 manual_type = characteristic._python_type # pylint: disable=protected-access
169 if manual_type and isinstance(manual_type, type):
170 return manual_type
171 if manual_type and isinstance(manual_type, str) and hasattr(templates, manual_type):
172 return getattr(templates, manual_type) # type: ignore[no-any-return] # Runtime template lookup by string name
174 # Try to get from template
175 if hasattr(characteristic, "_template") and characteristic._template: # pylint: disable=protected-access
176 template = characteristic._template # pylint: disable=protected-access
177 if hasattr(template, "__orig_class__"):
178 args = typing.get_args(template.__orig_class__)
179 if args:
180 return args[0] # type: ignore[no-any-return] # Generic type arg extraction from __orig_class__
182 # For simple types, check info.python_type
183 info = characteristic.info
184 if isinstance(info.python_type, type) and info.python_type in (int, float, str, bool, bytes):
185 return info.python_type
187 return None
189 def validate_characteristic_data(self, uuid: str, data: bytes) -> ValidationResult:
190 """Validate characteristic data format against SIG specifications.
192 Args:
193 uuid: The characteristic UUID
194 data: Raw data bytes to validate
196 Returns:
197 ValidationResult with validation details
199 """
200 try:
201 self._parser.parse_characteristic(uuid, data)
202 try:
203 bt_uuid = BluetoothUUID(uuid)
204 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(bt_uuid)
205 expected = char_class.expected_length if char_class else None
206 except (ValueError, AttributeError):
207 expected = None
208 return ValidationResult(
209 is_valid=True,
210 actual_length=len(data),
211 expected_length=expected,
212 error_message="",
213 )
214 except (CharacteristicParseError, ValueError, TypeError, struct.error, CharacteristicError) as e:
215 try:
216 bt_uuid = BluetoothUUID(uuid)
217 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(bt_uuid)
218 expected = char_class.expected_length if char_class else None
219 except (ValueError, AttributeError):
220 expected = None
221 return ValidationResult(
222 is_valid=False,
223 actual_length=len(data),
224 expected_length=expected,
225 error_message=str(e),
226 )
228 def create_value(self, uuid: str, **kwargs: Any) -> Any: # noqa: ANN401
229 """Create a properly typed value instance for a characteristic.
231 Args:
232 uuid: The characteristic UUID
233 **kwargs: Field values for the characteristic's type
235 Returns:
236 Properly typed value instance
238 Raises:
239 ValueError: If UUID is invalid or characteristic not found
240 TypeError: If kwargs don't match the characteristic's expected fields
242 """
243 characteristic = CharacteristicRegistry.get_characteristic(uuid)
244 if not characteristic:
245 raise ValueError(f"No characteristic found for UUID: {uuid}")
247 value_type = self._get_characteristic_value_type_class(characteristic)
249 if not value_type:
250 if len(kwargs) == 1:
251 return next(iter(kwargs.values()))
252 raise ValueError(
253 f"Cannot determine value type for characteristic {uuid}. "
254 "Try passing a dict to encode_characteristic() instead."
255 )
257 # Handle simple primitive types
258 if value_type in (int, float, str, bool, bytes):
259 if len(kwargs) == 1:
260 value = next(iter(kwargs.values()))
261 if not isinstance(value, value_type):
262 type_name = getattr(value_type, "__name__", str(value_type))
263 raise TypeError(f"Expected {type_name}, got {type(value).__name__}")
264 return value
265 type_name = getattr(value_type, "__name__", str(value_type))
266 raise TypeError(f"Simple type {type_name} expects a single value")
268 # Construct complex type from kwargs
269 try:
270 return value_type(**kwargs)
271 except (TypeError, ValueError) as e:
272 type_name = getattr(value_type, "__name__", str(value_type))
273 raise TypeError(f"Failed to create {type_name} for characteristic {uuid}: {e}") from e