Coverage for src / bluetooth_sig / device / characteristic_io.py: 70%
109 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-03 16:41 +0000
1"""Characteristic I/O operations for BLE devices.
3Encapsulates read, write, and notification operations for GATT characteristics,
4including type-safe overloads for class-based and string/enum-based access.
5"""
7from __future__ import annotations
9import logging
10from collections.abc import Callable
11from typing import Any, TypeVar, cast, overload
13from ..gatt.characteristics.base import BaseCharacteristic
14from ..gatt.characteristics.registry import CharacteristicName, CharacteristicRegistry
15from ..gatt.context import CharacteristicContext, DeviceInfo
16from ..types.uuid import BluetoothUUID
17from .client import ClientManagerProtocol
18from .dependency_resolver import DependencyResolutionMode, DependencyResolver
19from .protocols import SIGTranslatorProtocol
21logger = logging.getLogger(__name__)
23# Type variable for generic characteristic return types
24T = TypeVar("T")
27class CharacteristicIO:
28 """Read, write, and notification operations for GATT characteristics.
30 Encapsulates the I/O logic extracted from Device, handling both type-safe
31 (class-based) and dynamic (string/enum-based) characteristic access patterns.
33 Uses ``DependencyResolver`` for automatic dependency resolution before reads,
34 and a ``device_info_factory`` callable to get current ``DeviceInfo`` without
35 a back-reference to the owning Device.
36 """
38 def __init__(
39 self,
40 connection_manager: ClientManagerProtocol,
41 translator: SIGTranslatorProtocol,
42 dep_resolver: DependencyResolver,
43 device_info_factory: Callable[[], DeviceInfo],
44 ) -> None:
45 """Initialise with connection manager, translator, resolver, and info factory.
47 Args:
48 connection_manager: Connection manager for BLE I/O
49 translator: Translator for parsing/encoding characteristics
50 dep_resolver: Resolver for characteristic dependencies
51 device_info_factory: Callable returning current DeviceInfo
53 """
54 self._connection_manager = connection_manager
55 self._translator = translator
56 self._dep_resolver = dep_resolver
57 self._device_info_factory = device_info_factory
59 # ------------------------------------------------------------------
60 # Read
61 # ------------------------------------------------------------------
63 @overload
64 async def read(
65 self,
66 char: type[BaseCharacteristic[T]],
67 resolution_mode: DependencyResolutionMode = ...,
68 ) -> T | None: ...
70 @overload
71 async def read(
72 self,
73 char: str | CharacteristicName,
74 resolution_mode: DependencyResolutionMode = ...,
75 ) -> Any | None: ... # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe
77 async def read(
78 self,
79 char: str | CharacteristicName | type[BaseCharacteristic[T]],
80 resolution_mode: DependencyResolutionMode = DependencyResolutionMode.NORMAL,
81 ) -> T | Any | None: # Runtime UUID dispatch cannot be type-safe
82 """Read a characteristic value from the device.
84 Args:
85 char: Name, enum, or characteristic class to read.
86 Passing the class enables type-safe return values.
87 resolution_mode: How to handle automatic dependency resolution:
88 - NORMAL: Auto-resolve dependencies, use cache when available (default)
89 - SKIP_DEPENDENCIES: Skip dependency resolution and validation
90 - FORCE_REFRESH: Re-read dependencies from device, ignoring cache
92 Returns:
93 Parsed characteristic value or None if read fails.
94 Return type is inferred from characteristic class when provided.
96 Raises:
97 RuntimeError: If no connection manager is attached
98 ValueError: If required dependencies cannot be resolved
100 """
101 # Handle characteristic class input (type-safe path)
102 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
103 char_class: type[BaseCharacteristic[Any]] = char
104 char_instance = char_class()
105 resolved_uuid = char_instance.uuid
107 ctx: CharacteristicContext | None = None
108 if resolution_mode != DependencyResolutionMode.SKIP_DEPENDENCIES:
109 device_info = self._device_info_factory()
110 ctx = await self._dep_resolver.resolve(char_class, resolution_mode, device_info)
112 raw = await self._connection_manager.read_gatt_char(resolved_uuid)
113 return char_instance.parse_value(raw, ctx=ctx)
115 # Handle string/enum input (not type-safe path)
116 resolved_uuid = self._resolve_characteristic_name(char)
118 char_class_lookup = CharacteristicRegistry.get_characteristic_class_by_uuid(resolved_uuid)
120 # Resolve dependencies if characteristic class is known
121 ctx = None
122 if char_class_lookup and resolution_mode != DependencyResolutionMode.SKIP_DEPENDENCIES:
123 device_info = self._device_info_factory()
124 ctx = await self._dep_resolver.resolve(char_class_lookup, resolution_mode, device_info)
126 # Read the characteristic
127 raw = await self._connection_manager.read_gatt_char(resolved_uuid)
128 return self._translator.parse_characteristic(str(resolved_uuid), raw, ctx=ctx)
130 # ------------------------------------------------------------------
131 # Write
132 # ------------------------------------------------------------------
134 @overload
135 async def write(
136 self,
137 char: type[BaseCharacteristic[T]],
138 data: T,
139 response: bool = ...,
140 ) -> None: ...
142 @overload
143 async def write(
144 self,
145 char: str | CharacteristicName,
146 data: bytes,
147 response: bool = ...,
148 ) -> None: ...
150 async def write(
151 self,
152 char: str | CharacteristicName | type[BaseCharacteristic[T]],
153 data: bytes | T,
154 response: bool = True,
155 ) -> None:
156 r"""Write data to a characteristic on the device.
158 Args:
159 char: Name, enum, or characteristic class to write to.
160 Passing the class enables type-safe value encoding.
161 data: Raw bytes (for string/enum) or typed value (for characteristic class).
162 When using characteristic class, the value is encoded using build_value().
163 response: If True, use write-with-response (wait for acknowledgment).
164 If False, use write-without-response (faster but no confirmation).
165 Default is True for reliability.
167 Raises:
168 RuntimeError: If no connection manager is attached
169 CharacteristicEncodeError: If encoding fails (when using characteristic class)
171 """
172 # Handle characteristic class input (type-safe path)
173 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
174 char_instance = char()
175 resolved_uuid = char_instance.uuid
176 # data is typed value T, encode it
177 encoded = char_instance.build_value(data) # type: ignore[arg-type] # T is erased at runtime; overload ensures type safety at call site
178 await self._connection_manager.write_gatt_char(resolved_uuid, bytes(encoded), response=response)
179 return
181 # Handle string/enum input (not type-safe path)
182 # data must be bytes in this path
183 if not isinstance(data, (bytes, bytearray)):
184 raise TypeError(f"When using string/enum char_name, data must be bytes, got {type(data).__name__}")
186 resolved_uuid = self._resolve_characteristic_name(char)
187 # cast is safe: isinstance check above ensures data is bytes/bytearray
188 await self._connection_manager.write_gatt_char(resolved_uuid, cast("bytes", data), response=response)
190 # ------------------------------------------------------------------
191 # Notifications
192 # ------------------------------------------------------------------
194 @overload
195 async def start_notify(
196 self,
197 char: type[BaseCharacteristic[T]],
198 callback: Callable[[T], None],
199 ) -> None: ...
201 @overload
202 async def start_notify(
203 self,
204 char: str | CharacteristicName,
205 callback: Callable[[Any], None],
206 ) -> None: ...
208 async def start_notify(
209 self,
210 char: str | CharacteristicName | type[BaseCharacteristic[T]],
211 callback: Callable[[T], None] | Callable[[Any], None],
212 ) -> None:
213 """Start notifications for a characteristic.
215 Args:
216 char: Name, enum, or characteristic class to monitor.
217 Passing the class enables type-safe callbacks.
218 callback: Function to call when notifications are received.
219 Callback parameter type is inferred from characteristic class.
221 Raises:
222 RuntimeError: If no connection manager is attached
224 """
225 # Handle characteristic class input (type-safe path)
226 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
227 char_instance = char()
228 resolved_uuid = char_instance.uuid
230 def _typed_cb(sender: str, data: bytes) -> None:
231 del sender # Required by callback interface
232 parsed = char_instance.parse_value(data)
233 try:
234 callback(parsed)
235 except Exception: # pylint: disable=broad-exception-caught
236 logger.exception("Notification callback raised an exception")
238 await self._connection_manager.start_notify(resolved_uuid, _typed_cb)
239 return
241 # Handle string/enum input (not type-safe path)
242 resolved_uuid = self._resolve_characteristic_name(char)
243 translator = self._translator
245 def _internal_cb(sender: str, data: bytes) -> None:
246 parsed = translator.parse_characteristic(sender, data)
247 try:
248 callback(parsed)
249 except Exception: # pylint: disable=broad-exception-caught
250 logger.exception("Notification callback raised an exception")
252 await self._connection_manager.start_notify(resolved_uuid, _internal_cb)
254 async def stop_notify(self, char_name: str | CharacteristicName) -> None:
255 """Stop notifications for a characteristic.
257 Args:
258 char_name: Characteristic name or UUID
260 """
261 resolved_uuid = self._resolve_characteristic_name(char_name)
262 await self._connection_manager.stop_notify(resolved_uuid)
264 # ------------------------------------------------------------------
265 # Batch operations
266 # ------------------------------------------------------------------
268 async def read_multiple(self, char_names: list[str | CharacteristicName]) -> dict[str, Any | None]:
269 """Read multiple characteristics in batch.
271 Args:
272 char_names: List of characteristic names or enums to read
274 Returns:
275 Dictionary mapping characteristic UUIDs to parsed values
277 """
278 results: dict[str, Any | None] = {}
279 for char_name in char_names:
280 try:
281 value = await self.read(char_name)
282 resolved_uuid = self._resolve_characteristic_name(char_name)
283 results[str(resolved_uuid)] = value
284 except Exception as exc: # pylint: disable=broad-exception-caught
285 resolved_uuid = self._resolve_characteristic_name(char_name)
286 results[str(resolved_uuid)] = None
287 logger.warning("Failed to read characteristic %s: %s", char_name, exc)
289 return results
291 async def write_multiple(
292 self, data_map: dict[str | CharacteristicName, bytes], response: bool = True
293 ) -> dict[str, bool]:
294 """Write to multiple characteristics in batch.
296 Args:
297 data_map: Dictionary mapping characteristic names/enums to data bytes
298 response: If True, use write-with-response for all writes.
299 If False, use write-without-response for all writes.
301 Returns:
302 Dictionary mapping characteristic UUIDs to success status
304 """
305 results: dict[str, bool] = {}
306 for char_name, data in data_map.items():
307 try:
308 await self.write(char_name, data, response=response)
309 resolved_uuid = self._resolve_characteristic_name(char_name)
310 results[str(resolved_uuid)] = True
311 except Exception as exc: # pylint: disable=broad-exception-caught
312 resolved_uuid = self._resolve_characteristic_name(char_name)
313 results[str(resolved_uuid)] = False
314 logger.warning("Failed to write characteristic %s: %s", char_name, exc)
316 return results
318 # ------------------------------------------------------------------
319 # Internal helpers
320 # ------------------------------------------------------------------
322 def _resolve_characteristic_name(self, identifier: str | CharacteristicName) -> BluetoothUUID:
323 """Resolve a characteristic name or enum to its UUID.
325 Args:
326 identifier: Characteristic name string or enum
328 Returns:
329 Characteristic UUID string
331 Raises:
332 ValueError: If the characteristic name cannot be resolved
334 """
335 if isinstance(identifier, CharacteristicName):
336 # For enum inputs, ask the translator for the UUID
337 uuid = self._translator.get_characteristic_uuid_by_name(identifier)
338 if uuid:
339 return uuid
340 norm = identifier.value.strip()
341 else:
342 norm = identifier
343 stripped = norm.replace("-", "")
344 if len(stripped) in (4, 8, 32) and all(c in "0123456789abcdefABCDEF" for c in stripped):
345 return BluetoothUUID(norm)
347 raise ValueError(f"Unknown characteristic name: '{identifier}'")