Coverage for src / bluetooth_sig / registry / gss.py: 92%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""GSS (GATT Service Specification) registry. 

2 

3This module provides a registry for Bluetooth SIG GSS YAML files, 

4extracting characteristic specifications with field metadata including 

5units, resolutions, value ranges, and presence conditions. 

6""" 

7 

8from __future__ import annotations 

9 

10import logging 

11from pathlib import Path 

12from typing import Any, cast 

13 

14import msgspec 

15 

16from bluetooth_sig.registry.base import BaseGenericRegistry 

17from bluetooth_sig.registry.uuids.units import UnitsRegistry 

18from bluetooth_sig.types.gatt_enums import WIRE_TYPE_MAP 

19from bluetooth_sig.types.registry.gss_characteristic import ( 

20 FieldSpec, 

21 GssCharacteristicSpec, 

22) 

23 

24 

25class GssRegistry(BaseGenericRegistry[GssCharacteristicSpec]): 

26 """Registry for GSS (GATT Service Specification) characteristic definitions. 

27 

28 Parses Bluetooth SIG GSS YAML files and extracts typed characteristic 

29 specifications with full field metadata. Implements singleton pattern 

30 with thread-safe lazy loading. 

31 

32 Example:: 

33 registry = GssRegistry.get_instance() 

34 spec = registry.get_spec("Battery Level") 

35 if spec: 

36 for field in spec.structure: 

37 print(f"{field.python_name}: {field.unit_id}") 

38 

39 """ 

40 

41 _instance: GssRegistry | None = None 

42 

43 def __init__(self) -> None: 

44 """Initialize the GSS registry.""" 

45 super().__init__() 

46 self._specs: dict[str, GssCharacteristicSpec] = {} 

47 self._units_registry: UnitsRegistry | None = None 

48 

49 @classmethod 

50 def get_instance(cls) -> GssRegistry: 

51 """Get the singleton instance of the GSS registry.""" 

52 if cls._instance is None: 

53 with cls._lock: 

54 if cls._instance is None: 

55 cls._instance = cls() 

56 return cls._instance 

57 

58 def _get_units_registry(self) -> UnitsRegistry: 

59 """Get or lazily initialize the units registry. 

60 

61 Returns: 

62 The UnitsRegistry singleton instance 

63 """ 

64 if self._units_registry is None: 

65 # get_instance() returns BaseUUIDRegistry; narrow to concrete subclass 

66 self._units_registry = cast("UnitsRegistry", UnitsRegistry.get_instance()) 

67 return self._units_registry 

68 

69 def _load(self) -> None: 

70 """Load GSS specifications from YAML files.""" 

71 gss_path = self._find_gss_path() 

72 if gss_path: 

73 for yaml_file in gss_path.glob("org.bluetooth.characteristic.*.yaml"): 

74 self._process_gss_file(yaml_file) 

75 self._loaded = True 

76 

77 def _find_gss_path(self) -> Path | None: 

78 """Find the GSS specifications directory.""" 

79 # Try project root location 

80 project_root = Path(__file__).parent.parent.parent.parent 

81 gss_path = project_root / "bluetooth_sig" / "gss" 

82 

83 if gss_path.exists(): 

84 return gss_path 

85 

86 # Try package root location 

87 pkg_root = Path(__file__).parent.parent 

88 gss_path = pkg_root / "bluetooth_sig" / "gss" 

89 

90 return gss_path if gss_path.exists() else None 

91 

92 def _process_gss_file(self, yaml_file: Path) -> None: 

93 """Process a single GSS YAML file and store as typed GssCharacteristicSpec.""" 

94 try: 

95 with yaml_file.open("r", encoding="utf-8") as f: 

96 data = msgspec.yaml.decode(f.read()) 

97 

98 if not data or "characteristic" not in data: 

99 return 

100 

101 char_data = data["characteristic"] 

102 char_name = char_data.get("name") 

103 char_id = char_data.get("identifier") 

104 

105 if not char_name or not char_id: 

106 return 

107 

108 # Parse structure into typed FieldSpec list 

109 raw_structure = char_data.get("structure", []) 

110 typed_structure: list[FieldSpec] = [] 

111 for raw_field in raw_structure: 

112 if isinstance(raw_field, dict): 

113 typed_structure.append( 

114 FieldSpec( 

115 field=raw_field.get("field", ""), 

116 type=raw_field.get("type", ""), 

117 size=str(raw_field.get("size", "")), 

118 description=raw_field.get("description", ""), 

119 ) 

120 ) 

121 

122 # Create typed GssCharacteristicSpec 

123 gss_spec = GssCharacteristicSpec( 

124 identifier=char_id, 

125 name=char_name, 

126 description=char_data.get("description", ""), 

127 structure=typed_structure, 

128 ) 

129 

130 # Store by both ID and name for lookup flexibility 

131 # Normalise keys to lowercase for case-insensitive lookup 

132 if char_id: 

133 self._specs[char_id.lower()] = gss_spec 

134 if char_name: 

135 self._specs[char_name.lower()] = gss_spec 

136 

137 except (msgspec.DecodeError, OSError, KeyError) as e: 

138 logging.warning("Failed to parse GSS YAML file %s: %s", yaml_file, e) 

139 

140 def get_spec(self, identifier: str) -> GssCharacteristicSpec | None: 

141 """Get a GSS specification by name or ID. 

142 

143 Args: 

144 identifier: Characteristic name or ID (case-insensitive) 

145 

146 Returns: 

147 GssCharacteristicSpec if found, None otherwise 

148 """ 

149 self._ensure_loaded() 

150 with self._lock: 

151 return self._specs.get(identifier.lower()) 

152 

153 def get_all_specs(self) -> dict[str, GssCharacteristicSpec]: 

154 """Get all loaded GSS specifications. 

155 

156 Returns: 

157 Dictionary of all specifications keyed by name and ID 

158 """ 

159 self._ensure_loaded() 

160 with self._lock: 

161 return dict(self._specs) 

162 

163 def extract_info_from_gss(self, char_data: dict[str, Any]) -> tuple[str | None, type | None]: 

164 """Extract unit and python_type from GSS characteristic structure. 

165 

166 Args: 

167 char_data: Raw characteristic data from YAML 

168 

169 Returns: 

170 Tuple of (unit_symbol, python_type) or (None, None) if not found 

171 """ 

172 structure = char_data.get("structure", []) 

173 if not isinstance(structure, list) or not structure: 

174 return None, None 

175 

176 typed_structure: list[dict[str, Any]] = [] 

177 for raw_field in structure: 

178 if isinstance(raw_field, dict): 

179 typed_structure.append(cast("dict[str, Any]", raw_field)) 

180 

181 if not typed_structure: 

182 return None, None 

183 

184 unit = None 

185 python_type: type | None = None 

186 

187 for field in typed_structure: 

188 field_dict: dict[str, Any] = field 

189 

190 if not python_type and isinstance(field_dict.get("type"), str): 

191 yaml_type_value = cast("str", field_dict["type"]) 

192 python_type = self._convert_yaml_type_to_python_type(yaml_type_value) 

193 

194 description_value = field_dict.get("description", "") 

195 if not isinstance(description_value, str): 

196 continue 

197 

198 # Extract unit from either "Base Unit:" or "Unit:" format 

199 if not unit and ("Base Unit:" in description_value or "Unit:" in description_value): 

200 unit = self._extract_unit_from_description(description_value) 

201 

202 return unit, python_type 

203 

204 def _extract_unit_from_description(self, description: str) -> str | None: 

205 """Extract unit symbol from GSS field description. 

206 

207 Handles both "Base Unit:" (unit on next line) and "Unit:" (inline) formats. 

208 Strips all spaces from unit IDs to handle YAML formatting issues. 

209 

210 Args: 

211 description: Field description text from GSS YAML 

212 

213 Returns: 

214 Human-readable unit symbol, or None if no unit found 

215 """ 

216 unit_id, _ = self._extract_unit_id_and_line(description) 

217 if unit_id: 

218 return self._convert_bluetooth_unit_to_readable(unit_id) 

219 return None 

220 

221 def _extract_unit_id_and_line(self, description: str) -> tuple[str | None, str | None]: 

222 """Extract raw unit ID and line from GSS field description. 

223 

224 Handles both "Base Unit:" (unit on next line) and "Unit:" (inline) formats. 

225 Strips all spaces from unit IDs to handle YAML formatting issues. 

226 

227 Args: 

228 description: Field description text from GSS YAML 

229 

230 Returns: 

231 Tuple of (unit_id without org.bluetooth.unit prefix, full unit line) 

232 Returns (None, None) if no unit found 

233 """ 

234 unit_line = None 

235 

236 if "Base Unit:" in description: 

237 # Two formats: "Base Unit:\norg.bluetooth.unit.xxx" or "Base Unit: org.bluetooth.unit.xxx" 

238 parts = description.split("Base Unit:")[1].split("\n") 

239 unit_line = parts[0].strip() 

240 if not unit_line and len(parts) > 1: # Unit is on next line 

241 unit_line = parts[1].strip() 

242 elif "Unit:" in description: 

243 # Inline format: "Unit: org.bluetooth.unit.xxx" 

244 unit_line = description.split("Unit:")[1].split("\n", maxsplit=1)[0].strip() 

245 

246 if unit_line and "org.bluetooth.unit." in unit_line: 

247 # Remove all spaces (handles YAML formatting issues) 

248 cleaned_line = unit_line.replace(" ", "") 

249 unit_spec = cleaned_line.split("org.bluetooth.unit.")[1].strip() 

250 return unit_spec, cleaned_line 

251 

252 return None, None 

253 

254 def _convert_yaml_type_to_python_type(self, yaml_type: str) -> type | None: 

255 """Convert YAML wire type string to a Python type.""" 

256 return WIRE_TYPE_MAP.get(yaml_type.lower()) 

257 

258 def _convert_bluetooth_unit_to_readable(self, unit_spec: str) -> str: 

259 """Convert Bluetooth SIG unit specification to human-readable symbol. 

260 

261 Args: 

262 unit_spec: Unit specification from GSS YAML (e.g., "thermodynamic_temperature.degree_celsius") 

263 

264 Returns: 

265 Human-readable unit symbol (e.g., "°C"), or unit_spec if no mapping found 

266 """ 

267 unit_spec = unit_spec.rstrip(".").lower() 

268 unit_id = f"org.bluetooth.unit.{unit_spec}" 

269 

270 units_registry = self._get_units_registry() 

271 unit_info = units_registry.get_info(unit_id) 

272 if unit_info and unit_info.symbol: 

273 return unit_info.symbol 

274 

275 return unit_spec 

276 

277 

278# Singleton instance for convenient access 

279gss_registry = GssRegistry.get_instance()