Coverage for src / bluetooth_sig / gatt / characteristics / cgm_measurement.py: 100%

121 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 11:17 +0000

1"""CGM Measurement characteristic implementation. 

2 

3Implements the CGM Measurement characteristic (0x2AA7). The characteristic 

4value contains one or more CGM Measurement Records concatenated together. 

5 

6Each record contains: 

7 Size (uint8) -- total size of the record including this field 

8 Flags (uint8) -- controls presence of optional fields 

9 CGM Glucose Concentration (medfloat16) 

10 Time Offset (uint16) -- minutes since session start 

11 Sensor Status Annunciation (0-3 octets, flag-gated) 

12 CGM Trend Information (medfloat16, optional) 

13 CGM Quality (medfloat16, optional) 

14 

15Flag-bit assignments: 

16 Bit 0: CGM Trend Information present 

17 Bit 1: CGM Quality present 

18 Bits 2-4: Reserved 

19 Bit 5: Warning-Octet present (Sensor Status Annunciation) 

20 Bit 6: Cal/Temp-Octet present (Sensor Status Annunciation) 

21 Bit 7: Status-Octet present (Sensor Status Annunciation) 

22 

23References: 

24 Bluetooth SIG Continuous Glucose Monitoring Service 

25 org.bluetooth.characteristic.cgm_measurement (GSS YAML) 

26""" 

27 

28from __future__ import annotations 

29 

30from enum import IntFlag 

31 

32import msgspec 

33 

34from ..context import CharacteristicContext 

35from .base import BaseCharacteristic 

36from .utils import DataParser, IEEE11073Parser 

37 

38 

39class CGMMeasurementFlags(IntFlag): 

40 """CGM Measurement record flags.""" 

41 

42 TREND_INFORMATION_PRESENT = 0x01 

43 QUALITY_PRESENT = 0x02 

44 WARNING_OCTET_PRESENT = 0x20 

45 CAL_TEMP_OCTET_PRESENT = 0x40 

46 STATUS_OCTET_PRESENT = 0x80 

47 

48 

49class CGMSensorStatusOctet(IntFlag): 

50 """CGM Sensor Status Annunciation — Status octet (bits 0-7).""" 

51 

52 SESSION_STOPPED = 0x01 

53 DEVICE_BATTERY_LOW = 0x02 

54 SENSOR_TYPE_INCORRECT = 0x04 

55 SENSOR_MALFUNCTION = 0x08 

56 DEVICE_SPECIFIC_ALERT = 0x10 

57 GENERAL_DEVICE_FAULT = 0x20 

58 

59 

60class CGMCalTempOctet(IntFlag): 

61 """CGM Sensor Status Annunciation — Cal/Temp octet (bits 8-15).""" 

62 

63 TIME_SYNC_REQUIRED = 0x01 

64 CALIBRATION_NOT_ALLOWED = 0x02 

65 CALIBRATION_RECOMMENDED = 0x04 

66 CALIBRATION_REQUIRED = 0x08 

67 SENSOR_TEMP_TOO_HIGH = 0x10 

68 SENSOR_TEMP_TOO_LOW = 0x20 

69 CALIBRATION_PENDING = 0x40 

70 

71 

72class CGMWarningOctet(IntFlag): 

73 """CGM Sensor Status Annunciation — Warning octet (bits 16-23).""" 

74 

75 RESULT_LOWER_THAN_PATIENT_LOW = 0x01 

76 RESULT_HIGHER_THAN_PATIENT_HIGH = 0x02 

77 RESULT_LOWER_THAN_HYPO = 0x04 

78 RESULT_HIGHER_THAN_HYPER = 0x08 

79 RATE_OF_DECREASE_EXCEEDED = 0x10 

80 RATE_OF_INCREASE_EXCEEDED = 0x20 

81 RESULT_LOWER_THAN_DEVICE_CAN_PROCESS = 0x40 

82 RESULT_HIGHER_THAN_DEVICE_CAN_PROCESS = 0x80 

83 

84 

85class CGMMeasurementRecord(msgspec.Struct, frozen=True, kw_only=True): 

86 """A single CGM Measurement Record. 

87 

88 Attributes: 

89 size: Total size of this record in bytes (including the size field). 

90 flags: Raw 8-bit flags field. 

91 glucose_concentration: Glucose concentration in mg/dL. 

92 time_offset: Minutes since session start. 

93 status_octet: Sensor status octet (8 bits). None if absent. 

94 cal_temp_octet: Calibration/temperature octet (8 bits). None if absent. 

95 warning_octet: Warning octet (8 bits). None if absent. 

96 trend_information: Glucose trend rate (mg/dL/min). None if absent. 

97 quality: CGM quality percentage. None if absent. 

98 

99 """ 

100 

101 size: int 

102 flags: CGMMeasurementFlags 

103 glucose_concentration: float 

104 time_offset: int 

105 status_octet: CGMSensorStatusOctet | None = None 

106 cal_temp_octet: CGMCalTempOctet | None = None 

107 warning_octet: CGMWarningOctet | None = None 

108 trend_information: float | None = None 

109 quality: float | None = None 

110 

111 

112class CGMMeasurementData(msgspec.Struct, frozen=True, kw_only=True): 

113 """Parsed data from CGM Measurement characteristic. 

114 

115 Attributes: 

116 records: List of CGM Measurement Records. 

117 

118 """ 

119 

120 records: tuple[CGMMeasurementRecord, ...] 

121 

122 

123def _decode_single_record(data: bytearray, start: int) -> tuple[CGMMeasurementRecord, int]: 

124 """Decode a single CGM Measurement Record from data at the given offset. 

125 

126 Args: 

127 data: Full characteristic data. 

128 start: Byte offset where this record begins. 

129 

130 Returns: 

131 Tuple of (decoded record, next offset after this record). 

132 

133 """ 

134 record_size = data[start] 

135 flags = CGMMeasurementFlags(data[start + 1]) 

136 glucose_concentration = IEEE11073Parser.parse_sfloat(data, start + 2) 

137 time_offset = DataParser.parse_int16(data, start + 4, signed=False) 

138 offset = start + 6 

139 

140 # Sensor Status Annunciation: order is Status, Cal/Temp, Warning 

141 # (per YAML spec structure order) 

142 status_octet: CGMSensorStatusOctet | None = None 

143 if flags & CGMMeasurementFlags.STATUS_OCTET_PRESENT: 

144 status_octet = CGMSensorStatusOctet(data[offset]) 

145 offset += 1 

146 

147 cal_temp_octet: CGMCalTempOctet | None = None 

148 if flags & CGMMeasurementFlags.CAL_TEMP_OCTET_PRESENT: 

149 cal_temp_octet = CGMCalTempOctet(data[offset]) 

150 offset += 1 

151 

152 warning_octet: CGMWarningOctet | None = None 

153 if flags & CGMMeasurementFlags.WARNING_OCTET_PRESENT: 

154 warning_octet = CGMWarningOctet(data[offset]) 

155 offset += 1 

156 

157 trend_information: float | None = None 

158 if flags & CGMMeasurementFlags.TREND_INFORMATION_PRESENT: 

159 trend_information = IEEE11073Parser.parse_sfloat(data, offset) 

160 offset += 2 

161 

162 quality: float | None = None 

163 if flags & CGMMeasurementFlags.QUALITY_PRESENT: 

164 quality = IEEE11073Parser.parse_sfloat(data, offset) 

165 offset += 2 

166 

167 # Skip any remaining bytes in this record (e.g. E2E-CRC) 

168 record_end = start + record_size 

169 return ( 

170 CGMMeasurementRecord( 

171 size=record_size, 

172 flags=flags, 

173 glucose_concentration=glucose_concentration, 

174 time_offset=time_offset, 

175 status_octet=status_octet, 

176 cal_temp_octet=cal_temp_octet, 

177 warning_octet=warning_octet, 

178 trend_information=trend_information, 

179 quality=quality, 

180 ), 

181 record_end, 

182 ) 

183 

184 

185def _encode_single_record(record: CGMMeasurementRecord) -> bytearray: 

186 """Encode a single CGM Measurement Record to bytes. 

187 

188 Args: 

189 record: CGMMeasurementRecord instance. 

190 

191 Returns: 

192 Encoded bytearray for this record. 

193 

194 """ 

195 flags = CGMMeasurementFlags(0) 

196 if record.trend_information is not None: 

197 flags |= CGMMeasurementFlags.TREND_INFORMATION_PRESENT 

198 if record.quality is not None: 

199 flags |= CGMMeasurementFlags.QUALITY_PRESENT 

200 if record.status_octet is not None: 

201 flags |= CGMMeasurementFlags.STATUS_OCTET_PRESENT 

202 if record.cal_temp_octet is not None: 

203 flags |= CGMMeasurementFlags.CAL_TEMP_OCTET_PRESENT 

204 if record.warning_octet is not None: 

205 flags |= CGMMeasurementFlags.WARNING_OCTET_PRESENT 

206 

207 body = bytearray() 

208 # Placeholder for size byte — filled in at the end 

209 body.append(0) 

210 body.append(int(flags)) 

211 body.extend(IEEE11073Parser.encode_sfloat(record.glucose_concentration)) 

212 body.extend(DataParser.encode_int16(record.time_offset, signed=False)) 

213 

214 if record.status_octet is not None: 

215 body.append(int(record.status_octet)) 

216 if record.cal_temp_octet is not None: 

217 body.append(int(record.cal_temp_octet)) 

218 if record.warning_octet is not None: 

219 body.append(int(record.warning_octet)) 

220 if record.trend_information is not None: 

221 body.extend(IEEE11073Parser.encode_sfloat(record.trend_information)) 

222 if record.quality is not None: 

223 body.extend(IEEE11073Parser.encode_sfloat(record.quality)) 

224 

225 body[0] = len(body) 

226 return body 

227 

228 

229class CGMMeasurementCharacteristic(BaseCharacteristic[CGMMeasurementData]): 

230 """CGM Measurement characteristic (0x2AA7). 

231 

232 Contains one or more CGM Measurement Records concatenated together. 

233 Each record is self-sized via its leading Size byte. 

234 """ 

235 

236 expected_type = CGMMeasurementData 

237 min_length: int = 6 # At least one record: size(1)+flags(1)+glucose(2)+time(2) 

238 allow_variable_length: bool = True 

239 

240 def _decode_value( 

241 self, 

242 data: bytearray, 

243 ctx: CharacteristicContext | None = None, 

244 *, 

245 validate: bool = True, 

246 ) -> CGMMeasurementData: 

247 """Parse CGM Measurement records from raw BLE bytes. 

248 

249 Args: 

250 data: Raw bytearray from BLE characteristic. 

251 ctx: Optional context (unused). 

252 validate: Whether to validate ranges. 

253 

254 Returns: 

255 CGMMeasurementData containing all parsed records. 

256 

257 """ 

258 records: list[CGMMeasurementRecord] = [] 

259 offset = 0 

260 while offset < len(data): 

261 record, offset = _decode_single_record(data, offset) 

262 records.append(record) 

263 

264 return CGMMeasurementData(records=tuple(records)) 

265 

266 def _encode_value(self, data: CGMMeasurementData) -> bytearray: 

267 """Encode CGMMeasurementData back to BLE bytes. 

268 

269 Args: 

270 data: CGMMeasurementData instance. 

271 

272 Returns: 

273 Encoded bytearray with all records concatenated. 

274 

275 """ 

276 result = bytearray() 

277 for record in data.records: 

278 result.extend(_encode_single_record(record)) 

279 return result