Coverage for src / bluetooth_sig / registry / service_discovery / attribute_ids.py: 82%
100 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Service Discovery Attribute ID Registry for SDP attribute identifiers."""
3from __future__ import annotations
5from pathlib import Path
6from typing import Any, cast
8import msgspec
10from bluetooth_sig.registry.base import BaseGenericRegistry
11from bluetooth_sig.registry.utils import find_bluetooth_sig_path
12from bluetooth_sig.types.registry.profile_types import (
13 AttributeIdEntry,
14 ProtocolParameterEntry,
15)
18class ServiceDiscoveryAttributeRegistry(
19 BaseGenericRegistry["ServiceDiscoveryAttributeRegistry"],
20):
21 """Registry for SDP attribute identifiers with lazy loading.
23 Loads attribute IDs from ``service_discovery/attribute_ids/*.yaml``,
24 ``attribute_id_offsets_for_strings.yaml``, and ``protocol_parameters.yaml``.
26 Thread-safe: Multiple threads can safely access the registry concurrently.
27 """
29 def __init__(self) -> None:
30 """Initialise the service discovery attribute registry."""
31 super().__init__()
32 self._attribute_ids: dict[str, list[AttributeIdEntry]] = {}
33 self._protocol_parameters: list[ProtocolParameterEntry] = []
35 # ------------------------------------------------------------------
36 # Loading
37 # ------------------------------------------------------------------
39 @staticmethod
40 def _parse_hex_value(raw: object) -> int | None:
41 """Parse a hex string like ``'0x0001'`` into an int."""
42 if isinstance(raw, int):
43 return raw
44 if isinstance(raw, str):
45 try:
46 return int(raw, 16) if raw.startswith("0x") else int(raw)
47 except ValueError:
48 return None
49 return None
51 def _load_attribute_ids_file(self, yaml_path: Path, category: str) -> None:
52 """Load a single attribute_ids YAML file into *_attribute_ids[category]*."""
53 if not yaml_path.exists():
54 return
56 with yaml_path.open("r", encoding="utf-8") as fh:
57 data = msgspec.yaml.decode(fh.read())
59 if not isinstance(data, dict):
60 return
62 data_dict = cast("dict[str, Any]", data)
63 entries_raw = data_dict.get("attribute_ids")
64 if not isinstance(entries_raw, list):
65 return
67 entries: list[AttributeIdEntry] = []
68 for entry in entries_raw:
69 if not isinstance(entry, dict):
70 continue
71 name = entry.get("name")
72 value = self._parse_hex_value(entry.get("value"))
73 if name and value is not None:
74 entries.append(AttributeIdEntry(name=str(name), value=value))
76 if entries:
77 self._attribute_ids[category] = entries
79 def _load_protocol_parameters(self, yaml_path: Path) -> None:
80 """Load ``protocol_parameters.yaml``."""
81 if not yaml_path.exists():
82 return
84 with yaml_path.open("r", encoding="utf-8") as fh:
85 data = msgspec.yaml.decode(fh.read())
87 if not isinstance(data, dict):
88 return
90 data_dict = cast("dict[str, Any]", data)
91 params_raw = data_dict.get("protocol_parameters")
92 if not isinstance(params_raw, list):
93 return
95 for entry in params_raw:
96 if not isinstance(entry, dict):
97 continue
98 protocol = entry.get("protocol")
99 name = entry.get("name")
100 index = entry.get("index")
101 if protocol and name and isinstance(index, int):
102 self._protocol_parameters.append(
103 ProtocolParameterEntry(
104 protocol=str(protocol),
105 name=str(name),
106 index=index,
107 ),
108 )
110 def _load(self) -> None:
111 """Perform the actual loading of all service discovery data."""
112 uuids_path = find_bluetooth_sig_path()
113 if not uuids_path:
114 self._loaded = True
115 return
117 sd_path = uuids_path.parent / "service_discovery"
118 if not sd_path.exists():
119 self._loaded = True
120 return
122 # Load attribute_ids/*.yaml
123 attr_dir = sd_path / "attribute_ids"
124 if attr_dir.is_dir():
125 for yaml_file in sorted(attr_dir.glob("*.yaml")):
126 category = yaml_file.stem
127 self._load_attribute_ids_file(yaml_file, category)
129 # Load attribute_id_offsets_for_strings.yaml (same schema)
130 offsets_file = sd_path / "attribute_id_offsets_for_strings.yaml"
131 self._load_attribute_ids_file(offsets_file, "attribute_id_offsets_for_strings")
133 # Load protocol_parameters.yaml
134 self._load_protocol_parameters(sd_path / "protocol_parameters.yaml")
136 self._loaded = True
138 # ------------------------------------------------------------------
139 # Query API
140 # ------------------------------------------------------------------
142 def get_attribute_ids(self, category: str) -> list[AttributeIdEntry]:
143 """Get attribute ID entries for a named category.
145 Args:
146 category: The file stem / category name, e.g. ``"universal_attributes"``,
147 ``"a2dp"``, ``"sdp"``, ``"attribute_id_offsets_for_strings"``.
149 Returns:
150 List of :class:`AttributeIdEntry` or an empty list if not found.
151 """
152 self._ensure_loaded()
153 with self._lock:
154 return list(self._attribute_ids.get(category, []))
156 def get_all_categories(self) -> list[str]:
157 """Return all loaded category names (sorted)."""
158 self._ensure_loaded()
159 with self._lock:
160 return sorted(self._attribute_ids)
162 def get_protocol_parameters(self) -> list[ProtocolParameterEntry]:
163 """Return all protocol parameter entries."""
164 self._ensure_loaded()
165 with self._lock:
166 return list(self._protocol_parameters)
168 def resolve_attribute_name(self, category: str, value: int) -> str | None:
169 """Look up the attribute name for a given numeric value within a category.
171 Args:
172 category: Category name (e.g. ``"universal_attributes"``).
173 value: The numeric attribute ID.
175 Returns:
176 The attribute name or ``None`` if not found.
177 """
178 self._ensure_loaded()
179 with self._lock:
180 for entry in self._attribute_ids.get(category, []):
181 if entry.value == value:
182 return entry.name
183 return None
186# Singleton instance for global use
187service_discovery_attribute_registry = ServiceDiscoveryAttributeRegistry()