Coverage for src / bluetooth_sig / gatt / uuid_registry.py: 88%
308 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""UUID registry loading from Bluetooth SIG YAML files."""
3from __future__ import annotations
5import contextlib
6import logging
7import threading
9from bluetooth_sig.registry.gss import GssRegistry
10from bluetooth_sig.registry.uuids.units import UnitsRegistry
11from bluetooth_sig.types import CharacteristicInfo, ServiceInfo
12from bluetooth_sig.types.base_types import SIGInfo
13from bluetooth_sig.types.registry.descriptor_types import DescriptorInfo
14from bluetooth_sig.types.registry.gss_characteristic import GssCharacteristicSpec
15from bluetooth_sig.types.uuid import BluetoothUUID
17from ..registry.utils import find_bluetooth_sig_path, load_yaml_uuids, normalize_uuid_string
18from ..types.registry import CharacteristicSpec, FieldInfo, UnitMetadata
20__all__ = [
21 "UuidRegistry",
22 "uuid_registry",
23]
25logger = logging.getLogger(__name__)
28class UuidRegistry: # pylint: disable=too-many-instance-attributes
29 """Registry for Bluetooth SIG UUIDs with canonical storage + alias indices.
31 This registry stores a number of internal caches and mappings which
32 legitimately exceed the default pylint instance attribute limit. The
33 complexity is intentional and centralised; an inline disable is used to
34 avoid noisy global configuration changes.
35 """
37 def __init__(self) -> None:
38 """Initialize the UUID registry."""
39 self._lock = threading.RLock()
41 # Canonical storage: normalized_uuid -> domain types (single source of truth)
42 self._services: dict[str, ServiceInfo] = {}
43 self._characteristics: dict[str, CharacteristicInfo] = {}
44 self._descriptors: dict[str, DescriptorInfo] = {}
46 # Lightweight alias indices: alias -> normalized_uuid
47 self._service_aliases: dict[str, str] = {}
48 self._characteristic_aliases: dict[str, str] = {}
49 self._descriptor_aliases: dict[str, str] = {}
51 # Preserve SIG entries overridden at runtime so we can restore them
52 self._service_overrides: dict[str, ServiceInfo] = {}
53 self._characteristic_overrides: dict[str, CharacteristicInfo] = {}
54 self._descriptor_overrides: dict[str, DescriptorInfo] = {}
56 # Track runtime-registered UUIDs (replaces origin field checks)
57 self._runtime_uuids: set[str] = set()
59 self._gss_registry: GssRegistry | None = None
61 with contextlib.suppress(FileNotFoundError, Exception):
62 # If YAML loading fails, continue with empty registry
63 self._load_uuids()
65 def _store_service(self, info: ServiceInfo) -> None:
66 """Store service info with canonical storage + aliases."""
67 canonical_key = info.uuid.normalized
69 # Store once in canonical location
70 self._services[canonical_key] = info
72 # Create lightweight alias mappings (normalized to lowercase)
73 aliases = self._generate_aliases(info)
74 for alias in aliases:
75 self._service_aliases[alias.lower()] = canonical_key
77 def _store_characteristic(self, info: CharacteristicInfo) -> None:
78 """Store characteristic info with canonical storage + aliases."""
79 canonical_key = info.uuid.normalized
81 # Store once in canonical location
82 self._characteristics[canonical_key] = info
84 # Create lightweight alias mappings (normalized to lowercase)
85 aliases = self._generate_aliases(info)
86 for alias in aliases:
87 self._characteristic_aliases[alias.lower()] = canonical_key
89 def _store_descriptor(self, info: DescriptorInfo) -> None:
90 """Store descriptor info with canonical storage + aliases."""
91 canonical_key = info.uuid.normalized
93 # Store once in canonical location
94 self._descriptors[canonical_key] = info
96 # Create lightweight alias mappings (normalized to lowercase)
97 aliases = self._generate_aliases(info)
98 for alias in aliases:
99 self._descriptor_aliases[alias.lower()] = canonical_key
101 def _generate_aliases(self, info: SIGInfo) -> set[str]:
102 """Generate name/ID-based alias keys for domain info types (UUID variations handled by BluetoothUUID)."""
103 aliases: set[str] = {
104 info.name.lower(),
105 }
107 if info.id:
108 aliases.add(info.id)
110 if info.id and "service" in info.id:
111 service_name = info.id.replace("org.bluetooth.service.", "")
112 if service_name.endswith("_service"):
113 service_name = service_name[:-8] # Remove _service
114 service_name = service_name.replace("_", " ").title()
115 aliases.add(service_name)
116 # Also add "Service" suffix if not present
117 if not service_name.endswith(" Service"):
118 aliases.add(service_name + " Service")
119 elif info.id and "characteristic" in info.id:
120 char_name = info.id.replace("org.bluetooth.characteristic.", "")
121 char_name = char_name.replace("_", " ").title()
122 aliases.add(char_name)
124 # Add space-separated words from name
125 name_words = info.name.replace("_", " ").replace("-", " ")
126 if " " in name_words:
127 aliases.add(name_words.title())
128 aliases.add(name_words.lower())
130 # Remove empty strings, None values, and the canonical key itself
131 canonical_key = info.uuid.normalized
132 return {alias for alias in aliases if alias and alias.strip() and alias != canonical_key}
134 def _load_uuids(self) -> None: # pylint: disable=too-many-branches
135 """Load all UUIDs from YAML files."""
136 base_path = find_bluetooth_sig_path()
137 if not base_path:
138 return
140 # Load service UUIDs
141 service_yaml = base_path / "service_uuids.yaml"
142 if service_yaml.exists():
143 for uuid_info in load_yaml_uuids(service_yaml):
144 uuid = normalize_uuid_string(uuid_info["uuid"])
146 bt_uuid = BluetoothUUID(uuid)
147 info = ServiceInfo(
148 uuid=bt_uuid,
149 name=uuid_info["name"],
150 id=uuid_info.get("id", ""),
151 )
152 self._store_service(info)
154 # Load characteristic UUIDs
155 characteristic_yaml = base_path / "characteristic_uuids.yaml"
156 if characteristic_yaml.exists():
157 for uuid_info in load_yaml_uuids(characteristic_yaml):
158 uuid = normalize_uuid_string(uuid_info["uuid"])
160 bt_uuid = BluetoothUUID(uuid)
161 char_info = CharacteristicInfo(
162 uuid=bt_uuid,
163 name=uuid_info["name"],
164 id=uuid_info.get("id", ""),
165 unit="", # Will be set from unit mappings if available
166 )
167 self._store_characteristic(char_info)
169 # Load descriptor UUIDs
170 descriptor_yaml = base_path / "descriptors.yaml"
171 if descriptor_yaml.exists():
172 for uuid_info in load_yaml_uuids(descriptor_yaml):
173 uuid = normalize_uuid_string(uuid_info["uuid"])
175 bt_uuid = BluetoothUUID(uuid)
176 desc_info = DescriptorInfo(
177 uuid=bt_uuid,
178 name=uuid_info["name"],
179 id=uuid_info.get("id", ""),
180 )
181 self._store_descriptor(desc_info)
183 # Load GSS specifications
184 self._gss_registry = GssRegistry.get_instance()
185 self._load_gss_characteristic_info()
187 def _load_gss_characteristic_info(self) -> None:
188 """Load GSS specs and update characteristics with extracted info."""
189 if self._gss_registry is None:
190 return
192 all_specs = self._gss_registry.get_all_specs()
194 # Group by identifier to avoid duplicate processing
195 processed_ids: set[str] = set()
196 for spec in all_specs.values():
197 if spec.identifier in processed_ids:
198 continue
199 processed_ids.add(spec.identifier)
201 # Extract unit and value_type from structure
202 char_data = {
203 "structure": [
204 {
205 "field": f.field,
206 "type": f.type,
207 "size": f.size,
208 "description": f.description,
209 }
210 for f in spec.structure
211 ]
212 }
213 unit, value_type = self._gss_registry.extract_info_from_gss(char_data)
215 # Multi-field structs have per-field units; no single representative
216 # unit, and the first field's scalar wire type (e.g. int) is not
217 # representative of the struct-valued characteristic.
218 if len(spec.structure) > 1:
219 unit = None
220 value_type = None
222 if unit or value_type:
223 self._update_characteristic_with_gss_info(spec.name, spec.identifier, unit, value_type)
225 def _update_characteristic_with_gss_info(
226 self, char_name: str, char_id: str, unit: str | None, python_type: type | None
227 ) -> None:
228 """Update existing characteristic with GSS info."""
229 with self._lock:
230 # Find the canonical entry by checking aliases (normalized to lowercase)
231 canonical_uuid = None
232 for search_key in [char_name, char_id]:
233 canonical_uuid = self._characteristic_aliases.get(search_key.lower())
234 if canonical_uuid:
235 break
237 if not canonical_uuid or canonical_uuid not in self._characteristics:
238 return
240 # Get existing info and create updated version
241 existing_info = self._characteristics[canonical_uuid]
243 # Use provided python_type or keep existing
244 new_python_type = python_type if python_type is not None else existing_info.python_type
246 # Create updated CharacteristicInfo (immutable, so create new instance)
247 updated_info = CharacteristicInfo(
248 uuid=existing_info.uuid,
249 name=existing_info.name,
250 id=existing_info.id,
251 unit=unit or existing_info.unit,
252 python_type=new_python_type,
253 )
255 # Update canonical store (aliases remain the same since UUID/name/id unchanged)
256 self._characteristics[canonical_uuid] = updated_info
258 def _convert_bluetooth_unit_to_readable(self, unit_spec: str) -> str:
259 """Convert Bluetooth SIG unit specification to human-readable symbol.
261 Args:
262 unit_spec: Unit specification (e.g., "thermodynamic_temperature.degree_celsius")
264 Returns:
265 Human-readable symbol (e.g., "°C"), or unit_spec if no mapping found
266 """
267 unit_spec = unit_spec.rstrip(".").lower()
268 unit_id = f"org.bluetooth.unit.{unit_spec}"
270 units_registry = UnitsRegistry.get_instance()
271 unit_info = units_registry.get_info(unit_id)
272 if unit_info and unit_info.symbol:
273 return unit_info.symbol
275 return unit_spec
277 def register_characteristic( # pylint: disable=too-many-arguments,too-many-positional-arguments
278 self,
279 uuid: BluetoothUUID,
280 name: str,
281 identifier: str | None = None,
282 unit: str | None = None,
283 python_type: type | str | None = None,
284 override: bool = False,
285 ) -> None:
286 """Register a custom characteristic at runtime.
288 Args:
289 uuid: The Bluetooth UUID for the characteristic
290 name: Human-readable name
291 identifier: Optional identifier (auto-generated if not provided)
292 unit: Optional unit of measurement
293 python_type: Optional Python type for the value
294 override: If True, allow overriding existing entries
295 """
296 with self._lock:
297 canonical_key = uuid.normalized
299 # Check for conflicts with existing entries
300 if canonical_key in self._characteristics:
301 # Check if it's a SIG characteristic (not in runtime set)
302 if canonical_key not in self._runtime_uuids:
303 if not override:
304 raise ValueError(
305 f"UUID {uuid} conflicts with existing SIG "
306 "characteristic entry. Use override=True to replace."
307 )
308 # Preserve original SIG entry for restoration
309 self._characteristic_overrides.setdefault(canonical_key, self._characteristics[canonical_key])
310 elif not override:
311 # Runtime entry already exists
312 raise ValueError(
313 f"UUID {uuid} already registered as runtime characteristic. Use override=True to replace."
314 )
316 info = CharacteristicInfo(
317 uuid=uuid,
318 name=name,
319 id=identifier or f"runtime.characteristic.{name.lower().replace(' ', '_')}",
320 unit=unit or "",
321 python_type=python_type,
322 )
324 # Track as runtime-registered UUID
325 self._runtime_uuids.add(canonical_key)
327 self._store_characteristic(info)
329 def register_service(
330 self,
331 uuid: BluetoothUUID,
332 name: str,
333 identifier: str | None = None,
334 override: bool = False,
335 ) -> None:
336 """Register a custom service at runtime.
338 Args:
339 uuid: The Bluetooth UUID for the service
340 name: Human-readable name
341 identifier: Optional identifier (auto-generated if not provided)
342 override: If True, allow overriding existing entries
343 """
344 with self._lock:
345 canonical_key = uuid.normalized
347 # Check for conflicts with existing entries
348 if canonical_key in self._services:
349 # Check if it's a SIG service (not in runtime set)
350 if canonical_key not in self._runtime_uuids:
351 if not override:
352 raise ValueError(
353 f"UUID {uuid} conflicts with existing SIG service entry. Use override=True to replace."
354 )
355 # Preserve original SIG entry for restoration
356 self._service_overrides.setdefault(canonical_key, self._services[canonical_key])
357 elif not override:
358 # Runtime entry already exists
359 raise ValueError(
360 f"UUID {uuid} already registered as runtime service. Use override=True to replace."
361 )
363 info = ServiceInfo(
364 uuid=uuid,
365 name=name,
366 id=identifier or f"runtime.service.{name.lower().replace(' ', '_')}",
367 )
369 # Track as runtime-registered UUID
370 self._runtime_uuids.add(canonical_key)
372 self._store_service(info)
374 def get_service_info(self, key: str | BluetoothUUID) -> ServiceInfo | None:
375 """Get information about a service by UUID, name, or ID."""
376 with self._lock:
377 # Convert BluetoothUUID to canonical key
378 if isinstance(key, BluetoothUUID):
379 canonical_key = key.normalized
380 # Direct canonical lookup
381 if canonical_key in self._services:
382 return self._services[canonical_key]
383 else:
384 search_key = str(key).strip()
386 # Try UUID normalization first
387 try:
388 bt_uuid = BluetoothUUID(search_key)
389 canonical_key = bt_uuid.normalized
390 if canonical_key in self._services:
391 return self._services[canonical_key]
392 except ValueError:
393 pass # UUID normalization failed, continue to alias lookup
395 # Check alias index (normalized to lowercase)
396 alias_key = self._service_aliases.get(search_key.lower())
397 if alias_key and alias_key in self._services:
398 return self._services[alias_key]
400 return None
402 def get_characteristic_info(self, identifier: str | BluetoothUUID) -> CharacteristicInfo | None:
403 """Get information about a characteristic by UUID, name, or ID."""
404 with self._lock:
405 # Convert BluetoothUUID to canonical key
406 if isinstance(identifier, BluetoothUUID):
407 canonical_key = identifier.normalized
408 # Direct canonical lookup
409 if canonical_key in self._characteristics:
410 return self._characteristics[canonical_key]
411 else:
412 search_key = str(identifier).strip()
414 # Try UUID normalization first
415 try:
416 bt_uuid = BluetoothUUID(search_key)
417 canonical_key = bt_uuid.normalized
418 if canonical_key in self._characteristics:
419 return self._characteristics[canonical_key]
420 except ValueError:
421 pass # UUID normalization failed, continue to alias lookup
423 # Check alias index (normalized to lowercase)
424 alias_key = self._characteristic_aliases.get(search_key.lower())
425 if alias_key and alias_key in self._characteristics:
426 return self._characteristics[alias_key]
428 return None
430 def get_descriptor_info(self, identifier: str | BluetoothUUID) -> DescriptorInfo | None:
431 """Get information about a descriptor by UUID, name, or ID."""
432 with self._lock:
433 # Convert BluetoothUUID to canonical key
434 if isinstance(identifier, BluetoothUUID):
435 canonical_key = identifier.normalized
436 # Direct canonical lookup
437 if canonical_key in self._descriptors:
438 return self._descriptors[canonical_key]
439 else:
440 search_key = str(identifier).strip()
442 # Try UUID normalization first
443 try:
444 bt_uuid = BluetoothUUID(search_key)
445 canonical_key = bt_uuid.normalized
446 if canonical_key in self._descriptors:
447 return self._descriptors[canonical_key]
448 except ValueError:
449 pass # UUID normalization failed, continue to alias lookup
451 # Check alias index (normalized to lowercase)
452 alias_key = self._descriptor_aliases.get(search_key.lower())
453 if alias_key and alias_key in self._descriptors:
454 return self._descriptors[alias_key]
456 return None
458 def get_gss_spec(self, identifier: str | BluetoothUUID) -> GssCharacteristicSpec | None:
459 """Get the full GSS characteristic specification with all field metadata.
461 This provides access to the complete YAML structure including all fields,
462 their units, resolutions, ranges, and presence conditions.
464 Args:
465 identifier: Characteristic name, ID, or UUID
467 Returns:
468 GssCharacteristicSpec with full field structure, or None if not found
470 Example::
471 gss = uuid_registry.get_gss_spec("Location and Speed")
472 if gss:
473 for field in gss.structure:
474 print(f"{field.python_name}: unit={field.unit_id}, resolution={field.resolution}")
476 """
477 if self._gss_registry is None:
478 return None
480 with self._lock:
481 # Try direct lookup by name or ID
482 if isinstance(identifier, str):
483 spec = self._gss_registry.get_spec(identifier)
484 if spec:
485 return spec
487 # Try to get CharacteristicInfo to find the ID
488 char_info = self.get_characteristic_info(identifier)
489 if char_info:
490 spec = self._gss_registry.get_spec(char_info.id)
491 if spec:
492 return spec
493 elif isinstance(identifier, BluetoothUUID):
494 # Look up by UUID
495 char_info = self.get_characteristic_info(identifier)
496 if char_info:
497 spec = self._gss_registry.get_spec(char_info.name)
498 if spec:
499 return spec
500 spec = self._gss_registry.get_spec(char_info.id)
501 if spec:
502 return spec
504 return None
506 def resolve_characteristic_spec(self, characteristic_name: str) -> CharacteristicSpec | None: # pylint: disable=too-many-locals
507 """Resolve characteristic specification with rich YAML metadata.
509 This method provides detailed characteristic information including data types,
510 field sizes, units, and descriptions by cross-referencing multiple YAML sources.
512 Args:
513 characteristic_name: Name of the characteristic (e.g., "Temperature", "Battery Level")
515 Returns:
516 CharacteristicSpec with full metadata, or None if not found
518 Example::
519 spec = uuid_registry.resolve_characteristic_spec("Temperature")
520 if spec:
521 print(f"UUID: {spec.uuid}, Unit: {spec.unit_symbol}, Type: {spec.data_type}")
523 """
524 with self._lock:
525 # 1. Get UUID from characteristic registry
526 char_info = self.get_characteristic_info(characteristic_name)
527 if not char_info:
528 return None
530 # 2. Get typed GSS specification if available
531 gss_spec = self.get_gss_spec(characteristic_name)
533 # 3. Extract metadata from GSS specification
534 data_type = None
535 field_size = None
536 unit_id = None
537 unit_symbol = None
538 unit_readable_name = None
539 base_unit = None
540 resolution_text = None
541 description = None
543 if gss_spec:
544 description = gss_spec.description
546 # Only set data_type for single-field characteristics
547 # Multi-field characteristics have complex structures and no single data type
548 if len(gss_spec.structure) == 1:
549 # Use primary field for metadata extraction
550 primary = gss_spec.primary_field
551 if primary:
552 data_type = primary.type
553 field_size = str(primary.fixed_size) if primary.fixed_size else primary.size
555 # Use FieldSpec's unit_id property (auto-parsed from description)
556 if primary.unit_id:
557 unit_id = f"org.bluetooth.unit.{primary.unit_id}"
558 unit_symbol = self._convert_bluetooth_unit_to_readable(primary.unit_id)
559 # Preserve the human-readable long-form name
560 unit_info_obj = UnitsRegistry.get_instance().get_info(unit_id)
561 if unit_info_obj:
562 unit_readable_name = unit_info_obj.readable_name
563 base_unit = unit_id
565 # Get resolution from FieldSpec
566 if primary.resolution is not None:
567 resolution_text = f"Resolution: {primary.resolution}"
569 # 4. Use existing unit from CharacteristicInfo if GSS didn't provide one.
570 # Multi-field structs have per-field units; don't promote one to top-level.
571 is_multi_field = gss_spec is not None and len(gss_spec.structure) > 1
572 if not unit_symbol and char_info.unit and not is_multi_field:
573 unit_symbol = char_info.unit
575 return CharacteristicSpec(
576 uuid=char_info.uuid,
577 name=char_info.name,
578 field_info=FieldInfo(data_type=data_type, field_size=field_size),
579 unit_info=UnitMetadata(
580 unit_id=unit_id,
581 unit_symbol=unit_symbol,
582 unit_name=unit_readable_name,
583 base_unit=base_unit,
584 resolution_text=resolution_text,
585 ),
586 description=description,
587 structure=gss_spec.structure if gss_spec else [],
588 )
590 def get_signed_from_data_type(self, data_type: str | None) -> bool:
591 """Determine if data type is signed from GSS data type.
593 Args:
594 data_type: GSS data type string (e.g., "sint16", "float32", "uint8")
596 Returns:
597 True if the type represents signed values, False otherwise
599 """
600 if not data_type:
601 return False
602 # Comprehensive signed type detection
603 signed_types = {"float32", "float64", "medfloat16", "medfloat32"}
604 return data_type.startswith("sint") or data_type in signed_types
606 @staticmethod
607 def get_byte_order_hint() -> str:
608 """Get byte order hint for Bluetooth SIG specifications.
610 Returns:
611 "little" - Bluetooth SIG uses little-endian by convention
613 """
614 return "little"
616 def clear_custom_registrations(self) -> None:
617 """Clear all custom registrations (for testing)."""
618 with self._lock:
619 # Use runtime_uuids set to identify what to remove
620 runtime_keys = list(self._runtime_uuids)
622 # Remove runtime entries from canonical stores
623 for key in runtime_keys:
624 self._services.pop(key, None)
625 self._characteristics.pop(key, None)
626 self._descriptors.pop(key, None)
628 # Remove corresponding aliases (alias -> canonical_key where canonical_key is runtime)
629 runtime_service_aliases = [
630 alias for alias, canonical in self._service_aliases.items() if canonical in runtime_keys
631 ]
632 runtime_char_aliases = [
633 alias for alias, canonical in self._characteristic_aliases.items() if canonical in runtime_keys
634 ]
635 runtime_desc_aliases = [
636 alias for alias, canonical in self._descriptor_aliases.items() if canonical in runtime_keys
637 ]
639 for alias in runtime_service_aliases:
640 del self._service_aliases[alias]
641 for alias in runtime_char_aliases:
642 del self._characteristic_aliases[alias]
643 for alias in runtime_desc_aliases:
644 del self._descriptor_aliases[alias]
646 # Restore any preserved SIG entries that were overridden
647 for key in runtime_keys:
648 original = self._service_overrides.pop(key, None)
649 if original is not None:
650 self._store_service(original)
651 original = self._characteristic_overrides.pop(key, None)
652 if original is not None:
653 self._store_characteristic(original)
654 original = self._descriptor_overrides.pop(key, None)
655 if original is not None:
656 self._store_descriptor(original)
658 # Clear the runtime tracking set
659 self._runtime_uuids.clear()
662# Global instance
663uuid_registry = UuidRegistry()