Coverage for src / bluetooth_sig / device / characteristic_io.py: 70%
110 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""Characteristic I/O operations for BLE devices.
3Encapsulates read, write, and notification operations for GATT characteristics,
4including type-safe overloads for class-based and string/enum-based access.
5"""
7from __future__ import annotations
9import logging
10from collections.abc import Callable
11from typing import Any, TypeVar, cast, overload
13from ..gatt.characteristics import CharacteristicName
14from ..gatt.characteristics.base import BaseCharacteristic
15from ..gatt.characteristics.registry import CharacteristicRegistry
16from ..gatt.context import CharacteristicContext, DeviceInfo
17from ..types.uuid import BluetoothUUID
18from .client import ClientManagerProtocol
19from .dependency_resolver import DependencyResolutionMode, DependencyResolver
20from .protocols import SIGTranslatorProtocol
22logger = logging.getLogger(__name__)
24# Type variable for generic characteristic return types
25T = TypeVar("T")
28class CharacteristicIO:
29 """Read, write, and notification operations for GATT characteristics.
31 Encapsulates the I/O logic extracted from Device, handling both type-safe
32 (class-based) and dynamic (string/enum-based) characteristic access patterns.
34 Uses ``DependencyResolver`` for automatic dependency resolution before reads,
35 and a ``device_info_factory`` callable to get current ``DeviceInfo`` without
36 a back-reference to the owning Device.
37 """
39 def __init__(
40 self,
41 connection_manager: ClientManagerProtocol,
42 translator: SIGTranslatorProtocol,
43 dep_resolver: DependencyResolver,
44 device_info_factory: Callable[[], DeviceInfo],
45 ) -> None:
46 """Initialise with connection manager, translator, resolver, and info factory.
48 Args:
49 connection_manager: Connection manager for BLE I/O
50 translator: Translator for parsing/encoding characteristics
51 dep_resolver: Resolver for characteristic dependencies
52 device_info_factory: Callable returning current DeviceInfo
54 """
55 self._connection_manager = connection_manager
56 self._translator = translator
57 self._dep_resolver = dep_resolver
58 self._device_info_factory = device_info_factory
60 # ------------------------------------------------------------------
61 # Read
62 # ------------------------------------------------------------------
64 @overload
65 async def read(
66 self,
67 char: type[BaseCharacteristic[T]],
68 resolution_mode: DependencyResolutionMode = ...,
69 ) -> T | None: ...
71 @overload
72 async def read(
73 self,
74 char: str | CharacteristicName,
75 resolution_mode: DependencyResolutionMode = ...,
76 ) -> Any | None: ... # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe
78 async def read(
79 self,
80 char: str | CharacteristicName | type[BaseCharacteristic[T]],
81 resolution_mode: DependencyResolutionMode = DependencyResolutionMode.NORMAL,
82 ) -> T | Any | None: # Runtime UUID dispatch cannot be type-safe
83 """Read a characteristic value from the device.
85 Args:
86 char: Name, enum, or characteristic class to read.
87 Passing the class enables type-safe return values.
88 resolution_mode: How to handle automatic dependency resolution:
89 - NORMAL: Auto-resolve dependencies, use cache when available (default)
90 - SKIP_DEPENDENCIES: Skip dependency resolution and validation
91 - FORCE_REFRESH: Re-read dependencies from device, ignoring cache
93 Returns:
94 Parsed characteristic value or None if read fails.
95 Return type is inferred from characteristic class when provided.
97 Raises:
98 RuntimeError: If no connection manager is attached
99 ValueError: If required dependencies cannot be resolved
101 """
102 # Handle characteristic class input (type-safe path)
103 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
104 char_class: type[BaseCharacteristic[Any]] = char
105 char_instance = char_class()
106 resolved_uuid = char_instance.uuid
108 ctx: CharacteristicContext | None = None
109 if resolution_mode != DependencyResolutionMode.SKIP_DEPENDENCIES:
110 device_info = self._device_info_factory()
111 ctx = await self._dep_resolver.resolve(char_class, resolution_mode, device_info)
113 raw = await self._connection_manager.read_gatt_char(resolved_uuid)
114 return char_instance.parse_value(raw, ctx=ctx)
116 # Handle string/enum input (not type-safe path)
117 resolved_uuid = self._resolve_characteristic_name(char)
119 char_class_lookup = CharacteristicRegistry.get_characteristic_class_by_uuid(resolved_uuid)
121 # Resolve dependencies if characteristic class is known
122 ctx = None
123 if char_class_lookup and resolution_mode != DependencyResolutionMode.SKIP_DEPENDENCIES:
124 device_info = self._device_info_factory()
125 ctx = await self._dep_resolver.resolve(char_class_lookup, resolution_mode, device_info)
127 # Read the characteristic
128 raw = await self._connection_manager.read_gatt_char(resolved_uuid)
129 return self._translator.parse_characteristic(str(resolved_uuid), raw, ctx=ctx)
131 # ------------------------------------------------------------------
132 # Write
133 # ------------------------------------------------------------------
135 @overload
136 async def write(
137 self,
138 char: type[BaseCharacteristic[T]],
139 data: T,
140 response: bool = ...,
141 ) -> None: ...
143 @overload
144 async def write(
145 self,
146 char: str | CharacteristicName,
147 data: bytes,
148 response: bool = ...,
149 ) -> None: ...
151 async def write(
152 self,
153 char: str | CharacteristicName | type[BaseCharacteristic[T]],
154 data: bytes | T,
155 response: bool = True,
156 ) -> None:
157 r"""Write data to a characteristic on the device.
159 Args:
160 char: Name, enum, or characteristic class to write to.
161 Passing the class enables type-safe value encoding.
162 data: Raw bytes (for string/enum) or typed value (for characteristic class).
163 When using characteristic class, the value is encoded using build_value().
164 response: If True, use write-with-response (wait for acknowledgment).
165 If False, use write-without-response (faster but no confirmation).
166 Default is True for reliability.
168 Raises:
169 RuntimeError: If no connection manager is attached
170 CharacteristicEncodeError: If encoding fails (when using characteristic class)
172 """
173 # Handle characteristic class input (type-safe path)
174 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
175 char_instance = char()
176 resolved_uuid = char_instance.uuid
177 # data is typed value T, encode it
178 encoded = char_instance.build_value(data) # type: ignore[arg-type] # T is erased at runtime; overload ensures type safety at call site
179 await self._connection_manager.write_gatt_char(resolved_uuid, bytes(encoded), response=response)
180 return
182 # Handle string/enum input (not type-safe path)
183 # data must be bytes in this path
184 if not isinstance(data, (bytes, bytearray)):
185 raise TypeError(f"When using string/enum char_name, data must be bytes, got {type(data).__name__}")
187 resolved_uuid = self._resolve_characteristic_name(char)
188 # cast is safe: isinstance check above ensures data is bytes/bytearray
189 await self._connection_manager.write_gatt_char(resolved_uuid, cast("bytes", data), response=response)
191 # ------------------------------------------------------------------
192 # Notifications
193 # ------------------------------------------------------------------
195 @overload
196 async def start_notify(
197 self,
198 char: type[BaseCharacteristic[T]],
199 callback: Callable[[T], None],
200 ) -> None: ...
202 @overload
203 async def start_notify(
204 self,
205 char: str | CharacteristicName,
206 callback: Callable[[Any], None],
207 ) -> None: ...
209 async def start_notify(
210 self,
211 char: str | CharacteristicName | type[BaseCharacteristic[T]],
212 callback: Callable[[T], None] | Callable[[Any], None],
213 ) -> None:
214 """Start notifications for a characteristic.
216 Args:
217 char: Name, enum, or characteristic class to monitor.
218 Passing the class enables type-safe callbacks.
219 callback: Function to call when notifications are received.
220 Callback parameter type is inferred from characteristic class.
222 Raises:
223 RuntimeError: If no connection manager is attached
225 """
226 # Handle characteristic class input (type-safe path)
227 if isinstance(char, type) and issubclass(char, BaseCharacteristic):
228 char_instance = char()
229 resolved_uuid = char_instance.uuid
231 def _typed_cb(sender: str, data: bytes) -> None:
232 del sender # Required by callback interface
233 parsed = char_instance.parse_value(data)
234 try:
235 callback(parsed)
236 except Exception: # pylint: disable=broad-exception-caught
237 logger.exception("Notification callback raised an exception")
239 await self._connection_manager.start_notify(resolved_uuid, _typed_cb)
240 return
242 # Handle string/enum input (not type-safe path)
243 resolved_uuid = self._resolve_characteristic_name(char)
244 translator = self._translator
246 def _internal_cb(sender: str, data: bytes) -> None:
247 parsed = translator.parse_characteristic(sender, data)
248 try:
249 callback(parsed)
250 except Exception: # pylint: disable=broad-exception-caught
251 logger.exception("Notification callback raised an exception")
253 await self._connection_manager.start_notify(resolved_uuid, _internal_cb)
255 async def stop_notify(self, char_name: str | CharacteristicName) -> None:
256 """Stop notifications for a characteristic.
258 Args:
259 char_name: Characteristic name or UUID
261 """
262 resolved_uuid = self._resolve_characteristic_name(char_name)
263 await self._connection_manager.stop_notify(resolved_uuid)
265 # ------------------------------------------------------------------
266 # Batch operations
267 # ------------------------------------------------------------------
269 async def read_multiple(self, char_names: list[str | CharacteristicName]) -> dict[str, Any | None]:
270 """Read multiple characteristics in batch.
272 Args:
273 char_names: List of characteristic names or enums to read
275 Returns:
276 Dictionary mapping characteristic UUIDs to parsed values
278 """
279 results: dict[str, Any | None] = {}
280 for char_name in char_names:
281 try:
282 value = await self.read(char_name)
283 resolved_uuid = self._resolve_characteristic_name(char_name)
284 results[str(resolved_uuid)] = value
285 except Exception as exc: # pylint: disable=broad-exception-caught
286 resolved_uuid = self._resolve_characteristic_name(char_name)
287 results[str(resolved_uuid)] = None
288 logger.warning("Failed to read characteristic %s: %s", char_name, exc)
290 return results
292 async def write_multiple(
293 self, data_map: dict[str | CharacteristicName, bytes], response: bool = True
294 ) -> dict[str, bool]:
295 """Write to multiple characteristics in batch.
297 Args:
298 data_map: Dictionary mapping characteristic names/enums to data bytes
299 response: If True, use write-with-response for all writes.
300 If False, use write-without-response for all writes.
302 Returns:
303 Dictionary mapping characteristic UUIDs to success status
305 """
306 results: dict[str, bool] = {}
307 for char_name, data in data_map.items():
308 try:
309 await self.write(char_name, data, response=response)
310 resolved_uuid = self._resolve_characteristic_name(char_name)
311 results[str(resolved_uuid)] = True
312 except Exception as exc: # pylint: disable=broad-exception-caught
313 resolved_uuid = self._resolve_characteristic_name(char_name)
314 results[str(resolved_uuid)] = False
315 logger.warning("Failed to write characteristic %s: %s", char_name, exc)
317 return results
319 # ------------------------------------------------------------------
320 # Internal helpers
321 # ------------------------------------------------------------------
323 def _resolve_characteristic_name(self, identifier: str | CharacteristicName) -> BluetoothUUID:
324 """Resolve a characteristic name or enum to its UUID.
326 Args:
327 identifier: Characteristic name string or enum
329 Returns:
330 Characteristic UUID string
332 Raises:
333 ValueError: If the characteristic name cannot be resolved
335 """
336 if isinstance(identifier, CharacteristicName):
337 # For enum inputs, ask the translator for the UUID
338 uuid = self._translator.get_characteristic_uuid_by_name(identifier)
339 if uuid:
340 return uuid
341 norm = identifier.value.strip()
342 else:
343 norm = identifier
344 stripped = norm.replace("-", "")
345 if len(stripped) in (4, 8, 32) and all(c in "0123456789abcdefABCDEF" for c in stripped):
346 return BluetoothUUID(norm)
348 raise ValueError(f"Unknown characteristic name: '{identifier}'")