Coverage for src / bluetooth_sig / gatt / characteristics / ln_control_point.py: 79%

134 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""LN Control Point characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from datetime import datetime 

6from enum import IntEnum 

7 

8import msgspec 

9 

10from ..constants import UINT8_MAX, UINT16_MAX 

11from ..context import CharacteristicContext 

12from .base import BaseCharacteristic 

13from .utils import DataParser, IEEE11073Parser 

14 

15# Minimum data lengths for each operation 

16_MIN_LEN_CUMULATIVE_VALUE = 5 # 1 byte op code + 4 bytes uint32 

17_MIN_LEN_MASK_CONTENT = 3 # 1 byte op code + 2 bytes uint16 

18_MIN_LEN_NAV_CONTROL = 2 # 1 byte op code + 1 byte value 

19_MIN_LEN_ROUTE_NUMBER = 2 # 1 byte op code + 1 byte route number 

20_MIN_LEN_FIX_RATE = 2 # 1 byte op code + 1 byte fix rate 

21_MIN_LEN_ELEVATION = 5 # 1 byte op code + 4 bytes int32 

22_MIN_LEN_RESPONSE = 3 # 1 byte op code + 1 byte request + 1 byte response 

23 

24# Response parameter lengths 

25_RESPONSE_PARAM_OFFSET = 3 # Offset where response parameter starts 

26_PARAM_LEN_UINT8 = 1 

27_PARAM_LEN_UINT16 = 2 

28_PARAM_LEN_UINT32 = 4 

29_PARAM_LEN_TIMESTAMP = 7 

30 

31 

32class LNControlPointOpCode(IntEnum): 

33 """LN Control Point operation codes as per Bluetooth SIG specification.""" 

34 

35 # Value 0x00 is Reserved for Future Use 

36 SET_CUMULATIVE_VALUE = 0x01 

37 MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT = 0x02 

38 NAVIGATION_CONTROL = 0x03 

39 REQUEST_NUMBER_OF_ROUTES = 0x04 

40 REQUEST_NAME_OF_ROUTE = 0x05 

41 SELECT_ROUTE = 0x06 

42 SET_FIX_RATE = 0x07 

43 SET_ELEVATION = 0x08 

44 # Values 0x09-0x1F are Reserved for Future Use 

45 RESPONSE_CODE = 0x20 

46 # Values 0x21-0xFF are Reserved for Future Use 

47 

48 def __str__(self) -> str: 

49 """Return human-readable operation code name.""" 

50 names = { 

51 self.SET_CUMULATIVE_VALUE: "Set Cumulative Value", 

52 self.MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT: "Mask Location and Speed Characteristic Content", 

53 self.NAVIGATION_CONTROL: "Navigation Control", 

54 self.REQUEST_NUMBER_OF_ROUTES: "Request Number of Routes", 

55 self.REQUEST_NAME_OF_ROUTE: "Request Name of Route", 

56 self.SELECT_ROUTE: "Select Route", 

57 self.SET_FIX_RATE: "Set Fix Rate", 

58 self.SET_ELEVATION: "Set Elevation", 

59 self.RESPONSE_CODE: "Response Code", 

60 } 

61 return names[self] 

62 

63 

64class LNControlPointResponseValue(IntEnum): 

65 """LN Control Point response values as per Bluetooth SIG specification.""" 

66 

67 # Value 0x00 is Reserved for Future Use 

68 SUCCESS = 0x01 

69 OP_CODE_NOT_SUPPORTED = 0x02 

70 INVALID_OPERAND = 0x03 

71 OPERATION_FAILED = 0x04 

72 # Values 0x05-0xFF are Reserved for Future Use 

73 

74 def __str__(self) -> str: 

75 """Return human-readable response value name.""" 

76 names = { 

77 self.SUCCESS: "Success", 

78 self.OP_CODE_NOT_SUPPORTED: "Op Code not supported", 

79 self.INVALID_OPERAND: "Invalid Operand", 

80 self.OPERATION_FAILED: "Operation Failed", 

81 } 

82 return names[self] 

83 

84 

85class LNControlPointData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

86 """Parsed data from LN Control Point characteristic.""" 

87 

88 op_code: LNControlPointOpCode 

89 # Parameters based on op_code 

90 cumulative_value: int | None = None 

91 content_mask: int | None = None 

92 navigation_control_value: int | None = None 

93 route_number: int | None = None 

94 route_name: str | None = None 

95 fix_rate: int | None = None 

96 elevation: float | None = None 

97 # Response fields 

98 request_op_code: LNControlPointOpCode | None = None 

99 response_value: LNControlPointResponseValue | None = None 

100 response_parameter: int | str | datetime | bytearray | None = None 

101 

102 def __post_init__(self) -> None: 

103 """Validate LN control point data.""" 

104 if not 0 <= self.op_code <= UINT8_MAX: 

105 raise ValueError("Op code must be a uint8 value (0-UINT8_MAX)") 

106 

107 

108class LNControlPointCharacteristic(BaseCharacteristic[LNControlPointData]): 

109 """LN Control Point characteristic. 

110 

111 Used to enable device-specific procedures related to the exchange of location and navigation information. 

112 """ 

113 

114 min_length = 1 # Op Code(1) minimum 

115 max_length = 18 # Op Code(1) + Parameter(max 17) maximum 

116 allow_variable_length: bool = True # Variable parameter length 

117 

118 def _decode_value( # pylint: disable=too-many-locals,too-many-branches,too-many-statements # Control point with many op codes 

119 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True 

120 ) -> LNControlPointData: 

121 """Parse LN control point data according to Bluetooth specification. 

122 

123 Format: Op Code(1) + Parameter(0-17). 

124 

125 Args: 

126 data: Raw bytearray from BLE characteristic 

127 ctx: Optional context providing surrounding context (may be None) 

128 validate: Whether to validate ranges (default True) 

129 

130 Returns: 

131 LNControlPointData containing parsed control point data 

132 

133 """ 

134 op_code = LNControlPointOpCode(data[0]) 

135 

136 # Initialize optional fields 

137 cumulative_value: int | None = None 

138 content_mask: int | None = None 

139 navigation_control_value: int | None = None 

140 route_number: int | None = None 

141 route_name: str | None = None 

142 fix_rate: int | None = None 

143 elevation: float | None = None 

144 request_op_code: LNControlPointOpCode | None = None 

145 response_value: LNControlPointResponseValue | None = None 

146 response_parameter: int | str | datetime | bytearray | None = None 

147 

148 if op_code == LNControlPointOpCode.SET_CUMULATIVE_VALUE: 

149 if len(data) >= _MIN_LEN_CUMULATIVE_VALUE: 

150 cumulative_value = DataParser.parse_int32(data, 1, signed=False) 

151 elif op_code == LNControlPointOpCode.MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT: 

152 if len(data) >= _MIN_LEN_MASK_CONTENT: 

153 content_mask = DataParser.parse_int16(data, 1, signed=False) 

154 elif op_code == LNControlPointOpCode.NAVIGATION_CONTROL: 

155 if len(data) >= _MIN_LEN_NAV_CONTROL: 

156 navigation_control_value = data[1] 

157 elif op_code in (LNControlPointOpCode.REQUEST_NAME_OF_ROUTE, LNControlPointOpCode.SELECT_ROUTE): 

158 if len(data) >= _MIN_LEN_ROUTE_NUMBER: 

159 route_number = data[1] 

160 elif op_code == LNControlPointOpCode.SET_FIX_RATE: 

161 if len(data) >= _MIN_LEN_FIX_RATE: 

162 fix_rate = data[1] 

163 elif op_code == LNControlPointOpCode.SET_ELEVATION: 

164 if len(data) >= _MIN_LEN_ELEVATION: 

165 # Unit is 1/100 m 

166 elevation = DataParser.parse_int32(data, 1, signed=True) / 100.0 

167 elif op_code == LNControlPointOpCode.RESPONSE_CODE and len(data) >= _MIN_LEN_RESPONSE: 

168 request_op_code = LNControlPointOpCode(data[1]) 

169 response_value = LNControlPointResponseValue(data[2]) 

170 # Parse response parameter based on request op code 

171 if len(data) > _RESPONSE_PARAM_OFFSET: 

172 parameter_length = len(data) - _RESPONSE_PARAM_OFFSET 

173 if request_op_code == LNControlPointOpCode.REQUEST_NUMBER_OF_ROUTES: 

174 response_parameter = DataParser.parse_int16(data, _RESPONSE_PARAM_OFFSET, signed=False) 

175 elif request_op_code == LNControlPointOpCode.REQUEST_NAME_OF_ROUTE: 

176 response_parameter = data[_RESPONSE_PARAM_OFFSET:].decode("utf-8", errors="ignore") 

177 # For other responses, parse based on parameter length 

178 elif parameter_length == _PARAM_LEN_UINT8: 

179 response_parameter = data[_RESPONSE_PARAM_OFFSET] 

180 elif parameter_length == _PARAM_LEN_UINT16: 

181 response_parameter = DataParser.parse_int16(data, _RESPONSE_PARAM_OFFSET, signed=False) 

182 elif parameter_length == _PARAM_LEN_UINT32: 

183 response_parameter = DataParser.parse_int32(data, _RESPONSE_PARAM_OFFSET, signed=False) 

184 elif parameter_length == _PARAM_LEN_TIMESTAMP: 

185 response_parameter = IEEE11073Parser.parse_timestamp(data, _RESPONSE_PARAM_OFFSET) 

186 else: 

187 # Unknown parameter format, store as bytes 

188 response_parameter = data[_RESPONSE_PARAM_OFFSET:] 

189 

190 return LNControlPointData( 

191 op_code=op_code, 

192 cumulative_value=cumulative_value, 

193 content_mask=content_mask, 

194 navigation_control_value=navigation_control_value, 

195 route_number=route_number, 

196 route_name=route_name, 

197 fix_rate=fix_rate, 

198 elevation=elevation, 

199 request_op_code=request_op_code, 

200 response_value=response_value, 

201 response_parameter=response_parameter, 

202 ) 

203 

204 def _encode_value(self, data: LNControlPointData) -> bytearray: 

205 """Encode LNControlPointData back to bytes. 

206 

207 Args: 

208 data: LNControlPointData instance to encode 

209 

210 Returns: 

211 Encoded bytes representing the LN control point data 

212 

213 """ 

214 result = bytearray() 

215 result.append(data.op_code) 

216 

217 # Handle parameter encoding based on op code 

218 op_code_handlers = { 

219 LNControlPointOpCode.SET_CUMULATIVE_VALUE: lambda: ( 

220 result.extend(DataParser.encode_int32(data.cumulative_value, signed=False)) 

221 if data.cumulative_value is not None 

222 else None 

223 ), 

224 LNControlPointOpCode.MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT: lambda: ( 

225 result.extend(DataParser.encode_int16(data.content_mask, signed=False)) 

226 if data.content_mask is not None 

227 else None 

228 ), 

229 LNControlPointOpCode.NAVIGATION_CONTROL: lambda: ( 

230 result.append(data.navigation_control_value) if data.navigation_control_value is not None else None 

231 ), 

232 LNControlPointOpCode.REQUEST_NAME_OF_ROUTE: lambda: ( 

233 result.append(data.route_number) if data.route_number is not None else None 

234 ), 

235 LNControlPointOpCode.SELECT_ROUTE: lambda: ( 

236 result.append(data.route_number) if data.route_number is not None else None 

237 ), 

238 LNControlPointOpCode.SET_FIX_RATE: lambda: ( 

239 result.append(data.fix_rate) if data.fix_rate is not None else None 

240 ), 

241 LNControlPointOpCode.SET_ELEVATION: lambda: ( 

242 result.extend(DataParser.encode_int32(int(data.elevation * 100), signed=True)) 

243 if data.elevation is not None 

244 else None 

245 ), 

246 } 

247 

248 # Execute handler if op code is supported 

249 handler = op_code_handlers.get(data.op_code) 

250 if handler: 

251 handler() 

252 

253 # Special handling for response code 

254 if data.op_code == LNControlPointOpCode.RESPONSE_CODE: 

255 if data.request_op_code is not None: 

256 result.append(data.request_op_code) 

257 if data.response_value is not None: 

258 result.append(data.response_value) 

259 if data.response_parameter is not None: 

260 if isinstance(data.response_parameter, int): 

261 if data.response_parameter <= UINT8_MAX: 

262 result.append(data.response_parameter) 

263 elif data.response_parameter <= UINT16_MAX: 

264 result.extend(DataParser.encode_int16(data.response_parameter, signed=False)) 

265 elif isinstance(data.response_parameter, str): 

266 result.extend(data.response_parameter.encode("utf-8")) 

267 elif isinstance(data.response_parameter, datetime): 

268 result.extend(IEEE11073Parser.encode_timestamp(data.response_parameter)) 

269 elif isinstance(data.response_parameter, bytearray): 

270 result.extend(data.response_parameter) 

271 

272 return result