Coverage for src/bluetooth_sig/gatt/characteristics/pulse_oximetry_measurement.py: 83%
70 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-30 00:10 +0000
1"""Pulse Oximetry Measurement characteristic implementation."""
3from __future__ import annotations
5from datetime import datetime
6from enum import IntFlag
8import msgspec
10from ..context import CharacteristicContext
11from .base import BaseCharacteristic
12from .utils import DataParser, IEEE11073Parser
14# TODO: Implement CharacteristicContext support
15# This characteristic should access Pulse Oximetry Control Point (0x2A60) and Pulse Oximetry Features (0x2A61)
16# from ctx.other_characteristics to determine supported measurement types and calibration data
19class PulseOximetryFlags(IntFlag):
20 """Pulse Oximetry measurement flags."""
22 TIMESTAMP_PRESENT = 0x01
23 MEASUREMENT_STATUS_PRESENT = 0x02
24 DEVICE_STATUS_PRESENT = 0x04
25 PULSE_AMPLITUDE_INDEX_PRESENT = 0x08
28class PulseOximetryData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
29 """Parsed pulse oximetry measurement data."""
31 spo2: float
32 pulse_rate: float
33 timestamp: datetime | None = None
34 measurement_status: int | None = None
35 device_status: int | None = None
36 pulse_amplitude_index: float | None = None
39class PulseOximetryMeasurementCharacteristic(BaseCharacteristic):
40 """PLX Continuous Measurement characteristic (0x2A5F).
42 Used to transmit SpO2 (blood oxygen saturation) and pulse rate
43 measurements.
44 """
46 _characteristic_name: str = "PLX Continuous Measurement"
48 # Declarative validation (automatic)
49 min_length: int | None = 5 # Flags(1) + SpO2(2) + PulseRate(2) minimum
50 max_length: int | None = 16 # + Timestamp(7) + MeasurementStatus(2) + DeviceStatus(3) maximum
51 allow_variable_length: bool = True # Variable optional fields
53 def decode_value(self, data: bytearray, ctx: CharacteristicContext | None = None) -> PulseOximetryData: # pylint: disable=too-many-locals
54 """Parse pulse oximetry measurement data according to Bluetooth specification.
56 Format: Flags(1) + SpO2(2) + Pulse Rate(2) + [Timestamp(7)] +
57 [Measurement Status(2)] + [Device Status(3)] + [Pulse Amplitude Index(2)]
58 SpO2 and Pulse Rate are IEEE-11073 16-bit SFLOAT.
60 Args:
61 data: Raw bytearray from BLE characteristic.
62 ctx: Optional CharacteristicContext providing surrounding context (may be None).
64 Returns:
65 PulseOximetryData containing parsed pulse oximetry data.
67 """
68 if len(data) < 5:
69 raise ValueError("Pulse Oximetry Measurement data must be at least 5 bytes")
71 flags = PulseOximetryFlags(data[0])
73 # Parse SpO2 and pulse rate using IEEE-11073 SFLOAT format
74 spo2 = IEEE11073Parser.parse_sfloat(data, 1)
75 pulse_rate = IEEE11073Parser.parse_sfloat(data, 3)
77 # Parse optional fields
78 timestamp: datetime | None = None
79 measurement_status: int | None = None
80 device_status: int | None = None
81 pulse_amplitude_index: float | None = None
82 offset = 5
84 if PulseOximetryFlags.TIMESTAMP_PRESENT in flags and len(data) >= offset + 7:
85 timestamp = IEEE11073Parser.parse_timestamp(data, offset)
86 offset += 7
88 if PulseOximetryFlags.MEASUREMENT_STATUS_PRESENT in flags and len(data) >= offset + 2:
89 measurement_status = DataParser.parse_int16(data, offset, signed=False)
90 offset += 2
92 if PulseOximetryFlags.DEVICE_STATUS_PRESENT in flags and len(data) >= offset + 3:
93 device_status = DataParser.parse_int32(
94 data[offset : offset + 3] + b"\x00", 0, signed=False
95 ) # Pad to 4 bytes
96 offset += 3
98 if PulseOximetryFlags.PULSE_AMPLITUDE_INDEX_PRESENT in flags and len(data) >= offset + 2:
99 pulse_amplitude_index = IEEE11073Parser.parse_sfloat(data, offset)
101 # Create immutable struct with all values
102 return PulseOximetryData(
103 spo2=spo2,
104 pulse_rate=pulse_rate,
105 timestamp=timestamp,
106 measurement_status=measurement_status,
107 device_status=device_status,
108 pulse_amplitude_index=pulse_amplitude_index,
109 )
111 def encode_value(self, data: PulseOximetryData) -> bytearray:
112 """Encode pulse oximetry measurement value back to bytes.
114 Args:
115 data: PulseOximetryData instance to encode
117 Returns:
118 Encoded bytes representing the measurement
120 """
121 # Build flags
122 flags = PulseOximetryFlags(0)
123 if data.timestamp is not None:
124 flags |= PulseOximetryFlags.TIMESTAMP_PRESENT
125 if data.measurement_status is not None:
126 flags |= PulseOximetryFlags.MEASUREMENT_STATUS_PRESENT
127 if data.device_status is not None:
128 flags |= PulseOximetryFlags.DEVICE_STATUS_PRESENT
129 if data.pulse_amplitude_index is not None:
130 flags |= PulseOximetryFlags.PULSE_AMPLITUDE_INDEX_PRESENT
132 # Build result
133 result = bytearray([int(flags)])
134 result.extend(IEEE11073Parser.encode_sfloat(data.spo2))
135 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate))
137 # Encode optional timestamp
138 if data.timestamp is not None:
139 result.extend(IEEE11073Parser.encode_timestamp(data.timestamp))
141 # Encode optional measurement status
142 if data.measurement_status is not None:
143 result.extend(DataParser.encode_int16(data.measurement_status, signed=False))
145 # Encode optional device status
146 if data.device_status is not None:
147 # Device status is 3 bytes (24-bit value)
148 device_status_bytes = DataParser.encode_int32(data.device_status, signed=False)[:3]
149 result.extend(device_status_bytes)
151 # Encode optional pulse amplitude index
152 if data.pulse_amplitude_index is not None:
153 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_amplitude_index))
155 return result