Coverage for src / bluetooth_sig / core / translator.py: 76%
434 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
1# pylint: disable=too-many-lines # TODO split up Comprehensive translator with many methods
2"""Core Bluetooth SIG standards translator functionality."""
4from __future__ import annotations
6import inspect
7import logging
8import typing
9from collections.abc import Mapping
10from graphlib import TopologicalSorter
11from typing import Any, TypeVar, overload
13from ..gatt.characteristics import templates
14from ..gatt.characteristics.base import BaseCharacteristic
15from ..gatt.characteristics.registry import CharacteristicRegistry
16from ..gatt.exceptions import CharacteristicParseError, MissingDependencyError, SpecialValueDetected
17from ..gatt.services import ServiceName
18from ..gatt.services.base import BaseGattService
19from ..gatt.services.registry import GattServiceRegistry
20from ..gatt.uuid_registry import uuid_registry
21from ..types import (
22 CharacteristicContext,
23 CharacteristicInfo,
24 ServiceInfo,
25 SIGInfo,
26 ValidationResult,
27)
28from ..types.gatt_enums import CharacteristicName, ValueType
29from ..types.uuid import BluetoothUUID
31# Type alias for characteristic data in process_services
32CharacteristicDataDict = dict[str, Any]
34# Type variable for generic characteristic return types
35T = TypeVar("T")
37logger = logging.getLogger(__name__)
40class BluetoothSIGTranslator: # pylint: disable=too-many-public-methods
41 """Pure Bluetooth SIG standards translator for characteristic and service interpretation.
43 This class provides the primary API surface for Bluetooth SIG standards translation,
44 covering characteristic parsing, service discovery, UUID resolution, and registry
45 management.
47 Singleton Pattern:
48 This class is implemented as a singleton to provide a global registry for
49 custom characteristics and services. Access the singleton instance using
50 `BluetoothSIGTranslator.get_instance()` or the module-level `translator` variable.
52 Key features:
53 - Parse raw BLE characteristic data using Bluetooth SIG specifications
54 - Resolve UUIDs to [CharacteristicInfo][bluetooth_sig.types.CharacteristicInfo]
55 and [ServiceInfo][bluetooth_sig.types.ServiceInfo]
56 - Create BaseGattService instances from service UUIDs
57 - Access comprehensive registry of supported characteristics and services
59 Note: This class intentionally has >20 public methods as it serves as the
60 primary API surface for Bluetooth SIG standards translation. The methods are
61 organized by functionality and reducing them would harm API clarity.
62 """
64 _instance: BluetoothSIGTranslator | None = None
65 _instance_lock: bool = False # Simple lock to prevent recursion
67 def __new__(cls) -> BluetoothSIGTranslator:
68 """Create or return the singleton instance."""
69 if cls._instance is None:
70 cls._instance = super().__new__(cls)
71 return cls._instance
73 @classmethod
74 def get_instance(cls) -> BluetoothSIGTranslator:
75 """Get the singleton instance of BluetoothSIGTranslator.
77 Returns:
78 The singleton BluetoothSIGTranslator instance
80 Example::
82 from bluetooth_sig import BluetoothSIGTranslator
84 # Get the singleton instance
85 translator = BluetoothSIGTranslator.get_instance()
86 """
87 if cls._instance is None:
88 cls._instance = cls()
89 return cls._instance
91 def __init__(self) -> None:
92 """Initialize the SIG translator (singleton pattern)."""
93 # Only initialize once
94 if self.__class__._instance_lock:
95 return
96 self.__class__._instance_lock = True
98 self._services: dict[str, BaseGattService] = {}
100 def __str__(self) -> str:
101 """Return string representation of the translator."""
102 return "BluetoothSIGTranslator(pure SIG standards)"
104 @overload
105 def parse_characteristic(
106 self,
107 char: type[BaseCharacteristic[T]],
108 raw_data: bytes | bytearray,
109 ctx: CharacteristicContext | None = ...,
110 ) -> T: ...
112 @overload
113 def parse_characteristic(
114 self,
115 char: str,
116 raw_data: bytes | bytearray,
117 ctx: CharacteristicContext | None = ...,
118 ) -> Any: ... # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe
120 def parse_characteristic(
121 self,
122 char: str | type[BaseCharacteristic[T]],
123 raw_data: bytes | bytearray,
124 ctx: CharacteristicContext | None = None,
125 ) -> T | Any: # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe
126 r"""Parse a characteristic's raw data using Bluetooth SIG standards.
128 Args:
129 char: Characteristic class (type-safe) or UUID string (not type-safe).
130 raw_data: Raw bytes from the characteristic (bytes or bytearray)
131 ctx: Optional CharacteristicContext providing device-level info
133 Returns:
134 Parsed value. Return type is inferred when passing characteristic class.
136 - Primitives: ``int``, ``float``, ``str``, ``bool``
137 - Dataclasses: ``NavigationData``, ``HeartRateMeasurement``, etc.
138 - Special values: ``SpecialValueResult`` (via exception)
140 Raises:
141 SpecialValueDetected: Special sentinel value detected
142 CharacteristicParseError: Parse/validation failure
144 Example::
146 from bluetooth_sig import BluetoothSIGTranslator
147 from bluetooth_sig.gatt.characteristics import BatteryLevelCharacteristic
149 translator = BluetoothSIGTranslator()
151 # Type-safe: pass characteristic class, return type is inferred
152 level: int = translator.parse_characteristic(BatteryLevelCharacteristic, b"\\x64")
154 # Not type-safe: pass UUID string, returns Any
155 value = translator.parse_characteristic("2A19", b"\\x64")
157 """
158 # Handle characteristic class input (type-safe path)
159 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
160 char_instance = char()
161 logger.debug("Parsing characteristic class=%s, data_len=%d", char.__name__, len(raw_data))
162 try:
163 value = char_instance.parse_value(raw_data, ctx)
164 logger.debug("Successfully parsed %s: %s", char_instance.name, value)
165 return value
166 except SpecialValueDetected as e:
167 logger.debug("Special value detected for %s: %s", char_instance.name, e.special_value.meaning)
168 raise
169 except CharacteristicParseError as e:
170 logger.warning("Parse failed for %s: %s", char_instance.name, e)
171 raise
173 # Handle string UUID input (not type-safe path)
174 logger.debug("Parsing characteristic UUID=%s, data_len=%d", char, len(raw_data))
176 # Create characteristic instance for parsing
177 characteristic = CharacteristicRegistry.create_characteristic(char)
179 if characteristic:
180 logger.debug("Found parser for UUID=%s: %s", char, type(characteristic).__name__)
181 # Use the parse_value method which raises exceptions on failure
182 try:
183 value = characteristic.parse_value(raw_data, ctx)
184 logger.debug("Successfully parsed %s: %s", characteristic.name, value)
185 return value
186 except SpecialValueDetected as e:
187 logger.debug("Special value detected for %s: %s", characteristic.name, e.special_value.meaning)
188 raise
189 except CharacteristicParseError as e:
190 logger.warning("Parse failed for %s: %s", characteristic.name, e)
191 raise
192 else:
193 # No parser found, raise an error
194 logger.info("No parser available for UUID=%s", char)
195 raise CharacteristicParseError(
196 message=f"No parser available for characteristic UUID: {char}",
197 name="Unknown",
198 uuid=BluetoothUUID(char),
199 raw_data=bytes(raw_data),
200 )
202 @overload
203 def encode_characteristic(
204 self,
205 char: type[BaseCharacteristic[T]],
206 value: T,
207 validate: bool = ...,
208 ) -> bytes: ...
210 @overload
211 def encode_characteristic(
212 self,
213 char: str,
214 value: Any, # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe
215 validate: bool = ...,
216 ) -> bytes: ...
218 def encode_characteristic(
219 self,
220 char: str | type[BaseCharacteristic[T]],
221 value: T | Any, # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe
222 validate: bool = True,
223 ) -> bytes:
224 r"""Encode a value for writing to a characteristic.
226 Args:
227 char: Characteristic class (type-safe) or UUID string (not type-safe).
228 value: The value to encode. Type is checked when using characteristic class.
229 validate: If True, validates the value before encoding (default: True)
231 Returns:
232 Encoded bytes ready to write to the characteristic
234 Raises:
235 ValueError: If UUID is invalid, characteristic not found, or value is invalid
236 TypeError: If value type doesn't match characteristic's expected type
237 CharacteristicEncodeError: If encoding fails
239 Example::
241 from bluetooth_sig import BluetoothSIGTranslator
242 from bluetooth_sig.gatt.characteristics import AlertLevelCharacteristic
243 from bluetooth_sig.gatt.characteristics.alert_level import AlertLevel
245 translator = BluetoothSIGTranslator()
247 # Type-safe: pass characteristic class and typed value
248 data: bytes = translator.encode_characteristic(AlertLevelCharacteristic, AlertLevel.HIGH)
250 # Not type-safe: pass UUID string
251 data = translator.encode_characteristic("2A06", 2)
253 """
254 # Handle characteristic class input (type-safe path)
255 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
256 char_instance = char()
257 logger.debug("Encoding characteristic class=%s, value=%s", char.__name__, value)
258 try:
259 if validate:
260 encoded = char_instance.build_value(value)
261 logger.debug("Successfully encoded %s with validation", char_instance.name)
262 else:
263 encoded = char_instance._encode_value(value) # pylint: disable=protected-access
264 logger.debug("Successfully encoded %s without validation", char_instance.name)
265 return bytes(encoded)
266 except Exception as e:
267 logger.error("Encoding failed for %s: %s", char_instance.name, e)
268 raise
270 # Handle string UUID input (not type-safe path)
271 logger.debug("Encoding characteristic UUID=%s, value=%s", char, value)
273 # Create characteristic instance
274 characteristic = CharacteristicRegistry.create_characteristic(char)
275 if not characteristic:
276 raise ValueError(f"No encoder available for characteristic UUID: {char}")
278 logger.debug("Found encoder for UUID=%s: %s", char, type(characteristic).__name__)
280 # Handle dict input - convert to proper type
281 if isinstance(value, dict):
282 # Get the expected value type for this characteristic
283 value_type = self._get_characteristic_value_type_class(characteristic)
284 if value_type and hasattr(value_type, "__init__") and not isinstance(value_type, str):
285 try:
286 # Try to construct the dataclass from dict
287 value = value_type(**value)
288 logger.debug("Converted dict to %s", value_type.__name__)
289 except (TypeError, ValueError) as e:
290 type_name = getattr(value_type, "__name__", str(value_type))
291 raise TypeError(f"Failed to convert dict to {type_name} for characteristic {char}: {e}") from e
293 # Encode using build_value (with validation) or encode_value (without)
294 try:
295 if validate:
296 encoded = characteristic.build_value(value)
297 logger.debug("Successfully encoded %s with validation", characteristic.name)
298 else:
299 encoded = characteristic._encode_value(value) # pylint: disable=protected-access
300 logger.debug("Successfully encoded %s without validation", characteristic.name)
301 return bytes(encoded)
302 except Exception as e:
303 logger.error("Encoding failed for %s: %s", characteristic.name, e)
304 raise
306 def _get_characteristic_value_type_class( # pylint: disable=too-many-return-statements,too-many-branches
307 self, characteristic: BaseCharacteristic[Any]
308 ) -> type[Any] | None:
309 """Get the Python type class that a characteristic expects.
311 Args:
312 characteristic: The characteristic instance
314 Returns:
315 The type class, or None if it can't be determined
317 """
318 # Try to infer from decode_value return type annotation (resolve string annotations)
319 if hasattr(characteristic, "_decode_value"):
320 try:
321 # Use get_type_hints to resolve string annotations
322 # Need to pass the characteristic's module globals to resolve forward references
323 module = inspect.getmodule(characteristic.__class__)
324 globalns = getattr(module, "__dict__", {}) if module else {}
325 type_hints = typing.get_type_hints(characteristic._decode_value, globalns=globalns) # pylint: disable=protected-access
326 return_type = type_hints.get("return")
327 if return_type and return_type is not type(None):
328 return return_type # type: ignore[no-any-return]
329 except Exception: # pylint: disable=broad-exception-caught
330 # Fallback to direct signature inspection
331 return_type = inspect.signature(characteristic._decode_value).return_annotation # pylint: disable=protected-access
332 sig = inspect.signature(characteristic._decode_value)
333 return_annotation = sig.return_annotation
334 if return_annotation and return_annotation != inspect.Parameter.empty:
335 # Check if it's not just a string annotation
336 if not isinstance(return_annotation, str):
337 return return_annotation # type: ignore[no-any-return]
339 # Try to get from _manual_value_type attribute
340 # pylint: disable=protected-access # Need to inspect manual type info
341 if hasattr(characteristic, "_manual_value_type"):
342 manual_type = characteristic._manual_value_type
343 if manual_type:
344 # If it's a string, try to resolve it from templates module
345 if isinstance(manual_type, str):
346 if hasattr(templates, manual_type):
347 return getattr(templates, manual_type) # type: ignore[no-any-return]
349 # Try to get from template first
350 # pylint: disable=protected-access # Need to inspect template for type info
351 if hasattr(characteristic, "_template") and characteristic._template:
352 template = characteristic._template
353 # Check if template has a value_type annotation
354 if hasattr(template, "__orig_class__"):
355 # Extract type from Generic
356 args = typing.get_args(template.__orig_class__)
357 if args:
358 return args[0] # type: ignore[no-any-return]
360 # For simple types, check info.value_type
361 info = characteristic.info
362 if info.value_type == ValueType.INT:
363 return int
364 if info.value_type == ValueType.FLOAT:
365 return float
366 if info.value_type == ValueType.STRING:
367 return str
368 if info.value_type == ValueType.BOOL:
369 return bool
370 if info.value_type == ValueType.BYTES:
371 return bytes
373 return None
375 def get_value_type(self, uuid: str) -> ValueType | None:
376 """Get the expected value type for a characteristic.
378 Retrieves the ValueType enum indicating what type of data this
379 characteristic produces (int, float, string, bytes, etc.).
381 Args:
382 uuid: The characteristic UUID (16-bit short form or full 128-bit)
384 Returns:
385 ValueType enum if characteristic is found, None otherwise
387 Example:
388 Check what type a characteristic returns::
390 from bluetooth_sig import BluetoothSIGTranslator
392 translator = BluetoothSIGTranslator()
393 value_type = translator.get_value_type("2A19")
394 print(value_type) # ValueType.INT
396 """
397 info = self.get_characteristic_info_by_uuid(uuid)
398 return info.value_type if info else None
400 def supports(self, uuid: str) -> bool:
401 """Check if a characteristic UUID is supported.
403 Args:
404 uuid: The characteristic UUID to check
406 Returns:
407 True if the characteristic has a parser/encoder, False otherwise
409 Example:
410 Check if characteristic is supported::
412 from bluetooth_sig import BluetoothSIGTranslator
414 translator = BluetoothSIGTranslator()
415 if translator.supports("2A19"):
416 result = translator.parse_characteristic("2A19", data)
418 """
419 try:
420 bt_uuid = BluetoothUUID(uuid)
421 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(bt_uuid)
422 return char_class is not None
423 except (ValueError, TypeError):
424 return False
426 def get_characteristic_info_by_uuid(self, uuid: str) -> CharacteristicInfo | None:
427 """Get information about a characteristic by UUID.
429 Retrieve metadata for a Bluetooth characteristic using its UUID. This includes
430 the characteristic's name, description, value type, unit, and properties.
432 Args:
433 uuid: The characteristic UUID (16-bit short form or full 128-bit)
435 Returns:
436 [CharacteristicInfo][bluetooth_sig.CharacteristicInfo] with metadata or None if not found
438 Example:
439 Get battery level characteristic info::
441 from bluetooth_sig import BluetoothSIGTranslator
443 translator = BluetoothSIGTranslator()
444 info = translator.get_characteristic_info_by_uuid("2A19")
445 if info:
446 print(f"Name: {info.name}") # Name: Battery Level
448 """
449 try:
450 bt_uuid = BluetoothUUID(uuid)
451 except ValueError:
452 return None
454 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(bt_uuid)
455 if not char_class:
456 return None
458 # Create temporary instance to get metadata (no parameters needed for auto-resolution)
459 try:
460 temp_char = char_class()
461 return temp_char.info
462 except Exception: # pylint: disable=broad-exception-caught
463 return None
465 def get_characteristic_uuid_by_name(self, name: CharacteristicName) -> BluetoothUUID | None:
466 """Get the UUID for a characteristic name enum.
468 Args:
469 name: CharacteristicName enum
471 Returns:
472 Characteristic UUID or None if not found
474 """
475 info = self.get_characteristic_info_by_name(name)
476 return info.uuid if info else None
478 def get_service_uuid_by_name(self, name: str | ServiceName) -> BluetoothUUID | None:
479 """Get the UUID for a service name or enum.
481 Args:
482 name: Service name or enum
484 Returns:
485 Service UUID or None if not found
487 """
488 name_str = name.value if isinstance(name, ServiceName) else name
489 info = self.get_service_info_by_name(name_str)
490 return info.uuid if info else None
492 def get_characteristic_info_by_name(self, name: CharacteristicName) -> CharacteristicInfo | None:
493 """Get characteristic info by enum name.
495 Args:
496 name: CharacteristicName enum
498 Returns:
499 CharacteristicInfo if found, None otherwise
501 """
502 char_class = CharacteristicRegistry.get_characteristic_class(name)
503 if not char_class:
504 return None
506 # Try get_configured_info first (for custom characteristics)
507 info = char_class.get_configured_info()
508 if info:
509 return info
511 # For SIG characteristics, create temporary instance to get metadata
512 try:
513 temp_char = char_class()
514 return temp_char.info
515 except Exception: # pylint: disable=broad-exception-caught
516 return None
518 def get_service_info_by_name(self, name: str) -> ServiceInfo | None:
519 """Get service info by name instead of UUID.
521 Args:
522 name: Service name
524 Returns:
525 ServiceInfo if found, None otherwise
527 """
528 # Use UUID registry for name-based lookup
529 try:
530 uuid_info = uuid_registry.get_service_info(name)
531 if uuid_info:
532 return ServiceInfo(uuid=uuid_info.uuid, name=uuid_info.name, characteristics=[])
533 except Exception: # pylint: disable=broad-exception-caught
534 pass
536 return None
538 def get_service_info_by_uuid(self, uuid: str) -> ServiceInfo | None:
539 """Get information about a service by UUID.
541 Args:
542 uuid: The service UUID
544 Returns:
545 ServiceInfo with metadata or None if not found
547 """
548 service_class = GattServiceRegistry.get_service_class_by_uuid(BluetoothUUID(uuid))
549 if not service_class:
550 return None
552 try:
553 temp_service = service_class()
554 # Convert characteristics dict to list of CharacteristicInfo
555 char_infos: list[CharacteristicInfo] = []
556 for _, char_instance in temp_service.characteristics.items():
557 # Use public info property
558 char_infos.append(char_instance.info)
559 return ServiceInfo(
560 uuid=temp_service.uuid,
561 name=temp_service.name,
562 characteristics=char_infos,
563 )
564 except Exception: # pylint: disable=broad-exception-caught
565 return None
567 def list_supported_characteristics(self) -> dict[str, str]:
568 """List all supported characteristics with their names and UUIDs.
570 Returns:
571 Dictionary mapping characteristic names to UUIDs
573 """
574 result: dict[str, str] = {}
575 for name, char_class in CharacteristicRegistry.get_all_characteristics().items():
576 # Try to get configured_info from class using public accessor
577 configured_info = char_class.get_configured_info()
578 if configured_info:
579 # Convert CharacteristicName enum to string for dict key
580 name_str = name.value if hasattr(name, "value") else str(name)
581 result[name_str] = str(configured_info.uuid)
582 return result
584 def list_supported_services(self) -> dict[str, str]:
585 """List all supported services with their names and UUIDs.
587 Returns:
588 Dictionary mapping service names to UUIDs
590 """
591 result: dict[str, str] = {}
592 for service_class in GattServiceRegistry.get_all_services():
593 try:
594 temp_service = service_class()
595 service_name = getattr(temp_service, "_service_name", service_class.__name__)
596 result[service_name] = str(temp_service.uuid)
597 except Exception: # pylint: disable=broad-exception-caught
598 continue
599 return result
601 def process_services(self, services: dict[str, dict[str, CharacteristicDataDict]]) -> None:
602 """Process discovered services and their characteristics.
604 Args:
605 services: Dictionary of service UUIDs to their characteristics
607 """
608 for uuid_str, service_data in services.items():
609 uuid = BluetoothUUID(uuid_str)
610 # Convert dict[str, dict] to ServiceDiscoveryData
611 characteristics: dict[BluetoothUUID, CharacteristicInfo] = {}
612 for char_uuid_str, char_data in service_data.get("characteristics", {}).items():
613 char_uuid = BluetoothUUID(char_uuid_str)
614 # Create CharacteristicInfo from dict
615 vtype_raw = char_data.get("value_type", "bytes")
616 if isinstance(vtype_raw, str):
617 value_type = ValueType(vtype_raw)
618 elif isinstance(vtype_raw, ValueType):
619 value_type = vtype_raw
620 else:
621 value_type = ValueType.BYTES
622 characteristics[char_uuid] = CharacteristicInfo(
623 uuid=char_uuid,
624 name=char_data.get("name", ""),
625 unit=char_data.get("unit", ""),
626 value_type=value_type,
627 )
628 service = GattServiceRegistry.create_service(uuid, characteristics)
629 if service:
630 self._services[str(uuid)] = service
632 def get_service_by_uuid(self, uuid: str) -> BaseGattService | None:
633 """Get a service instance by UUID.
635 Args:
636 uuid: The service UUID
638 Returns:
639 Service instance if found, None otherwise
641 """
642 return self._services.get(uuid)
644 @property
645 def discovered_services(self) -> list[BaseGattService]:
646 """Get list of discovered service instances.
648 Returns:
649 List of discovered service instances
651 """
652 return list(self._services.values())
654 def clear_services(self) -> None:
655 """Clear all discovered services."""
656 self._services.clear()
658 def get_sig_info_by_name(self, name: str) -> SIGInfo | None:
659 """Get Bluetooth SIG information for a characteristic or service by name.
661 Args:
662 name: Characteristic or service name
664 Returns:
665 CharacteristicInfo or ServiceInfo if found, None otherwise
667 """
668 # Use the UUID registry for name-based lookups (string inputs).
669 try:
670 char_info = uuid_registry.get_characteristic_info(name)
671 if char_info:
672 # Build CharacteristicInfo from registry data
673 value_type = ValueType.UNKNOWN
674 if char_info.value_type:
675 try:
676 value_type = ValueType(char_info.value_type)
677 except (ValueError, KeyError):
678 value_type = ValueType.UNKNOWN
679 return CharacteristicInfo(
680 uuid=char_info.uuid,
681 name=char_info.name,
682 value_type=value_type,
683 unit=char_info.unit or "",
684 )
685 except Exception: # pylint: disable=broad-exception-caught
686 pass
688 # Try service
689 service_info = self.get_service_info_by_name(name)
690 if service_info:
691 return service_info
693 return None
695 def get_sig_info_by_uuid(self, uuid: str) -> SIGInfo | None:
696 """Get Bluetooth SIG information for a UUID.
698 Args:
699 uuid: UUID string (with or without dashes)
701 Returns:
702 CharacteristicInfo or ServiceInfo if found, None otherwise
704 """
705 # Try characteristic first
706 char_info = self.get_characteristic_info_by_uuid(uuid)
707 if char_info:
708 return char_info
710 # Try service
711 service_info = self.get_service_info_by_uuid(uuid)
712 if service_info:
713 return service_info
715 return None
717 def parse_characteristics(
718 self,
719 char_data: dict[str, bytes],
720 ctx: CharacteristicContext | None = None,
721 ) -> dict[str, Any]:
722 r"""Parse multiple characteristics at once with dependency-aware ordering.
724 This method automatically handles multi-characteristic dependencies by parsing
725 independent characteristics first, then parsing characteristics that depend on them.
726 The parsing order is determined by the `required_dependencies` and `optional_dependencies`
727 attributes declared on characteristic classes.
729 Required dependencies MUST be present and successfully parsed; missing required
730 dependencies result in parse failure with MissingDependencyError. Optional dependencies
731 enrich parsing when available but are not mandatory.
733 Args:
734 char_data: Dictionary mapping UUIDs to raw data bytes
735 ctx: Optional CharacteristicContext used as the starting context
737 Returns:
738 Dictionary mapping UUIDs to parsed values
740 Raises:
741 ValueError: If circular dependencies are detected
742 CharacteristicParseError: If parsing fails for any characteristic
744 Example:
745 Parse multiple environmental characteristics::
747 from bluetooth_sig import BluetoothSIGTranslator
749 translator = BluetoothSIGTranslator()
750 data = {
751 "2A6E": b"\\x0A\\x00", # Temperature
752 "2A6F": b"\\x32\\x00", # Humidity
753 }
754 try:
755 results = translator.parse_characteristics(data)
756 for uuid, value in results.items():
757 print(f"{uuid}: {value}")
758 except CharacteristicParseError as e:
759 print(f"Parse failed: {e}")
761 """
762 return self._parse_characteristics_batch(char_data, ctx)
764 def _parse_characteristics_batch(
765 self,
766 char_data: dict[str, bytes],
767 ctx: CharacteristicContext | None,
768 ) -> dict[str, Any]:
769 """Parse multiple characteristics using dependency-aware ordering."""
770 logger.debug("Batch parsing %d characteristics", len(char_data))
772 # Prepare characteristics and dependencies
773 uuid_to_characteristic, uuid_to_required_deps, uuid_to_optional_deps = (
774 self._prepare_characteristic_dependencies(char_data)
775 )
777 # Resolve dependency order
778 sorted_uuids = self._resolve_dependency_order(char_data, uuid_to_required_deps, uuid_to_optional_deps)
780 # Build base context
781 base_context = ctx
783 results: dict[str, Any] = {}
784 for uuid_str in sorted_uuids:
785 raw_data = char_data[uuid_str]
786 characteristic = uuid_to_characteristic.get(uuid_str)
788 missing_required = self._find_missing_required_dependencies(
789 uuid_str,
790 uuid_to_required_deps.get(uuid_str, []),
791 results,
792 base_context,
793 )
795 if missing_required:
796 raise MissingDependencyError(characteristic.name if characteristic else "Unknown", missing_required)
798 self._log_optional_dependency_gaps(
799 uuid_str,
800 uuid_to_optional_deps.get(uuid_str, []),
801 results,
802 base_context,
803 )
805 parse_context = self._build_parse_context(base_context, results)
807 try:
808 value = self.parse_characteristic(uuid_str, raw_data, ctx=parse_context)
809 results[uuid_str] = value
810 except (CharacteristicParseError, SpecialValueDetected):
811 # Re-raise parse errors for individual characteristics - intentional passthrough
812 raise # pylint: disable=try-except-raise
814 logger.debug("Batch parsing complete: %d results", len(results))
815 return results
817 def _prepare_characteristic_dependencies(
818 self, characteristic_data: Mapping[str, bytes]
819 ) -> tuple[dict[str, BaseCharacteristic[Any]], dict[str, list[str]], dict[str, list[str]]]:
820 """Instantiate characteristics once and collect declared dependencies."""
821 uuid_to_characteristic: dict[str, BaseCharacteristic[Any]] = {}
822 uuid_to_required_deps: dict[str, list[str]] = {}
823 uuid_to_optional_deps: dict[str, list[str]] = {}
825 for uuid in characteristic_data:
826 characteristic = CharacteristicRegistry.create_characteristic(uuid)
827 if characteristic is None:
828 continue
830 uuid_to_characteristic[uuid] = characteristic
832 required = characteristic.required_dependencies
833 optional = characteristic.optional_dependencies
835 if required:
836 uuid_to_required_deps[uuid] = required
837 logger.debug("Characteristic %s has required dependencies: %s", uuid, required)
838 if optional:
839 uuid_to_optional_deps[uuid] = optional
840 logger.debug("Characteristic %s has optional dependencies: %s", uuid, optional)
842 return uuid_to_characteristic, uuid_to_required_deps, uuid_to_optional_deps
844 def _resolve_dependency_order(
845 self,
846 characteristic_data: Mapping[str, bytes],
847 uuid_to_required_deps: Mapping[str, list[str]],
848 uuid_to_optional_deps: Mapping[str, list[str]],
849 ) -> list[str]:
850 """Topologically sort characteristics based on declared dependencies."""
851 try:
852 sorter: TopologicalSorter[str] = TopologicalSorter()
853 for uuid in characteristic_data:
854 all_deps = uuid_to_required_deps.get(uuid, []) + uuid_to_optional_deps.get(uuid, [])
855 batch_deps = [dep for dep in all_deps if dep in characteristic_data]
856 sorter.add(uuid, *batch_deps)
858 sorted_sequence = sorter.static_order()
859 sorted_uuids = list(sorted_sequence)
860 logger.debug("Dependency-sorted parsing order: %s", sorted_uuids)
861 return sorted_uuids
862 except Exception as exc: # pylint: disable=broad-exception-caught
863 logger.warning("Dependency sorting failed: %s. Using original order.", exc)
864 return list(characteristic_data.keys())
866 def _find_missing_required_dependencies(
867 self,
868 uuid: str,
869 required_deps: list[str],
870 results: Mapping[str, Any],
871 base_context: CharacteristicContext | None,
872 ) -> list[str]:
873 """Determine which required dependencies are unavailable for a characteristic."""
874 if not required_deps:
875 return []
877 missing: list[str] = []
878 other_characteristics = (
879 base_context.other_characteristics if base_context and base_context.other_characteristics else None
880 )
882 for dep_uuid in required_deps:
883 if dep_uuid in results:
884 # If it's in results, it was successfully parsed
885 continue
887 if other_characteristics and dep_uuid in other_characteristics:
888 # If it's in context, assume it's available
889 continue
891 missing.append(dep_uuid)
893 if missing:
894 logger.debug("Characteristic %s missing required dependencies: %s", uuid, missing)
896 return missing
898 def _log_optional_dependency_gaps(
899 self,
900 uuid: str,
901 optional_deps: list[str],
902 results: Mapping[str, Any],
903 base_context: CharacteristicContext | None,
904 ) -> None:
905 """Emit debug logs when optional dependencies are unavailable."""
906 if not optional_deps:
907 return
909 other_characteristics = (
910 base_context.other_characteristics if base_context and base_context.other_characteristics else None
911 )
913 for dep_uuid in optional_deps:
914 if dep_uuid in results:
915 continue
916 if other_characteristics and dep_uuid in other_characteristics:
917 continue
918 logger.debug("Optional dependency %s not available for %s", dep_uuid, uuid)
920 def _build_parse_context(
921 self,
922 base_context: CharacteristicContext | None,
923 results: Mapping[str, Any],
924 ) -> CharacteristicContext:
925 """Construct the context passed to per-characteristic parsers."""
926 if base_context is not None:
927 return CharacteristicContext(
928 device_info=base_context.device_info,
929 advertisement=base_context.advertisement,
930 other_characteristics=results,
931 raw_service=base_context.raw_service,
932 )
934 return CharacteristicContext(other_characteristics=results)
936 def get_characteristics_info_by_uuids(self, uuids: list[str]) -> dict[str, CharacteristicInfo | None]:
937 """Get information about multiple characteristics by UUID.
939 Args:
940 uuids: List of characteristic UUIDs
942 Returns:
943 Dictionary mapping UUIDs to CharacteristicInfo
944 (or None if not found)
946 """
947 results: dict[str, CharacteristicInfo | None] = {}
948 for uuid in uuids:
949 results[uuid] = self.get_characteristic_info_by_uuid(uuid)
950 return results
952 def validate_characteristic_data(self, uuid: str, data: bytes) -> ValidationResult:
953 """Validate characteristic data format against SIG specifications.
955 Args:
956 uuid: The characteristic UUID
957 data: Raw data bytes to validate
959 Returns:
960 ValidationResult with validation details
962 """
963 try:
964 # Attempt to parse the data - if it succeeds, format is valid
965 self.parse_characteristic(uuid, data)
966 # Try to get expected_length
967 try:
968 bt_uuid = BluetoothUUID(uuid)
969 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(bt_uuid)
970 expected = char_class.expected_length if char_class else None
971 except Exception: # pylint: disable=broad-exception-caught
972 expected = None
973 return ValidationResult(
974 is_valid=True,
975 actual_length=len(data),
976 expected_length=expected,
977 error_message="",
978 )
979 except Exception as e: # pylint: disable=broad-exception-caught
980 # If parsing failed, data format is invalid
981 # Try to get expected_length even on failure
982 try:
983 bt_uuid = BluetoothUUID(uuid)
984 char_class = CharacteristicRegistry.get_characteristic_class_by_uuid(bt_uuid)
985 expected = char_class.expected_length if char_class else None
986 except Exception: # pylint: disable=broad-exception-caught
987 expected = None
988 return ValidationResult(
989 is_valid=False,
990 actual_length=len(data),
991 expected_length=expected,
992 error_message=str(e),
993 )
995 def get_service_characteristics(self, service_uuid: str) -> list[str]: # pylint: disable=too-many-return-statements
996 """Get the characteristic UUIDs associated with a service.
998 Args:
999 service_uuid: The service UUID
1001 Returns:
1002 List of characteristic UUIDs for this service
1004 """
1005 service_class = GattServiceRegistry.get_service_class_by_uuid(BluetoothUUID(service_uuid))
1006 if not service_class:
1007 return []
1009 try:
1010 temp_service = service_class()
1011 required_chars = temp_service.get_required_characteristics()
1012 return [str(k) for k in required_chars]
1013 except Exception: # pylint: disable=broad-exception-caught
1014 return []
1016 def register_custom_characteristic_class(
1017 self,
1018 uuid_or_name: str,
1019 cls: type[BaseCharacteristic[Any]],
1020 info: CharacteristicInfo | None = None,
1021 override: bool = False,
1022 ) -> None:
1023 """Register a custom characteristic class at runtime.
1025 Args:
1026 uuid_or_name: The characteristic UUID or name
1027 cls: The characteristic class to register
1028 info: Optional CharacteristicInfo with metadata (name, unit, value_type)
1029 override: Whether to override existing registrations
1031 Raises:
1032 TypeError: If cls does not inherit from BaseCharacteristic
1033 ValueError: If UUID conflicts with existing registration and override=False
1035 Example::
1037 from bluetooth_sig import BluetoothSIGTranslator, CharacteristicInfo, ValueType
1038 from bluetooth_sig.types import BluetoothUUID
1040 translator = BluetoothSIGTranslator()
1041 info = CharacteristicInfo(
1042 uuid=BluetoothUUID("12345678-1234-1234-1234-123456789abc"),
1043 name="Custom Temperature",
1044 unit="°C",
1045 value_type=ValueType.FLOAT,
1046 )
1047 translator.register_custom_characteristic_class(str(info.uuid), MyCustomCharacteristic, info=info)
1049 """
1050 # Register the class
1051 CharacteristicRegistry.register_characteristic_class(uuid_or_name, cls, override)
1053 # Register metadata in uuid_registry if provided
1054 if info:
1055 uuid_registry.register_characteristic(
1056 uuid=info.uuid,
1057 name=info.name or cls.__name__,
1058 identifier=info.id,
1059 unit=info.unit,
1060 value_type=info.value_type,
1061 override=override,
1062 )
1064 def register_custom_service_class(
1065 self,
1066 uuid_or_name: str,
1067 cls: type[BaseGattService],
1068 info: ServiceInfo | None = None,
1069 override: bool = False,
1070 ) -> None:
1071 """Register a custom service class at runtime.
1073 Args:
1074 uuid_or_name: The service UUID or name
1075 cls: The service class to register
1076 info: Optional ServiceInfo with metadata (name)
1077 override: Whether to override existing registrations
1079 Raises:
1080 TypeError: If cls does not inherit from BaseGattService
1081 ValueError: If UUID conflicts with existing registration and override=False
1083 Example::
1085 from bluetooth_sig import BluetoothSIGTranslator, ServiceInfo
1086 from bluetooth_sig.types import BluetoothUUID
1088 translator = BluetoothSIGTranslator()
1089 info = ServiceInfo(uuid=BluetoothUUID("12345678-1234-1234-1234-123456789abc"), name="Custom Service")
1090 translator.register_custom_service_class(str(info.uuid), MyCustomService, info=info)
1092 """
1093 # Register the class
1094 GattServiceRegistry.register_service_class(uuid_or_name, cls, override)
1096 # Register metadata in uuid_registry if provided
1097 if info:
1098 uuid_registry.register_service(
1099 uuid=info.uuid,
1100 name=info.name or cls.__name__,
1101 identifier=info.id,
1102 override=override,
1103 )
1105 # Async methods for non-blocking operation in async contexts
1107 @overload
1108 async def parse_characteristic_async(
1109 self,
1110 char: type[BaseCharacteristic[T]],
1111 raw_data: bytes,
1112 ctx: CharacteristicContext | None = ...,
1113 ) -> T: ...
1115 @overload
1116 async def parse_characteristic_async(
1117 self,
1118 char: str | BluetoothUUID,
1119 raw_data: bytes,
1120 ctx: CharacteristicContext | None = ...,
1121 ) -> Any: ... # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe
1123 async def parse_characteristic_async(
1124 self,
1125 char: str | BluetoothUUID | type[BaseCharacteristic[T]],
1126 raw_data: bytes,
1127 ctx: CharacteristicContext | None = None,
1128 ) -> T | Any: # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe
1129 """Parse characteristic data in an async-compatible manner.
1131 This is an async wrapper that allows characteristic parsing to be used
1132 in async contexts. The actual parsing is performed synchronously as it's
1133 a fast, CPU-bound operation that doesn't benefit from async I/O.
1135 Args:
1136 char: Characteristic class (type-safe) or UUID string/BluetoothUUID (not type-safe).
1137 raw_data: Raw bytes from the characteristic
1138 ctx: Optional context providing device-level info
1140 Returns:
1141 Parsed value. Return type is inferred when passing characteristic class.
1143 Raises:
1144 SpecialValueDetected: Special sentinel value detected
1145 CharacteristicParseError: Parse/validation failure
1147 Example::
1149 async with BleakClient(address) as client:
1150 data = await client.read_gatt_char("2A19")
1152 # Type-safe: pass characteristic class
1153 from bluetooth_sig.gatt.characteristics import BatteryLevelCharacteristic
1155 level: int = await translator.parse_characteristic_async(BatteryLevelCharacteristic, data)
1157 # Not type-safe: pass UUID string
1158 value = await translator.parse_characteristic_async("2A19", data)
1160 """
1161 # Handle characteristic class input (type-safe path)
1162 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
1163 return self.parse_characteristic(char, raw_data, ctx)
1165 # Convert to string for consistency with sync API
1166 uuid_str = str(char) if isinstance(char, BluetoothUUID) else char
1168 # Delegate to sync implementation
1169 return self.parse_characteristic(uuid_str, raw_data, ctx)
1171 async def parse_characteristics_async(
1172 self,
1173 char_data: dict[str, bytes],
1174 ctx: CharacteristicContext | None = None,
1175 ) -> dict[str, Any]:
1176 """Parse multiple characteristics in an async-compatible manner.
1178 This is an async wrapper for batch characteristic parsing. The parsing
1179 is performed synchronously as it's a fast, CPU-bound operation. This method
1180 allows batch parsing to be used naturally in async workflows.
1182 Args:
1183 char_data: Dictionary mapping UUIDs to raw data bytes
1184 ctx: Optional context
1186 Returns:
1187 Dictionary mapping UUIDs to parsed values
1189 Example::
1191 async with BleakClient(address) as client:
1192 # Read multiple characteristics
1193 char_data = {}
1194 for uuid in ["2A19", "2A6E", "2A6F"]:
1195 char_data[uuid] = await client.read_gatt_char(uuid)
1197 # Parse all asynchronously
1198 results = await translator.parse_characteristics_async(char_data)
1199 for uuid, value in results.items():
1200 print(f"{uuid}: {value}")
1201 """
1202 # Delegate directly to sync implementation
1203 # The sync implementation already handles dependency ordering
1204 return self.parse_characteristics(char_data, ctx)
1206 async def encode_characteristic_async(
1207 self,
1208 uuid: str | BluetoothUUID,
1209 value: Any, # noqa: ANN401
1210 validate: bool = True,
1211 ) -> bytes:
1212 """Encode characteristic value in an async-compatible manner.
1214 This is an async wrapper that allows characteristic encoding to be used
1215 in async contexts. The actual encoding is performed synchronously as it's
1216 a fast, CPU-bound operation that doesn't benefit from async I/O.
1218 Args:
1219 uuid: The characteristic UUID (string or BluetoothUUID)
1220 value: The value to encode (dataclass, dict, or primitive)
1221 validate: If True, validates before encoding (default: True)
1223 Returns:
1224 Encoded bytes ready to write
1226 Example::
1228 async with BleakClient(address) as client:
1229 data = await translator.encode_characteristic_async("2A19", 85)
1230 await client.write_gatt_char("2A19", data)
1231 """
1232 # Convert to string for consistency with sync API
1233 uuid_str = str(uuid) if isinstance(uuid, BluetoothUUID) else uuid
1235 # Delegate to sync implementation
1236 return self.encode_characteristic(uuid_str, value, validate)
1238 def create_value(self, uuid: str, **kwargs: Any) -> Any: # noqa: ANN401
1239 """Create a properly typed value instance for a characteristic.
1241 This is a convenience method that constructs the appropriate dataclass
1242 or value type for a characteristic, which can then be passed to
1243 encode_characteristic() or used directly.
1245 Args:
1246 uuid: The characteristic UUID
1247 **kwargs: Field values for the characteristic's type
1249 Returns:
1250 Properly typed value instance
1252 Raises:
1253 ValueError: If UUID is invalid or characteristic not found
1254 TypeError: If kwargs don't match the characteristic's expected fields
1256 Example:
1257 Create complex characteristic values::
1259 from bluetooth_sig import BluetoothSIGTranslator
1261 translator = BluetoothSIGTranslator()
1263 # Create acceleration data
1264 accel = translator.create_value("2C1D", x_axis=1.5, y_axis=0.5, z_axis=9.8)
1266 # Encode and write
1267 data = translator.encode_characteristic("2C1D", accel)
1268 await client.write_gatt_char("2C1D", data)
1270 """
1271 # Create characteristic instance
1272 characteristic = CharacteristicRegistry.create_characteristic(uuid)
1273 if not characteristic:
1274 raise ValueError(f"No characteristic found for UUID: {uuid}")
1276 # Get the value type
1277 value_type = self._get_characteristic_value_type_class(characteristic)
1279 if not value_type:
1280 # For simple types, just return the single value if provided
1281 if len(kwargs) == 1:
1282 return next(iter(kwargs.values()))
1283 raise ValueError(
1284 f"Cannot determine value type for characteristic {uuid}. "
1285 "Try passing a dict to encode_characteristic() instead."
1286 )
1288 # Handle simple primitive types
1289 if value_type in (int, float, str, bool, bytes):
1290 if len(kwargs) == 1:
1291 value = next(iter(kwargs.values()))
1292 if not isinstance(value, value_type):
1293 type_name = getattr(value_type, "__name__", str(value_type))
1294 raise TypeError(f"Expected {type_name}, got {type(value).__name__}")
1295 return value
1296 type_name = getattr(value_type, "__name__", str(value_type))
1297 raise TypeError(f"Simple type {type_name} expects a single value")
1299 # Construct complex type from kwargs
1300 try:
1301 return value_type(**kwargs)
1302 except (TypeError, ValueError) as e:
1303 type_name = getattr(value_type, "__name__", str(value_type))
1304 raise TypeError(f"Failed to create {type_name} for characteristic {uuid}: {e}") from e
1307# Global instance
1308BluetoothSIG = BluetoothSIGTranslator()