Coverage for src/bluetooth_sig/registry/gss.py: 91%
117 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 01:26 +0000
« prev ^ index » next coverage.py v7.14.3, created at 2026-06-28 01:26 +0000
1"""GSS (GATT Service Specification) registry.
3This module provides a registry for Bluetooth SIG GSS YAML files,
4extracting characteristic specifications with field metadata including
5units, resolutions, value ranges, and presence conditions.
6"""
8from __future__ import annotations
10import logging
11from pathlib import Path
12from typing import Any, cast
14import msgspec
16from bluetooth_sig.registry.base import BaseGenericRegistry
17from bluetooth_sig.registry.uuids.units import UnitsRegistry
18from bluetooth_sig.types.gatt_enums import WIRE_TYPE_MAP
19from bluetooth_sig.types.registry.gss_characteristic import (
20 FieldSpec,
21 GssCharacteristicSpec,
22)
25class GssRegistry(BaseGenericRegistry[GssCharacteristicSpec]):
26 """Registry for GSS (GATT Service Specification) characteristic definitions.
28 Parses Bluetooth SIG GSS YAML files and extracts typed characteristic
29 specifications with full field metadata. Implements singleton pattern
30 with thread-safe lazy loading.
32 Example::
33 registry = GssRegistry.get_instance()
34 spec = registry.get_spec("Battery Level")
35 if spec:
36 for field in spec.structure:
37 print(f"{field.python_name}: {field.unit_id}")
39 """
41 def __init__(self) -> None:
42 """Initialize the GSS registry."""
43 super().__init__()
44 self._specs: dict[str, GssCharacteristicSpec] = {}
45 self._units_registry: UnitsRegistry | None = None
47 def _get_units_registry(self) -> UnitsRegistry:
48 """Get or lazily initialize the units registry.
50 Returns:
51 The UnitsRegistry singleton instance
52 """
53 if self._units_registry is None:
54 self._units_registry = UnitsRegistry.get_instance()
55 return self._units_registry
57 def _load(self) -> None:
58 """Load GSS specifications from YAML files."""
59 gss_path = self._find_gss_path()
60 if gss_path:
61 for yaml_file in gss_path.glob("org.bluetooth.characteristic.*.yaml"):
62 self._process_gss_file(yaml_file)
63 self._loaded = True
65 def _find_gss_path(self) -> Path | None:
66 """Find the GSS specifications directory."""
67 # Try project root location
68 project_root = Path(__file__).parent.parent.parent.parent
69 gss_path = project_root / "bluetooth_sig" / "gss"
71 if gss_path.exists():
72 return gss_path
74 # Try package root location
75 pkg_root = Path(__file__).parent.parent
76 gss_path = pkg_root / "bluetooth_sig" / "gss"
78 return gss_path if gss_path.exists() else None
80 def _process_gss_file(self, yaml_file: Path) -> None:
81 """Process a single GSS YAML file and store as typed GssCharacteristicSpec."""
82 try:
83 with yaml_file.open("r", encoding="utf-8") as f:
84 data = msgspec.yaml.decode(f.read())
86 if not data or "characteristic" not in data:
87 return
89 char_data = data["characteristic"]
90 char_name = char_data.get("name")
91 char_id = char_data.get("identifier")
93 if not char_name or not char_id:
94 return
96 # Parse structure into typed FieldSpec list
97 raw_structure = char_data.get("structure", [])
98 typed_structure: list[FieldSpec] = []
99 for raw_field in raw_structure:
100 if isinstance(raw_field, dict):
101 typed_structure.append(
102 FieldSpec(
103 field=raw_field.get("field", ""),
104 type=raw_field.get("type", ""),
105 size=str(raw_field.get("size", "")),
106 description=raw_field.get("description", ""),
107 )
108 )
110 # Create typed GssCharacteristicSpec
111 gss_spec = GssCharacteristicSpec(
112 identifier=char_id,
113 name=char_name,
114 description=char_data.get("description", ""),
115 structure=typed_structure,
116 )
118 # Store by both ID and name for lookup flexibility
119 # Normalise keys to lowercase for case-insensitive lookup
120 if char_id:
121 self._specs[char_id.lower()] = gss_spec
122 if char_name:
123 self._specs[char_name.lower()] = gss_spec
125 except (msgspec.DecodeError, OSError, KeyError) as e:
126 logging.warning("Failed to parse GSS YAML file %s: %s", yaml_file, e)
128 def get_spec(self, identifier: str) -> GssCharacteristicSpec | None:
129 """Get a GSS specification by name or ID.
131 Args:
132 identifier: Characteristic name or ID (case-insensitive)
134 Returns:
135 GssCharacteristicSpec if found, None otherwise
136 """
137 self._ensure_loaded()
138 with self._lock:
139 return self._specs.get(identifier.lower())
141 def get_all_specs(self) -> dict[str, GssCharacteristicSpec]:
142 """Get all loaded GSS specifications.
144 Returns:
145 Dictionary of all specifications keyed by name and ID
146 """
147 self._ensure_loaded()
148 with self._lock:
149 return dict(self._specs)
151 def extract_info_from_gss(self, char_data: dict[str, Any]) -> tuple[str | None, type | None]:
152 """Extract unit and python_type from GSS characteristic structure.
154 Args:
155 char_data: Raw characteristic data from YAML
157 Returns:
158 Tuple of (unit_symbol, python_type) or (None, None) if not found
159 """
160 structure = char_data.get("structure", [])
161 if not isinstance(structure, list) or not structure:
162 return None, None
164 typed_structure: list[dict[str, Any]] = []
165 for raw_field in structure:
166 if isinstance(raw_field, dict):
167 typed_structure.append(cast("dict[str, Any]", raw_field))
169 if not typed_structure:
170 return None, None
172 unit = None
173 python_type: type | None = None
175 for field in typed_structure:
176 field_dict: dict[str, Any] = field
178 if not python_type and isinstance(field_dict.get("type"), str):
179 yaml_type_value = cast("str", field_dict["type"])
180 python_type = self._convert_yaml_type_to_python_type(yaml_type_value)
182 description_value = field_dict.get("description", "")
183 if not isinstance(description_value, str):
184 continue
186 # Extract unit from either "Base Unit:" or "Unit:" format
187 if not unit and ("Base Unit:" in description_value or "Unit:" in description_value):
188 unit = self._extract_unit_from_description(description_value)
190 return unit, python_type
192 def _extract_unit_from_description(self, description: str) -> str | None:
193 """Extract unit symbol from GSS field description.
195 Handles both "Base Unit:" (unit on next line) and "Unit:" (inline) formats.
196 Strips all spaces from unit IDs to handle YAML formatting issues.
198 Args:
199 description: Field description text from GSS YAML
201 Returns:
202 Human-readable unit symbol, or None if no unit found
203 """
204 unit_id, _ = self._extract_unit_id_and_line(description)
205 if unit_id:
206 return self._convert_bluetooth_unit_to_readable(unit_id)
207 return None
209 def _extract_unit_id_and_line(self, description: str) -> tuple[str | None, str | None]:
210 """Extract raw unit ID and line from GSS field description.
212 Handles both "Base Unit:" (unit on next line) and "Unit:" (inline) formats.
213 Strips all spaces from unit IDs to handle YAML formatting issues.
215 Args:
216 description: Field description text from GSS YAML
218 Returns:
219 Tuple of (unit_id without org.bluetooth.unit prefix, full unit line)
220 Returns (None, None) if no unit found
221 """
222 unit_line = None
224 if "Base Unit:" in description:
225 # Two formats: "Base Unit:\norg.bluetooth.unit.xxx" or "Base Unit: org.bluetooth.unit.xxx"
226 parts = description.split("Base Unit:")[1].split("\n")
227 unit_line = parts[0].strip()
228 if not unit_line and len(parts) > 1: # Unit is on next line
229 unit_line = parts[1].strip()
230 elif "Unit:" in description:
231 # Inline format: "Unit: org.bluetooth.unit.xxx"
232 unit_line = description.split("Unit:")[1].split("\n", maxsplit=1)[0].strip()
234 if unit_line and "org.bluetooth.unit." in unit_line:
235 # Remove all spaces (handles YAML formatting issues)
236 cleaned_line = unit_line.replace(" ", "")
237 unit_spec = cleaned_line.split("org.bluetooth.unit.")[1].strip()
238 return unit_spec, cleaned_line
240 return None, None
242 def _convert_yaml_type_to_python_type(self, yaml_type: str) -> type | None:
243 """Convert YAML wire type string to a Python type."""
244 return WIRE_TYPE_MAP.get(yaml_type.lower())
246 def _convert_bluetooth_unit_to_readable(self, unit_spec: str) -> str:
247 """Convert Bluetooth SIG unit specification to human-readable symbol.
249 Args:
250 unit_spec: Unit specification from GSS YAML (e.g., "thermodynamic_temperature.degree_celsius")
252 Returns:
253 Human-readable unit symbol (e.g., "°C"), or unit_spec if no mapping found
254 """
255 unit_spec = unit_spec.rstrip(".").lower()
256 unit_id = f"org.bluetooth.unit.{unit_spec}"
258 units_registry = self._get_units_registry()
259 unit_info = units_registry.get_info(unit_id)
260 if unit_info and unit_info.symbol:
261 return unit_info.symbol
263 return unit_spec
266# Singleton instance for convenient access
267def get_gss_registry() -> GssRegistry:
268 """Return the process-wide gss_registry singleton instance."""
269 return GssRegistry.get_instance()