Coverage for src/bluetooth_sig/registry/units.py: 94%

52 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""Units registry for Bluetooth SIG unit definitions.""" 

2 

3from __future__ import annotations 

4 

5import threading 

6 

7import msgspec 

8 

9from bluetooth_sig.types.uuid import BluetoothUUID 

10 

11from .utils import find_bluetooth_sig_path, load_yaml_uuids, normalize_uuid_string, parse_bluetooth_uuid 

12 

13 

14class UnitInfo(msgspec.Struct, frozen=True, kw_only=True): 

15 """Information about a Bluetooth SIG unit.""" 

16 

17 uuid: BluetoothUUID 

18 name: str 

19 id: str 

20 

21 

22class UnitsRegistry: 

23 """Registry for Bluetooth SIG unit UUIDs.""" 

24 

25 def __init__(self) -> None: 

26 """Initialize the units registry.""" 

27 self._lock = threading.RLock() 

28 self._units: dict[str, UnitInfo] = {} # normalized_uuid -> UnitInfo 

29 self._units_by_name: dict[str, UnitInfo] = {} # lower_name -> UnitInfo 

30 self._units_by_id: dict[str, UnitInfo] = {} # id -> UnitInfo 

31 

32 try: 

33 self._load_units() 

34 except (FileNotFoundError, Exception): # pylint: disable=broad-exception-caught 

35 # If YAML loading fails, continue with empty registry 

36 pass 

37 

38 def _load_units(self) -> None: 

39 """Load unit UUIDs from YAML file.""" 

40 base_path = find_bluetooth_sig_path() 

41 if not base_path: 

42 return 

43 

44 # Load unit UUIDs 

45 units_yaml = base_path / "units.yaml" 

46 if units_yaml.exists(): 

47 for unit_info in load_yaml_uuids(units_yaml): 

48 uuid = normalize_uuid_string(unit_info["uuid"]) 

49 

50 bt_uuid = BluetoothUUID(uuid) 

51 info = UnitInfo(uuid=bt_uuid, name=unit_info["name"], id=unit_info["id"]) 

52 # Store using short form as key for easy lookup 

53 self._units[bt_uuid.short_form.upper()] = info 

54 # Also store by name and id for reverse lookup 

55 self._units_by_name[unit_info["name"].lower()] = info 

56 self._units_by_id[unit_info["id"]] = info 

57 

58 def get_unit_info(self, uuid: str | int | BluetoothUUID) -> UnitInfo | None: 

59 """Get unit information by UUID. 

60 

61 Args: 

62 uuid: 16-bit UUID as string (with or without 0x), int, or BluetoothUUID 

63 

64 Returns: 

65 UnitInfo object, or None if not found 

66 """ 

67 with self._lock: 

68 try: 

69 bt_uuid = parse_bluetooth_uuid(uuid) 

70 

71 # Get the short form (16-bit) for lookup 

72 short_key = bt_uuid.short_form.upper() 

73 return self._units.get(short_key) 

74 except ValueError: 

75 return None 

76 

77 def get_unit_info_by_name(self, name: str) -> UnitInfo | None: 

78 """Get unit information by name. 

79 

80 Args: 

81 name: Unit name (case-insensitive) 

82 

83 Returns: 

84 UnitInfo object, or None if not found 

85 """ 

86 with self._lock: 

87 return self._units_by_name.get(name.lower()) 

88 

89 def get_unit_info_by_id(self, unit_id: str) -> UnitInfo | None: 

90 """Get unit information by ID. 

91 

92 Args: 

93 unit_id: Unit ID (e.g., "org.bluetooth.unit.celsius") 

94 

95 Returns: 

96 UnitInfo object, or None if not found 

97 """ 

98 with self._lock: 

99 return self._units_by_id.get(unit_id) 

100 

101 def is_unit_uuid(self, uuid: str | int | BluetoothUUID) -> bool: 

102 """Check if a UUID is a registered unit UUID. 

103 

104 Args: 

105 uuid: UUID to check 

106 

107 Returns: 

108 True if the UUID is a unit UUID, False otherwise 

109 """ 

110 return self.get_unit_info(uuid) is not None 

111 

112 def get_all_units(self) -> list[UnitInfo]: 

113 """Get all registered units. 

114 

115 Returns: 

116 List of all UnitInfo objects 

117 """ 

118 with self._lock: 

119 return list(self._units.values()) 

120 

121 

122# Global instance 

123units_registry = UnitsRegistry()