Coverage for src / bluetooth_sig / core / translator.py: 76%

434 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1# pylint: disable=too-many-lines # TODO split up Comprehensive translator with many methods 

2"""Core Bluetooth SIG standards translator functionality.""" 

3 

4from __future__ import annotations 

5 

6import inspect 

7import logging 

8import typing 

9from collections.abc import Mapping 

10from graphlib import TopologicalSorter 

11from typing import Any, TypeVar, overload 

12 

13from ..gatt.characteristics import templates 

14from ..gatt.characteristics.base import BaseCharacteristic 

15from ..gatt.characteristics.registry import CharacteristicRegistry 

16from ..gatt.exceptions import CharacteristicParseError, MissingDependencyError, SpecialValueDetected 

17from ..gatt.services import ServiceName 

18from ..gatt.services.base import BaseGattService 

19from ..gatt.services.registry import GattServiceRegistry 

20from ..gatt.uuid_registry import uuid_registry 

21from ..types import ( 

22 CharacteristicContext, 

23 CharacteristicInfo, 

24 ServiceInfo, 

25 SIGInfo, 

26 ValidationResult, 

27) 

28from ..types.gatt_enums import CharacteristicName, ValueType 

29from ..types.uuid import BluetoothUUID 

30 

31# Type alias for characteristic data in process_services 

32CharacteristicDataDict = dict[str, Any] 

33 

34# Type variable for generic characteristic return types 

35T = TypeVar("T") 

36 

37logger = logging.getLogger(__name__) 

38 

39 

40class BluetoothSIGTranslator: # pylint: disable=too-many-public-methods 

41 """Pure Bluetooth SIG standards translator for characteristic and service interpretation. 

42 

43 This class provides the primary API surface for Bluetooth SIG standards translation, 

44 covering characteristic parsing, service discovery, UUID resolution, and registry 

45 management. 

46 

47 Singleton Pattern: 

48 This class is implemented as a singleton to provide a global registry for 

49 custom characteristics and services. Access the singleton instance using 

50 `BluetoothSIGTranslator.get_instance()` or the module-level `translator` variable. 

51 

52 Key features: 

53 - Parse raw BLE characteristic data using Bluetooth SIG specifications 

54 - Resolve UUIDs to [CharacteristicInfo][bluetooth_sig.types.CharacteristicInfo] 

55 and [ServiceInfo][bluetooth_sig.types.ServiceInfo] 

56 - Create BaseGattService instances from service UUIDs 

57 - Access comprehensive registry of supported characteristics and services 

58 

59 Note: This class intentionally has >20 public methods as it serves as the 

60 primary API surface for Bluetooth SIG standards translation. The methods are 

61 organized by functionality and reducing them would harm API clarity. 

62 """ 

63 

64 _instance: BluetoothSIGTranslator | None = None 

65 _instance_lock: bool = False # Simple lock to prevent recursion 

66 

67 def __new__(cls) -> BluetoothSIGTranslator: 

68 """Create or return the singleton instance.""" 

69 if cls._instance is None: 

70 cls._instance = super().__new__(cls) 

71 return cls._instance 

72 

73 @classmethod 

74 def get_instance(cls) -> BluetoothSIGTranslator: 

75 """Get the singleton instance of BluetoothSIGTranslator. 

76 

77 Returns: 

78 The singleton BluetoothSIGTranslator instance 

79 

80 Example:: 

81 

82 from bluetooth_sig import BluetoothSIGTranslator 

83 

84 # Get the singleton instance 

85 translator = BluetoothSIGTranslator.get_instance() 

86 """ 

87 if cls._instance is None: 

88 cls._instance = cls() 

89 return cls._instance 

90 

91 def __init__(self) -> None: 

92 """Initialize the SIG translator (singleton pattern).""" 

93 # Only initialize once 

94 if self.__class__._instance_lock: 

95 return 

96 self.__class__._instance_lock = True 

97 

98 self._services: dict[str, BaseGattService] = {} 

99 

100 def __str__(self) -> str: 

101 """Return string representation of the translator.""" 

102 return "BluetoothSIGTranslator(pure SIG standards)" 

103 

104 @overload 

105 def parse_characteristic( 

106 self, 

107 char: type[BaseCharacteristic[T]], 

108 raw_data: bytes | bytearray, 

109 ctx: CharacteristicContext | None = ..., 

110 ) -> T: ... 

111 

112 @overload 

113 def parse_characteristic( 

114 self, 

115 char: str, 

116 raw_data: bytes | bytearray, 

117 ctx: CharacteristicContext | None = ..., 

118 ) -> Any: ... # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

119 

120 def parse_characteristic( 

121 self, 

122 char: str | type[BaseCharacteristic[T]], 

123 raw_data: bytes | bytearray, 

124 ctx: CharacteristicContext | None = None, 

125 ) -> T | Any: # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

126 r"""Parse a characteristic's raw data using Bluetooth SIG standards. 

127 

128 Args: 

129 char: Characteristic class (type-safe) or UUID string (not type-safe). 

130 raw_data: Raw bytes from the characteristic (bytes or bytearray) 

131 ctx: Optional CharacteristicContext providing device-level info 

132 

133 Returns: 

134 Parsed value. Return type is inferred when passing characteristic class. 

135 

136 - Primitives: ``int``, ``float``, ``str``, ``bool`` 

137 - Dataclasses: ``NavigationData``, ``HeartRateMeasurement``, etc. 

138 - Special values: ``SpecialValueResult`` (via exception) 

139 

140 Raises: 

141 SpecialValueDetected: Special sentinel value detected 

142 CharacteristicParseError: Parse/validation failure 

143 

144 Example:: 

145 

146 from bluetooth_sig import BluetoothSIGTranslator 

147 from bluetooth_sig.gatt.characteristics import BatteryLevelCharacteristic 

148 

149 translator = BluetoothSIGTranslator() 

150 

151 # Type-safe: pass characteristic class, return type is inferred 

152 level: int = translator.parse_characteristic(BatteryLevelCharacteristic, b"\\x64") 

153 

154 # Not type-safe: pass UUID string, returns Any 

155 value = translator.parse_characteristic("2A19", b"\\x64") 

156 

157 """ 

158 # Handle characteristic class input (type-safe path) 

159 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

160 char_instance = char() 

161 logger.debug("Parsing characteristic class=%s, data_len=%d", char.__name__, len(raw_data)) 

162 try: 

163 value = char_instance.parse_value(raw_data, ctx) 

164 logger.debug("Successfully parsed %s: %s", char_instance.name, value) 

165 return value 

166 except SpecialValueDetected as e: 

167 logger.debug("Special value detected for %s: %s", char_instance.name, e.special_value.meaning) 

168 raise 

169 except CharacteristicParseError as e: 

170 logger.warning("Parse failed for %s: %s", char_instance.name, e) 

171 raise 

172 

173 # Handle string UUID input (not type-safe path) 

174 logger.debug("Parsing characteristic UUID=%s, data_len=%d", char, len(raw_data)) 

175 

176 # Create characteristic instance for parsing 

177 characteristic = CharacteristicRegistry.create_characteristic(char) 

178 

179 if characteristic: 

180 logger.debug("Found parser for UUID=%s: %s", char, type(characteristic).__name__) 

181 # Use the parse_value method which raises exceptions on failure 

182 try: 

183 value = characteristic.parse_value(raw_data, ctx) 

184 logger.debug("Successfully parsed %s: %s", characteristic.name, value) 

185 return value 

186 except SpecialValueDetected as e: 

187 logger.debug("Special value detected for %s: %s", characteristic.name, e.special_value.meaning) 

188 raise 

189 except CharacteristicParseError as e: 

190 logger.warning("Parse failed for %s: %s", characteristic.name, e) 

191 raise 

192 else: 

193 # No parser found, raise an error 

194 logger.info("No parser available for UUID=%s", char) 

195 raise CharacteristicParseError( 

196 message=f"No parser available for characteristic UUID: {char}", 

197 name="Unknown", 

198 uuid=BluetoothUUID(char), 

199 raw_data=bytes(raw_data), 

200 ) 

201 

202 @overload 

203 def encode_characteristic( 

204 self, 

205 char: type[BaseCharacteristic[T]], 

206 value: T, 

207 validate: bool = ..., 

208 ) -> bytes: ... 

209 

210 @overload 

211 def encode_characteristic( 

212 self, 

213 char: str, 

214 value: Any, # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

215 validate: bool = ..., 

216 ) -> bytes: ... 

217 

218 def encode_characteristic( 

219 self, 

220 char: str | type[BaseCharacteristic[T]], 

221 value: T | Any, # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

222 validate: bool = True, 

223 ) -> bytes: 

224 r"""Encode a value for writing to a characteristic. 

225 

226 Args: 

227 char: Characteristic class (type-safe) or UUID string (not type-safe). 

228 value: The value to encode. Type is checked when using characteristic class. 

229 validate: If True, validates the value before encoding (default: True) 

230 

231 Returns: 

232 Encoded bytes ready to write to the characteristic 

233 

234 Raises: 

235 ValueError: If UUID is invalid, characteristic not found, or value is invalid 

236 TypeError: If value type doesn't match characteristic's expected type 

237 CharacteristicEncodeError: If encoding fails 

238 

239 Example:: 

240 

241 from bluetooth_sig import BluetoothSIGTranslator 

242 from bluetooth_sig.gatt.characteristics import AlertLevelCharacteristic 

243 from bluetooth_sig.gatt.characteristics.alert_level import AlertLevel 

244 

245 translator = BluetoothSIGTranslator() 

246 

247 # Type-safe: pass characteristic class and typed value 

248 data: bytes = translator.encode_characteristic(AlertLevelCharacteristic, AlertLevel.HIGH) 

249 

250 # Not type-safe: pass UUID string 

251 data = translator.encode_characteristic("2A06", 2) 

252 

253 """ 

254 # Handle characteristic class input (type-safe path) 

255 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

256 char_instance = char() 

257 logger.debug("Encoding characteristic class=%s, value=%s", char.__name__, value) 

258 try: 

259 if validate: 

260 encoded = char_instance.build_value(value) 

261 logger.debug("Successfully encoded %s with validation", char_instance.name) 

262 else: 

263 encoded = char_instance._encode_value(value) # pylint: disable=protected-access 

264 logger.debug("Successfully encoded %s without validation", char_instance.name) 

265 return bytes(encoded) 

266 except Exception as e: 

267 logger.error("Encoding failed for %s: %s", char_instance.name, e) 

268 raise 

269 

270 # Handle string UUID input (not type-safe path) 

271 logger.debug("Encoding characteristic UUID=%s, value=%s", char, value) 

272 

273 # Create characteristic instance 

274 characteristic = CharacteristicRegistry.create_characteristic(char) 

275 if not characteristic: 

276 raise ValueError(f"No encoder available for characteristic UUID: {char}") 

277 

278 logger.debug("Found encoder for UUID=%s: %s", char, type(characteristic).__name__) 

279 

280 # Handle dict input - convert to proper type 

281 if isinstance(value, dict): 

282 # Get the expected value type for this characteristic 

283 value_type = self._get_characteristic_value_type_class(characteristic) 

284 if value_type and hasattr(value_type, "__init__") and not isinstance(value_type, str): 

285 try: 

286 # Try to construct the dataclass from dict 

287 value = value_type(**value) 

288 logger.debug("Converted dict to %s", value_type.__name__) 

289 except (TypeError, ValueError) as e: 

290 type_name = getattr(value_type, "__name__", str(value_type)) 

291 raise TypeError(f"Failed to convert dict to {type_name} for characteristic {char}: {e}") from e 

292 

293 # Encode using build_value (with validation) or encode_value (without) 

294 try: 

295 if validate: 

296 encoded = characteristic.build_value(value) 

297 logger.debug("Successfully encoded %s with validation", characteristic.name) 

298 else: 

299 encoded = characteristic._encode_value(value) # pylint: disable=protected-access 

300 logger.debug("Successfully encoded %s without validation", characteristic.name) 

301 return bytes(encoded) 

302 except Exception as e: 

303 logger.error("Encoding failed for %s: %s", characteristic.name, e) 

304 raise 

305 

306 def _get_characteristic_value_type_class( # pylint: disable=too-many-return-statements,too-many-branches 

307 self, characteristic: BaseCharacteristic[Any] 

308 ) -> type[Any] | None: 

309 """Get the Python type class that a characteristic expects. 

310 

311 Args: 

312 characteristic: The characteristic instance 

313 

314 Returns: 

315 The type class, or None if it can't be determined 

316 

317 """ 

318 # Try to infer from decode_value return type annotation (resolve string annotations) 

319 if hasattr(characteristic, "_decode_value"): 

320 try: 

321 # Use get_type_hints to resolve string annotations 

322 # Need to pass the characteristic's module globals to resolve forward references 

323 module = inspect.getmodule(characteristic.__class__) 

324 globalns = getattr(module, "__dict__", {}) if module else {} 

325 type_hints = typing.get_type_hints(characteristic._decode_value, globalns=globalns) # pylint: disable=protected-access 

326 return_type = type_hints.get("return") 

327 if return_type and return_type is not type(None): 

328 return return_type # type: ignore[no-any-return] 

329 except Exception: # pylint: disable=broad-exception-caught 

330 # Fallback to direct signature inspection 

331 return_type = inspect.signature(characteristic._decode_value).return_annotation # pylint: disable=protected-access 

332 sig = inspect.signature(characteristic._decode_value) 

333 return_annotation = sig.return_annotation 

334 if return_annotation and return_annotation != inspect.Parameter.empty: 

335 # Check if it's not just a string annotation 

336 if not isinstance(return_annotation, str): 

337 return return_annotation # type: ignore[no-any-return] 

338 

339 # Try to get from _manual_value_type attribute 

340 # pylint: disable=protected-access # Need to inspect manual type info 

341 if hasattr(characteristic, "_manual_value_type"): 

342 manual_type = characteristic._manual_value_type 

343 if manual_type: 

344 # If it's a string, try to resolve it from templates module 

345 if isinstance(manual_type, str): 

346 if hasattr(templates, manual_type): 

347 return getattr(templates, manual_type) # type: ignore[no-any-return] 

348 

349 # Try to get from template first 

350 # pylint: disable=protected-access # Need to inspect template for type info 

351 if hasattr(characteristic, "_template") and characteristic._template: 

352 template = characteristic._template 

353 # Check if template has a value_type annotation 

354 if hasattr(template, "__orig_class__"): 

355 # Extract type from Generic 

356 args = typing.get_args(template.__orig_class__) 

357 if args: 

358 return args[0] # type: ignore[no-any-return] 

359 

360 # For simple types, check info.value_type 

361 info = characteristic.info 

362 if info.value_type == ValueType.INT: 

363 return int 

364 if info.value_type == ValueType.FLOAT: 

365 return float 

366 if info.value_type == ValueType.STRING: 

367 return str 

368 if info.value_type == ValueType.BOOL: 

369 return bool 

370 if info.value_type == ValueType.BYTES: 

371 return bytes 

372 

373 return None 

374 

375 def get_value_type(self, uuid: str) -> ValueType | None: 

376 """Get the expected value type for a characteristic. 

377 

378 Retrieves the ValueType enum indicating what type of data this 

379 characteristic produces (int, float, string, bytes, etc.). 

380 

381 Args: 

382 uuid: The characteristic UUID (16-bit short form or full 128-bit) 

383 

384 Returns: 

385 ValueType enum if characteristic is found, None otherwise 

386 

387 Example: 

388 Check what type a characteristic returns:: 

389 

390 from bluetooth_sig import BluetoothSIGTranslator 

391 

392 translator = BluetoothSIGTranslator() 

393 value_type = translator.get_value_type("2A19") 

394 print(value_type) # ValueType.INT 

395 

396 """ 

397 info = self.get_characteristic_info_by_uuid(uuid) 

398 return info.value_type if info else None 

399 

400 def supports(self, uuid: str) -> bool: 

401 """Check if a characteristic UUID is supported. 

402 

403 Args: 

404 uuid: The characteristic UUID to check 

405 

406 Returns: 

407 True if the characteristic has a parser/encoder, False otherwise 

408 

409 Example: 

410 Check if characteristic is supported:: 

411 

412 from bluetooth_sig import BluetoothSIGTranslator 

413 

414 translator = BluetoothSIGTranslator() 

415 if translator.supports("2A19"): 

416 result = translator.parse_characteristic("2A19", data) 

417 

418 """ 

419 try: 

420 bt_uuid = BluetoothUUID(uuid) 

421 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(bt_uuid) 

422 return char_class is not None 

423 except (ValueError, TypeError): 

424 return False 

425 

426 def get_characteristic_info_by_uuid(self, uuid: str) -> CharacteristicInfo | None: 

427 """Get information about a characteristic by UUID. 

428 

429 Retrieve metadata for a Bluetooth characteristic using its UUID. This includes 

430 the characteristic's name, description, value type, unit, and properties. 

431 

432 Args: 

433 uuid: The characteristic UUID (16-bit short form or full 128-bit) 

434 

435 Returns: 

436 [CharacteristicInfo][bluetooth_sig.CharacteristicInfo] with metadata or None if not found 

437 

438 Example: 

439 Get battery level characteristic info:: 

440 

441 from bluetooth_sig import BluetoothSIGTranslator 

442 

443 translator = BluetoothSIGTranslator() 

444 info = translator.get_characteristic_info_by_uuid("2A19") 

445 if info: 

446 print(f"Name: {info.name}") # Name: Battery Level 

447 

448 """ 

449 try: 

450 bt_uuid = BluetoothUUID(uuid) 

451 except ValueError: 

452 return None 

453 

454 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(bt_uuid) 

455 if not char_class: 

456 return None 

457 

458 # Create temporary instance to get metadata (no parameters needed for auto-resolution) 

459 try: 

460 temp_char = char_class() 

461 return temp_char.info 

462 except Exception: # pylint: disable=broad-exception-caught 

463 return None 

464 

465 def get_characteristic_uuid_by_name(self, name: CharacteristicName) -> BluetoothUUID | None: 

466 """Get the UUID for a characteristic name enum. 

467 

468 Args: 

469 name: CharacteristicName enum 

470 

471 Returns: 

472 Characteristic UUID or None if not found 

473 

474 """ 

475 info = self.get_characteristic_info_by_name(name) 

476 return info.uuid if info else None 

477 

478 def get_service_uuid_by_name(self, name: str | ServiceName) -> BluetoothUUID | None: 

479 """Get the UUID for a service name or enum. 

480 

481 Args: 

482 name: Service name or enum 

483 

484 Returns: 

485 Service UUID or None if not found 

486 

487 """ 

488 name_str = name.value if isinstance(name, ServiceName) else name 

489 info = self.get_service_info_by_name(name_str) 

490 return info.uuid if info else None 

491 

492 def get_characteristic_info_by_name(self, name: CharacteristicName) -> CharacteristicInfo | None: 

493 """Get characteristic info by enum name. 

494 

495 Args: 

496 name: CharacteristicName enum 

497 

498 Returns: 

499 CharacteristicInfo if found, None otherwise 

500 

501 """ 

502 char_class = CharacteristicRegistry.get_characteristic_class(name) 

503 if not char_class: 

504 return None 

505 

506 # Try get_configured_info first (for custom characteristics) 

507 info = char_class.get_configured_info() 

508 if info: 

509 return info 

510 

511 # For SIG characteristics, create temporary instance to get metadata 

512 try: 

513 temp_char = char_class() 

514 return temp_char.info 

515 except Exception: # pylint: disable=broad-exception-caught 

516 return None 

517 

518 def get_service_info_by_name(self, name: str) -> ServiceInfo | None: 

519 """Get service info by name instead of UUID. 

520 

521 Args: 

522 name: Service name 

523 

524 Returns: 

525 ServiceInfo if found, None otherwise 

526 

527 """ 

528 # Use UUID registry for name-based lookup 

529 try: 

530 uuid_info = uuid_registry.get_service_info(name) 

531 if uuid_info: 

532 return ServiceInfo(uuid=uuid_info.uuid, name=uuid_info.name, characteristics=[]) 

533 except Exception: # pylint: disable=broad-exception-caught 

534 pass 

535 

536 return None 

537 

538 def get_service_info_by_uuid(self, uuid: str) -> ServiceInfo | None: 

539 """Get information about a service by UUID. 

540 

541 Args: 

542 uuid: The service UUID 

543 

544 Returns: 

545 ServiceInfo with metadata or None if not found 

546 

547 """ 

548 service_class = GattServiceRegistry.get_service_class_by_uuid(BluetoothUUID(uuid)) 

549 if not service_class: 

550 return None 

551 

552 try: 

553 temp_service = service_class() 

554 # Convert characteristics dict to list of CharacteristicInfo 

555 char_infos: list[CharacteristicInfo] = [] 

556 for _, char_instance in temp_service.characteristics.items(): 

557 # Use public info property 

558 char_infos.append(char_instance.info) 

559 return ServiceInfo( 

560 uuid=temp_service.uuid, 

561 name=temp_service.name, 

562 characteristics=char_infos, 

563 ) 

564 except Exception: # pylint: disable=broad-exception-caught 

565 return None 

566 

567 def list_supported_characteristics(self) -> dict[str, str]: 

568 """List all supported characteristics with their names and UUIDs. 

569 

570 Returns: 

571 Dictionary mapping characteristic names to UUIDs 

572 

573 """ 

574 result: dict[str, str] = {} 

575 for name, char_class in CharacteristicRegistry.get_all_characteristics().items(): 

576 # Try to get configured_info from class using public accessor 

577 configured_info = char_class.get_configured_info() 

578 if configured_info: 

579 # Convert CharacteristicName enum to string for dict key 

580 name_str = name.value if hasattr(name, "value") else str(name) 

581 result[name_str] = str(configured_info.uuid) 

582 return result 

583 

584 def list_supported_services(self) -> dict[str, str]: 

585 """List all supported services with their names and UUIDs. 

586 

587 Returns: 

588 Dictionary mapping service names to UUIDs 

589 

590 """ 

591 result: dict[str, str] = {} 

592 for service_class in GattServiceRegistry.get_all_services(): 

593 try: 

594 temp_service = service_class() 

595 service_name = getattr(temp_service, "_service_name", service_class.__name__) 

596 result[service_name] = str(temp_service.uuid) 

597 except Exception: # pylint: disable=broad-exception-caught 

598 continue 

599 return result 

600 

601 def process_services(self, services: dict[str, dict[str, CharacteristicDataDict]]) -> None: 

602 """Process discovered services and their characteristics. 

603 

604 Args: 

605 services: Dictionary of service UUIDs to their characteristics 

606 

607 """ 

608 for uuid_str, service_data in services.items(): 

609 uuid = BluetoothUUID(uuid_str) 

610 # Convert dict[str, dict] to ServiceDiscoveryData 

611 characteristics: dict[BluetoothUUID, CharacteristicInfo] = {} 

612 for char_uuid_str, char_data in service_data.get("characteristics", {}).items(): 

613 char_uuid = BluetoothUUID(char_uuid_str) 

614 # Create CharacteristicInfo from dict 

615 vtype_raw = char_data.get("value_type", "bytes") 

616 if isinstance(vtype_raw, str): 

617 value_type = ValueType(vtype_raw) 

618 elif isinstance(vtype_raw, ValueType): 

619 value_type = vtype_raw 

620 else: 

621 value_type = ValueType.BYTES 

622 characteristics[char_uuid] = CharacteristicInfo( 

623 uuid=char_uuid, 

624 name=char_data.get("name", ""), 

625 unit=char_data.get("unit", ""), 

626 value_type=value_type, 

627 ) 

628 service = GattServiceRegistry.create_service(uuid, characteristics) 

629 if service: 

630 self._services[str(uuid)] = service 

631 

632 def get_service_by_uuid(self, uuid: str) -> BaseGattService | None: 

633 """Get a service instance by UUID. 

634 

635 Args: 

636 uuid: The service UUID 

637 

638 Returns: 

639 Service instance if found, None otherwise 

640 

641 """ 

642 return self._services.get(uuid) 

643 

644 @property 

645 def discovered_services(self) -> list[BaseGattService]: 

646 """Get list of discovered service instances. 

647 

648 Returns: 

649 List of discovered service instances 

650 

651 """ 

652 return list(self._services.values()) 

653 

654 def clear_services(self) -> None: 

655 """Clear all discovered services.""" 

656 self._services.clear() 

657 

658 def get_sig_info_by_name(self, name: str) -> SIGInfo | None: 

659 """Get Bluetooth SIG information for a characteristic or service by name. 

660 

661 Args: 

662 name: Characteristic or service name 

663 

664 Returns: 

665 CharacteristicInfo or ServiceInfo if found, None otherwise 

666 

667 """ 

668 # Use the UUID registry for name-based lookups (string inputs). 

669 try: 

670 char_info = uuid_registry.get_characteristic_info(name) 

671 if char_info: 

672 # Build CharacteristicInfo from registry data 

673 value_type = ValueType.UNKNOWN 

674 if char_info.value_type: 

675 try: 

676 value_type = ValueType(char_info.value_type) 

677 except (ValueError, KeyError): 

678 value_type = ValueType.UNKNOWN 

679 return CharacteristicInfo( 

680 uuid=char_info.uuid, 

681 name=char_info.name, 

682 value_type=value_type, 

683 unit=char_info.unit or "", 

684 ) 

685 except Exception: # pylint: disable=broad-exception-caught 

686 pass 

687 

688 # Try service 

689 service_info = self.get_service_info_by_name(name) 

690 if service_info: 

691 return service_info 

692 

693 return None 

694 

695 def get_sig_info_by_uuid(self, uuid: str) -> SIGInfo | None: 

696 """Get Bluetooth SIG information for a UUID. 

697 

698 Args: 

699 uuid: UUID string (with or without dashes) 

700 

701 Returns: 

702 CharacteristicInfo or ServiceInfo if found, None otherwise 

703 

704 """ 

705 # Try characteristic first 

706 char_info = self.get_characteristic_info_by_uuid(uuid) 

707 if char_info: 

708 return char_info 

709 

710 # Try service 

711 service_info = self.get_service_info_by_uuid(uuid) 

712 if service_info: 

713 return service_info 

714 

715 return None 

716 

717 def parse_characteristics( 

718 self, 

719 char_data: dict[str, bytes], 

720 ctx: CharacteristicContext | None = None, 

721 ) -> dict[str, Any]: 

722 r"""Parse multiple characteristics at once with dependency-aware ordering. 

723 

724 This method automatically handles multi-characteristic dependencies by parsing 

725 independent characteristics first, then parsing characteristics that depend on them. 

726 The parsing order is determined by the `required_dependencies` and `optional_dependencies` 

727 attributes declared on characteristic classes. 

728 

729 Required dependencies MUST be present and successfully parsed; missing required 

730 dependencies result in parse failure with MissingDependencyError. Optional dependencies 

731 enrich parsing when available but are not mandatory. 

732 

733 Args: 

734 char_data: Dictionary mapping UUIDs to raw data bytes 

735 ctx: Optional CharacteristicContext used as the starting context 

736 

737 Returns: 

738 Dictionary mapping UUIDs to parsed values 

739 

740 Raises: 

741 ValueError: If circular dependencies are detected 

742 CharacteristicParseError: If parsing fails for any characteristic 

743 

744 Example: 

745 Parse multiple environmental characteristics:: 

746 

747 from bluetooth_sig import BluetoothSIGTranslator 

748 

749 translator = BluetoothSIGTranslator() 

750 data = { 

751 "2A6E": b"\\x0A\\x00", # Temperature 

752 "2A6F": b"\\x32\\x00", # Humidity 

753 } 

754 try: 

755 results = translator.parse_characteristics(data) 

756 for uuid, value in results.items(): 

757 print(f"{uuid}: {value}") 

758 except CharacteristicParseError as e: 

759 print(f"Parse failed: {e}") 

760 

761 """ 

762 return self._parse_characteristics_batch(char_data, ctx) 

763 

764 def _parse_characteristics_batch( 

765 self, 

766 char_data: dict[str, bytes], 

767 ctx: CharacteristicContext | None, 

768 ) -> dict[str, Any]: 

769 """Parse multiple characteristics using dependency-aware ordering.""" 

770 logger.debug("Batch parsing %d characteristics", len(char_data)) 

771 

772 # Prepare characteristics and dependencies 

773 uuid_to_characteristic, uuid_to_required_deps, uuid_to_optional_deps = ( 

774 self._prepare_characteristic_dependencies(char_data) 

775 ) 

776 

777 # Resolve dependency order 

778 sorted_uuids = self._resolve_dependency_order(char_data, uuid_to_required_deps, uuid_to_optional_deps) 

779 

780 # Build base context 

781 base_context = ctx 

782 

783 results: dict[str, Any] = {} 

784 for uuid_str in sorted_uuids: 

785 raw_data = char_data[uuid_str] 

786 characteristic = uuid_to_characteristic.get(uuid_str) 

787 

788 missing_required = self._find_missing_required_dependencies( 

789 uuid_str, 

790 uuid_to_required_deps.get(uuid_str, []), 

791 results, 

792 base_context, 

793 ) 

794 

795 if missing_required: 

796 raise MissingDependencyError(characteristic.name if characteristic else "Unknown", missing_required) 

797 

798 self._log_optional_dependency_gaps( 

799 uuid_str, 

800 uuid_to_optional_deps.get(uuid_str, []), 

801 results, 

802 base_context, 

803 ) 

804 

805 parse_context = self._build_parse_context(base_context, results) 

806 

807 try: 

808 value = self.parse_characteristic(uuid_str, raw_data, ctx=parse_context) 

809 results[uuid_str] = value 

810 except (CharacteristicParseError, SpecialValueDetected): 

811 # Re-raise parse errors for individual characteristics - intentional passthrough 

812 raise # pylint: disable=try-except-raise 

813 

814 logger.debug("Batch parsing complete: %d results", len(results)) 

815 return results 

816 

817 def _prepare_characteristic_dependencies( 

818 self, characteristic_data: Mapping[str, bytes] 

819 ) -> tuple[dict[str, BaseCharacteristic[Any]], dict[str, list[str]], dict[str, list[str]]]: 

820 """Instantiate characteristics once and collect declared dependencies.""" 

821 uuid_to_characteristic: dict[str, BaseCharacteristic[Any]] = {} 

822 uuid_to_required_deps: dict[str, list[str]] = {} 

823 uuid_to_optional_deps: dict[str, list[str]] = {} 

824 

825 for uuid in characteristic_data: 

826 characteristic = CharacteristicRegistry.create_characteristic(uuid) 

827 if characteristic is None: 

828 continue 

829 

830 uuid_to_characteristic[uuid] = characteristic 

831 

832 required = characteristic.required_dependencies 

833 optional = characteristic.optional_dependencies 

834 

835 if required: 

836 uuid_to_required_deps[uuid] = required 

837 logger.debug("Characteristic %s has required dependencies: %s", uuid, required) 

838 if optional: 

839 uuid_to_optional_deps[uuid] = optional 

840 logger.debug("Characteristic %s has optional dependencies: %s", uuid, optional) 

841 

842 return uuid_to_characteristic, uuid_to_required_deps, uuid_to_optional_deps 

843 

844 def _resolve_dependency_order( 

845 self, 

846 characteristic_data: Mapping[str, bytes], 

847 uuid_to_required_deps: Mapping[str, list[str]], 

848 uuid_to_optional_deps: Mapping[str, list[str]], 

849 ) -> list[str]: 

850 """Topologically sort characteristics based on declared dependencies.""" 

851 try: 

852 sorter: TopologicalSorter[str] = TopologicalSorter() 

853 for uuid in characteristic_data: 

854 all_deps = uuid_to_required_deps.get(uuid, []) + uuid_to_optional_deps.get(uuid, []) 

855 batch_deps = [dep for dep in all_deps if dep in characteristic_data] 

856 sorter.add(uuid, *batch_deps) 

857 

858 sorted_sequence = sorter.static_order() 

859 sorted_uuids = list(sorted_sequence) 

860 logger.debug("Dependency-sorted parsing order: %s", sorted_uuids) 

861 return sorted_uuids 

862 except Exception as exc: # pylint: disable=broad-exception-caught 

863 logger.warning("Dependency sorting failed: %s. Using original order.", exc) 

864 return list(characteristic_data.keys()) 

865 

866 def _find_missing_required_dependencies( 

867 self, 

868 uuid: str, 

869 required_deps: list[str], 

870 results: Mapping[str, Any], 

871 base_context: CharacteristicContext | None, 

872 ) -> list[str]: 

873 """Determine which required dependencies are unavailable for a characteristic.""" 

874 if not required_deps: 

875 return [] 

876 

877 missing: list[str] = [] 

878 other_characteristics = ( 

879 base_context.other_characteristics if base_context and base_context.other_characteristics else None 

880 ) 

881 

882 for dep_uuid in required_deps: 

883 if dep_uuid in results: 

884 # If it's in results, it was successfully parsed 

885 continue 

886 

887 if other_characteristics and dep_uuid in other_characteristics: 

888 # If it's in context, assume it's available 

889 continue 

890 

891 missing.append(dep_uuid) 

892 

893 if missing: 

894 logger.debug("Characteristic %s missing required dependencies: %s", uuid, missing) 

895 

896 return missing 

897 

898 def _log_optional_dependency_gaps( 

899 self, 

900 uuid: str, 

901 optional_deps: list[str], 

902 results: Mapping[str, Any], 

903 base_context: CharacteristicContext | None, 

904 ) -> None: 

905 """Emit debug logs when optional dependencies are unavailable.""" 

906 if not optional_deps: 

907 return 

908 

909 other_characteristics = ( 

910 base_context.other_characteristics if base_context and base_context.other_characteristics else None 

911 ) 

912 

913 for dep_uuid in optional_deps: 

914 if dep_uuid in results: 

915 continue 

916 if other_characteristics and dep_uuid in other_characteristics: 

917 continue 

918 logger.debug("Optional dependency %s not available for %s", dep_uuid, uuid) 

919 

920 def _build_parse_context( 

921 self, 

922 base_context: CharacteristicContext | None, 

923 results: Mapping[str, Any], 

924 ) -> CharacteristicContext: 

925 """Construct the context passed to per-characteristic parsers.""" 

926 if base_context is not None: 

927 return CharacteristicContext( 

928 device_info=base_context.device_info, 

929 advertisement=base_context.advertisement, 

930 other_characteristics=results, 

931 raw_service=base_context.raw_service, 

932 ) 

933 

934 return CharacteristicContext(other_characteristics=results) 

935 

936 def get_characteristics_info_by_uuids(self, uuids: list[str]) -> dict[str, CharacteristicInfo | None]: 

937 """Get information about multiple characteristics by UUID. 

938 

939 Args: 

940 uuids: List of characteristic UUIDs 

941 

942 Returns: 

943 Dictionary mapping UUIDs to CharacteristicInfo 

944 (or None if not found) 

945 

946 """ 

947 results: dict[str, CharacteristicInfo | None] = {} 

948 for uuid in uuids: 

949 results[uuid] = self.get_characteristic_info_by_uuid(uuid) 

950 return results 

951 

952 def validate_characteristic_data(self, uuid: str, data: bytes) -> ValidationResult: 

953 """Validate characteristic data format against SIG specifications. 

954 

955 Args: 

956 uuid: The characteristic UUID 

957 data: Raw data bytes to validate 

958 

959 Returns: 

960 ValidationResult with validation details 

961 

962 """ 

963 try: 

964 # Attempt to parse the data - if it succeeds, format is valid 

965 self.parse_characteristic(uuid, data) 

966 # Try to get expected_length 

967 try: 

968 bt_uuid = BluetoothUUID(uuid) 

969 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(bt_uuid) 

970 expected = char_class.expected_length if char_class else None 

971 except Exception: # pylint: disable=broad-exception-caught 

972 expected = None 

973 return ValidationResult( 

974 is_valid=True, 

975 actual_length=len(data), 

976 expected_length=expected, 

977 error_message="", 

978 ) 

979 except Exception as e: # pylint: disable=broad-exception-caught 

980 # If parsing failed, data format is invalid 

981 # Try to get expected_length even on failure 

982 try: 

983 bt_uuid = BluetoothUUID(uuid) 

984 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(bt_uuid) 

985 expected = char_class.expected_length if char_class else None 

986 except Exception: # pylint: disable=broad-exception-caught 

987 expected = None 

988 return ValidationResult( 

989 is_valid=False, 

990 actual_length=len(data), 

991 expected_length=expected, 

992 error_message=str(e), 

993 ) 

994 

995 def get_service_characteristics(self, service_uuid: str) -> list[str]: # pylint: disable=too-many-return-statements 

996 """Get the characteristic UUIDs associated with a service. 

997 

998 Args: 

999 service_uuid: The service UUID 

1000 

1001 Returns: 

1002 List of characteristic UUIDs for this service 

1003 

1004 """ 

1005 service_class = GattServiceRegistry.get_service_class_by_uuid(BluetoothUUID(service_uuid)) 

1006 if not service_class: 

1007 return [] 

1008 

1009 try: 

1010 temp_service = service_class() 

1011 required_chars = temp_service.get_required_characteristics() 

1012 return [str(k) for k in required_chars] 

1013 except Exception: # pylint: disable=broad-exception-caught 

1014 return [] 

1015 

1016 def register_custom_characteristic_class( 

1017 self, 

1018 uuid_or_name: str, 

1019 cls: type[BaseCharacteristic[Any]], 

1020 info: CharacteristicInfo | None = None, 

1021 override: bool = False, 

1022 ) -> None: 

1023 """Register a custom characteristic class at runtime. 

1024 

1025 Args: 

1026 uuid_or_name: The characteristic UUID or name 

1027 cls: The characteristic class to register 

1028 info: Optional CharacteristicInfo with metadata (name, unit, value_type) 

1029 override: Whether to override existing registrations 

1030 

1031 Raises: 

1032 TypeError: If cls does not inherit from BaseCharacteristic 

1033 ValueError: If UUID conflicts with existing registration and override=False 

1034 

1035 Example:: 

1036 

1037 from bluetooth_sig import BluetoothSIGTranslator, CharacteristicInfo, ValueType 

1038 from bluetooth_sig.types import BluetoothUUID 

1039 

1040 translator = BluetoothSIGTranslator() 

1041 info = CharacteristicInfo( 

1042 uuid=BluetoothUUID("12345678-1234-1234-1234-123456789abc"), 

1043 name="Custom Temperature", 

1044 unit="°C", 

1045 value_type=ValueType.FLOAT, 

1046 ) 

1047 translator.register_custom_characteristic_class(str(info.uuid), MyCustomCharacteristic, info=info) 

1048 

1049 """ 

1050 # Register the class 

1051 CharacteristicRegistry.register_characteristic_class(uuid_or_name, cls, override) 

1052 

1053 # Register metadata in uuid_registry if provided 

1054 if info: 

1055 uuid_registry.register_characteristic( 

1056 uuid=info.uuid, 

1057 name=info.name or cls.__name__, 

1058 identifier=info.id, 

1059 unit=info.unit, 

1060 value_type=info.value_type, 

1061 override=override, 

1062 ) 

1063 

1064 def register_custom_service_class( 

1065 self, 

1066 uuid_or_name: str, 

1067 cls: type[BaseGattService], 

1068 info: ServiceInfo | None = None, 

1069 override: bool = False, 

1070 ) -> None: 

1071 """Register a custom service class at runtime. 

1072 

1073 Args: 

1074 uuid_or_name: The service UUID or name 

1075 cls: The service class to register 

1076 info: Optional ServiceInfo with metadata (name) 

1077 override: Whether to override existing registrations 

1078 

1079 Raises: 

1080 TypeError: If cls does not inherit from BaseGattService 

1081 ValueError: If UUID conflicts with existing registration and override=False 

1082 

1083 Example:: 

1084 

1085 from bluetooth_sig import BluetoothSIGTranslator, ServiceInfo 

1086 from bluetooth_sig.types import BluetoothUUID 

1087 

1088 translator = BluetoothSIGTranslator() 

1089 info = ServiceInfo(uuid=BluetoothUUID("12345678-1234-1234-1234-123456789abc"), name="Custom Service") 

1090 translator.register_custom_service_class(str(info.uuid), MyCustomService, info=info) 

1091 

1092 """ 

1093 # Register the class 

1094 GattServiceRegistry.register_service_class(uuid_or_name, cls, override) 

1095 

1096 # Register metadata in uuid_registry if provided 

1097 if info: 

1098 uuid_registry.register_service( 

1099 uuid=info.uuid, 

1100 name=info.name or cls.__name__, 

1101 identifier=info.id, 

1102 override=override, 

1103 ) 

1104 

1105 # Async methods for non-blocking operation in async contexts 

1106 

1107 @overload 

1108 async def parse_characteristic_async( 

1109 self, 

1110 char: type[BaseCharacteristic[T]], 

1111 raw_data: bytes, 

1112 ctx: CharacteristicContext | None = ..., 

1113 ) -> T: ... 

1114 

1115 @overload 

1116 async def parse_characteristic_async( 

1117 self, 

1118 char: str | BluetoothUUID, 

1119 raw_data: bytes, 

1120 ctx: CharacteristicContext | None = ..., 

1121 ) -> Any: ... # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

1122 

1123 async def parse_characteristic_async( 

1124 self, 

1125 char: str | BluetoothUUID | type[BaseCharacteristic[T]], 

1126 raw_data: bytes, 

1127 ctx: CharacteristicContext | None = None, 

1128 ) -> T | Any: # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

1129 """Parse characteristic data in an async-compatible manner. 

1130 

1131 This is an async wrapper that allows characteristic parsing to be used 

1132 in async contexts. The actual parsing is performed synchronously as it's 

1133 a fast, CPU-bound operation that doesn't benefit from async I/O. 

1134 

1135 Args: 

1136 char: Characteristic class (type-safe) or UUID string/BluetoothUUID (not type-safe). 

1137 raw_data: Raw bytes from the characteristic 

1138 ctx: Optional context providing device-level info 

1139 

1140 Returns: 

1141 Parsed value. Return type is inferred when passing characteristic class. 

1142 

1143 Raises: 

1144 SpecialValueDetected: Special sentinel value detected 

1145 CharacteristicParseError: Parse/validation failure 

1146 

1147 Example:: 

1148 

1149 async with BleakClient(address) as client: 

1150 data = await client.read_gatt_char("2A19") 

1151 

1152 # Type-safe: pass characteristic class 

1153 from bluetooth_sig.gatt.characteristics import BatteryLevelCharacteristic 

1154 

1155 level: int = await translator.parse_characteristic_async(BatteryLevelCharacteristic, data) 

1156 

1157 # Not type-safe: pass UUID string 

1158 value = await translator.parse_characteristic_async("2A19", data) 

1159 

1160 """ 

1161 # Handle characteristic class input (type-safe path) 

1162 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

1163 return self.parse_characteristic(char, raw_data, ctx) 

1164 

1165 # Convert to string for consistency with sync API 

1166 uuid_str = str(char) if isinstance(char, BluetoothUUID) else char 

1167 

1168 # Delegate to sync implementation 

1169 return self.parse_characteristic(uuid_str, raw_data, ctx) 

1170 

1171 async def parse_characteristics_async( 

1172 self, 

1173 char_data: dict[str, bytes], 

1174 ctx: CharacteristicContext | None = None, 

1175 ) -> dict[str, Any]: 

1176 """Parse multiple characteristics in an async-compatible manner. 

1177 

1178 This is an async wrapper for batch characteristic parsing. The parsing 

1179 is performed synchronously as it's a fast, CPU-bound operation. This method 

1180 allows batch parsing to be used naturally in async workflows. 

1181 

1182 Args: 

1183 char_data: Dictionary mapping UUIDs to raw data bytes 

1184 ctx: Optional context 

1185 

1186 Returns: 

1187 Dictionary mapping UUIDs to parsed values 

1188 

1189 Example:: 

1190 

1191 async with BleakClient(address) as client: 

1192 # Read multiple characteristics 

1193 char_data = {} 

1194 for uuid in ["2A19", "2A6E", "2A6F"]: 

1195 char_data[uuid] = await client.read_gatt_char(uuid) 

1196 

1197 # Parse all asynchronously 

1198 results = await translator.parse_characteristics_async(char_data) 

1199 for uuid, value in results.items(): 

1200 print(f"{uuid}: {value}") 

1201 """ 

1202 # Delegate directly to sync implementation 

1203 # The sync implementation already handles dependency ordering 

1204 return self.parse_characteristics(char_data, ctx) 

1205 

1206 async def encode_characteristic_async( 

1207 self, 

1208 uuid: str | BluetoothUUID, 

1209 value: Any, # noqa: ANN401 

1210 validate: bool = True, 

1211 ) -> bytes: 

1212 """Encode characteristic value in an async-compatible manner. 

1213 

1214 This is an async wrapper that allows characteristic encoding to be used 

1215 in async contexts. The actual encoding is performed synchronously as it's 

1216 a fast, CPU-bound operation that doesn't benefit from async I/O. 

1217 

1218 Args: 

1219 uuid: The characteristic UUID (string or BluetoothUUID) 

1220 value: The value to encode (dataclass, dict, or primitive) 

1221 validate: If True, validates before encoding (default: True) 

1222 

1223 Returns: 

1224 Encoded bytes ready to write 

1225 

1226 Example:: 

1227 

1228 async with BleakClient(address) as client: 

1229 data = await translator.encode_characteristic_async("2A19", 85) 

1230 await client.write_gatt_char("2A19", data) 

1231 """ 

1232 # Convert to string for consistency with sync API 

1233 uuid_str = str(uuid) if isinstance(uuid, BluetoothUUID) else uuid 

1234 

1235 # Delegate to sync implementation 

1236 return self.encode_characteristic(uuid_str, value, validate) 

1237 

1238 def create_value(self, uuid: str, **kwargs: Any) -> Any: # noqa: ANN401 

1239 """Create a properly typed value instance for a characteristic. 

1240 

1241 This is a convenience method that constructs the appropriate dataclass 

1242 or value type for a characteristic, which can then be passed to 

1243 encode_characteristic() or used directly. 

1244 

1245 Args: 

1246 uuid: The characteristic UUID 

1247 **kwargs: Field values for the characteristic's type 

1248 

1249 Returns: 

1250 Properly typed value instance 

1251 

1252 Raises: 

1253 ValueError: If UUID is invalid or characteristic not found 

1254 TypeError: If kwargs don't match the characteristic's expected fields 

1255 

1256 Example: 

1257 Create complex characteristic values:: 

1258 

1259 from bluetooth_sig import BluetoothSIGTranslator 

1260 

1261 translator = BluetoothSIGTranslator() 

1262 

1263 # Create acceleration data 

1264 accel = translator.create_value("2C1D", x_axis=1.5, y_axis=0.5, z_axis=9.8) 

1265 

1266 # Encode and write 

1267 data = translator.encode_characteristic("2C1D", accel) 

1268 await client.write_gatt_char("2C1D", data) 

1269 

1270 """ 

1271 # Create characteristic instance 

1272 characteristic = CharacteristicRegistry.create_characteristic(uuid) 

1273 if not characteristic: 

1274 raise ValueError(f"No characteristic found for UUID: {uuid}") 

1275 

1276 # Get the value type 

1277 value_type = self._get_characteristic_value_type_class(characteristic) 

1278 

1279 if not value_type: 

1280 # For simple types, just return the single value if provided 

1281 if len(kwargs) == 1: 

1282 return next(iter(kwargs.values())) 

1283 raise ValueError( 

1284 f"Cannot determine value type for characteristic {uuid}. " 

1285 "Try passing a dict to encode_characteristic() instead." 

1286 ) 

1287 

1288 # Handle simple primitive types 

1289 if value_type in (int, float, str, bool, bytes): 

1290 if len(kwargs) == 1: 

1291 value = next(iter(kwargs.values())) 

1292 if not isinstance(value, value_type): 

1293 type_name = getattr(value_type, "__name__", str(value_type)) 

1294 raise TypeError(f"Expected {type_name}, got {type(value).__name__}") 

1295 return value 

1296 type_name = getattr(value_type, "__name__", str(value_type)) 

1297 raise TypeError(f"Simple type {type_name} expects a single value") 

1298 

1299 # Construct complex type from kwargs 

1300 try: 

1301 return value_type(**kwargs) 

1302 except (TypeError, ValueError) as e: 

1303 type_name = getattr(value_type, "__name__", str(value_type)) 

1304 raise TypeError(f"Failed to create {type_name} for characteristic {uuid}: {e}") from e 

1305 

1306 

1307# Global instance 

1308BluetoothSIG = BluetoothSIGTranslator()