Coverage for src / bluetooth_sig / core / encoder.py: 79%

127 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Characteristic encoding, value creation, and data validation. 

2 

3Provides encode_characteristic, create_value, validate_characteristic_data, 

4and type introspection for characteristic value types. 

5""" 

6 

7from __future__ import annotations 

8 

9import inspect 

10import logging 

11import struct 

12import typing 

13from typing import Any, TypeVar, overload 

14 

15from ..gatt.characteristics import templates 

16from ..gatt.characteristics.base import BaseCharacteristic 

17from ..gatt.characteristics.registry import CharacteristicRegistry 

18from ..gatt.exceptions import ( 

19 CharacteristicError, 

20 CharacteristicParseError, 

21) 

22from ..types import ( 

23 ValidationResult, 

24) 

25from ..types.uuid import BluetoothUUID 

26from .parser import CharacteristicParser 

27 

28T = TypeVar("T") 

29 

30logger = logging.getLogger(__name__) 

31 

32 

33class CharacteristicEncoder: 

34 """Handles characteristic encoding, value creation, and data validation. 

35 

36 Takes a CharacteristicParser reference for validate_characteristic_data, 

37 which needs to attempt a parse to check data format validity. 

38 """ 

39 

40 def __init__(self, parser: CharacteristicParser) -> None: 

41 """Initialise with a parser reference for validation support. 

42 

43 Args: 

44 parser: CharacteristicParser instance for data validation 

45 

46 """ 

47 self._parser = parser 

48 

49 @overload 

50 def encode_characteristic( 

51 self, 

52 char: type[BaseCharacteristic[T]], 

53 value: T, 

54 validate: bool = ..., 

55 ) -> bytes: ... 

56 

57 @overload 

58 def encode_characteristic( 

59 self, 

60 char: str, 

61 value: Any, # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

62 validate: bool = ..., 

63 ) -> bytes: ... 

64 

65 def encode_characteristic( 

66 self, 

67 char: str | type[BaseCharacteristic[T]], 

68 value: T | Any, # Runtime UUID dispatch cannot be type-safe 

69 validate: bool = True, 

70 ) -> bytes: 

71 r"""Encode a value for writing to a characteristic. 

72 

73 Args: 

74 char: Characteristic class (type-safe) or UUID string (not type-safe). 

75 value: The value to encode. Type is checked when using characteristic class. 

76 validate: If True, validates the value before encoding (default: True) 

77 

78 Returns: 

79 Encoded bytes ready to write to the characteristic 

80 

81 Raises: 

82 ValueError: If UUID is invalid, characteristic not found, or value is invalid 

83 TypeError: If value type doesn't match characteristic's expected type 

84 CharacteristicEncodeError: If encoding fails 

85 

86 """ 

87 # Handle characteristic class input (type-safe path) 

88 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

89 char_instance = char() 

90 logger.debug("Encoding characteristic class=%s, value=%s", char.__name__, value) 

91 try: 

92 if validate: 

93 encoded = char_instance.build_value(value) 

94 logger.debug("Successfully encoded %s with validation", char_instance.name) 

95 else: 

96 encoded = char_instance._encode_value(value) # pylint: disable=protected-access 

97 logger.debug("Successfully encoded %s without validation", char_instance.name) 

98 return bytes(encoded) 

99 except Exception: 

100 logger.exception("Encoding failed for %s", char_instance.name) 

101 raise 

102 

103 # Handle string UUID input (not type-safe path) 

104 logger.debug("Encoding characteristic UUID=%s, value=%s", char, value) 

105 

106 characteristic = CharacteristicRegistry.get_characteristic(char) 

107 if not characteristic: 

108 raise ValueError(f"No encoder available for characteristic UUID: {char}") 

109 

110 logger.debug("Found encoder for UUID=%s: %s", char, type(characteristic).__name__) 

111 

112 # Handle dict input - convert to proper type 

113 if isinstance(value, dict): 

114 value_type = self._get_characteristic_value_type_class(characteristic) 

115 if value_type and hasattr(value_type, "__init__") and not isinstance(value_type, str): 

116 try: 

117 value = value_type(**value) 

118 logger.debug("Converted dict to %s", value_type.__name__) 

119 except (TypeError, ValueError) as e: 

120 type_name = getattr(value_type, "__name__", str(value_type)) 

121 raise TypeError(f"Failed to convert dict to {type_name} for characteristic {char}: {e}") from e 

122 

123 # Encode using build_value (with validation) or encode_value (without) 

124 try: 

125 if validate: 

126 encoded = characteristic.build_value(value) 

127 logger.debug("Successfully encoded %s with validation", characteristic.name) 

128 else: 

129 encoded = characteristic._encode_value(value) # pylint: disable=protected-access 

130 logger.debug("Successfully encoded %s without validation", characteristic.name) 

131 return bytes(encoded) 

132 except Exception: 

133 logger.exception("Encoding failed for %s", characteristic.name) 

134 raise 

135 

136 def _get_characteristic_value_type_class( # pylint: disable=too-many-return-statements,too-many-branches 

137 self, characteristic: BaseCharacteristic[Any] 

138 ) -> type[Any] | None: 

139 """Get the Python type class that a characteristic expects. 

140 

141 Args: 

142 characteristic: The characteristic instance 

143 

144 Returns: 

145 The type class, or None if it can't be determined 

146 

147 """ 

148 # Try to infer from decode_value return type annotation 

149 if hasattr(characteristic, "_decode_value"): 

150 try: 

151 module = inspect.getmodule(characteristic.__class__) 

152 globalns = getattr(module, "__dict__", {}) if module else {} 

153 type_hints = typing.get_type_hints(characteristic._decode_value, globalns=globalns) # pylint: disable=protected-access 

154 return_type = type_hints.get("return") 

155 if return_type and return_type is not type(None): 

156 return return_type # type: ignore[no-any-return] # Dynamic introspection of _decode_value return annotation 

157 except (TypeError, AttributeError, NameError): 

158 return_type = inspect.signature(characteristic._decode_value).return_annotation # pylint: disable=protected-access 

159 sig = inspect.signature(characteristic._decode_value) # pylint: disable=protected-access 

160 return_annotation = sig.return_annotation 

161 if ( 

162 return_annotation 

163 and return_annotation != inspect.Parameter.empty 

164 and not isinstance(return_annotation, str) 

165 ): 

166 return return_annotation # type: ignore[no-any-return] # Dynamic introspection fallback via inspect.signature 

167 

168 manual_type = characteristic._python_type # pylint: disable=protected-access 

169 if manual_type and isinstance(manual_type, type): 

170 return manual_type 

171 if manual_type and isinstance(manual_type, str) and hasattr(templates, manual_type): 

172 return getattr(templates, manual_type) # type: ignore[no-any-return] # Runtime template lookup by string name 

173 

174 # Try to get from template 

175 if hasattr(characteristic, "_template") and characteristic._template: # pylint: disable=protected-access 

176 template = characteristic._template # pylint: disable=protected-access 

177 if hasattr(template, "__orig_class__"): 

178 args = typing.get_args(template.__orig_class__) 

179 if args: 

180 return args[0] # type: ignore[no-any-return] # Generic type arg extraction from __orig_class__ 

181 

182 # For simple types, check info.python_type 

183 info = characteristic.info 

184 if isinstance(info.python_type, type) and info.python_type in (int, float, str, bool, bytes): 

185 return info.python_type 

186 

187 return None 

188 

189 def validate_characteristic_data(self, uuid: str, data: bytes) -> ValidationResult: 

190 """Validate characteristic data format against SIG specifications. 

191 

192 Args: 

193 uuid: The characteristic UUID 

194 data: Raw data bytes to validate 

195 

196 Returns: 

197 ValidationResult with validation details 

198 

199 """ 

200 try: 

201 self._parser.parse_characteristic(uuid, data) 

202 try: 

203 bt_uuid = BluetoothUUID(uuid) 

204 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(bt_uuid) 

205 expected = char_class.expected_length if char_class else None 

206 except (ValueError, AttributeError): 

207 expected = None 

208 return ValidationResult( 

209 is_valid=True, 

210 actual_length=len(data), 

211 expected_length=expected, 

212 error_message="", 

213 ) 

214 except (CharacteristicParseError, ValueError, TypeError, struct.error, CharacteristicError) as e: 

215 try: 

216 bt_uuid = BluetoothUUID(uuid) 

217 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(bt_uuid) 

218 expected = char_class.expected_length if char_class else None 

219 except (ValueError, AttributeError): 

220 expected = None 

221 return ValidationResult( 

222 is_valid=False, 

223 actual_length=len(data), 

224 expected_length=expected, 

225 error_message=str(e), 

226 ) 

227 

228 def create_value(self, uuid: str, **kwargs: Any) -> Any: # noqa: ANN401 

229 """Create a properly typed value instance for a characteristic. 

230 

231 Args: 

232 uuid: The characteristic UUID 

233 **kwargs: Field values for the characteristic's type 

234 

235 Returns: 

236 Properly typed value instance 

237 

238 Raises: 

239 ValueError: If UUID is invalid or characteristic not found 

240 TypeError: If kwargs don't match the characteristic's expected fields 

241 

242 """ 

243 characteristic = CharacteristicRegistry.get_characteristic(uuid) 

244 if not characteristic: 

245 raise ValueError(f"No characteristic found for UUID: {uuid}") 

246 

247 value_type = self._get_characteristic_value_type_class(characteristic) 

248 

249 if not value_type: 

250 if len(kwargs) == 1: 

251 return next(iter(kwargs.values())) 

252 raise ValueError( 

253 f"Cannot determine value type for characteristic {uuid}. " 

254 "Try passing a dict to encode_characteristic() instead." 

255 ) 

256 

257 # Handle simple primitive types 

258 if value_type in (int, float, str, bool, bytes): 

259 if len(kwargs) == 1: 

260 value = next(iter(kwargs.values())) 

261 if not isinstance(value, value_type): 

262 type_name = getattr(value_type, "__name__", str(value_type)) 

263 raise TypeError(f"Expected {type_name}, got {type(value).__name__}") 

264 return value 

265 type_name = getattr(value_type, "__name__", str(value_type)) 

266 raise TypeError(f"Simple type {type_name} expects a single value") 

267 

268 # Construct complex type from kwargs 

269 try: 

270 return value_type(**kwargs) 

271 except (TypeError, ValueError) as e: 

272 type_name = getattr(value_type, "__name__", str(value_type)) 

273 raise TypeError(f"Failed to create {type_name} for characteristic {uuid}: {e}") from e