Coverage for src / bluetooth_sig / device / characteristic_io.py: 70%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""Characteristic I/O operations for BLE devices. 

2 

3Encapsulates read, write, and notification operations for GATT characteristics, 

4including type-safe overloads for class-based and string/enum-based access. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10from collections.abc import Callable 

11from typing import Any, TypeVar, cast, overload 

12 

13from ..gatt.characteristics import CharacteristicName 

14from ..gatt.characteristics.base import BaseCharacteristic 

15from ..gatt.characteristics.registry import CharacteristicRegistry 

16from ..gatt.context import CharacteristicContext, DeviceInfo 

17from ..types.uuid import BluetoothUUID 

18from .client import ClientManagerProtocol 

19from .dependency_resolver import DependencyResolutionMode, DependencyResolver 

20from .protocols import SIGTranslatorProtocol 

21 

22logger = logging.getLogger(__name__) 

23 

24# Type variable for generic characteristic return types 

25T = TypeVar("T") 

26 

27 

28class CharacteristicIO: 

29 """Read, write, and notification operations for GATT characteristics. 

30 

31 Encapsulates the I/O logic extracted from Device, handling both type-safe 

32 (class-based) and dynamic (string/enum-based) characteristic access patterns. 

33 

34 Uses ``DependencyResolver`` for automatic dependency resolution before reads, 

35 and a ``device_info_factory`` callable to get current ``DeviceInfo`` without 

36 a back-reference to the owning Device. 

37 """ 

38 

39 def __init__( 

40 self, 

41 connection_manager: ClientManagerProtocol, 

42 translator: SIGTranslatorProtocol, 

43 dep_resolver: DependencyResolver, 

44 device_info_factory: Callable[[], DeviceInfo], 

45 ) -> None: 

46 """Initialise with connection manager, translator, resolver, and info factory. 

47 

48 Args: 

49 connection_manager: Connection manager for BLE I/O 

50 translator: Translator for parsing/encoding characteristics 

51 dep_resolver: Resolver for characteristic dependencies 

52 device_info_factory: Callable returning current DeviceInfo 

53 

54 """ 

55 self._connection_manager = connection_manager 

56 self._translator = translator 

57 self._dep_resolver = dep_resolver 

58 self._device_info_factory = device_info_factory 

59 

60 # ------------------------------------------------------------------ 

61 # Read 

62 # ------------------------------------------------------------------ 

63 

64 @overload 

65 async def read( 

66 self, 

67 char: type[BaseCharacteristic[T]], 

68 resolution_mode: DependencyResolutionMode = ..., 

69 ) -> T | None: ... 

70 

71 @overload 

72 async def read( 

73 self, 

74 char: str | CharacteristicName, 

75 resolution_mode: DependencyResolutionMode = ..., 

76 ) -> Any | None: ... # noqa: ANN401 # Runtime UUID dispatch cannot be type-safe 

77 

78 async def read( 

79 self, 

80 char: str | CharacteristicName | type[BaseCharacteristic[T]], 

81 resolution_mode: DependencyResolutionMode = DependencyResolutionMode.NORMAL, 

82 ) -> T | Any | None: # Runtime UUID dispatch cannot be type-safe 

83 """Read a characteristic value from the device. 

84 

85 Args: 

86 char: Name, enum, or characteristic class to read. 

87 Passing the class enables type-safe return values. 

88 resolution_mode: How to handle automatic dependency resolution: 

89 - NORMAL: Auto-resolve dependencies, use cache when available (default) 

90 - SKIP_DEPENDENCIES: Skip dependency resolution and validation 

91 - FORCE_REFRESH: Re-read dependencies from device, ignoring cache 

92 

93 Returns: 

94 Parsed characteristic value or None if read fails. 

95 Return type is inferred from characteristic class when provided. 

96 

97 Raises: 

98 RuntimeError: If no connection manager is attached 

99 ValueError: If required dependencies cannot be resolved 

100 

101 """ 

102 # Handle characteristic class input (type-safe path) 

103 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

104 char_class: type[BaseCharacteristic[Any]] = char 

105 char_instance = char_class() 

106 resolved_uuid = char_instance.uuid 

107 

108 ctx: CharacteristicContext | None = None 

109 if resolution_mode != DependencyResolutionMode.SKIP_DEPENDENCIES: 

110 device_info = self._device_info_factory() 

111 ctx = await self._dep_resolver.resolve(char_class, resolution_mode, device_info) 

112 

113 raw = await self._connection_manager.read_gatt_char(resolved_uuid) 

114 return char_instance.parse_value(raw, ctx=ctx) 

115 

116 # Handle string/enum input (not type-safe path) 

117 resolved_uuid = self._resolve_characteristic_name(char) 

118 

119 char_class_lookup = CharacteristicRegistry.get_characteristic_class_by_uuid(resolved_uuid) 

120 

121 # Resolve dependencies if characteristic class is known 

122 ctx = None 

123 if char_class_lookup and resolution_mode != DependencyResolutionMode.SKIP_DEPENDENCIES: 

124 device_info = self._device_info_factory() 

125 ctx = await self._dep_resolver.resolve(char_class_lookup, resolution_mode, device_info) 

126 

127 # Read the characteristic 

128 raw = await self._connection_manager.read_gatt_char(resolved_uuid) 

129 return self._translator.parse_characteristic(str(resolved_uuid), raw, ctx=ctx) 

130 

131 # ------------------------------------------------------------------ 

132 # Write 

133 # ------------------------------------------------------------------ 

134 

135 @overload 

136 async def write( 

137 self, 

138 char: type[BaseCharacteristic[T]], 

139 data: T, 

140 response: bool = ..., 

141 ) -> None: ... 

142 

143 @overload 

144 async def write( 

145 self, 

146 char: str | CharacteristicName, 

147 data: bytes, 

148 response: bool = ..., 

149 ) -> None: ... 

150 

151 async def write( 

152 self, 

153 char: str | CharacteristicName | type[BaseCharacteristic[T]], 

154 data: bytes | T, 

155 response: bool = True, 

156 ) -> None: 

157 r"""Write data to a characteristic on the device. 

158 

159 Args: 

160 char: Name, enum, or characteristic class to write to. 

161 Passing the class enables type-safe value encoding. 

162 data: Raw bytes (for string/enum) or typed value (for characteristic class). 

163 When using characteristic class, the value is encoded using build_value(). 

164 response: If True, use write-with-response (wait for acknowledgment). 

165 If False, use write-without-response (faster but no confirmation). 

166 Default is True for reliability. 

167 

168 Raises: 

169 RuntimeError: If no connection manager is attached 

170 CharacteristicEncodeError: If encoding fails (when using characteristic class) 

171 

172 """ 

173 # Handle characteristic class input (type-safe path) 

174 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

175 char_instance = char() 

176 resolved_uuid = char_instance.uuid 

177 # data is typed value T, encode it 

178 encoded = char_instance.build_value(data) # type: ignore[arg-type] # T is erased at runtime; overload ensures type safety at call site 

179 await self._connection_manager.write_gatt_char(resolved_uuid, bytes(encoded), response=response) 

180 return 

181 

182 # Handle string/enum input (not type-safe path) 

183 # data must be bytes in this path 

184 if not isinstance(data, (bytes, bytearray)): 

185 raise TypeError(f"When using string/enum char_name, data must be bytes, got {type(data).__name__}") 

186 

187 resolved_uuid = self._resolve_characteristic_name(char) 

188 # cast is safe: isinstance check above ensures data is bytes/bytearray 

189 await self._connection_manager.write_gatt_char(resolved_uuid, cast("bytes", data), response=response) 

190 

191 # ------------------------------------------------------------------ 

192 # Notifications 

193 # ------------------------------------------------------------------ 

194 

195 @overload 

196 async def start_notify( 

197 self, 

198 char: type[BaseCharacteristic[T]], 

199 callback: Callable[[T], None], 

200 ) -> None: ... 

201 

202 @overload 

203 async def start_notify( 

204 self, 

205 char: str | CharacteristicName, 

206 callback: Callable[[Any], None], 

207 ) -> None: ... 

208 

209 async def start_notify( 

210 self, 

211 char: str | CharacteristicName | type[BaseCharacteristic[T]], 

212 callback: Callable[[T], None] | Callable[[Any], None], 

213 ) -> None: 

214 """Start notifications for a characteristic. 

215 

216 Args: 

217 char: Name, enum, or characteristic class to monitor. 

218 Passing the class enables type-safe callbacks. 

219 callback: Function to call when notifications are received. 

220 Callback parameter type is inferred from characteristic class. 

221 

222 Raises: 

223 RuntimeError: If no connection manager is attached 

224 

225 """ 

226 # Handle characteristic class input (type-safe path) 

227 if isinstance(char, type) and issubclass(char, BaseCharacteristic): 

228 char_instance = char() 

229 resolved_uuid = char_instance.uuid 

230 

231 def _typed_cb(sender: str, data: bytes) -> None: 

232 del sender # Required by callback interface 

233 parsed = char_instance.parse_value(data) 

234 try: 

235 callback(parsed) 

236 except Exception: # pylint: disable=broad-exception-caught 

237 logger.exception("Notification callback raised an exception") 

238 

239 await self._connection_manager.start_notify(resolved_uuid, _typed_cb) 

240 return 

241 

242 # Handle string/enum input (not type-safe path) 

243 resolved_uuid = self._resolve_characteristic_name(char) 

244 translator = self._translator 

245 

246 def _internal_cb(sender: str, data: bytes) -> None: 

247 parsed = translator.parse_characteristic(sender, data) 

248 try: 

249 callback(parsed) 

250 except Exception: # pylint: disable=broad-exception-caught 

251 logger.exception("Notification callback raised an exception") 

252 

253 await self._connection_manager.start_notify(resolved_uuid, _internal_cb) 

254 

255 async def stop_notify(self, char_name: str | CharacteristicName) -> None: 

256 """Stop notifications for a characteristic. 

257 

258 Args: 

259 char_name: Characteristic name or UUID 

260 

261 """ 

262 resolved_uuid = self._resolve_characteristic_name(char_name) 

263 await self._connection_manager.stop_notify(resolved_uuid) 

264 

265 # ------------------------------------------------------------------ 

266 # Batch operations 

267 # ------------------------------------------------------------------ 

268 

269 async def read_multiple(self, char_names: list[str | CharacteristicName]) -> dict[str, Any | None]: 

270 """Read multiple characteristics in batch. 

271 

272 Args: 

273 char_names: List of characteristic names or enums to read 

274 

275 Returns: 

276 Dictionary mapping characteristic UUIDs to parsed values 

277 

278 """ 

279 results: dict[str, Any | None] = {} 

280 for char_name in char_names: 

281 try: 

282 value = await self.read(char_name) 

283 resolved_uuid = self._resolve_characteristic_name(char_name) 

284 results[str(resolved_uuid)] = value 

285 except Exception as exc: # pylint: disable=broad-exception-caught 

286 resolved_uuid = self._resolve_characteristic_name(char_name) 

287 results[str(resolved_uuid)] = None 

288 logger.warning("Failed to read characteristic %s: %s", char_name, exc) 

289 

290 return results 

291 

292 async def write_multiple( 

293 self, data_map: dict[str | CharacteristicName, bytes], response: bool = True 

294 ) -> dict[str, bool]: 

295 """Write to multiple characteristics in batch. 

296 

297 Args: 

298 data_map: Dictionary mapping characteristic names/enums to data bytes 

299 response: If True, use write-with-response for all writes. 

300 If False, use write-without-response for all writes. 

301 

302 Returns: 

303 Dictionary mapping characteristic UUIDs to success status 

304 

305 """ 

306 results: dict[str, bool] = {} 

307 for char_name, data in data_map.items(): 

308 try: 

309 await self.write(char_name, data, response=response) 

310 resolved_uuid = self._resolve_characteristic_name(char_name) 

311 results[str(resolved_uuid)] = True 

312 except Exception as exc: # pylint: disable=broad-exception-caught 

313 resolved_uuid = self._resolve_characteristic_name(char_name) 

314 results[str(resolved_uuid)] = False 

315 logger.warning("Failed to write characteristic %s: %s", char_name, exc) 

316 

317 return results 

318 

319 # ------------------------------------------------------------------ 

320 # Internal helpers 

321 # ------------------------------------------------------------------ 

322 

323 def _resolve_characteristic_name(self, identifier: str | CharacteristicName) -> BluetoothUUID: 

324 """Resolve a characteristic name or enum to its UUID. 

325 

326 Args: 

327 identifier: Characteristic name string or enum 

328 

329 Returns: 

330 Characteristic UUID string 

331 

332 Raises: 

333 ValueError: If the characteristic name cannot be resolved 

334 

335 """ 

336 if isinstance(identifier, CharacteristicName): 

337 # For enum inputs, ask the translator for the UUID 

338 uuid = self._translator.get_characteristic_uuid_by_name(identifier) 

339 if uuid: 

340 return uuid 

341 norm = identifier.value.strip() 

342 else: 

343 norm = identifier 

344 stripped = norm.replace("-", "") 

345 if len(stripped) in (4, 8, 32) and all(c in "0123456789abcdefABCDEF" for c in stripped): 

346 return BluetoothUUID(norm) 

347 

348 raise ValueError(f"Unknown characteristic name: '{identifier}'")