Coverage for src / bluetooth_sig / registry / gss.py: 92%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 20:14 +0000

1"""GSS (GATT Service Specification) registry. 

2 

3This module provides a registry for Bluetooth SIG GSS YAML files, 

4extracting characteristic specifications with field metadata including 

5units, resolutions, value ranges, and presence conditions. 

6""" 

7 

8from __future__ import annotations 

9 

10import logging 

11from pathlib import Path 

12from typing import Any, cast 

13 

14import msgspec 

15 

16from bluetooth_sig.registry.base import BaseGenericRegistry 

17from bluetooth_sig.registry.uuids.units import UnitsRegistry 

18from bluetooth_sig.types.gatt_enums import DataType 

19from bluetooth_sig.types.registry.gss_characteristic import ( 

20 FieldSpec, 

21 GssCharacteristicSpec, 

22) 

23 

24 

25class GssRegistry(BaseGenericRegistry[GssCharacteristicSpec]): 

26 """Registry for GSS (GATT Service Specification) characteristic definitions. 

27 

28 Parses Bluetooth SIG GSS YAML files and extracts typed characteristic 

29 specifications with full field metadata. Implements singleton pattern 

30 with thread-safe lazy loading. 

31 

32 Example: 

33 registry = GssRegistry.get_instance() 

34 spec = registry.get_spec("Battery Level") 

35 if spec: 

36 for field in spec.structure: 

37 print(f"{field.python_name}: {field.unit_id}") 

38 

39 """ 

40 

41 _instance: GssRegistry | None = None 

42 

43 def __init__(self) -> None: 

44 """Initialize the GSS registry.""" 

45 super().__init__() 

46 self._specs: dict[str, GssCharacteristicSpec] = {} 

47 self._units_registry: UnitsRegistry | None = None 

48 

49 @classmethod 

50 def get_instance(cls) -> GssRegistry: 

51 """Get the singleton instance of the GSS registry.""" 

52 if cls._instance is None: 

53 with cls._lock: 

54 if cls._instance is None: 

55 cls._instance = cls() 

56 return cls._instance 

57 

58 def _get_units_registry(self) -> UnitsRegistry: 

59 """Get or lazily initialize the units registry. 

60 

61 Returns: 

62 The UnitsRegistry singleton instance 

63 """ 

64 if self._units_registry is None: 

65 # Cast needed because get_instance returns base class type 

66 self._units_registry = UnitsRegistry.get_instance() # type: ignore[assignment] 

67 return self._units_registry # type: ignore[return-value] 

68 

69 def _load(self) -> None: 

70 """Load GSS specifications from YAML files.""" 

71 gss_path = self._find_gss_path() 

72 if gss_path: 

73 for yaml_file in gss_path.glob("org.bluetooth.characteristic.*.yaml"): 

74 self._process_gss_file(yaml_file) 

75 self._loaded = True 

76 

77 def _find_gss_path(self) -> Path | None: 

78 """Find the GSS specifications directory.""" 

79 # Try project root location 

80 project_root = Path(__file__).parent.parent.parent.parent 

81 gss_path = project_root / "bluetooth_sig" / "gss" 

82 

83 if gss_path.exists(): 

84 return gss_path 

85 

86 # Try package root location 

87 pkg_root = Path(__file__).parent.parent 

88 gss_path = pkg_root / "bluetooth_sig" / "gss" 

89 

90 return gss_path if gss_path.exists() else None 

91 

92 def _process_gss_file(self, yaml_file: Path) -> None: 

93 """Process a single GSS YAML file and store as typed GssCharacteristicSpec.""" 

94 try: 

95 with yaml_file.open("r", encoding="utf-8") as f: 

96 data = msgspec.yaml.decode(f.read()) 

97 

98 if not data or "characteristic" not in data: 

99 return 

100 

101 char_data = data["characteristic"] 

102 char_name = char_data.get("name") 

103 char_id = char_data.get("identifier") 

104 

105 if not char_name or not char_id: 

106 return 

107 

108 # Parse structure into typed FieldSpec list 

109 raw_structure = char_data.get("structure", []) 

110 typed_structure: list[FieldSpec] = [] 

111 for raw_field in raw_structure: 

112 if isinstance(raw_field, dict): 

113 typed_structure.append( 

114 FieldSpec( 

115 field=raw_field.get("field", ""), 

116 type=raw_field.get("type", ""), 

117 size=str(raw_field.get("size", "")), 

118 description=raw_field.get("description", ""), 

119 ) 

120 ) 

121 

122 # Create typed GssCharacteristicSpec 

123 gss_spec = GssCharacteristicSpec( 

124 identifier=char_id, 

125 name=char_name, 

126 description=char_data.get("description", ""), 

127 structure=typed_structure, 

128 ) 

129 

130 # Store by both ID and name for lookup flexibility 

131 if char_id: 

132 self._specs[char_id] = gss_spec 

133 if char_name: 

134 self._specs[char_name] = gss_spec 

135 

136 except (msgspec.DecodeError, OSError, KeyError) as e: 

137 logging.warning("Failed to parse GSS YAML file %s: %s", yaml_file, e) 

138 

139 def get_spec(self, identifier: str) -> GssCharacteristicSpec | None: 

140 """Get a GSS specification by name or ID. 

141 

142 Args: 

143 identifier: Characteristic name or ID 

144 

145 Returns: 

146 GssCharacteristicSpec if found, None otherwise 

147 """ 

148 self._ensure_loaded() 

149 with self._lock: 

150 return self._specs.get(identifier) 

151 

152 def get_all_specs(self) -> dict[str, GssCharacteristicSpec]: 

153 """Get all loaded GSS specifications. 

154 

155 Returns: 

156 Dictionary of all specifications keyed by name and ID 

157 """ 

158 self._ensure_loaded() 

159 with self._lock: 

160 return dict(self._specs) 

161 

162 def extract_info_from_gss(self, char_data: dict[str, Any]) -> tuple[str | None, str | None]: 

163 """Extract unit and value_type from GSS characteristic structure. 

164 

165 Args: 

166 char_data: Raw characteristic data from YAML 

167 

168 Returns: 

169 Tuple of (unit_symbol, value_type) or (None, None) if not found 

170 """ 

171 structure = char_data.get("structure", []) 

172 if not isinstance(structure, list) or not structure: 

173 return None, None 

174 

175 typed_structure: list[dict[str, Any]] = [] 

176 for raw_field in structure: 

177 if isinstance(raw_field, dict): 

178 typed_structure.append(cast(dict[str, Any], raw_field)) 

179 

180 if not typed_structure: 

181 return None, None 

182 

183 unit = None 

184 value_type = None 

185 

186 for field in typed_structure: 

187 field_dict: dict[str, Any] = field 

188 

189 if not value_type and isinstance(field_dict.get("type"), str): 

190 yaml_type_value = cast(str, field_dict["type"]) 

191 value_type = self._convert_yaml_type_to_python_type(yaml_type_value) 

192 

193 description_value = field_dict.get("description", "") 

194 if not isinstance(description_value, str): 

195 continue 

196 

197 # Extract unit from either "Base Unit:" or "Unit:" format 

198 if not unit and ("Base Unit:" in description_value or "Unit:" in description_value): 

199 unit = self._extract_unit_from_description(description_value) 

200 

201 return unit, value_type 

202 

203 def _extract_unit_from_description(self, description: str) -> str | None: 

204 """Extract unit symbol from GSS field description. 

205 

206 Handles both "Base Unit:" (unit on next line) and "Unit:" (inline) formats. 

207 Strips all spaces from unit IDs to handle YAML formatting issues. 

208 

209 Args: 

210 description: Field description text from GSS YAML 

211 

212 Returns: 

213 Human-readable unit symbol, or None if no unit found 

214 """ 

215 unit_id, _ = self._extract_unit_id_and_line(description) 

216 if unit_id: 

217 return self._convert_bluetooth_unit_to_readable(unit_id) 

218 return None 

219 

220 def _extract_unit_id_and_line(self, description: str) -> tuple[str | None, str | None]: 

221 """Extract raw unit ID and line from GSS field description. 

222 

223 Handles both "Base Unit:" (unit on next line) and "Unit:" (inline) formats. 

224 Strips all spaces from unit IDs to handle YAML formatting issues. 

225 

226 Args: 

227 description: Field description text from GSS YAML 

228 

229 Returns: 

230 Tuple of (unit_id without org.bluetooth.unit prefix, full unit line) 

231 Returns (None, None) if no unit found 

232 """ 

233 unit_line = None 

234 

235 if "Base Unit:" in description: 

236 # Format: "Base Unit:\norg.bluetooth.unit.xxx" or "Base Unit: org.bluetooth.unit.xxx" 

237 parts = description.split("Base Unit:")[1].split("\n") 

238 unit_line = parts[0].strip() 

239 if not unit_line and len(parts) > 1: # Unit is on next line 

240 unit_line = parts[1].strip() 

241 elif "Unit:" in description: 

242 # Format: "Unit: org.bluetooth.unit.xxx" (inline) 

243 unit_line = description.split("Unit:")[1].split("\n")[0].strip() 

244 

245 if unit_line and "org.bluetooth.unit." in unit_line: 

246 # Remove all spaces (handles YAML formatting issues) 

247 cleaned_line = unit_line.replace(" ", "") 

248 unit_spec = cleaned_line.split("org.bluetooth.unit.")[1].strip() 

249 return unit_spec, cleaned_line 

250 

251 return None, None 

252 

253 def _convert_yaml_type_to_python_type(self, yaml_type: str) -> str: 

254 """Convert YAML type to Python type string.""" 

255 return DataType.from_string(yaml_type).to_python_type() 

256 

257 def _convert_bluetooth_unit_to_readable(self, unit_spec: str) -> str: 

258 """Convert Bluetooth SIG unit specification to human-readable symbol. 

259 

260 Args: 

261 unit_spec: Unit specification from GSS YAML (e.g., "thermodynamic_temperature.degree_celsius") 

262 

263 Returns: 

264 Human-readable unit symbol (e.g., "°C"), or unit_spec if no mapping found 

265 """ 

266 unit_spec = unit_spec.rstrip(".").lower() 

267 unit_id = f"org.bluetooth.unit.{unit_spec}" 

268 

269 units_registry = self._get_units_registry() 

270 unit_info = units_registry.get_info(unit_id) 

271 if unit_info and unit_info.symbol: 

272 return unit_info.symbol 

273 

274 return unit_spec 

275 

276 

277# Singleton instance for convenient access 

278gss_registry = GssRegistry.get_instance()