Coverage for src / bluetooth_sig / core / parser.py: 91%

127 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Characteristic parsing with dependency-aware batch support. 

2 

3Provides single and batch characteristic parsing, including topological 

4dependency ordering for multi-characteristic reads. Stateless. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10from collections.abc import Mapping 

11from graphlib import TopologicalSorter 

12from typing import Any, TypeVar, overload 

13 

14from ..gatt.characteristics.base import BaseCharacteristic 

15from ..gatt.characteristics.registry import CharacteristicRegistry 

16from ..gatt.exceptions import ( 

17 CharacteristicParseError, 

18 MissingDependencyError, 

19 SpecialValueDetectedError, 

20) 

21from ..types import CharacteristicContext 

22from ..types.uuid import BluetoothUUID 

23 

24T = TypeVar("T") 

25 

26logger = logging.getLogger(__name__) 

27 

28 

29class CharacteristicParser: 

30 """Stateless parser for single and batch characteristic data. 

31 

32 Handles parse_characteristic (with @overload support), parse_characteristics 

33 (batch with dependency ordering), and all private batch helpers. 

34 """ 

35 

36 @overload 

37 def parse_characteristic( 

38 self, 

39 char: type[BaseCharacteristic[T]], 

40 raw_data: bytes | bytearray, 

41 ctx: CharacteristicContext | None = ..., 

42 ) -> T: ... 

43 

44 @overload 

45 def parse_characteristic( 

46 self, 

47 char: str, 

48 raw_data: bytes | bytearray, 

49 ctx: CharacteristicContext | None = ..., 

50 ) -> Any: ... # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

51 

52 def parse_characteristic( 

53 self, 

54 char: str | type[BaseCharacteristic[T]], 

55 raw_data: bytes | bytearray, 

56 ctx: CharacteristicContext | None = None, 

57 ) -> T | Any: # Runtime UUID dispatch cannot be type-safe 

58 r"""Parse a characteristic's raw data using Bluetooth SIG standards. 

59 

60 Args: 

61 char: Characteristic class (type-safe) or UUID string (not type-safe). 

62 raw_data: Raw bytes from the characteristic (bytes or bytearray) 

63 ctx: Optional CharacteristicContext providing device-level info 

64 

65 Returns: 

66 Parsed value. Return type is inferred when passing characteristic class. 

67 

68 Raises: 

69 SpecialValueDetectedError: Special sentinel value detected 

70 CharacteristicParseError: Parse/validation failure 

71 

72 """ 

73 # Handle characteristic class input (type-safe path) 

74 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

75 char_instance = char() 

76 logger.debug("Parsing characteristic class=%s, data_len=%d", char.__name__, len(raw_data)) 

77 try: 

78 value = char_instance.parse_value(raw_data, ctx) 

79 logger.debug("Successfully parsed %s: %s", char_instance.name, value) 

80 except SpecialValueDetectedError as e: 

81 logger.debug("Special value detected for %s: %s", char_instance.name, e.special_value.meaning) 

82 raise 

83 except CharacteristicParseError as e: 

84 logger.warning("Parse failed for %s: %s", char_instance.name, e) 

85 raise 

86 else: 

87 return value 

88 

89 # Handle string UUID input (not type-safe path) 

90 logger.debug("Parsing characteristic UUID=%s, data_len=%d", char, len(raw_data)) 

91 

92 characteristic = CharacteristicRegistry.get_characteristic(char) 

93 

94 if characteristic: 

95 logger.debug("Found parser for UUID=%s: %s", char, type(characteristic).__name__) 

96 try: 

97 value = characteristic.parse_value(raw_data, ctx) 

98 logger.debug("Successfully parsed %s: %s", characteristic.name, value) 

99 except SpecialValueDetectedError as e: 

100 logger.debug("Special value detected for %s: %s", characteristic.name, e.special_value.meaning) 

101 raise 

102 except CharacteristicParseError as e: 

103 logger.warning("Parse failed for %s: %s", characteristic.name, e) 

104 raise 

105 else: 

106 return value 

107 else: 

108 logger.info("No parser available for UUID=%s", char) 

109 raise CharacteristicParseError( 

110 message=f"No parser available for characteristic UUID: {char}", 

111 name="Unknown", 

112 uuid=BluetoothUUID(char), 

113 raw_data=bytes(raw_data), 

114 ) 

115 

116 def parse_characteristics( 

117 self, 

118 char_data: dict[str, bytes], 

119 ctx: CharacteristicContext | None = None, 

120 ) -> dict[str, Any]: 

121 r"""Parse multiple characteristics at once with dependency-aware ordering. 

122 

123 Args: 

124 char_data: Dictionary mapping UUIDs to raw data bytes 

125 ctx: Optional CharacteristicContext used as the starting context 

126 

127 Returns: 

128 Dictionary mapping UUIDs to parsed values 

129 

130 Raises: 

131 ValueError: If circular dependencies are detected 

132 CharacteristicParseError: If parsing fails for any characteristic 

133 

134 """ 

135 return self._parse_characteristics_batch(char_data, ctx) 

136 

137 def _parse_characteristics_batch( 

138 self, 

139 char_data: dict[str, bytes], 

140 ctx: CharacteristicContext | None, 

141 ) -> dict[str, Any]: 

142 """Parse multiple characteristics using dependency-aware ordering.""" 

143 logger.debug("Batch parsing %d characteristics", len(char_data)) 

144 

145 uuid_to_characteristic, uuid_to_required_deps, uuid_to_optional_deps = ( 

146 self._prepare_characteristic_dependencies(char_data) 

147 ) 

148 

149 sorted_uuids = self._resolve_dependency_order(char_data, uuid_to_required_deps, uuid_to_optional_deps) 

150 

151 base_context = ctx 

152 

153 results: dict[str, Any] = {} 

154 for uuid_str in sorted_uuids: 

155 raw_data = char_data[uuid_str] 

156 characteristic = uuid_to_characteristic.get(uuid_str) 

157 

158 missing_required = self._find_missing_required_dependencies( 

159 uuid_str, 

160 uuid_to_required_deps.get(uuid_str, []), 

161 results, 

162 base_context, 

163 ) 

164 

165 if missing_required: 

166 raise MissingDependencyError(characteristic.name if characteristic else "Unknown", missing_required) 

167 

168 self._log_optional_dependency_gaps( 

169 uuid_str, 

170 uuid_to_optional_deps.get(uuid_str, []), 

171 results, 

172 base_context, 

173 ) 

174 

175 parse_context = self._build_parse_context(base_context, results) 

176 

177 value = self.parse_characteristic(uuid_str, raw_data, ctx=parse_context) 

178 results[uuid_str] = value 

179 

180 logger.debug("Batch parsing complete: %d results", len(results)) 

181 return results 

182 

183 def _prepare_characteristic_dependencies( 

184 self, characteristic_data: Mapping[str, bytes] 

185 ) -> tuple[dict[str, BaseCharacteristic[Any]], dict[str, list[str]], dict[str, list[str]]]: 

186 """Instantiate characteristics once and collect declared dependencies.""" 

187 uuid_to_characteristic: dict[str, BaseCharacteristic[Any]] = {} 

188 uuid_to_required_deps: dict[str, list[str]] = {} 

189 uuid_to_optional_deps: dict[str, list[str]] = {} 

190 

191 for uuid in characteristic_data: 

192 characteristic = CharacteristicRegistry.get_characteristic(uuid) 

193 if characteristic is None: 

194 continue 

195 

196 uuid_to_characteristic[uuid] = characteristic 

197 

198 required = characteristic.required_dependencies 

199 optional = characteristic.optional_dependencies 

200 

201 if required: 

202 uuid_to_required_deps[uuid] = required 

203 logger.debug("Characteristic %s has required dependencies: %s", uuid, required) 

204 if optional: 

205 uuid_to_optional_deps[uuid] = optional 

206 logger.debug("Characteristic %s has optional dependencies: %s", uuid, optional) 

207 

208 return uuid_to_characteristic, uuid_to_required_deps, uuid_to_optional_deps 

209 

210 @staticmethod 

211 def _resolve_dependency_order( 

212 characteristic_data: Mapping[str, bytes], 

213 uuid_to_required_deps: Mapping[str, list[str]], 

214 uuid_to_optional_deps: Mapping[str, list[str]], 

215 ) -> list[str]: 

216 """Topologically sort characteristics based on declared dependencies.""" 

217 try: 

218 sorter: TopologicalSorter[str] = TopologicalSorter() 

219 for uuid in characteristic_data: 

220 all_deps = uuid_to_required_deps.get(uuid, []) + uuid_to_optional_deps.get(uuid, []) 

221 batch_deps = [dep for dep in all_deps if dep in characteristic_data] 

222 sorter.add(uuid, *batch_deps) 

223 

224 sorted_sequence = sorter.static_order() 

225 sorted_uuids = list(sorted_sequence) 

226 logger.debug("Dependency-sorted parsing order: %s", sorted_uuids) 

227 except Exception as exc: # pylint: disable=broad-exception-caught 

228 logger.warning("Dependency sorting failed: %s. Using original order.", exc) 

229 return list(characteristic_data.keys()) 

230 else: 

231 return sorted_uuids 

232 

233 @staticmethod 

234 def _find_missing_required_dependencies( 

235 uuid: str, 

236 required_deps: list[str], 

237 results: Mapping[str, Any], 

238 base_context: CharacteristicContext | None, 

239 ) -> list[str]: 

240 """Determine which required dependencies are unavailable for a characteristic.""" 

241 if not required_deps: 

242 return [] 

243 

244 missing: list[str] = [] 

245 other_characteristics = ( 

246 base_context.other_characteristics if base_context and base_context.other_characteristics else None 

247 ) 

248 

249 for dep_uuid in required_deps: 

250 if dep_uuid in results: 

251 continue 

252 

253 if other_characteristics and dep_uuid in other_characteristics: 

254 continue 

255 

256 missing.append(dep_uuid) 

257 

258 if missing: 

259 logger.debug("Characteristic %s missing required dependencies: %s", uuid, missing) 

260 

261 return missing 

262 

263 @staticmethod 

264 def _log_optional_dependency_gaps( 

265 uuid: str, 

266 optional_deps: list[str], 

267 results: Mapping[str, Any], 

268 base_context: CharacteristicContext | None, 

269 ) -> None: 

270 """Emit debug logs when optional dependencies are unavailable.""" 

271 if not optional_deps: 

272 return 

273 

274 other_characteristics = ( 

275 base_context.other_characteristics if base_context and base_context.other_characteristics else None 

276 ) 

277 

278 for dep_uuid in optional_deps: 

279 if dep_uuid in results: 

280 continue 

281 if other_characteristics and dep_uuid in other_characteristics: 

282 continue 

283 logger.debug("Optional dependency %s not available for %s", dep_uuid, uuid) 

284 

285 @staticmethod 

286 def _build_parse_context( 

287 base_context: CharacteristicContext | None, 

288 results: Mapping[str, Any], 

289 ) -> CharacteristicContext: 

290 """Construct the context passed to per-characteristic parsers.""" 

291 if base_context is not None: 

292 return CharacteristicContext( 

293 device_info=base_context.device_info, 

294 advertisement=base_context.advertisement, 

295 other_characteristics=results, 

296 raw_service=base_context.raw_service, 

297 ) 

298 

299 return CharacteristicContext(other_characteristics=results)