Coverage for src/bluetooth_sig/registry/units.py: 94%
52 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""Units registry for Bluetooth SIG unit definitions."""
3from __future__ import annotations
5import threading
7import msgspec
9from bluetooth_sig.types.uuid import BluetoothUUID
11from .utils import find_bluetooth_sig_path, load_yaml_uuids, normalize_uuid_string, parse_bluetooth_uuid
14class UnitInfo(msgspec.Struct, frozen=True, kw_only=True):
15 """Information about a Bluetooth SIG unit."""
17 uuid: BluetoothUUID
18 name: str
19 id: str
22class UnitsRegistry:
23 """Registry for Bluetooth SIG unit UUIDs."""
25 def __init__(self) -> None:
26 """Initialize the units registry."""
27 self._lock = threading.RLock()
28 self._units: dict[str, UnitInfo] = {} # normalized_uuid -> UnitInfo
29 self._units_by_name: dict[str, UnitInfo] = {} # lower_name -> UnitInfo
30 self._units_by_id: dict[str, UnitInfo] = {} # id -> UnitInfo
32 try:
33 self._load_units()
34 except (FileNotFoundError, Exception): # pylint: disable=broad-exception-caught
35 # If YAML loading fails, continue with empty registry
36 pass
38 def _load_units(self) -> None:
39 """Load unit UUIDs from YAML file."""
40 base_path = find_bluetooth_sig_path()
41 if not base_path:
42 return
44 # Load unit UUIDs
45 units_yaml = base_path / "units.yaml"
46 if units_yaml.exists():
47 for unit_info in load_yaml_uuids(units_yaml):
48 uuid = normalize_uuid_string(unit_info["uuid"])
50 bt_uuid = BluetoothUUID(uuid)
51 info = UnitInfo(uuid=bt_uuid, name=unit_info["name"], id=unit_info["id"])
52 # Store using short form as key for easy lookup
53 self._units[bt_uuid.short_form.upper()] = info
54 # Also store by name and id for reverse lookup
55 self._units_by_name[unit_info["name"].lower()] = info
56 self._units_by_id[unit_info["id"]] = info
58 def get_unit_info(self, uuid: str | int | BluetoothUUID) -> UnitInfo | None:
59 """Get unit information by UUID.
61 Args:
62 uuid: 16-bit UUID as string (with or without 0x), int, or BluetoothUUID
64 Returns:
65 UnitInfo object, or None if not found
66 """
67 with self._lock:
68 try:
69 bt_uuid = parse_bluetooth_uuid(uuid)
71 # Get the short form (16-bit) for lookup
72 short_key = bt_uuid.short_form.upper()
73 return self._units.get(short_key)
74 except ValueError:
75 return None
77 def get_unit_info_by_name(self, name: str) -> UnitInfo | None:
78 """Get unit information by name.
80 Args:
81 name: Unit name (case-insensitive)
83 Returns:
84 UnitInfo object, or None if not found
85 """
86 with self._lock:
87 return self._units_by_name.get(name.lower())
89 def get_unit_info_by_id(self, unit_id: str) -> UnitInfo | None:
90 """Get unit information by ID.
92 Args:
93 unit_id: Unit ID (e.g., "org.bluetooth.unit.celsius")
95 Returns:
96 UnitInfo object, or None if not found
97 """
98 with self._lock:
99 return self._units_by_id.get(unit_id)
101 def is_unit_uuid(self, uuid: str | int | BluetoothUUID) -> bool:
102 """Check if a UUID is a registered unit UUID.
104 Args:
105 uuid: UUID to check
107 Returns:
108 True if the UUID is a unit UUID, False otherwise
109 """
110 return self.get_unit_info(uuid) is not None
112 def get_all_units(self) -> list[UnitInfo]:
113 """Get all registered units.
115 Returns:
116 List of all UnitInfo objects
117 """
118 with self._lock:
119 return list(self._units.values())
122# Global instance
123units_registry = UnitsRegistry()