Coverage for src/bluetooth_sig/registry/gss.py: 91%

117 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-06-28 01:26 +0000

1"""GSS (GATT Service Specification) registry. 

2 

3This module provides a registry for Bluetooth SIG GSS YAML files, 

4extracting characteristic specifications with field metadata including 

5units, resolutions, value ranges, and presence conditions. 

6""" 

7 

8from __future__ import annotations 

9 

10import logging 

11from pathlib import Path 

12from typing import Any, cast 

13 

14import msgspec 

15 

16from bluetooth_sig.registry.base import BaseGenericRegistry 

17from bluetooth_sig.registry.uuids.units import UnitsRegistry 

18from bluetooth_sig.types.gatt_enums import WIRE_TYPE_MAP 

19from bluetooth_sig.types.registry.gss_characteristic import ( 

20 FieldSpec, 

21 GssCharacteristicSpec, 

22) 

23 

24 

25class GssRegistry(BaseGenericRegistry[GssCharacteristicSpec]): 

26 """Registry for GSS (GATT Service Specification) characteristic definitions. 

27 

28 Parses Bluetooth SIG GSS YAML files and extracts typed characteristic 

29 specifications with full field metadata. Implements singleton pattern 

30 with thread-safe lazy loading. 

31 

32 Example:: 

33 registry = GssRegistry.get_instance() 

34 spec = registry.get_spec("Battery Level") 

35 if spec: 

36 for field in spec.structure: 

37 print(f"{field.python_name}: {field.unit_id}") 

38 

39 """ 

40 

41 def __init__(self) -> None: 

42 """Initialize the GSS registry.""" 

43 super().__init__() 

44 self._specs: dict[str, GssCharacteristicSpec] = {} 

45 self._units_registry: UnitsRegistry | None = None 

46 

47 def _get_units_registry(self) -> UnitsRegistry: 

48 """Get or lazily initialize the units registry. 

49 

50 Returns: 

51 The UnitsRegistry singleton instance 

52 """ 

53 if self._units_registry is None: 

54 self._units_registry = UnitsRegistry.get_instance() 

55 return self._units_registry 

56 

57 def _load(self) -> None: 

58 """Load GSS specifications from YAML files.""" 

59 gss_path = self._find_gss_path() 

60 if gss_path: 

61 for yaml_file in gss_path.glob("org.bluetooth.characteristic.*.yaml"): 

62 self._process_gss_file(yaml_file) 

63 self._loaded = True 

64 

65 def _find_gss_path(self) -> Path | None: 

66 """Find the GSS specifications directory.""" 

67 # Try project root location 

68 project_root = Path(__file__).parent.parent.parent.parent 

69 gss_path = project_root / "bluetooth_sig" / "gss" 

70 

71 if gss_path.exists(): 

72 return gss_path 

73 

74 # Try package root location 

75 pkg_root = Path(__file__).parent.parent 

76 gss_path = pkg_root / "bluetooth_sig" / "gss" 

77 

78 return gss_path if gss_path.exists() else None 

79 

80 def _process_gss_file(self, yaml_file: Path) -> None: 

81 """Process a single GSS YAML file and store as typed GssCharacteristicSpec.""" 

82 try: 

83 with yaml_file.open("r", encoding="utf-8") as f: 

84 data = msgspec.yaml.decode(f.read()) 

85 

86 if not data or "characteristic" not in data: 

87 return 

88 

89 char_data = data["characteristic"] 

90 char_name = char_data.get("name") 

91 char_id = char_data.get("identifier") 

92 

93 if not char_name or not char_id: 

94 return 

95 

96 # Parse structure into typed FieldSpec list 

97 raw_structure = char_data.get("structure", []) 

98 typed_structure: list[FieldSpec] = [] 

99 for raw_field in raw_structure: 

100 if isinstance(raw_field, dict): 

101 typed_structure.append( 

102 FieldSpec( 

103 field=raw_field.get("field", ""), 

104 type=raw_field.get("type", ""), 

105 size=str(raw_field.get("size", "")), 

106 description=raw_field.get("description", ""), 

107 ) 

108 ) 

109 

110 # Create typed GssCharacteristicSpec 

111 gss_spec = GssCharacteristicSpec( 

112 identifier=char_id, 

113 name=char_name, 

114 description=char_data.get("description", ""), 

115 structure=typed_structure, 

116 ) 

117 

118 # Store by both ID and name for lookup flexibility 

119 # Normalise keys to lowercase for case-insensitive lookup 

120 if char_id: 

121 self._specs[char_id.lower()] = gss_spec 

122 if char_name: 

123 self._specs[char_name.lower()] = gss_spec 

124 

125 except (msgspec.DecodeError, OSError, KeyError) as e: 

126 logging.warning("Failed to parse GSS YAML file %s: %s", yaml_file, e) 

127 

128 def get_spec(self, identifier: str) -> GssCharacteristicSpec | None: 

129 """Get a GSS specification by name or ID. 

130 

131 Args: 

132 identifier: Characteristic name or ID (case-insensitive) 

133 

134 Returns: 

135 GssCharacteristicSpec if found, None otherwise 

136 """ 

137 self._ensure_loaded() 

138 with self._lock: 

139 return self._specs.get(identifier.lower()) 

140 

141 def get_all_specs(self) -> dict[str, GssCharacteristicSpec]: 

142 """Get all loaded GSS specifications. 

143 

144 Returns: 

145 Dictionary of all specifications keyed by name and ID 

146 """ 

147 self._ensure_loaded() 

148 with self._lock: 

149 return dict(self._specs) 

150 

151 def extract_info_from_gss(self, char_data: dict[str, Any]) -> tuple[str | None, type | None]: 

152 """Extract unit and python_type from GSS characteristic structure. 

153 

154 Args: 

155 char_data: Raw characteristic data from YAML 

156 

157 Returns: 

158 Tuple of (unit_symbol, python_type) or (None, None) if not found 

159 """ 

160 structure = char_data.get("structure", []) 

161 if not isinstance(structure, list) or not structure: 

162 return None, None 

163 

164 typed_structure: list[dict[str, Any]] = [] 

165 for raw_field in structure: 

166 if isinstance(raw_field, dict): 

167 typed_structure.append(cast("dict[str, Any]", raw_field)) 

168 

169 if not typed_structure: 

170 return None, None 

171 

172 unit = None 

173 python_type: type | None = None 

174 

175 for field in typed_structure: 

176 field_dict: dict[str, Any] = field 

177 

178 if not python_type and isinstance(field_dict.get("type"), str): 

179 yaml_type_value = cast("str", field_dict["type"]) 

180 python_type = self._convert_yaml_type_to_python_type(yaml_type_value) 

181 

182 description_value = field_dict.get("description", "") 

183 if not isinstance(description_value, str): 

184 continue 

185 

186 # Extract unit from either "Base Unit:" or "Unit:" format 

187 if not unit and ("Base Unit:" in description_value or "Unit:" in description_value): 

188 unit = self._extract_unit_from_description(description_value) 

189 

190 return unit, python_type 

191 

192 def _extract_unit_from_description(self, description: str) -> str | None: 

193 """Extract unit symbol from GSS field description. 

194 

195 Handles both "Base Unit:" (unit on next line) and "Unit:" (inline) formats. 

196 Strips all spaces from unit IDs to handle YAML formatting issues. 

197 

198 Args: 

199 description: Field description text from GSS YAML 

200 

201 Returns: 

202 Human-readable unit symbol, or None if no unit found 

203 """ 

204 unit_id, _ = self._extract_unit_id_and_line(description) 

205 if unit_id: 

206 return self._convert_bluetooth_unit_to_readable(unit_id) 

207 return None 

208 

209 def _extract_unit_id_and_line(self, description: str) -> tuple[str | None, str | None]: 

210 """Extract raw unit ID and line from GSS field description. 

211 

212 Handles both "Base Unit:" (unit on next line) and "Unit:" (inline) formats. 

213 Strips all spaces from unit IDs to handle YAML formatting issues. 

214 

215 Args: 

216 description: Field description text from GSS YAML 

217 

218 Returns: 

219 Tuple of (unit_id without org.bluetooth.unit prefix, full unit line) 

220 Returns (None, None) if no unit found 

221 """ 

222 unit_line = None 

223 

224 if "Base Unit:" in description: 

225 # Two formats: "Base Unit:\norg.bluetooth.unit.xxx" or "Base Unit: org.bluetooth.unit.xxx" 

226 parts = description.split("Base Unit:")[1].split("\n") 

227 unit_line = parts[0].strip() 

228 if not unit_line and len(parts) > 1: # Unit is on next line 

229 unit_line = parts[1].strip() 

230 elif "Unit:" in description: 

231 # Inline format: "Unit: org.bluetooth.unit.xxx" 

232 unit_line = description.split("Unit:")[1].split("\n", maxsplit=1)[0].strip() 

233 

234 if unit_line and "org.bluetooth.unit." in unit_line: 

235 # Remove all spaces (handles YAML formatting issues) 

236 cleaned_line = unit_line.replace(" ", "") 

237 unit_spec = cleaned_line.split("org.bluetooth.unit.")[1].strip() 

238 return unit_spec, cleaned_line 

239 

240 return None, None 

241 

242 def _convert_yaml_type_to_python_type(self, yaml_type: str) -> type | None: 

243 """Convert YAML wire type string to a Python type.""" 

244 return WIRE_TYPE_MAP.get(yaml_type.lower()) 

245 

246 def _convert_bluetooth_unit_to_readable(self, unit_spec: str) -> str: 

247 """Convert Bluetooth SIG unit specification to human-readable symbol. 

248 

249 Args: 

250 unit_spec: Unit specification from GSS YAML (e.g., "thermodynamic_temperature.degree_celsius") 

251 

252 Returns: 

253 Human-readable unit symbol (e.g., "°C"), or unit_spec if no mapping found 

254 """ 

255 unit_spec = unit_spec.rstrip(".").lower() 

256 unit_id = f"org.bluetooth.unit.{unit_spec}" 

257 

258 units_registry = self._get_units_registry() 

259 unit_info = units_registry.get_info(unit_id) 

260 if unit_info and unit_info.symbol: 

261 return unit_info.symbol 

262 

263 return unit_spec 

264 

265 

266# Singleton instance for convenient access 

267def get_gss_registry() -> GssRegistry: 

268 """Return the process-wide gss_registry singleton instance.""" 

269 return GssRegistry.get_instance()