Coverage for src / bluetooth_sig / gatt / characteristics / ln_control_point.py: 79%
134 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""LN Control Point characteristic implementation."""
3from __future__ import annotations
5from datetime import datetime
6from enum import IntEnum
8import msgspec
10from ..constants import UINT8_MAX, UINT16_MAX
11from ..context import CharacteristicContext
12from .base import BaseCharacteristic
13from .utils import DataParser, IEEE11073Parser
15# Minimum data lengths for each operation
16_MIN_LEN_CUMULATIVE_VALUE = 5 # 1 byte op code + 4 bytes uint32
17_MIN_LEN_MASK_CONTENT = 3 # 1 byte op code + 2 bytes uint16
18_MIN_LEN_NAV_CONTROL = 2 # 1 byte op code + 1 byte value
19_MIN_LEN_ROUTE_NUMBER = 2 # 1 byte op code + 1 byte route number
20_MIN_LEN_FIX_RATE = 2 # 1 byte op code + 1 byte fix rate
21_MIN_LEN_ELEVATION = 5 # 1 byte op code + 4 bytes int32
22_MIN_LEN_RESPONSE = 3 # 1 byte op code + 1 byte request + 1 byte response
24# Response parameter lengths
25_RESPONSE_PARAM_OFFSET = 3 # Offset where response parameter starts
26_PARAM_LEN_UINT8 = 1
27_PARAM_LEN_UINT16 = 2
28_PARAM_LEN_UINT32 = 4
29_PARAM_LEN_TIMESTAMP = 7
32class LNControlPointOpCode(IntEnum):
33 """LN Control Point operation codes as per Bluetooth SIG specification."""
35 # Value 0x00 is Reserved for Future Use
36 SET_CUMULATIVE_VALUE = 0x01
37 MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT = 0x02
38 NAVIGATION_CONTROL = 0x03
39 REQUEST_NUMBER_OF_ROUTES = 0x04
40 REQUEST_NAME_OF_ROUTE = 0x05
41 SELECT_ROUTE = 0x06
42 SET_FIX_RATE = 0x07
43 SET_ELEVATION = 0x08
44 # Values 0x09-0x1F are Reserved for Future Use
45 RESPONSE_CODE = 0x20
46 # Values 0x21-0xFF are Reserved for Future Use
48 def __str__(self) -> str:
49 """Return human-readable operation code name."""
50 names = {
51 self.SET_CUMULATIVE_VALUE: "Set Cumulative Value",
52 self.MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT: "Mask Location and Speed Characteristic Content",
53 self.NAVIGATION_CONTROL: "Navigation Control",
54 self.REQUEST_NUMBER_OF_ROUTES: "Request Number of Routes",
55 self.REQUEST_NAME_OF_ROUTE: "Request Name of Route",
56 self.SELECT_ROUTE: "Select Route",
57 self.SET_FIX_RATE: "Set Fix Rate",
58 self.SET_ELEVATION: "Set Elevation",
59 self.RESPONSE_CODE: "Response Code",
60 }
61 return names[self]
64class LNControlPointResponseValue(IntEnum):
65 """LN Control Point response values as per Bluetooth SIG specification."""
67 # Value 0x00 is Reserved for Future Use
68 SUCCESS = 0x01
69 OP_CODE_NOT_SUPPORTED = 0x02
70 INVALID_OPERAND = 0x03
71 OPERATION_FAILED = 0x04
72 # Values 0x05-0xFF are Reserved for Future Use
74 def __str__(self) -> str:
75 """Return human-readable response value name."""
76 names = {
77 self.SUCCESS: "Success",
78 self.OP_CODE_NOT_SUPPORTED: "Op Code not supported",
79 self.INVALID_OPERAND: "Invalid Operand",
80 self.OPERATION_FAILED: "Operation Failed",
81 }
82 return names[self]
85class LNControlPointData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods,too-many-instance-attributes
86 """Parsed data from LN Control Point characteristic."""
88 op_code: LNControlPointOpCode
89 # Parameters based on op_code
90 cumulative_value: int | None = None
91 content_mask: int | None = None
92 navigation_control_value: int | None = None
93 route_number: int | None = None
94 route_name: str | None = None
95 fix_rate: int | None = None
96 elevation: float | None = None
97 # Response fields
98 request_op_code: LNControlPointOpCode | None = None
99 response_value: LNControlPointResponseValue | None = None
100 response_parameter: int | str | datetime | bytearray | None = None
102 def __post_init__(self) -> None:
103 """Validate LN control point data."""
104 if not 0 <= self.op_code <= UINT8_MAX:
105 raise ValueError("Op code must be a uint8 value (0-UINT8_MAX)")
108class LNControlPointCharacteristic(BaseCharacteristic[LNControlPointData]):
109 """LN Control Point characteristic.
111 Used to enable device-specific procedures related to the exchange of location and navigation information.
112 """
114 min_length = 1 # Op Code(1) minimum
115 max_length = 18 # Op Code(1) + Parameter(max 17) maximum
116 allow_variable_length: bool = True # Variable parameter length
118 def _decode_value( # pylint: disable=too-many-locals,too-many-branches,too-many-statements # Control point with many op codes
119 self, data: bytearray, ctx: CharacteristicContext | None = None, *, validate: bool = True
120 ) -> LNControlPointData:
121 """Parse LN control point data according to Bluetooth specification.
123 Format: Op Code(1) + Parameter(0-17).
125 Args:
126 data: Raw bytearray from BLE characteristic
127 ctx: Optional context providing surrounding context (may be None)
128 validate: Whether to validate ranges (default True)
130 Returns:
131 LNControlPointData containing parsed control point data
133 """
134 op_code = LNControlPointOpCode(data[0])
136 # Initialize optional fields
137 cumulative_value: int | None = None
138 content_mask: int | None = None
139 navigation_control_value: int | None = None
140 route_number: int | None = None
141 route_name: str | None = None
142 fix_rate: int | None = None
143 elevation: float | None = None
144 request_op_code: LNControlPointOpCode | None = None
145 response_value: LNControlPointResponseValue | None = None
146 response_parameter: int | str | datetime | bytearray | None = None
148 if op_code == LNControlPointOpCode.SET_CUMULATIVE_VALUE:
149 if len(data) >= _MIN_LEN_CUMULATIVE_VALUE:
150 cumulative_value = DataParser.parse_int32(data, 1, signed=False)
151 elif op_code == LNControlPointOpCode.MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT:
152 if len(data) >= _MIN_LEN_MASK_CONTENT:
153 content_mask = DataParser.parse_int16(data, 1, signed=False)
154 elif op_code == LNControlPointOpCode.NAVIGATION_CONTROL:
155 if len(data) >= _MIN_LEN_NAV_CONTROL:
156 navigation_control_value = data[1]
157 elif op_code in (LNControlPointOpCode.REQUEST_NAME_OF_ROUTE, LNControlPointOpCode.SELECT_ROUTE):
158 if len(data) >= _MIN_LEN_ROUTE_NUMBER:
159 route_number = data[1]
160 elif op_code == LNControlPointOpCode.SET_FIX_RATE:
161 if len(data) >= _MIN_LEN_FIX_RATE:
162 fix_rate = data[1]
163 elif op_code == LNControlPointOpCode.SET_ELEVATION:
164 if len(data) >= _MIN_LEN_ELEVATION:
165 # Unit is 1/100 m
166 elevation = DataParser.parse_int32(data, 1, signed=True) / 100.0
167 elif op_code == LNControlPointOpCode.RESPONSE_CODE and len(data) >= _MIN_LEN_RESPONSE:
168 request_op_code = LNControlPointOpCode(data[1])
169 response_value = LNControlPointResponseValue(data[2])
170 # Parse response parameter based on request op code
171 if len(data) > _RESPONSE_PARAM_OFFSET:
172 parameter_length = len(data) - _RESPONSE_PARAM_OFFSET
173 if request_op_code == LNControlPointOpCode.REQUEST_NUMBER_OF_ROUTES:
174 response_parameter = DataParser.parse_int16(data, _RESPONSE_PARAM_OFFSET, signed=False)
175 elif request_op_code == LNControlPointOpCode.REQUEST_NAME_OF_ROUTE:
176 response_parameter = data[_RESPONSE_PARAM_OFFSET:].decode("utf-8", errors="ignore")
177 # For other responses, parse based on parameter length
178 elif parameter_length == _PARAM_LEN_UINT8:
179 response_parameter = data[_RESPONSE_PARAM_OFFSET]
180 elif parameter_length == _PARAM_LEN_UINT16:
181 response_parameter = DataParser.parse_int16(data, _RESPONSE_PARAM_OFFSET, signed=False)
182 elif parameter_length == _PARAM_LEN_UINT32:
183 response_parameter = DataParser.parse_int32(data, _RESPONSE_PARAM_OFFSET, signed=False)
184 elif parameter_length == _PARAM_LEN_TIMESTAMP:
185 response_parameter = IEEE11073Parser.parse_timestamp(data, _RESPONSE_PARAM_OFFSET)
186 else:
187 # Unknown parameter format, store as bytes
188 response_parameter = data[_RESPONSE_PARAM_OFFSET:]
190 return LNControlPointData(
191 op_code=op_code,
192 cumulative_value=cumulative_value,
193 content_mask=content_mask,
194 navigation_control_value=navigation_control_value,
195 route_number=route_number,
196 route_name=route_name,
197 fix_rate=fix_rate,
198 elevation=elevation,
199 request_op_code=request_op_code,
200 response_value=response_value,
201 response_parameter=response_parameter,
202 )
204 def _encode_value(self, data: LNControlPointData) -> bytearray:
205 """Encode LNControlPointData back to bytes.
207 Args:
208 data: LNControlPointData instance to encode
210 Returns:
211 Encoded bytes representing the LN control point data
213 """
214 result = bytearray()
215 result.append(data.op_code)
217 # Handle parameter encoding based on op code
218 op_code_handlers = {
219 LNControlPointOpCode.SET_CUMULATIVE_VALUE: lambda: (
220 result.extend(DataParser.encode_int32(data.cumulative_value, signed=False))
221 if data.cumulative_value is not None
222 else None
223 ),
224 LNControlPointOpCode.MASK_LOCATION_AND_SPEED_CHARACTERISTIC_CONTENT: lambda: (
225 result.extend(DataParser.encode_int16(data.content_mask, signed=False))
226 if data.content_mask is not None
227 else None
228 ),
229 LNControlPointOpCode.NAVIGATION_CONTROL: lambda: (
230 result.append(data.navigation_control_value) if data.navigation_control_value is not None else None
231 ),
232 LNControlPointOpCode.REQUEST_NAME_OF_ROUTE: lambda: (
233 result.append(data.route_number) if data.route_number is not None else None
234 ),
235 LNControlPointOpCode.SELECT_ROUTE: lambda: (
236 result.append(data.route_number) if data.route_number is not None else None
237 ),
238 LNControlPointOpCode.SET_FIX_RATE: lambda: (
239 result.append(data.fix_rate) if data.fix_rate is not None else None
240 ),
241 LNControlPointOpCode.SET_ELEVATION: lambda: (
242 result.extend(DataParser.encode_int32(int(data.elevation * 100), signed=True))
243 if data.elevation is not None
244 else None
245 ),
246 }
248 # Execute handler if op code is supported
249 handler = op_code_handlers.get(data.op_code)
250 if handler:
251 handler()
253 # Special handling for response code
254 if data.op_code == LNControlPointOpCode.RESPONSE_CODE:
255 if data.request_op_code is not None:
256 result.append(data.request_op_code)
257 if data.response_value is not None:
258 result.append(data.response_value)
259 if data.response_parameter is not None:
260 if isinstance(data.response_parameter, int):
261 if data.response_parameter <= UINT8_MAX:
262 result.append(data.response_parameter)
263 elif data.response_parameter <= UINT16_MAX:
264 result.extend(DataParser.encode_int16(data.response_parameter, signed=False))
265 elif isinstance(data.response_parameter, str):
266 result.extend(data.response_parameter.encode("utf-8"))
267 elif isinstance(data.response_parameter, datetime):
268 result.extend(IEEE11073Parser.encode_timestamp(data.response_parameter))
269 elif isinstance(data.response_parameter, bytearray):
270 result.extend(data.response_parameter)
272 return result