Coverage for src/bluetooth_sig/gatt/characteristics/ln_control_point.py: 78%

130 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-30 00:10 +0000

1"""LN Control Point characteristic implementation.""" 

2 

3from __future__ import annotations 

4 

5from datetime import datetime 

6from enum import IntEnum 

7 

8import msgspec 

9 

10from ...types.gatt_enums import ValueType 

11from ..constants import UINT8_MAX 

12from ..context import CharacteristicContext 

13from .base import BaseCharacteristic 

14from .utils import DataParser, IEEE11073Parser 

15 

16 

17class LNControlPointOpCode(IntEnum): 

18 """LN Control Point operation codes as per Bluetooth SIG specification.""" 

19 

20 # Value 0x00 is Reserved for Future Use 

21 SET_CUMULATIVE_VALUE = 0x01 

22 MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT = 0x02 

23 NAVIGATION_CONTROL = 0x03 

24 REQUEST_NUMBER_OF_ROUTES = 0x04 

25 REQUEST_NAME_OF_ROUTE = 0x05 

26 SELECT_ROUTE = 0x06 

27 SET_FIX_RATE = 0x07 

28 SET_ELEVATION = 0x08 

29 # Values 0x09-0x1F are Reserved for Future Use 

30 RESPONSE_CODE = 0x20 

31 # Values 0x21-0xFF are Reserved for Future Use 

32 

33 def __str__(self) -> str: 

34 """Return human-readable operation code name.""" 

35 names = { 

36 self.SET_CUMULATIVE_VALUE: "Set Cumulative Value", 

37 self.MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT: "Mask Location and Speed Characteristic Content", 

38 self.NAVIGATION_CONTROL: "Navigation Control", 

39 self.REQUEST_NUMBER_OF_ROUTES: "Request Number of Routes", 

40 self.REQUEST_NAME_OF_ROUTE: "Request Name of Route", 

41 self.SELECT_ROUTE: "Select Route", 

42 self.SET_FIX_RATE: "Set Fix Rate", 

43 self.SET_ELEVATION: "Set Elevation", 

44 self.RESPONSE_CODE: "Response Code", 

45 } 

46 return names[self] 

47 

48 

49class LNControlPointResponseValue(IntEnum): 

50 """LN Control Point response values as per Bluetooth SIG specification.""" 

51 

52 # Value 0x00 is Reserved for Future Use 

53 SUCCESS = 0x01 

54 OP_CODE_NOT_SUPPORTED = 0x02 

55 INVALID_OPERAND = 0x03 

56 OPERATION_FAILED = 0x04 

57 # Values 0x05-0xFF are Reserved for Future Use 

58 

59 def __str__(self) -> str: 

60 """Return human-readable response value name.""" 

61 names = { 

62 self.SUCCESS: "Success", 

63 self.OP_CODE_NOT_SUPPORTED: "Op Code not supported", 

64 self.INVALID_OPERAND: "Invalid Operand", 

65 self.OPERATION_FAILED: "Operation Failed", 

66 } 

67 return names[self] 

68 

69 

70class LNControlPointData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes 

71 """Parsed data from LN Control Point characteristic.""" 

72 

73 op_code: LNControlPointOpCode 

74 # Parameters based on op_code 

75 cumulative_value: int | None = None 

76 content_mask: int | None = None 

77 navigation_control_value: int | None = None 

78 route_number: int | None = None 

79 route_name: str | None = None 

80 fix_rate: int | None = None 

81 elevation: float | None = None 

82 # Response fields 

83 request_op_code: LNControlPointOpCode | None = None 

84 response_value: LNControlPointResponseValue | None = None 

85 response_parameter: int | str | datetime | bytearray | None = None 

86 

87 def __post_init__(self) -> None: 

88 """Validate LN control point data.""" 

89 if not 0 <= self.op_code <= UINT8_MAX: 

90 raise ValueError("Op code must be a uint8 value (0-UINT8_MAX)") 

91 

92 

93class LNControlPointCharacteristic(BaseCharacteristic): 

94 """LN Control Point characteristic. 

95 

96 Used to enable device-specific procedures related to the exchange of location and navigation information. 

97 """ 

98 

99 _manual_value_type: ValueType | str | None = ValueType.DICT # Override since decode_value returns dataclass 

100 

101 min_length = 1 # Op Code(1) minimum 

102 max_length = 18 # Op Code(1) + Parameter(max 17) maximum 

103 allow_variable_length: bool = True # Variable parameter length 

104 

105 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> LNControlPointData: # pylint: disable=too-many-locals,too-many-branches,too-many-statements 

106 """Parse LN control point data according to Bluetooth specification. 

107 

108 Format: Op Code(1) + Parameter(0-17). 

109 

110 Args: 

111 data: Raw bytearray from BLE characteristic 

112 ctx: Optional context providing surrounding context (may be None) 

113 

114 Returns: 

115 LNControlPointData containing parsed control point data 

116 

117 """ 

118 if len(data) < 1: 

119 raise ValueError("LN Control Point data must be at least 1 byte") 

120 

121 op_code = LNControlPointOpCode(data[0]) 

122 

123 # Initialize optional fields 

124 cumulative_value: int | None = None 

125 content_mask: int | None = None 

126 navigation_control_value: int | None = None 

127 route_number: int | None = None 

128 route_name: str | None = None 

129 fix_rate: int | None = None 

130 elevation: float | None = None 

131 request_op_code: LNControlPointOpCode | None = None 

132 response_value: LNControlPointResponseValue | None = None 

133 response_parameter: int | str | datetime | bytearray | None = None 

134 

135 if op_code == LNControlPointOpCode.SET_CUMULATIVE_VALUE: 

136 if len(data) >= 5: 

137 cumulative_value = DataParser.parse_int32(data, 1, signed=False) 

138 elif op_code == LNControlPointOpCode.MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT: 

139 if len(data) >= 3: 

140 content_mask = DataParser.parse_int16(data, 1, signed=False) 

141 elif op_code == LNControlPointOpCode.NAVIGATION_CONTROL: 

142 if len(data) >= 2: 

143 navigation_control_value = data[1] 

144 elif op_code == LNControlPointOpCode.REQUEST_NAME_OF_ROUTE: 

145 if len(data) >= 2: 

146 route_number = data[1] 

147 elif op_code == LNControlPointOpCode.SELECT_ROUTE: 

148 if len(data) >= 2: 

149 route_number = data[1] 

150 elif op_code == LNControlPointOpCode.SET_FIX_RATE: 

151 if len(data) >= 2: 

152 fix_rate = data[1] 

153 elif op_code == LNControlPointOpCode.SET_ELEVATION: 

154 if len(data) >= 5: 

155 # Unit is 1/100 m 

156 elevation = DataParser.parse_int32(data, 1, signed=True) / 100.0 

157 elif op_code == LNControlPointOpCode.RESPONSE_CODE: 

158 if len(data) >= 3: 

159 request_op_code = LNControlPointOpCode(data[1]) 

160 response_value = LNControlPointResponseValue(data[2]) 

161 # Parse response parameter based on request op code 

162 if len(data) > 3: 

163 parameter_length = len(data) - 3 

164 if request_op_code == LNControlPointOpCode.REQUEST_NUMBER_OF_ROUTES: 

165 response_parameter = DataParser.parse_int16(data, 3, signed=False) 

166 elif request_op_code == LNControlPointOpCode.REQUEST_NAME_OF_ROUTE: 

167 response_parameter = data[3:].decode("utf-8", errors="ignore") 

168 else: 

169 # For other responses, parse based on parameter length 

170 if parameter_length == 1: 

171 response_parameter = data[3] 

172 elif parameter_length == 2: 

173 response_parameter = DataParser.parse_int16(data, 3, signed=False) 

174 elif parameter_length == 4: 

175 response_parameter = DataParser.parse_int32(data, 3, signed=False) 

176 elif parameter_length == 7: 

177 response_parameter = IEEE11073Parser.parse_timestamp(data, 3) 

178 else: 

179 # Unknown parameter format, store as bytes 

180 response_parameter = data[3:] 

181 

182 return LNControlPointData( 

183 op_code=op_code, 

184 cumulative_value=cumulative_value, 

185 content_mask=content_mask, 

186 navigation_control_value=navigation_control_value, 

187 route_number=route_number, 

188 route_name=route_name, 

189 fix_rate=fix_rate, 

190 elevation=elevation, 

191 request_op_code=request_op_code, 

192 response_value=response_value, 

193 response_parameter=response_parameter, 

194 ) 

195 

196 def encode_value(self, data: LNControlPointData) -> bytearray: 

197 """Encode LNControlPointData back to bytes. 

198 

199 Args: 

200 data: LNControlPointData instance to encode 

201 

202 Returns: 

203 Encoded bytes representing the LN control point data 

204 

205 """ 

206 result = bytearray() 

207 result.append(data.op_code) 

208 

209 # Handle parameter encoding based on op code 

210 op_code_handlers = { 

211 LNControlPointOpCode.SET_CUMULATIVE_VALUE: lambda: ( 

212 result.extend(DataParser.encode_int32(data.cumulative_value, signed=False)) 

213 if data.cumulative_value is not None 

214 else None 

215 ), 

216 LNControlPointOpCode.MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT: lambda: ( 

217 result.extend(DataParser.encode_int16(data.content_mask, signed=False)) 

218 if data.content_mask is not None 

219 else None 

220 ), 

221 LNControlPointOpCode.NAVIGATION_CONTROL: lambda: ( 

222 result.append(data.navigation_control_value) if data.navigation_control_value is not None else None 

223 ), 

224 LNControlPointOpCode.REQUEST_NAME_OF_ROUTE: lambda: ( 

225 result.append(data.route_number) if data.route_number is not None else None 

226 ), 

227 LNControlPointOpCode.SELECT_ROUTE: lambda: ( 

228 result.append(data.route_number) if data.route_number is not None else None 

229 ), 

230 LNControlPointOpCode.SET_FIX_RATE: lambda: ( 

231 result.append(data.fix_rate) if data.fix_rate is not None else None 

232 ), 

233 LNControlPointOpCode.SET_ELEVATION: lambda: ( 

234 result.extend(DataParser.encode_int32(int(data.elevation * 100), signed=True)) 

235 if data.elevation is not None 

236 else None 

237 ), 

238 } 

239 

240 # Execute handler if op code is supported 

241 handler = op_code_handlers.get(data.op_code) 

242 if handler: 

243 handler() 

244 

245 # Special handling for response code 

246 if data.op_code == LNControlPointOpCode.RESPONSE_CODE: 

247 if data.request_op_code is not None: 

248 result.append(data.request_op_code) 

249 if data.response_value is not None: 

250 result.append(data.response_value) 

251 if data.response_parameter is not None: 

252 if isinstance(data.response_parameter, int): 

253 if data.response_parameter <= 0xFF: 

254 result.append(data.response_parameter) 

255 elif data.response_parameter <= 0xFFFF: 

256 result.extend(DataParser.encode_int16(data.response_parameter, signed=False)) 

257 elif isinstance(data.response_parameter, str): 

258 result.extend(data.response_parameter.encode("utf-8")) 

259 elif isinstance(data.response_parameter, datetime): 

260 result.extend(IEEE11073Parser.encode_timestamp(data.response_parameter)) 

261 elif isinstance(data.response_parameter, bytearray): 

262 result.extend(data.response_parameter) 

263 

264 return result