Coverage for src / bluetooth_sig / gatt / uuid_registry.py: 87%
305 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
1"""UUID registry loading from Bluetooth SIG YAML files."""
3from __future__ import annotations
5from bluetooth_sig.types.base_types import SIGInfo
7__all__ = [
8 "UuidRegistry",
9 "uuid_registry",
10]
12import threading
14from bluetooth_sig.registry.gss import GssRegistry
15from bluetooth_sig.registry.uuids.units import UnitsRegistry
16from bluetooth_sig.types import CharacteristicInfo, ServiceInfo
17from bluetooth_sig.types.gatt_enums import ValueType
18from bluetooth_sig.types.registry.descriptor_types import DescriptorInfo
19from bluetooth_sig.types.registry.gss_characteristic import GssCharacteristicSpec
20from bluetooth_sig.types.uuid import BluetoothUUID
22from ..registry.utils import find_bluetooth_sig_path, load_yaml_uuids, normalize_uuid_string
23from ..types.registry import CharacteristicSpec, FieldInfo, UnitMetadata
26class UuidRegistry: # pylint: disable=too-many-instance-attributes
27 """Registry for Bluetooth SIG UUIDs with canonical storage + alias indices.
29 This registry stores a number of internal caches and mappings which
30 legitimately exceed the default pylint instance attribute limit. The
31 complexity is intentional and centralised; an inline disable is used to
32 avoid noisy global configuration changes.
33 """
35 def __init__(self) -> None:
36 """Initialize the UUID registry."""
37 self._lock = threading.RLock()
39 # Canonical storage: normalized_uuid -> domain types (single source of truth)
40 self._services: dict[str, ServiceInfo] = {}
41 self._characteristics: dict[str, CharacteristicInfo] = {}
42 self._descriptors: dict[str, DescriptorInfo] = {}
44 # Lightweight alias indices: alias -> normalized_uuid
45 self._service_aliases: dict[str, str] = {}
46 self._characteristic_aliases: dict[str, str] = {}
47 self._descriptor_aliases: dict[str, str] = {}
49 # Preserve SIG entries overridden at runtime so we can restore them
50 self._service_overrides: dict[str, ServiceInfo] = {}
51 self._characteristic_overrides: dict[str, CharacteristicInfo] = {}
52 self._descriptor_overrides: dict[str, DescriptorInfo] = {}
54 # Track runtime-registered UUIDs (replaces origin field checks)
55 self._runtime_uuids: set[str] = set()
57 self._gss_registry: GssRegistry | None = None
59 try:
60 self._load_uuids()
61 except (FileNotFoundError, Exception): # pylint: disable=broad-exception-caught
62 # If YAML loading fails, continue with empty registry
63 pass
65 def _store_service(self, info: ServiceInfo) -> None:
66 """Store service info with canonical storage + aliases."""
67 canonical_key = info.uuid.normalized
69 # Store once in canonical location
70 self._services[canonical_key] = info
72 # Create lightweight alias mappings (normalized to lowercase)
73 aliases = self._generate_aliases(info)
74 for alias in aliases:
75 self._service_aliases[alias.lower()] = canonical_key
77 def _store_characteristic(self, info: CharacteristicInfo) -> None:
78 """Store characteristic info with canonical storage + aliases."""
79 canonical_key = info.uuid.normalized
81 # Store once in canonical location
82 self._characteristics[canonical_key] = info
84 # Create lightweight alias mappings (normalized to lowercase)
85 aliases = self._generate_aliases(info)
86 for alias in aliases:
87 self._characteristic_aliases[alias.lower()] = canonical_key
89 def _store_descriptor(self, info: DescriptorInfo) -> None:
90 """Store descriptor info with canonical storage + aliases."""
91 canonical_key = info.uuid.normalized
93 # Store once in canonical location
94 self._descriptors[canonical_key] = info
96 # Create lightweight alias mappings (normalized to lowercase)
97 aliases = self._generate_aliases(info)
98 for alias in aliases:
99 self._descriptor_aliases[alias.lower()] = canonical_key
101 def _generate_aliases(self, info: SIGInfo) -> set[str]:
102 """Generate name/ID-based alias keys for domain info types (UUID variations handled by BluetoothUUID)."""
103 aliases: set[str] = {
104 info.name.lower(),
105 }
107 if info.id:
108 aliases.add(info.id)
110 if info.id and "service" in info.id:
111 service_name = info.id.replace("org.bluetooth.service.", "")
112 if service_name.endswith("_service"):
113 service_name = service_name[:-8] # Remove _service
114 service_name = service_name.replace("_", " ").title()
115 aliases.add(service_name)
116 # Also add "Service" suffix if not present
117 if not service_name.endswith(" Service"):
118 aliases.add(service_name + " Service")
119 elif info.id and "characteristic" in info.id:
120 char_name = info.id.replace("org.bluetooth.characteristic.", "")
121 char_name = char_name.replace("_", " ").title()
122 aliases.add(char_name)
124 # Add space-separated words from name
125 name_words = info.name.replace("_", " ").replace("-", " ")
126 if " " in name_words:
127 aliases.add(name_words.title())
128 aliases.add(name_words.lower())
130 # Remove empty strings, None values, and the canonical key itself
131 canonical_key = info.uuid.normalized
132 return {alias for alias in aliases if alias and alias.strip() and alias != canonical_key}
134 def _load_uuids(self) -> None: # pylint: disable=too-many-branches
135 """Load all UUIDs from YAML files."""
136 base_path = find_bluetooth_sig_path()
137 if not base_path:
138 return
140 # Load service UUIDs
141 service_yaml = base_path / "service_uuids.yaml"
142 if service_yaml.exists():
143 for uuid_info in load_yaml_uuids(service_yaml):
144 uuid = normalize_uuid_string(uuid_info["uuid"])
146 bt_uuid = BluetoothUUID(uuid)
147 info = ServiceInfo(
148 uuid=bt_uuid,
149 name=uuid_info["name"],
150 id=uuid_info.get("id", ""),
151 )
152 self._store_service(info)
154 # Load characteristic UUIDs
155 characteristic_yaml = base_path / "characteristic_uuids.yaml"
156 if characteristic_yaml.exists():
157 for uuid_info in load_yaml_uuids(characteristic_yaml):
158 uuid = normalize_uuid_string(uuid_info["uuid"])
160 bt_uuid = BluetoothUUID(uuid)
161 char_info = CharacteristicInfo(
162 uuid=bt_uuid,
163 name=uuid_info["name"],
164 id=uuid_info.get("id", ""),
165 unit="", # Will be set from unit mappings if available
166 value_type=ValueType.UNKNOWN,
167 )
168 self._store_characteristic(char_info)
170 # Load descriptor UUIDs
171 descriptor_yaml = base_path / "descriptors.yaml"
172 if descriptor_yaml.exists():
173 for uuid_info in load_yaml_uuids(descriptor_yaml):
174 uuid = normalize_uuid_string(uuid_info["uuid"])
176 bt_uuid = BluetoothUUID(uuid)
177 desc_info = DescriptorInfo(
178 uuid=bt_uuid,
179 name=uuid_info["name"],
180 id=uuid_info.get("id", ""),
181 )
182 self._store_descriptor(desc_info)
184 # Load GSS specifications
185 self._gss_registry = GssRegistry.get_instance()
186 self._load_gss_characteristic_info()
188 def _load_gss_characteristic_info(self) -> None:
189 """Load GSS specs and update characteristics with extracted info."""
190 if self._gss_registry is None:
191 return
193 all_specs = self._gss_registry.get_all_specs()
195 # Group by identifier to avoid duplicate processing
196 processed_ids: set[str] = set()
197 for spec in all_specs.values():
198 if spec.identifier in processed_ids:
199 continue
200 processed_ids.add(spec.identifier)
202 # Extract unit and value_type from structure
203 char_data = {
204 "structure": [
205 {
206 "field": f.field,
207 "type": f.type,
208 "size": f.size,
209 "description": f.description,
210 }
211 for f in spec.structure
212 ]
213 }
214 unit, value_type = self._gss_registry.extract_info_from_gss(char_data)
216 if unit or value_type:
217 self._update_characteristic_with_gss_info(spec.name, spec.identifier, unit, value_type)
219 def _update_characteristic_with_gss_info(
220 self, char_name: str, char_id: str, unit: str | None, value_type: str | None
221 ) -> None:
222 """Update existing characteristic with GSS info."""
223 with self._lock:
224 # Find the canonical entry by checking aliases (normalized to lowercase)
225 canonical_uuid = None
226 for search_key in [char_name, char_id]:
227 canonical_uuid = self._characteristic_aliases.get(search_key.lower())
228 if canonical_uuid:
229 break
231 if not canonical_uuid or canonical_uuid not in self._characteristics:
232 return
234 # Get existing info and create updated version
235 existing_info = self._characteristics[canonical_uuid]
237 # Convert value_type string to ValueType enum if provided
238 new_value_type = existing_info.value_type
239 if value_type:
240 try:
241 new_value_type = ValueType(value_type)
242 except (ValueError, KeyError):
243 new_value_type = existing_info.value_type
245 # Create updated CharacteristicInfo (immutable, so create new instance)
246 updated_info = CharacteristicInfo(
247 uuid=existing_info.uuid,
248 name=existing_info.name,
249 id=existing_info.id,
250 unit=unit or existing_info.unit,
251 value_type=new_value_type,
252 )
254 # Update canonical store (aliases remain the same since UUID/name/id unchanged)
255 self._characteristics[canonical_uuid] = updated_info
257 def _convert_bluetooth_unit_to_readable(self, unit_spec: str) -> str:
258 """Convert Bluetooth SIG unit specification to human-readable symbol.
260 Args:
261 unit_spec: Unit specification (e.g., "thermodynamic_temperature.degree_celsius")
263 Returns:
264 Human-readable symbol (e.g., "°C"), or unit_spec if no mapping found
265 """
266 unit_spec = unit_spec.rstrip(".").lower()
267 unit_id = f"org.bluetooth.unit.{unit_spec}"
269 units_registry = UnitsRegistry.get_instance()
270 unit_info = units_registry.get_info(unit_id)
271 if unit_info and unit_info.symbol:
272 return unit_info.symbol
274 return unit_spec
276 def register_characteristic( # pylint: disable=too-many-arguments,too-many-positional-arguments
277 self,
278 uuid: BluetoothUUID,
279 name: str,
280 identifier: str | None = None,
281 unit: str | None = None,
282 value_type: ValueType | None = None,
283 override: bool = False,
284 ) -> None:
285 """Register a custom characteristic at runtime.
287 Args:
288 uuid: The Bluetooth UUID for the characteristic
289 name: Human-readable name
290 identifier: Optional identifier (auto-generated if not provided)
291 unit: Optional unit of measurement
292 value_type: Optional value type
293 override: If True, allow overriding existing entries
294 """
295 with self._lock:
296 canonical_key = uuid.normalized
298 # Check for conflicts with existing entries
299 if canonical_key in self._characteristics:
300 # Check if it's a SIG characteristic (not in runtime set)
301 if canonical_key not in self._runtime_uuids:
302 if not override:
303 raise ValueError(
304 f"UUID {uuid} conflicts with existing SIG "
305 "characteristic entry. Use override=True to replace."
306 )
307 # Preserve original SIG entry for restoration
308 self._characteristic_overrides.setdefault(canonical_key, self._characteristics[canonical_key])
309 elif not override:
310 # Runtime entry already exists
311 raise ValueError(
312 f"UUID {uuid} already registered as runtime characteristic. Use override=True to replace."
313 )
315 info = CharacteristicInfo(
316 uuid=uuid,
317 name=name,
318 id=identifier or f"runtime.characteristic.{name.lower().replace(' ', '_')}",
319 unit=unit or "",
320 value_type=value_type or ValueType.UNKNOWN,
321 )
323 # Track as runtime-registered UUID
324 self._runtime_uuids.add(canonical_key)
326 self._store_characteristic(info)
328 def register_service(
329 self,
330 uuid: BluetoothUUID,
331 name: str,
332 identifier: str | None = None,
333 override: bool = False,
334 ) -> None:
335 """Register a custom service at runtime.
337 Args:
338 uuid: The Bluetooth UUID for the service
339 name: Human-readable name
340 identifier: Optional identifier (auto-generated if not provided)
341 override: If True, allow overriding existing entries
342 """
343 with self._lock:
344 canonical_key = uuid.normalized
346 # Check for conflicts with existing entries
347 if canonical_key in self._services:
348 # Check if it's a SIG service (not in runtime set)
349 if canonical_key not in self._runtime_uuids:
350 if not override:
351 raise ValueError(
352 f"UUID {uuid} conflicts with existing SIG service entry. Use override=True to replace."
353 )
354 # Preserve original SIG entry for restoration
355 self._service_overrides.setdefault(canonical_key, self._services[canonical_key])
356 elif not override:
357 # Runtime entry already exists
358 raise ValueError(
359 f"UUID {uuid} already registered as runtime service. Use override=True to replace."
360 )
362 info = ServiceInfo(
363 uuid=uuid,
364 name=name,
365 id=identifier or f"runtime.service.{name.lower().replace(' ', '_')}",
366 )
368 # Track as runtime-registered UUID
369 self._runtime_uuids.add(canonical_key)
371 self._store_service(info)
373 def get_service_info(self, key: str | BluetoothUUID) -> ServiceInfo | None:
374 """Get information about a service by UUID, name, or ID."""
375 with self._lock:
376 # Convert BluetoothUUID to canonical key
377 if isinstance(key, BluetoothUUID):
378 canonical_key = key.normalized
379 # Direct canonical lookup
380 if canonical_key in self._services:
381 return self._services[canonical_key]
382 else:
383 search_key = str(key).strip()
385 # Try UUID normalization first
386 try:
387 bt_uuid = BluetoothUUID(search_key)
388 canonical_key = bt_uuid.normalized
389 if canonical_key in self._services:
390 return self._services[canonical_key]
391 except ValueError:
392 pass
394 # Check alias index (normalized to lowercase)
395 alias_key = self._service_aliases.get(search_key.lower())
396 if alias_key and alias_key in self._services:
397 return self._services[alias_key]
399 return None
401 def get_characteristic_info(self, identifier: str | BluetoothUUID) -> CharacteristicInfo | None:
402 """Get information about a characteristic by UUID, name, or ID."""
403 with self._lock:
404 # Convert BluetoothUUID to canonical key
405 if isinstance(identifier, BluetoothUUID):
406 canonical_key = identifier.normalized
407 # Direct canonical lookup
408 if canonical_key in self._characteristics:
409 return self._characteristics[canonical_key]
410 else:
411 search_key = str(identifier).strip()
413 # Try UUID normalization first
414 try:
415 bt_uuid = BluetoothUUID(search_key)
416 canonical_key = bt_uuid.normalized
417 if canonical_key in self._characteristics:
418 return self._characteristics[canonical_key]
419 except ValueError:
420 pass
422 # Check alias index (normalized to lowercase)
423 alias_key = self._characteristic_aliases.get(search_key.lower())
424 if alias_key and alias_key in self._characteristics:
425 return self._characteristics[alias_key]
427 return None
429 def get_descriptor_info(self, identifier: str | BluetoothUUID) -> DescriptorInfo | None:
430 """Get information about a descriptor by UUID, name, or ID."""
431 with self._lock:
432 # Convert BluetoothUUID to canonical key
433 if isinstance(identifier, BluetoothUUID):
434 canonical_key = identifier.normalized
435 # Direct canonical lookup
436 if canonical_key in self._descriptors:
437 return self._descriptors[canonical_key]
438 else:
439 search_key = str(identifier).strip()
441 # Try UUID normalization first
442 try:
443 bt_uuid = BluetoothUUID(search_key)
444 canonical_key = bt_uuid.normalized
445 if canonical_key in self._descriptors:
446 return self._descriptors[canonical_key]
447 except ValueError:
448 pass
450 # Check alias index (normalized to lowercase)
451 alias_key = self._descriptor_aliases.get(search_key.lower())
452 if alias_key and alias_key in self._descriptors:
453 return self._descriptors[alias_key]
455 return None
457 def get_gss_spec(self, identifier: str | BluetoothUUID) -> GssCharacteristicSpec | None:
458 """Get the full GSS characteristic specification with all field metadata.
460 This provides access to the complete YAML structure including all fields,
461 their units, resolutions, ranges, and presence conditions.
463 Args:
464 identifier: Characteristic name, ID, or UUID
466 Returns:
467 GssCharacteristicSpec with full field structure, or None if not found
469 Example:
470 gss = uuid_registry.get_gss_spec("Location and Speed")
471 if gss:
472 for field in gss.structure:
473 print(f"{field.python_name}: unit={field.unit_id}, resolution={field.resolution}")
475 """
476 if self._gss_registry is None:
477 return None
479 with self._lock:
480 # Try direct lookup by name or ID
481 if isinstance(identifier, str):
482 spec = self._gss_registry.get_spec(identifier)
483 if spec:
484 return spec
486 # Try to get CharacteristicInfo to find the ID
487 char_info = self.get_characteristic_info(identifier)
488 if char_info:
489 spec = self._gss_registry.get_spec(char_info.id)
490 if spec:
491 return spec
492 elif isinstance(identifier, BluetoothUUID):
493 # Look up by UUID
494 char_info = self.get_characteristic_info(identifier)
495 if char_info:
496 spec = self._gss_registry.get_spec(char_info.name)
497 if spec:
498 return spec
499 spec = self._gss_registry.get_spec(char_info.id)
500 if spec:
501 return spec
503 return None
505 def resolve_characteristic_spec(self, characteristic_name: str) -> CharacteristicSpec | None: # pylint: disable=too-many-locals
506 """Resolve characteristic specification with rich YAML metadata.
508 This method provides detailed characteristic information including data types,
509 field sizes, units, and descriptions by cross-referencing multiple YAML sources.
511 Args:
512 characteristic_name: Name of the characteristic (e.g., "Temperature", "Battery Level")
514 Returns:
515 CharacteristicSpec with full metadata, or None if not found
517 Example:
518 spec = uuid_registry.resolve_characteristic_spec("Temperature")
519 if spec:
520 print(f"UUID: {spec.uuid}, Unit: {spec.unit_symbol}, Type: {spec.data_type}")
522 """
523 with self._lock:
524 # 1. Get UUID from characteristic registry
525 char_info = self.get_characteristic_info(characteristic_name)
526 if not char_info:
527 return None
529 # 2. Get typed GSS specification if available
530 gss_spec = self.get_gss_spec(characteristic_name)
532 # 3. Extract metadata from GSS specification
533 data_type = None
534 field_size = None
535 unit_id = None
536 unit_symbol = None
537 base_unit = None
538 resolution_text = None
539 description = None
541 if gss_spec:
542 description = gss_spec.description
544 # Only set data_type for single-field characteristics
545 # Multi-field characteristics have complex structures and no single data type
546 if len(gss_spec.structure) == 1:
547 # Use primary field for metadata extraction
548 primary = gss_spec.primary_field
549 if primary:
550 data_type = primary.type
551 field_size = str(primary.fixed_size) if primary.fixed_size else primary.size
553 # Use FieldSpec's unit_id property (auto-parsed from description)
554 if primary.unit_id:
555 unit_id = f"org.bluetooth.unit.{primary.unit_id}"
556 unit_symbol = self._convert_bluetooth_unit_to_readable(primary.unit_id)
557 base_unit = unit_id
559 # Get resolution from FieldSpec
560 if primary.resolution is not None:
561 resolution_text = f"Resolution: {primary.resolution}"
563 # 4. Use existing unit/value_type from CharacteristicInfo if GSS didn't provide them
564 if not unit_symbol and char_info.unit:
565 unit_symbol = char_info.unit
567 return CharacteristicSpec(
568 uuid=char_info.uuid,
569 name=char_info.name,
570 field_info=FieldInfo(data_type=data_type, field_size=field_size),
571 unit_info=UnitMetadata(
572 unit_id=unit_id,
573 unit_symbol=unit_symbol,
574 base_unit=base_unit,
575 resolution_text=resolution_text,
576 ),
577 description=description,
578 structure=gss_spec.structure if gss_spec else [],
579 )
581 def get_signed_from_data_type(self, data_type: str | None) -> bool:
582 """Determine if data type is signed from GSS data type.
584 Args:
585 data_type: GSS data type string (e.g., "sint16", "float32", "uint8")
587 Returns:
588 True if the type represents signed values, False otherwise
590 """
591 if not data_type:
592 return False
593 # Comprehensive signed type detection
594 signed_types = {"float32", "float64", "medfloat16", "medfloat32"}
595 return data_type.startswith("sint") or data_type in signed_types
597 @staticmethod
598 def get_byte_order_hint() -> str:
599 """Get byte order hint for Bluetooth SIG specifications.
601 Returns:
602 "little" - Bluetooth SIG uses little-endian by convention
604 """
605 return "little"
607 def clear_custom_registrations(self) -> None:
608 """Clear all custom registrations (for testing)."""
609 with self._lock:
610 # Use runtime_uuids set to identify what to remove
611 runtime_keys = list(self._runtime_uuids)
613 # Remove runtime entries from canonical stores
614 for key in runtime_keys:
615 self._services.pop(key, None)
616 self._characteristics.pop(key, None)
617 self._descriptors.pop(key, None)
619 # Remove corresponding aliases (alias -> canonical_key where canonical_key is runtime)
620 runtime_service_aliases = [
621 alias for alias, canonical in self._service_aliases.items() if canonical in runtime_keys
622 ]
623 runtime_char_aliases = [
624 alias for alias, canonical in self._characteristic_aliases.items() if canonical in runtime_keys
625 ]
626 runtime_desc_aliases = [
627 alias for alias, canonical in self._descriptor_aliases.items() if canonical in runtime_keys
628 ]
630 for alias in runtime_service_aliases:
631 del self._service_aliases[alias]
632 for alias in runtime_char_aliases:
633 del self._characteristic_aliases[alias]
634 for alias in runtime_desc_aliases:
635 del self._descriptor_aliases[alias]
637 # Restore any preserved SIG entries that were overridden
638 for key in runtime_keys:
639 original = self._service_overrides.pop(key, None)
640 if original is not None:
641 self._store_service(original)
642 original = self._characteristic_overrides.pop(key, None)
643 if original is not None:
644 self._store_characteristic(original)
645 original = self._descriptor_overrides.pop(key, None)
646 if original is not None:
647 self._store_descriptor(original)
649 # Clear the runtime tracking set
650 self._runtime_uuids.clear()
653# Global instance
654uuid_registry = UuidRegistry()