Coverage for src/bluetooth_sig/gatt/characteristics/ln_control_point.py: 78%
130 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""LN Control Point characteristic implementation."""
3from __future__ import annotations
5from datetime import datetime
6from enum import IntEnum
8import msgspec
10from ...types.gatt_enums import ValueType
11from ..constants import UINT8_MAX
12from ..context import CharacteristicContext
13from .base import BaseCharacteristic
14from .utils import DataParser, IEEE11073Parser
17class LNControlPointOpCode(IntEnum):
18 """LN Control Point operation codes as per Bluetooth SIG specification."""
20 # Value 0x00 is Reserved for Future Use
21 SET_CUMULATIVE_VALUE = 0x01
22 MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT = 0x02
23 NAVIGATION_CONTROL = 0x03
24 REQUEST_NUMBER_OF_ROUTES = 0x04
25 REQUEST_NAME_OF_ROUTE = 0x05
26 SELECT_ROUTE = 0x06
27 SET_FIX_RATE = 0x07
28 SET_ELEVATION = 0x08
29 # Values 0x09-0x1F are Reserved for Future Use
30 RESPONSE_CODE = 0x20
31 # Values 0x21-0xFF are Reserved for Future Use
33 def __str__(self) -> str:
34 """Return human-readable operation code name."""
35 names = {
36 self.SET_CUMULATIVE_VALUE: "Set Cumulative Value",
37 self.MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT: "Mask Location and Speed Characteristic Content",
38 self.NAVIGATION_CONTROL: "Navigation Control",
39 self.REQUEST_NUMBER_OF_ROUTES: "Request Number of Routes",
40 self.REQUEST_NAME_OF_ROUTE: "Request Name of Route",
41 self.SELECT_ROUTE: "Select Route",
42 self.SET_FIX_RATE: "Set Fix Rate",
43 self.SET_ELEVATION: "Set Elevation",
44 self.RESPONSE_CODE: "Response Code",
45 }
46 return names[self]
49class LNControlPointResponseValue(IntEnum):
50 """LN Control Point response values as per Bluetooth SIG specification."""
52 # Value 0x00 is Reserved for Future Use
53 SUCCESS = 0x01
54 OP_CODE_NOT_SUPPORTED = 0x02
55 INVALID_OPERAND = 0x03
56 OPERATION_FAILED = 0x04
57 # Values 0x05-0xFF are Reserved for Future Use
59 def __str__(self) -> str:
60 """Return human-readable response value name."""
61 names = {
62 self.SUCCESS: "Success",
63 self.OP_CODE_NOT_SUPPORTED: "Op Code not supported",
64 self.INVALID_OPERAND: "Invalid Operand",
65 self.OPERATION_FAILED: "Operation Failed",
66 }
67 return names[self]
70class LNControlPointData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
71 """Parsed data from LN Control Point characteristic."""
73 op_code: LNControlPointOpCode
74 # Parameters based on op_code
75 cumulative_value: int | None = None
76 content_mask: int | None = None
77 navigation_control_value: int | None = None
78 route_number: int | None = None
79 route_name: str | None = None
80 fix_rate: int | None = None
81 elevation: float | None = None
82 # Response fields
83 request_op_code: LNControlPointOpCode | None = None
84 response_value: LNControlPointResponseValue | None = None
85 response_parameter: int | str | datetime | bytearray | None = None
87 def __post_init__(self) -> None:
88 """Validate LN control point data."""
89 if not 0 <= self.op_code <= UINT8_MAX:
90 raise ValueError("Op code must be a uint8 value (0-UINT8_MAX)")
93class LNControlPointCharacteristic(BaseCharacteristic):
94 """LN Control Point characteristic.
96 Used to enable device-specific procedures related to the exchange of location and navigation information.
97 """
99 _manual_value_type: ValueType | str | None = ValueType.DICT # Override since decode_value returns dataclass
101 min_length = 1 # Op Code(1) minimum
102 max_length = 18 # Op Code(1) + Parameter(max 17) maximum
103 allow_variable_length: bool = True # Variable parameter length
105 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> LNControlPointData: # pylint: disable=too-many-locals,too-many-branches,too-many-statements
106 """Parse LN control point data according to Bluetooth specification.
108 Format: Op Code(1) + Parameter(0-17).
110 Args:
111 data: Raw bytearray from BLE characteristic
112 ctx: Optional context providing surrounding context (may be None)
114 Returns:
115 LNControlPointData containing parsed control point data
117 """
118 if len(data) < 1:
119 raise ValueError("LN Control Point data must be at least 1 byte")
121 op_code = LNControlPointOpCode(data[0])
123 # Initialize optional fields
124 cumulative_value: int | None = None
125 content_mask: int | None = None
126 navigation_control_value: int | None = None
127 route_number: int | None = None
128 route_name: str | None = None
129 fix_rate: int | None = None
130 elevation: float | None = None
131 request_op_code: LNControlPointOpCode | None = None
132 response_value: LNControlPointResponseValue | None = None
133 response_parameter: int | str | datetime | bytearray | None = None
135 if op_code == LNControlPointOpCode.SET_CUMULATIVE_VALUE:
136 if len(data) >= 5:
137 cumulative_value = DataParser.parse_int32(data, 1, signed=False)
138 elif op_code == LNControlPointOpCode.MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT:
139 if len(data) >= 3:
140 content_mask = DataParser.parse_int16(data, 1, signed=False)
141 elif op_code == LNControlPointOpCode.NAVIGATION_CONTROL:
142 if len(data) >= 2:
143 navigation_control_value = data[1]
144 elif op_code == LNControlPointOpCode.REQUEST_NAME_OF_ROUTE:
145 if len(data) >= 2:
146 route_number = data[1]
147 elif op_code == LNControlPointOpCode.SELECT_ROUTE:
148 if len(data) >= 2:
149 route_number = data[1]
150 elif op_code == LNControlPointOpCode.SET_FIX_RATE:
151 if len(data) >= 2:
152 fix_rate = data[1]
153 elif op_code == LNControlPointOpCode.SET_ELEVATION:
154 if len(data) >= 5:
155 # Unit is 1/100 m
156 elevation = DataParser.parse_int32(data, 1, signed=True) / 100.0
157 elif op_code == LNControlPointOpCode.RESPONSE_CODE:
158 if len(data) >= 3:
159 request_op_code = LNControlPointOpCode(data[1])
160 response_value = LNControlPointResponseValue(data[2])
161 # Parse response parameter based on request op code
162 if len(data) > 3:
163 parameter_length = len(data) - 3
164 if request_op_code == LNControlPointOpCode.REQUEST_NUMBER_OF_ROUTES:
165 response_parameter = DataParser.parse_int16(data, 3, signed=False)
166 elif request_op_code == LNControlPointOpCode.REQUEST_NAME_OF_ROUTE:
167 response_parameter = data[3:].decode("utf-8", errors="ignore")
168 else:
169 # For other responses, parse based on parameter length
170 if parameter_length == 1:
171 response_parameter = data[3]
172 elif parameter_length == 2:
173 response_parameter = DataParser.parse_int16(data, 3, signed=False)
174 elif parameter_length == 4:
175 response_parameter = DataParser.parse_int32(data, 3, signed=False)
176 elif parameter_length == 7:
177 response_parameter = IEEE11073Parser.parse_timestamp(data, 3)
178 else:
179 # Unknown parameter format, store as bytes
180 response_parameter = data[3:]
182 return LNControlPointData(
183 op_code=op_code,
184 cumulative_value=cumulative_value,
185 content_mask=content_mask,
186 navigation_control_value=navigation_control_value,
187 route_number=route_number,
188 route_name=route_name,
189 fix_rate=fix_rate,
190 elevation=elevation,
191 request_op_code=request_op_code,
192 response_value=response_value,
193 response_parameter=response_parameter,
194 )
196 def encode_value(self, data: LNControlPointData) -> bytearray:
197 """Encode LNControlPointData back to bytes.
199 Args:
200 data: LNControlPointData instance to encode
202 Returns:
203 Encoded bytes representing the LN control point data
205 """
206 result = bytearray()
207 result.append(data.op_code)
209 # Handle parameter encoding based on op code
210 op_code_handlers = {
211 LNControlPointOpCode.SET_CUMULATIVE_VALUE: lambda: (
212 result.extend(DataParser.encode_int32(data.cumulative_value, signed=False))
213 if data.cumulative_value is not None
214 else None
215 ),
216 LNControlPointOpCode.MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT: lambda: (
217 result.extend(DataParser.encode_int16(data.content_mask, signed=False))
218 if data.content_mask is not None
219 else None
220 ),
221 LNControlPointOpCode.NAVIGATION_CONTROL: lambda: (
222 result.append(data.navigation_control_value) if data.navigation_control_value is not None else None
223 ),
224 LNControlPointOpCode.REQUEST_NAME_OF_ROUTE: lambda: (
225 result.append(data.route_number) if data.route_number is not None else None
226 ),
227 LNControlPointOpCode.SELECT_ROUTE: lambda: (
228 result.append(data.route_number) if data.route_number is not None else None
229 ),
230 LNControlPointOpCode.SET_FIX_RATE: lambda: (
231 result.append(data.fix_rate) if data.fix_rate is not None else None
232 ),
233 LNControlPointOpCode.SET_ELEVATION: lambda: (
234 result.extend(DataParser.encode_int32(int(data.elevation * 100), signed=True))
235 if data.elevation is not None
236 else None
237 ),
238 }
240 # Execute handler if op code is supported
241 handler = op_code_handlers.get(data.op_code)
242 if handler:
243 handler()
245 # Special handling for response code
246 if data.op_code == LNControlPointOpCode.RESPONSE_CODE:
247 if data.request_op_code is not None:
248 result.append(data.request_op_code)
249 if data.response_value is not None:
250 result.append(data.response_value)
251 if data.response_parameter is not None:
252 if isinstance(data.response_parameter, int):
253 if data.response_parameter <= 0xFF:
254 result.append(data.response_parameter)
255 elif data.response_parameter <= 0xFFFF:
256 result.extend(DataParser.encode_int16(data.response_parameter, signed=False))
257 elif isinstance(data.response_parameter, str):
258 result.extend(data.response_parameter.encode("utf-8"))
259 elif isinstance(data.response_parameter, datetime):
260 result.extend(IEEE11073Parser.encode_timestamp(data.response_parameter))
261 elif isinstance(data.response_parameter, bytearray):
262 result.extend(data.response_parameter)
264 return result