Coverage for src / bluetooth_sig / gatt / characteristics / cgm_measurement.py: 100%
121 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 11:17 +0000
1"""CGM Measurement characteristic implementation.
3Implements the CGM Measurement characteristic (0x2AA7). The characteristic
4value contains one or more CGM Measurement Records concatenated together.
6Each record contains:
7 Size (uint8) -- total size of the record including this field
8 Flags (uint8) -- controls presence of optional fields
9 CGM Glucose Concentration (medfloat16)
10 Time Offset (uint16) -- minutes since session start
11 Sensor Status Annunciation (0-3 octets, flag-gated)
12 CGM Trend Information (medfloat16, optional)
13 CGM Quality (medfloat16, optional)
15Flag-bit assignments:
16 Bit 0: CGM Trend Information present
17 Bit 1: CGM Quality present
18 Bits 2-4: Reserved
19 Bit 5: Warning-Octet present (Sensor Status Annunciation)
20 Bit 6: Cal/Temp-Octet present (Sensor Status Annunciation)
21 Bit 7: Status-Octet present (Sensor Status Annunciation)
23References:
24 Bluetooth SIG Continuous Glucose Monitoring Service
25 org.bluetooth.characteristic.cgm_measurement (GSS YAML)
26"""
28from __future__ import annotations
30from enum import IntFlag
32import msgspec
34from ..context import CharacteristicContext
35from .base import BaseCharacteristic
36from .utils import DataParser, IEEE11073Parser
39class CGMMeasurementFlags(IntFlag):
40 """CGM Measurement record flags."""
42 TREND_INFORMATION_PRESENT = 0x01
43 QUALITY_PRESENT = 0x02
44 WARNING_OCTET_PRESENT = 0x20
45 CAL_TEMP_OCTET_PRESENT = 0x40
46 STATUS_OCTET_PRESENT = 0x80
49class CGMSensorStatusOctet(IntFlag):
50 """CGM Sensor Status Annunciation — Status octet (bits 0-7)."""
52 SESSION_STOPPED = 0x01
53 DEVICE_BATTERY_LOW = 0x02
54 SENSOR_TYPE_INCORRECT = 0x04
55 SENSOR_MALFUNCTION = 0x08
56 DEVICE_SPECIFIC_ALERT = 0x10
57 GENERAL_DEVICE_FAULT = 0x20
60class CGMCalTempOctet(IntFlag):
61 """CGM Sensor Status Annunciation — Cal/Temp octet (bits 8-15)."""
63 TIME_SYNC_REQUIRED = 0x01
64 CALIBRATION_NOT_ALLOWED = 0x02
65 CALIBRATION_RECOMMENDED = 0x04
66 CALIBRATION_REQUIRED = 0x08
67 SENSOR_TEMP_TOO_HIGH = 0x10
68 SENSOR_TEMP_TOO_LOW = 0x20
69 CALIBRATION_PENDING = 0x40
72class CGMWarningOctet(IntFlag):
73 """CGM Sensor Status Annunciation — Warning octet (bits 16-23)."""
75 RESULT_LOWER_THAN_PATIENT_LOW = 0x01
76 RESULT_HIGHER_THAN_PATIENT_HIGH = 0x02
77 RESULT_LOWER_THAN_HYPO = 0x04
78 RESULT_HIGHER_THAN_HYPER = 0x08
79 RATE_OF_DECREASE_EXCEEDED = 0x10
80 RATE_OF_INCREASE_EXCEEDED = 0x20
81 RESULT_LOWER_THAN_DEVICE_CAN_PROCESS = 0x40
82 RESULT_HIGHER_THAN_DEVICE_CAN_PROCESS = 0x80
85class CGMMeasurementRecord(msgspec.Struct, frozen=True, kw_only=True):
86 """A single CGM Measurement Record.
88 Attributes:
89 size: Total size of this record in bytes (including the size field).
90 flags: Raw 8-bit flags field.
91 glucose_concentration: Glucose concentration in mg/dL.
92 time_offset: Minutes since session start.
93 status_octet: Sensor status octet (8 bits). None if absent.
94 cal_temp_octet: Calibration/temperature octet (8 bits). None if absent.
95 warning_octet: Warning octet (8 bits). None if absent.
96 trend_information: Glucose trend rate (mg/dL/min). None if absent.
97 quality: CGM quality percentage. None if absent.
99 """
101 size: int
102 flags: CGMMeasurementFlags
103 glucose_concentration: float
104 time_offset: int
105 status_octet: CGMSensorStatusOctet | None = None
106 cal_temp_octet: CGMCalTempOctet | None = None
107 warning_octet: CGMWarningOctet | None = None
108 trend_information: float | None = None
109 quality: float | None = None
112class CGMMeasurementData(msgspec.Struct, frozen=True, kw_only=True):
113 """Parsed data from CGM Measurement characteristic.
115 Attributes:
116 records: List of CGM Measurement Records.
118 """
120 records: tuple[CGMMeasurementRecord, ...]
123def _decode_single_record(data: bytearray, start: int) -> tuple[CGMMeasurementRecord, int]:
124 """Decode a single CGM Measurement Record from data at the given offset.
126 Args:
127 data: Full characteristic data.
128 start: Byte offset where this record begins.
130 Returns:
131 Tuple of (decoded record, next offset after this record).
133 """
134 record_size = data[start]
135 flags = CGMMeasurementFlags(data[start + 1])
136 glucose_concentration = IEEE11073Parser.parse_sfloat(data, start + 2)
137 time_offset = DataParser.parse_int16(data, start + 4, signed=False)
138 offset = start + 6
140 # Sensor Status Annunciation: order is Status, Cal/Temp, Warning
141 # (per YAML spec structure order)
142 status_octet: CGMSensorStatusOctet | None = None
143 if flags & CGMMeasurementFlags.STATUS_OCTET_PRESENT:
144 status_octet = CGMSensorStatusOctet(data[offset])
145 offset += 1
147 cal_temp_octet: CGMCalTempOctet | None = None
148 if flags & CGMMeasurementFlags.CAL_TEMP_OCTET_PRESENT:
149 cal_temp_octet = CGMCalTempOctet(data[offset])
150 offset += 1
152 warning_octet: CGMWarningOctet | None = None
153 if flags & CGMMeasurementFlags.WARNING_OCTET_PRESENT:
154 warning_octet = CGMWarningOctet(data[offset])
155 offset += 1
157 trend_information: float | None = None
158 if flags & CGMMeasurementFlags.TREND_INFORMATION_PRESENT:
159 trend_information = IEEE11073Parser.parse_sfloat(data, offset)
160 offset += 2
162 quality: float | None = None
163 if flags & CGMMeasurementFlags.QUALITY_PRESENT:
164 quality = IEEE11073Parser.parse_sfloat(data, offset)
165 offset += 2
167 # Skip any remaining bytes in this record (e.g. E2E-CRC)
168 record_end = start + record_size
169 return (
170 CGMMeasurementRecord(
171 size=record_size,
172 flags=flags,
173 glucose_concentration=glucose_concentration,
174 time_offset=time_offset,
175 status_octet=status_octet,
176 cal_temp_octet=cal_temp_octet,
177 warning_octet=warning_octet,
178 trend_information=trend_information,
179 quality=quality,
180 ),
181 record_end,
182 )
185def _encode_single_record(record: CGMMeasurementRecord) -> bytearray:
186 """Encode a single CGM Measurement Record to bytes.
188 Args:
189 record: CGMMeasurementRecord instance.
191 Returns:
192 Encoded bytearray for this record.
194 """
195 flags = CGMMeasurementFlags(0)
196 if record.trend_information is not None:
197 flags |= CGMMeasurementFlags.TREND_INFORMATION_PRESENT
198 if record.quality is not None:
199 flags |= CGMMeasurementFlags.QUALITY_PRESENT
200 if record.status_octet is not None:
201 flags |= CGMMeasurementFlags.STATUS_OCTET_PRESENT
202 if record.cal_temp_octet is not None:
203 flags |= CGMMeasurementFlags.CAL_TEMP_OCTET_PRESENT
204 if record.warning_octet is not None:
205 flags |= CGMMeasurementFlags.WARNING_OCTET_PRESENT
207 body = bytearray()
208 # Placeholder for size byte — filled in at the end
209 body.append(0)
210 body.append(int(flags))
211 body.extend(IEEE11073Parser.encode_sfloat(record.glucose_concentration))
212 body.extend(DataParser.encode_int16(record.time_offset, signed=False))
214 if record.status_octet is not None:
215 body.append(int(record.status_octet))
216 if record.cal_temp_octet is not None:
217 body.append(int(record.cal_temp_octet))
218 if record.warning_octet is not None:
219 body.append(int(record.warning_octet))
220 if record.trend_information is not None:
221 body.extend(IEEE11073Parser.encode_sfloat(record.trend_information))
222 if record.quality is not None:
223 body.extend(IEEE11073Parser.encode_sfloat(record.quality))
225 body[0] = len(body)
226 return body
229class CGMMeasurementCharacteristic(BaseCharacteristic[CGMMeasurementData]):
230 """CGM Measurement characteristic (0x2AA7).
232 Contains one or more CGM Measurement Records concatenated together.
233 Each record is self-sized via its leading Size byte.
234 """
236 expected_type = CGMMeasurementData
237 min_length: int = 6 # At least one record: size(1)+flags(1)+glucose(2)+time(2)
238 allow_variable_length: bool = True
240 def _decode_value(
241 self,
242 data: bytearray,
243 ctx: CharacteristicContext | None = None,
244 *,
245 validate: bool = True,
246 ) -> CGMMeasurementData:
247 """Parse CGM Measurement records from raw BLE bytes.
249 Args:
250 data: Raw bytearray from BLE characteristic.
251 ctx: Optional context (unused).
252 validate: Whether to validate ranges.
254 Returns:
255 CGMMeasurementData containing all parsed records.
257 """
258 records: list[CGMMeasurementRecord] = []
259 offset = 0
260 while offset < len(data):
261 record, offset = _decode_single_record(data, offset)
262 records.append(record)
264 return CGMMeasurementData(records=tuple(records))
266 def _encode_value(self, data: CGMMeasurementData) -> bytearray:
267 """Encode CGMMeasurementData back to BLE bytes.
269 Args:
270 data: CGMMeasurementData instance.
272 Returns:
273 Encoded bytearray with all records concatenated.
275 """
276 result = bytearray()
277 for record in data.records:
278 result.extend(_encode_single_record(record))
279 return result