Coverage for src / bluetooth_sig / registry / service_discovery / attribute_ids.py: 82%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Service Discovery Attribute ID Registry for SDP attribute identifiers.""" 

2 

3from __future__ import annotations 

4 

5from pathlib import Path 

6from typing import Any, cast 

7 

8import msgspec 

9 

10from bluetooth_sig.registry.base import BaseGenericRegistry 

11from bluetooth_sig.registry.utils import find_bluetooth_sig_path 

12from bluetooth_sig.types.registry.profile_types import ( 

13 AttributeIdEntry, 

14 ProtocolParameterEntry, 

15) 

16 

17 

18class ServiceDiscoveryAttributeRegistry( 

19 BaseGenericRegistry["ServiceDiscoveryAttributeRegistry"], 

20): 

21 """Registry for SDP attribute identifiers with lazy loading. 

22 

23 Loads attribute IDs from ``service_discovery/attribute_ids/*.yaml``, 

24 ``attribute_id_offsets_for_strings.yaml``, and ``protocol_parameters.yaml``. 

25 

26 Thread-safe: Multiple threads can safely access the registry concurrently. 

27 """ 

28 

29 def __init__(self) -> None: 

30 """Initialise the service discovery attribute registry.""" 

31 super().__init__() 

32 self._attribute_ids: dict[str, list[AttributeIdEntry]] = {} 

33 self._protocol_parameters: list[ProtocolParameterEntry] = [] 

34 

35 # ------------------------------------------------------------------ 

36 # Loading 

37 # ------------------------------------------------------------------ 

38 

39 @staticmethod 

40 def _parse_hex_value(raw: object) -> int | None: 

41 """Parse a hex string like ``'0x0001'`` into an int.""" 

42 if isinstance(raw, int): 

43 return raw 

44 if isinstance(raw, str): 

45 try: 

46 return int(raw, 16) if raw.startswith("0x") else int(raw) 

47 except ValueError: 

48 return None 

49 return None 

50 

51 def _load_attribute_ids_file(self, yaml_path: Path, category: str) -> None: 

52 """Load a single attribute_ids YAML file into *_attribute_ids[category]*.""" 

53 if not yaml_path.exists(): 

54 return 

55 

56 with yaml_path.open("r", encoding="utf-8") as fh: 

57 data = msgspec.yaml.decode(fh.read()) 

58 

59 if not isinstance(data, dict): 

60 return 

61 

62 data_dict = cast("dict[str, Any]", data) 

63 entries_raw = data_dict.get("attribute_ids") 

64 if not isinstance(entries_raw, list): 

65 return 

66 

67 entries: list[AttributeIdEntry] = [] 

68 for entry in entries_raw: 

69 if not isinstance(entry, dict): 

70 continue 

71 name = entry.get("name") 

72 value = self._parse_hex_value(entry.get("value")) 

73 if name and value is not None: 

74 entries.append(AttributeIdEntry(name=str(name), value=value)) 

75 

76 if entries: 

77 self._attribute_ids[category] = entries 

78 

79 def _load_protocol_parameters(self, yaml_path: Path) -> None: 

80 """Load ``protocol_parameters.yaml``.""" 

81 if not yaml_path.exists(): 

82 return 

83 

84 with yaml_path.open("r", encoding="utf-8") as fh: 

85 data = msgspec.yaml.decode(fh.read()) 

86 

87 if not isinstance(data, dict): 

88 return 

89 

90 data_dict = cast("dict[str, Any]", data) 

91 params_raw = data_dict.get("protocol_parameters") 

92 if not isinstance(params_raw, list): 

93 return 

94 

95 for entry in params_raw: 

96 if not isinstance(entry, dict): 

97 continue 

98 protocol = entry.get("protocol") 

99 name = entry.get("name") 

100 index = entry.get("index") 

101 if protocol and name and isinstance(index, int): 

102 self._protocol_parameters.append( 

103 ProtocolParameterEntry( 

104 protocol=str(protocol), 

105 name=str(name), 

106 index=index, 

107 ), 

108 ) 

109 

110 def _load(self) -> None: 

111 """Perform the actual loading of all service discovery data.""" 

112 uuids_path = find_bluetooth_sig_path() 

113 if not uuids_path: 

114 self._loaded = True 

115 return 

116 

117 sd_path = uuids_path.parent / "service_discovery" 

118 if not sd_path.exists(): 

119 self._loaded = True 

120 return 

121 

122 # Load attribute_ids/*.yaml 

123 attr_dir = sd_path / "attribute_ids" 

124 if attr_dir.is_dir(): 

125 for yaml_file in sorted(attr_dir.glob("*.yaml")): 

126 category = yaml_file.stem 

127 self._load_attribute_ids_file(yaml_file, category) 

128 

129 # Load attribute_id_offsets_for_strings.yaml (same schema) 

130 offsets_file = sd_path / "attribute_id_offsets_for_strings.yaml" 

131 self._load_attribute_ids_file(offsets_file, "attribute_id_offsets_for_strings") 

132 

133 # Load protocol_parameters.yaml 

134 self._load_protocol_parameters(sd_path / "protocol_parameters.yaml") 

135 

136 self._loaded = True 

137 

138 # ------------------------------------------------------------------ 

139 # Query API 

140 # ------------------------------------------------------------------ 

141 

142 def get_attribute_ids(self, category: str) -> list[AttributeIdEntry]: 

143 """Get attribute ID entries for a named category. 

144 

145 Args: 

146 category: The file stem / category name, e.g. ``"universal_attributes"``, 

147 ``"a2dp"``, ``"sdp"``, ``"attribute_id_offsets_for_strings"``. 

148 

149 Returns: 

150 List of :class:`AttributeIdEntry` or an empty list if not found. 

151 """ 

152 self._ensure_loaded() 

153 with self._lock: 

154 return list(self._attribute_ids.get(category, [])) 

155 

156 def get_all_categories(self) -> list[str]: 

157 """Return all loaded category names (sorted).""" 

158 self._ensure_loaded() 

159 with self._lock: 

160 return sorted(self._attribute_ids) 

161 

162 def get_protocol_parameters(self) -> list[ProtocolParameterEntry]: 

163 """Return all protocol parameter entries.""" 

164 self._ensure_loaded() 

165 with self._lock: 

166 return list(self._protocol_parameters) 

167 

168 def resolve_attribute_name(self, category: str, value: int) -> str | None: 

169 """Look up the attribute name for a given numeric value within a category. 

170 

171 Args: 

172 category: Category name (e.g. ``"universal_attributes"``). 

173 value: The numeric attribute ID. 

174 

175 Returns: 

176 The attribute name or ``None`` if not found. 

177 """ 

178 self._ensure_loaded() 

179 with self._lock: 

180 for entry in self._attribute_ids.get(category, []): 

181 if entry.value == value: 

182 return entry.name 

183 return None 

184 

185 

186# Singleton instance for global use 

187service_discovery_attribute_registry = ServiceDiscoveryAttributeRegistry()