Coverage for src / bluetooth_sig / gatt / characteristics / pulse_oximetry_measurement.py: 84%
81 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 20:14 +0000
1"""Pulse Oximetry Measurement characteristic implementation."""
3from __future__ import annotations
5import logging
6from datetime import datetime
7from enum import IntFlag
9import msgspec
11from ...types.gatt_enums import CharacteristicName
12from ..context import CharacteristicContext
13from .base import BaseCharacteristic
14from .plx_features import PLXFeatureFlags, PLXFeaturesCharacteristic
15from .utils import DataParser, IEEE11073Parser
17logger = logging.getLogger(__name__)
20class PulseOximetryFlags(IntFlag):
21 """Pulse Oximetry measurement flags."""
23 TIMESTAMP_PRESENT = 0x01
24 MEASUREMENT_STATUS_PRESENT = 0x02
25 DEVICE_STATUS_PRESENT = 0x04
26 PULSE_AMPLITUDE_INDEX_PRESENT = 0x08
29class PulseOximetryData(msgspec.Struct, frozen=True, kw_only=True): # pylint: disable=too-few-public-methods
30 """Parsed pulse oximetry measurement data.
32 Attributes:
33 spo2: Blood oxygen saturation percentage (SpO2)
34 pulse_rate: Pulse rate in beats per minute
35 timestamp: Optional timestamp of the measurement
36 measurement_status: Optional measurement status flags
37 device_status: Optional device status flags
38 pulse_amplitude_index: Optional pulse amplitude index value
39 supported_features: Optional PLX features from context (PLXFeatureFlags enum)
40 """
42 spo2: float
43 pulse_rate: float
44 timestamp: datetime | None = None
45 measurement_status: int | None = None
46 device_status: int | None = None
47 pulse_amplitude_index: float | None = None
48 supported_features: PLXFeatureFlags | None = None # PLX Features from context (0x2A60)
51class PulseOximetryMeasurementCharacteristic(BaseCharacteristic[PulseOximetryData]):
52 """PLX Continuous Measurement characteristic (0x2A5F).
54 Used to transmit SpO2 (blood oxygen saturation) and pulse rate
55 measurements.
56 """
58 _characteristic_name: str = "PLX Continuous Measurement"
60 _optional_dependencies = [PLXFeaturesCharacteristic]
62 # Declarative validation (automatic)
63 min_length: int | None = 5 # Flags(1) + SpO2(2) + PulseRate(2) minimum
64 max_length: int | None = 16 # + Timestamp(7) + MeasurementStatus(2) + DeviceStatus(3) maximum
65 allow_variable_length: bool = True # Variable optional fields
67 def _decode_value( # pylint: disable=too-many-locals,too-many-branches # Complexity needed for spec parsing
68 self, data: bytearray, ctx: CharacteristicContext | None = None
69 ) -> PulseOximetryData:
70 """Parse pulse oximetry measurement data according to Bluetooth specification.
72 Format: Flags(1) + SpO2(2) + Pulse Rate(2) + [Timestamp(7)] +
73 [Measurement Status(2)] + [Device Status(3)] + [Pulse Amplitude Index(2)]
74 SpO2 and Pulse Rate are IEEE-11073 16-bit SFLOAT.
76 Context Enhancement:
77 If ctx is provided, this method will attempt to enhance the parsed data with:
78 - PLX Features (0x2A60): Device capabilities and supported measurement types
80 Args:
81 data: Raw bytearray from BLE characteristic.
82 ctx: Optional CharacteristicContext providing surrounding context (may be None).
84 Returns:
85 PulseOximetryData containing parsed pulse oximetry data with optional
86 context-enhanced information.
88 """
89 if len(data) < 5:
90 raise ValueError("Pulse Oximetry Measurement data must be at least 5 bytes")
92 flags = PulseOximetryFlags(data[0])
94 # Parse SpO2 and pulse rate using IEEE-11073 SFLOAT format
95 spo2 = IEEE11073Parser.parse_sfloat(data, 1)
96 pulse_rate = IEEE11073Parser.parse_sfloat(data, 3)
98 # Parse optional fields
99 timestamp: datetime | None = None
100 measurement_status: int | None = None
101 device_status: int | None = None
102 pulse_amplitude_index: float | None = None
103 offset = 5
105 if PulseOximetryFlags.TIMESTAMP_PRESENT in flags and len(data) >= offset + 7:
106 timestamp = IEEE11073Parser.parse_timestamp(data, offset)
107 offset += 7
109 if PulseOximetryFlags.MEASUREMENT_STATUS_PRESENT in flags and len(data) >= offset + 2:
110 measurement_status = DataParser.parse_int16(data, offset, signed=False)
111 offset += 2
113 if PulseOximetryFlags.DEVICE_STATUS_PRESENT in flags and len(data) >= offset + 3:
114 device_status = DataParser.parse_int32(
115 data[offset : offset + 3] + b"\x00", 0, signed=False
116 ) # Pad to 4 bytes
117 offset += 3
119 if PulseOximetryFlags.PULSE_AMPLITUDE_INDEX_PRESENT in flags and len(data) >= offset + 2:
120 pulse_amplitude_index = IEEE11073Parser.parse_sfloat(data, offset)
122 # Context enhancement: PLX Features
123 supported_features: PLXFeatureFlags | None = None
124 if ctx:
125 plx_features_value = self.get_context_characteristic(ctx, CharacteristicName.PLX_FEATURES)
126 if plx_features_value is not None:
127 # PLX Features returns PLXFeatureFlags enum
128 supported_features = plx_features_value
130 # Create immutable struct with all values
131 return PulseOximetryData(
132 spo2=spo2,
133 pulse_rate=pulse_rate,
134 timestamp=timestamp,
135 measurement_status=measurement_status,
136 device_status=device_status,
137 pulse_amplitude_index=pulse_amplitude_index,
138 supported_features=supported_features,
139 )
141 def _encode_value(self, data: PulseOximetryData) -> bytearray:
142 """Encode pulse oximetry measurement value back to bytes.
144 Args:
145 data: PulseOximetryData instance to encode
147 Returns:
148 Encoded bytes representing the measurement
150 """
151 # Build flags
152 flags = PulseOximetryFlags(0)
153 if data.timestamp is not None:
154 flags |= PulseOximetryFlags.TIMESTAMP_PRESENT
155 if data.measurement_status is not None:
156 flags |= PulseOximetryFlags.MEASUREMENT_STATUS_PRESENT
157 if data.device_status is not None:
158 flags |= PulseOximetryFlags.DEVICE_STATUS_PRESENT
159 if data.pulse_amplitude_index is not None:
160 flags |= PulseOximetryFlags.PULSE_AMPLITUDE_INDEX_PRESENT
162 # Build result
163 result = bytearray([int(flags)])
164 result.extend(IEEE11073Parser.encode_sfloat(data.spo2))
165 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_rate))
167 # Encode optional timestamp
168 if data.timestamp is not None:
169 result.extend(IEEE11073Parser.encode_timestamp(data.timestamp))
171 # Encode optional measurement status
172 if data.measurement_status is not None:
173 result.extend(DataParser.encode_int16(data.measurement_status, signed=False))
175 # Encode optional device status
176 if data.device_status is not None:
177 # Device status is 3 bytes (24-bit value)
178 device_status_bytes = DataParser.encode_int32(data.device_status, signed=False)[:3]
179 result.extend(device_status_bytes)
181 # Encode optional pulse amplitude index
182 if data.pulse_amplitude_index is not None:
183 result.extend(IEEE11073Parser.encode_sfloat(data.pulse_amplitude_index))
185 return result