Coverage for src / bluetooth_sig / registry / gss.py: 92%
124 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""GSS (GATT Service Specification) registry.
3This module provides a registry for Bluetooth SIG GSS YAML files,
4extracting characteristic specifications with field metadata including
5units, resolutions, value ranges, and presence conditions.
6"""
8from __future__ import annotations
10import logging
11from pathlib import Path
12from typing import Any, cast
14import msgspec
16from bluetooth_sig.registry.base import BaseGenericRegistry
17from bluetooth_sig.registry.uuids.units import UnitsRegistry
18from bluetooth_sig.types.gatt_enums import WIRE_TYPE_MAP
19from bluetooth_sig.types.registry.gss_characteristic import (
20 FieldSpec,
21 GssCharacteristicSpec,
22)
25class GssRegistry(BaseGenericRegistry[GssCharacteristicSpec]):
26 """Registry for GSS (GATT Service Specification) characteristic definitions.
28 Parses Bluetooth SIG GSS YAML files and extracts typed characteristic
29 specifications with full field metadata. Implements singleton pattern
30 with thread-safe lazy loading.
32 Example::
33 registry = GssRegistry.get_instance()
34 spec = registry.get_spec("Battery Level")
35 if spec:
36 for field in spec.structure:
37 print(f"{field.python_name}: {field.unit_id}")
39 """
41 _instance: GssRegistry | None = None
43 def __init__(self) -> None:
44 """Initialize the GSS registry."""
45 super().__init__()
46 self._specs: dict[str, GssCharacteristicSpec] = {}
47 self._units_registry: UnitsRegistry | None = None
49 @classmethod
50 def get_instance(cls) -> GssRegistry:
51 """Get the singleton instance of the GSS registry."""
52 if cls._instance is None:
53 with cls._lock:
54 if cls._instance is None:
55 cls._instance = cls()
56 return cls._instance
58 def _get_units_registry(self) -> UnitsRegistry:
59 """Get or lazily initialize the units registry.
61 Returns:
62 The UnitsRegistry singleton instance
63 """
64 if self._units_registry is None:
65 # get_instance() returns BaseUUIDRegistry; narrow to concrete subclass
66 self._units_registry = cast("UnitsRegistry", UnitsRegistry.get_instance())
67 return self._units_registry
69 def _load(self) -> None:
70 """Load GSS specifications from YAML files."""
71 gss_path = self._find_gss_path()
72 if gss_path:
73 for yaml_file in gss_path.glob("org.bluetooth.characteristic.*.yaml"):
74 self._process_gss_file(yaml_file)
75 self._loaded = True
77 def _find_gss_path(self) -> Path | None:
78 """Find the GSS specifications directory."""
79 # Try project root location
80 project_root = Path(__file__).parent.parent.parent.parent
81 gss_path = project_root / "bluetooth_sig" / "gss"
83 if gss_path.exists():
84 return gss_path
86 # Try package root location
87 pkg_root = Path(__file__).parent.parent
88 gss_path = pkg_root / "bluetooth_sig" / "gss"
90 return gss_path if gss_path.exists() else None
92 def _process_gss_file(self, yaml_file: Path) -> None:
93 """Process a single GSS YAML file and store as typed GssCharacteristicSpec."""
94 try:
95 with yaml_file.open("r", encoding="utf-8") as f:
96 data = msgspec.yaml.decode(f.read())
98 if not data or "characteristic" not in data:
99 return
101 char_data = data["characteristic"]
102 char_name = char_data.get("name")
103 char_id = char_data.get("identifier")
105 if not char_name or not char_id:
106 return
108 # Parse structure into typed FieldSpec list
109 raw_structure = char_data.get("structure", [])
110 typed_structure: list[FieldSpec] = []
111 for raw_field in raw_structure:
112 if isinstance(raw_field, dict):
113 typed_structure.append(
114 FieldSpec(
115 field=raw_field.get("field", ""),
116 type=raw_field.get("type", ""),
117 size=str(raw_field.get("size", "")),
118 description=raw_field.get("description", ""),
119 )
120 )
122 # Create typed GssCharacteristicSpec
123 gss_spec = GssCharacteristicSpec(
124 identifier=char_id,
125 name=char_name,
126 description=char_data.get("description", ""),
127 structure=typed_structure,
128 )
130 # Store by both ID and name for lookup flexibility
131 # Normalise keys to lowercase for case-insensitive lookup
132 if char_id:
133 self._specs[char_id.lower()] = gss_spec
134 if char_name:
135 self._specs[char_name.lower()] = gss_spec
137 except (msgspec.DecodeError, OSError, KeyError) as e:
138 logging.warning("Failed to parse GSS YAML file %s: %s", yaml_file, e)
140 def get_spec(self, identifier: str) -> GssCharacteristicSpec | None:
141 """Get a GSS specification by name or ID.
143 Args:
144 identifier: Characteristic name or ID (case-insensitive)
146 Returns:
147 GssCharacteristicSpec if found, None otherwise
148 """
149 self._ensure_loaded()
150 with self._lock:
151 return self._specs.get(identifier.lower())
153 def get_all_specs(self) -> dict[str, GssCharacteristicSpec]:
154 """Get all loaded GSS specifications.
156 Returns:
157 Dictionary of all specifications keyed by name and ID
158 """
159 self._ensure_loaded()
160 with self._lock:
161 return dict(self._specs)
163 def extract_info_from_gss(self, char_data: dict[str, Any]) -> tuple[str | None, type | None]:
164 """Extract unit and python_type from GSS characteristic structure.
166 Args:
167 char_data: Raw characteristic data from YAML
169 Returns:
170 Tuple of (unit_symbol, python_type) or (None, None) if not found
171 """
172 structure = char_data.get("structure", [])
173 if not isinstance(structure, list) or not structure:
174 return None, None
176 typed_structure: list[dict[str, Any]] = []
177 for raw_field in structure:
178 if isinstance(raw_field, dict):
179 typed_structure.append(cast("dict[str, Any]", raw_field))
181 if not typed_structure:
182 return None, None
184 unit = None
185 python_type: type | None = None
187 for field in typed_structure:
188 field_dict: dict[str, Any] = field
190 if not python_type and isinstance(field_dict.get("type"), str):
191 yaml_type_value = cast("str", field_dict["type"])
192 python_type = self._convert_yaml_type_to_python_type(yaml_type_value)
194 description_value = field_dict.get("description", "")
195 if not isinstance(description_value, str):
196 continue
198 # Extract unit from either "Base Unit:" or "Unit:" format
199 if not unit and ("Base Unit:" in description_value or "Unit:" in description_value):
200 unit = self._extract_unit_from_description(description_value)
202 return unit, python_type
204 def _extract_unit_from_description(self, description: str) -> str | None:
205 """Extract unit symbol from GSS field description.
207 Handles both "Base Unit:" (unit on next line) and "Unit:" (inline) formats.
208 Strips all spaces from unit IDs to handle YAML formatting issues.
210 Args:
211 description: Field description text from GSS YAML
213 Returns:
214 Human-readable unit symbol, or None if no unit found
215 """
216 unit_id, _ = self._extract_unit_id_and_line(description)
217 if unit_id:
218 return self._convert_bluetooth_unit_to_readable(unit_id)
219 return None
221 def _extract_unit_id_and_line(self, description: str) -> tuple[str | None, str | None]:
222 """Extract raw unit ID and line from GSS field description.
224 Handles both "Base Unit:" (unit on next line) and "Unit:" (inline) formats.
225 Strips all spaces from unit IDs to handle YAML formatting issues.
227 Args:
228 description: Field description text from GSS YAML
230 Returns:
231 Tuple of (unit_id without org.bluetooth.unit prefix, full unit line)
232 Returns (None, None) if no unit found
233 """
234 unit_line = None
236 if "Base Unit:" in description:
237 # Two formats: "Base Unit:\norg.bluetooth.unit.xxx" or "Base Unit: org.bluetooth.unit.xxx"
238 parts = description.split("Base Unit:")[1].split("\n")
239 unit_line = parts[0].strip()
240 if not unit_line and len(parts) > 1: # Unit is on next line
241 unit_line = parts[1].strip()
242 elif "Unit:" in description:
243 # Inline format: "Unit: org.bluetooth.unit.xxx"
244 unit_line = description.split("Unit:")[1].split("\n", maxsplit=1)[0].strip()
246 if unit_line and "org.bluetooth.unit." in unit_line:
247 # Remove all spaces (handles YAML formatting issues)
248 cleaned_line = unit_line.replace(" ", "")
249 unit_spec = cleaned_line.split("org.bluetooth.unit.")[1].strip()
250 return unit_spec, cleaned_line
252 return None, None
254 def _convert_yaml_type_to_python_type(self, yaml_type: str) -> type | None:
255 """Convert YAML wire type string to a Python type."""
256 return WIRE_TYPE_MAP.get(yaml_type.lower())
258 def _convert_bluetooth_unit_to_readable(self, unit_spec: str) -> str:
259 """Convert Bluetooth SIG unit specification to human-readable symbol.
261 Args:
262 unit_spec: Unit specification from GSS YAML (e.g., "thermodynamic_temperature.degree_celsius")
264 Returns:
265 Human-readable unit symbol (e.g., "°C"), or unit_spec if no mapping found
266 """
267 unit_spec = unit_spec.rstrip(".").lower()
268 unit_id = f"org.bluetooth.unit.{unit_spec}"
270 units_registry = self._get_units_registry()
271 unit_info = units_registry.get_info(unit_id)
272 if unit_info and unit_info.symbol:
273 return unit_info.symbol
275 return unit_spec
278# Singleton instance for convenient access
279gss_registry = GssRegistry.get_instance()