Coverage for src / bluetooth_sig / registry / gss.py: 92%
124 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
1"""GSS (GATT Service Specification) registry.
3This module provides a registry for Bluetooth SIG GSS YAML files,
4extracting characteristic specifications with field metadata including
5units, resolutions, value ranges, and presence conditions.
6"""
8from __future__ import annotations
10import logging
11from pathlib import Path
12from typing import Any, cast
14import msgspec
16from bluetooth_sig.registry.base import BaseGenericRegistry
17from bluetooth_sig.registry.uuids.units import UnitsRegistry
18from bluetooth_sig.types.gatt_enums import DataType
19from bluetooth_sig.types.registry.gss_characteristic import (
20 FieldSpec,
21 GssCharacteristicSpec,
22)
25class GssRegistry(BaseGenericRegistry[GssCharacteristicSpec]):
26 """Registry for GSS (GATT Service Specification) characteristic definitions.
28 Parses Bluetooth SIG GSS YAML files and extracts typed characteristic
29 specifications with full field metadata. Implements singleton pattern
30 with thread-safe lazy loading.
32 Example:
33 registry = GssRegistry.get_instance()
34 spec = registry.get_spec("Battery Level")
35 if spec:
36 for field in spec.structure:
37 print(f"{field.python_name}: {field.unit_id}")
39 """
41 _instance: GssRegistry | None = None
43 def __init__(self) -> None:
44 """Initialize the GSS registry."""
45 super().__init__()
46 self._specs: dict[str, GssCharacteristicSpec] = {}
47 self._units_registry: UnitsRegistry | None = None
49 @classmethod
50 def get_instance(cls) -> GssRegistry:
51 """Get the singleton instance of the GSS registry."""
52 if cls._instance is None:
53 with cls._lock:
54 if cls._instance is None:
55 cls._instance = cls()
56 return cls._instance
58 def _get_units_registry(self) -> UnitsRegistry:
59 """Get or lazily initialize the units registry.
61 Returns:
62 The UnitsRegistry singleton instance
63 """
64 if self._units_registry is None:
65 # Cast needed because get_instance returns base class type
66 self._units_registry = UnitsRegistry.get_instance() # type: ignore[assignment]
67 return self._units_registry # type: ignore[return-value]
69 def _load(self) -> None:
70 """Load GSS specifications from YAML files."""
71 gss_path = self._find_gss_path()
72 if gss_path:
73 for yaml_file in gss_path.glob("org.bluetooth.characteristic.*.yaml"):
74 self._process_gss_file(yaml_file)
75 self._loaded = True
77 def _find_gss_path(self) -> Path | None:
78 """Find the GSS specifications directory."""
79 # Try project root location
80 project_root = Path(__file__).parent.parent.parent.parent
81 gss_path = project_root / "bluetooth_sig" / "gss"
83 if gss_path.exists():
84 return gss_path
86 # Try package root location
87 pkg_root = Path(__file__).parent.parent
88 gss_path = pkg_root / "bluetooth_sig" / "gss"
90 return gss_path if gss_path.exists() else None
92 def _process_gss_file(self, yaml_file: Path) -> None:
93 """Process a single GSS YAML file and store as typed GssCharacteristicSpec."""
94 try:
95 with yaml_file.open("r", encoding="utf-8") as f:
96 data = msgspec.yaml.decode(f.read())
98 if not data or "characteristic" not in data:
99 return
101 char_data = data["characteristic"]
102 char_name = char_data.get("name")
103 char_id = char_data.get("identifier")
105 if not char_name or not char_id:
106 return
108 # Parse structure into typed FieldSpec list
109 raw_structure = char_data.get("structure", [])
110 typed_structure: list[FieldSpec] = []
111 for raw_field in raw_structure:
112 if isinstance(raw_field, dict):
113 typed_structure.append(
114 FieldSpec(
115 field=raw_field.get("field", ""),
116 type=raw_field.get("type", ""),
117 size=str(raw_field.get("size", "")),
118 description=raw_field.get("description", ""),
119 )
120 )
122 # Create typed GssCharacteristicSpec
123 gss_spec = GssCharacteristicSpec(
124 identifier=char_id,
125 name=char_name,
126 description=char_data.get("description", ""),
127 structure=typed_structure,
128 )
130 # Store by both ID and name for lookup flexibility
131 if char_id:
132 self._specs[char_id] = gss_spec
133 if char_name:
134 self._specs[char_name] = gss_spec
136 except (msgspec.DecodeError, OSError, KeyError) as e:
137 logging.warning("Failed to parse GSS YAML file %s: %s", yaml_file, e)
139 def get_spec(self, identifier: str) -> GssCharacteristicSpec | None:
140 """Get a GSS specification by name or ID.
142 Args:
143 identifier: Characteristic name or ID
145 Returns:
146 GssCharacteristicSpec if found, None otherwise
147 """
148 self._ensure_loaded()
149 with self._lock:
150 return self._specs.get(identifier)
152 def get_all_specs(self) -> dict[str, GssCharacteristicSpec]:
153 """Get all loaded GSS specifications.
155 Returns:
156 Dictionary of all specifications keyed by name and ID
157 """
158 self._ensure_loaded()
159 with self._lock:
160 return dict(self._specs)
162 def extract_info_from_gss(self, char_data: dict[str, Any]) -> tuple[str | None, str | None]:
163 """Extract unit and value_type from GSS characteristic structure.
165 Args:
166 char_data: Raw characteristic data from YAML
168 Returns:
169 Tuple of (unit_symbol, value_type) or (None, None) if not found
170 """
171 structure = char_data.get("structure", [])
172 if not isinstance(structure, list) or not structure:
173 return None, None
175 typed_structure: list[dict[str, Any]] = []
176 for raw_field in structure:
177 if isinstance(raw_field, dict):
178 typed_structure.append(cast(dict[str, Any], raw_field))
180 if not typed_structure:
181 return None, None
183 unit = None
184 value_type = None
186 for field in typed_structure:
187 field_dict: dict[str, Any] = field
189 if not value_type and isinstance(field_dict.get("type"), str):
190 yaml_type_value = cast(str, field_dict["type"])
191 value_type = self._convert_yaml_type_to_python_type(yaml_type_value)
193 description_value = field_dict.get("description", "")
194 if not isinstance(description_value, str):
195 continue
197 # Extract unit from either "Base Unit:" or "Unit:" format
198 if not unit and ("Base Unit:" in description_value or "Unit:" in description_value):
199 unit = self._extract_unit_from_description(description_value)
201 return unit, value_type
203 def _extract_unit_from_description(self, description: str) -> str | None:
204 """Extract unit symbol from GSS field description.
206 Handles both "Base Unit:" (unit on next line) and "Unit:" (inline) formats.
207 Strips all spaces from unit IDs to handle YAML formatting issues.
209 Args:
210 description: Field description text from GSS YAML
212 Returns:
213 Human-readable unit symbol, or None if no unit found
214 """
215 unit_id, _ = self._extract_unit_id_and_line(description)
216 if unit_id:
217 return self._convert_bluetooth_unit_to_readable(unit_id)
218 return None
220 def _extract_unit_id_and_line(self, description: str) -> tuple[str | None, str | None]:
221 """Extract raw unit ID and line from GSS field description.
223 Handles both "Base Unit:" (unit on next line) and "Unit:" (inline) formats.
224 Strips all spaces from unit IDs to handle YAML formatting issues.
226 Args:
227 description: Field description text from GSS YAML
229 Returns:
230 Tuple of (unit_id without org.bluetooth.unit prefix, full unit line)
231 Returns (None, None) if no unit found
232 """
233 unit_line = None
235 if "Base Unit:" in description:
236 # Format: "Base Unit:\norg.bluetooth.unit.xxx" or "Base Unit: org.bluetooth.unit.xxx"
237 parts = description.split("Base Unit:")[1].split("\n")
238 unit_line = parts[0].strip()
239 if not unit_line and len(parts) > 1: # Unit is on next line
240 unit_line = parts[1].strip()
241 elif "Unit:" in description:
242 # Format: "Unit: org.bluetooth.unit.xxx" (inline)
243 unit_line = description.split("Unit:")[1].split("\n")[0].strip()
245 if unit_line and "org.bluetooth.unit." in unit_line:
246 # Remove all spaces (handles YAML formatting issues)
247 cleaned_line = unit_line.replace(" ", "")
248 unit_spec = cleaned_line.split("org.bluetooth.unit.")[1].strip()
249 return unit_spec, cleaned_line
251 return None, None
253 def _convert_yaml_type_to_python_type(self, yaml_type: str) -> str:
254 """Convert YAML type to Python type string."""
255 return DataType.from_string(yaml_type).to_python_type()
257 def _convert_bluetooth_unit_to_readable(self, unit_spec: str) -> str:
258 """Convert Bluetooth SIG unit specification to human-readable symbol.
260 Args:
261 unit_spec: Unit specification from GSS YAML (e.g., "thermodynamic_temperature.degree_celsius")
263 Returns:
264 Human-readable unit symbol (e.g., "°C"), or unit_spec if no mapping found
265 """
266 unit_spec = unit_spec.rstrip(".").lower()
267 unit_id = f"org.bluetooth.unit.{unit_spec}"
269 units_registry = self._get_units_registry()
270 unit_info = units_registry.get_info(unit_id)
271 if unit_info and unit_info.symbol:
272 return unit_info.symbol
274 return unit_spec
277# Singleton instance for convenient access
278gss_registry = GssRegistry.get_instance()