Artificial intelligent assistant

. 你知道血氧传感器参考案例吗?

在Thonny使用Mircopython编写程序控制RP2040控制Max30102读取心率数据
Max30102芯片介绍
MAX30102是一个集成的脉搏血氧计和心率监测器模块。它包括内部LED、光电探测器、光学元件和具有环境光抑制功能的低噪声电子器件。
MAX30102使用一个1.8V电源和一个用于内部LED的独立3.3V电源。通过标准I2C兼容接口进行通信。该模块可以通过零待机电流的软件关闭,使电源导轨始终保持通电状态。
电路连接
程序代码
程序文件需要文件
init
.py
circular_buffer.py
spo2cal.py
HR_SpO2.py
前面两个文件可在github上面下载
https://github.com/n-elia/MAX30102-MicroPython-driver/tree/main/max30102
spo2cal.py程序如下
#
-*-
coding
:
utf
-
8
#
25
samples per second
(
in algorithm.h
)
SAMPLE_FREQ
=
25
#
taking moving average of
4
samples when calculating HR
#
in algorithm.h
,
"DONOT CHANGE"
comment is attached
MA_SIZE
=
4
#
sampling frequency
*
4
(
in algorithm.h
)
BUFFER_SIZE
=
100
#
this
assumes ir_data
and
red_data as np.array
def calc_hr_and_spo2
(
ir_data
,
red_data
)
:
""
"
By detecting peaks of PPG cycle and corresponding AC/DC
of red/infra-red signal, the an_ratio for the SPO2 is computed.
"
""
#
get dc mean
ir_mean
=
int
(
sum
(
ir_data
)
/
len
(
ir_data
)
)
#
remove DC mean
and
inver signal
#
this
lets peak detecter detect valley
x
=
[
ir_mean
-
x
for
x in ir_data
]
#
4
point moving average
#
x is np.array
with
int
values
,
so automatically casted to
int
for
i in range
(
len
(
x
)
-
MA_SIZE
)
:
x
[
i
]
=
sum
(
x
[
i
:
i
+
MA_SIZE
]
)
/
MA_SIZE
#
calculate threshold
n_th
=
int
(
sum
(
x
)
/
len
(
x
)
)
n_th
=
30
if
n_th
<
30
else
n_th
#
min allowed
n_th
=
60
if
n_th
>
60
else
n_th
#
max allowed
 
ir_valley_locs
,
n_peaks
=
find_peaks
(
x
,
BUFFER_SIZE
,
n_th
,
4
,
15
)
#
print
(
ir_valley_locs
[
:
n_peaks
]
,
","
,
end
=
""
)
peak_interval_sum
=
0
if
n_peaks
>=
2
:
for
i in range
(
1
,
n_peaks
)
:
peak_interval_sum
+=
(
ir_valley_locs
[
i
]
-
ir_valley_locs
[
i
-
1
]
)
peak_interval_sum
=
int
(
peak_interval_sum
/
(
n_peaks
-
1
)
)
hr
=
int
(
SAMPLE_FREQ
*
60
/
peak_interval_sum
)
hr_valid
=
True
else
:
hr
=
-
999
#
unable to calculate because
#
of peaks are too
small
hr_valid
=
False
#
---------
spo2
---------
#
find precise min near ir_valley_locs
(
???
)
exact_ir_valley_locs_count
=
n_peaks
#
find ir
-
red DC
and
ir
-
red AC
for
SPO2 calibration ratio
#
find AC
/
DC maximum of raw
#
FIXME
:
needed
??
for
i in range
(
exact_ir_valley_locs_count
)
:
if
ir_valley_locs
[
i
]
>
BUFFER_SIZE
:
spo2
=
-
999
#
do
not
use
SPO2 since valley loc is out of range
spo2_valid
=
False
return
hr
,
hr_valid
,
spo2
,
spo2_valid
 
i_ratio_count
=
0
ratio
=
[
]
#
find max between two valley locations
#
and
use
ratio between AC component of Ir
and
Red DC component of Ir
and
Red
for
SpO2
red_dc_max_index
=
-
1
ir_dc_max_index
=
-
1
for
k in range
(
exact_ir_valley_locs_count
-
1
)
:
red_dc_max
=
-
16777216
ir_dc_max
=
-
16777216
if
ir_valley_locs
[
k
+
1
]
-
ir_valley_locs
[
k
]
>
3
:
for
i in range
(
ir_valley_locs
[
k
]
,
ir_valley_locs
[
k
+
1
]
)
:
if
ir_data
[
i
]
>
ir_dc_max
:
ir_dc_max
=
ir_data
[
i
]
ir_dc_max_index
=
i
if
red_data
[
i
]
>
red_dc_max
:
red_dc_max
=
red_data
[
i
]
red_dc_max_index
=
i
 
red_ac
=
int
(
(
red_data
[
ir_valley_locs
[
k
+
1
]
]
-
red_data
[
ir_valley_locs
[
k
]
]
)
*
(
red_dc_max_index
-
ir_valley_locs
[
k
]
)
)
red_ac
=
red_data
[
ir_valley_locs
[
k
]
]
+
int
(
red_ac
/
(
ir_valley_locs
[
k
+
1
]
-
ir_valley_locs
[
k
]
)
)
red_ac
=
red_data
[
red_dc_max_index
]
-
red_ac
#
subtract linear DC components from raw
 
ir_ac
=
int
(
(
ir_data
[
ir_valley_locs
[
k
+
1
]
]
-
ir_data
[
ir_valley_locs
[
k
]
]
)
*
(
ir_dc_max_index
-
ir_valley_locs
[
k
]
)
)
ir_ac
=
ir_data
[
ir_valley_locs
[
k
]
]
+
int
(
ir_ac
/
(
ir_valley_locs
[
k
+
1
]
-
ir_valley_locs
[
k
]
)
)
ir_ac
=
ir_data
[
ir_dc_max_index
]
-
ir_ac
#
subtract linear DC components from raw
 
nume
=
red_ac
*
ir_dc_max
denom
=
ir_ac
*
red_dc_max
if
(
denom
>
0
and
i_ratio_count
<
5
)
and
nume
!=
0
:
#
original cpp implementation uses overflow intentionally.
#
but at
64
-
bit
OS
,
Pyhthon
3
.X uses
64
-
bit
int
and
nume
*
100
/
denom does
not
trigger overflow
#
so using
bit
operation
(
&
0xffffffff
)
is needed
ratio.append
(
int
(
(
(
nume
*
100
)
&
0xffffffff
)
/
denom
)
)
i_ratio_count
+=
1
#
choose median value since PPG signal may vary from beat to beat
ratio
=
sorted
(
ratio
)
#
sort to ascending order
mid_index
=
int
(
i_ratio_count
/
2
)
ratio_ave
=
0
if
mid_index
>
1
:
ratio_ave
=
int
(
(
ratio
[
mid_index
-
1
]
+
ratio
[
mid_index
]
)
/
2
)
else
:
if
len
(
ratio
)
!=
0
:
ratio_ave
=
ratio
[
mid_index
]
#
why
184
?
#
print
(
"ratio average: "
,
ratio_ave
)
if
ratio_ave
>
2
and
ratio_ave
<
184
:
#
-
45.060
*
ratioAverage
*
ratioAverage
/
10000
+
30.354
*
ratioAverage
/
100
+
94.845
spo2
=
-
45.060
*
(
ratio_ave
**
2
)
/
10000.0
+
30.054
*
ratio_ave
/
100.0
+
94.845
spo2_valid
=
True
else
:
spo2
=
-
999
spo2_valid
=
False
return
hr
-
20
,
hr_valid
,
spo2
,
spo2_valid
 
 
def find_peaks
(
x
,
size
,
min_height
,
min_dist
,
max_num
)
:
""
"
Find at most MAX_NUM peaks above MIN_HEIGHT separated by at least MIN_DISTANCE
"
""
ir_valley_locs
,
n_peaks
=
find_peaks_above_min_height
(
x
,
size
,
min_height
,
max_num
)
ir_valley_locs
,
n_peaks
=
remove_close_peaks
(
n_peaks
,
ir_valley_locs
,
x
,
min_dist
)
n_peaks
=
min
(
[
n_peaks
,
max_num
]
)
return
ir_valley_locs
,
n_peaks
 
 
def find_peaks_above_min_height
(
x
,
size
,
min_height
,
max_num
)
:
""
"
Find all peaks above MIN_HEIGHT
"
""
i
=
0
n_peaks
=
0
ir_valley_locs
=
[
]
#
[
0
for
i in range
(
max_num
)
]
while
i
<
size
-
1
:
if
x
[
i
]
>
min_height
and
x
[
i
]
>
x
[
i
-
1
]
:
#
find the left
edge
of potential peaks
n_width
=
1
#
original condition i
+
n_width
<
size may cause IndexError
#
so I changed the condition to i
+
n_width
<
size
-
1
while
i
+
n_width
<
size
-
1
and
x
[
i
]
==
x
[
i
+
n_width
]
:
#
find flat peaks
n_width
+=
1
if
x
[
i
]
>
x
[
i
+
n_width
]
and
n_peaks
<
max_num
:
#
find the right
edge
of peaks
#
ir_valley_locs
[
n_peaks
]
=
i
ir_valley_locs.append
(
i
)
n_peaks
+=
1
#
original uses post increment
i
+=
n_width
+
1
else
:
i
+=
n_width
else
:
i
+=
1
return
ir_valley_locs
,
n_peaks
 
 
def remove_close_peaks
(
n_peaks
,
ir_valley_locs
,
x
,
min_dist
)
:
""
"
Remove peaks separated by less than MIN_DISTANCE
"
""
#
should be equal to maxim_sort_indices_descend
#
order peaks from
large
to
small
#
should ignore index
:
0
sorted_indices
=
sorted
(
ir_valley_locs
,
key
=
lambda i
:
x
[
i
]
)
sorted_indices.reverse
(
)
#
this
"for"
loop expression does
not
check finish condition
#
for
i in range
(
-
1
,
n_peaks
)
:
i
=
-
1
while
i
<
n_peaks
:
old_n_peaks
=
n_peaks
n_peaks
=
i
+
1
#
this
"for"
loop expression does
not
check finish condition
#
for
j in
(
i
+
1
,
old_n_peaks
)
:
j
=
i
+
1
while
j
<
old_n_peaks
:
n_dist
=
(
sorted_indices
[
j
]
-
sorted_indices
[
i
]
)
if
i
!=
-
1
else
(
sorted_indices
[
j
]
+
1
)
#
lag
-
zero peak of autocorr is at index
-
1
if
n_dist
>
min_dist
or
n_dist
<
-
1
*
min_dist
:
sorted_indices
[
n_peaks
]
=
sorted_indices
[
j
]
n_peaks
+=
1
#
original uses post increment
j
+=
1
i
+=
1
sorted_indices
[
:
n_peaks
]
=
sorted
(
sorted_indices
[
:
n_peaks
]
)
return
sorted_indices
,
n_peaks
if
__name__
==
"__main__"
:
hr
,
hrb
,
sp
,
spb
=
calc_hr_and_spo2
(
[
12853
,
15573
,
15580
,
15586
,
15587
,
15567
,
15520
,
15480
,
15464
,
15460
,
15462
,
15466
,
15473
,
15479
,
15485
,
15490
,
15495
,
15503
,
15512
,
15518
,
15521
,
15521
,
15518
,
15517
,
15522
,
15527
,
15536
,
15547
,
15558
,
15568
,
15577
,
15587
,
15594
,
15604
,
15610
,
15616
,
15620
,
15624
,
15625
,
15615
,
15576
,
15531
,
15508
,
15500
,
15502
,
15509
,
15516
,
15523
,
15528
,
15533
,
15538
,
15547
,
15556
,
15564
,
15564
,
15560
,
15556
,
15556
,
15559
,
15564
,
15570
,
15579
,
15588
,
15599
,
15610
,
15619
,
15628
,
15635
,
15642
,
15649
,
15655
,
15662
,
15669
,
15672
,
15661
,
15621
,
15571
,
15546
,
15537
,
15538
,
15545
,
15553
,
15560
,
15565
,
15570
,
15577
,
15585
,
15593
,
15600
,
15601
,
15597
,
15592
,
15591
,
15594
,
15600
,
15608
,
15617
,
15626
,
15633
,
15640
]
,
[
12258
,
14318
,
14322
,
14324
,
14326
,
14317
,
14299
,
14284
,
14280
,
14279
,
14280
,
14283
,
14285
,
14288
,
14292
,
14294
,
14297
,
14299
,
14302
,
14304
,
14305
,
14305
,
14304
,
14304
,
14306
,
14308
,
14311
,
14316
,
14321
,
14325
,
14329
,
14333
,
14329
,
14329
,
14332
,
14335
,
14336
,
14338
,
14338
,
14333
,
14315
,
14295
,
14286
,
14283
,
14285
,
14288
,
14292
,
14295
,
14297
,
14298
,
14301
,
14305
,
14309
,
14312
,
14312
,
14310
,
14308
,
14308
,
14309
,
14312
,
14315
,
14318
,
14322
,
14327
,
14332
,
14336
,
14341
,
14344
,
14347
,
14350
,
14351
,
14354
,
14357
,
14359
,
14353
,
14335
,
14313
,
14304
,
14300
,
14302
,
14305
,
14309
,
14312
,
14314
,
14316
,
14319
,
14323
,
14326
,
14329
,
14329
,
14326
,
14325
,
14324
,
14326
,
14328
,
14332
,
14336
,
14341
,
14345
,
14349
]
)
print
(
"hr detected:"
,
hrb
)
print
(
"sp detected:"
,
spb
)
if
(
hrb
==
True
and
hr
!=
-
999
)
:
hr2
=
int
(
hr
)
print
(
"Heart Rate : "
,
hr2
)
if
(
spb
==
True
and
sp
!=
-
999
)
:
sp2
=
int
(
sp
)
print
(
"SPO2 : "
,
sp2
)
HR_SpO2.py程序如下:
from machine
import
SoftI2C
,
Pin
,
Timer
from utime
import
ticks_diff
,
ticks_us
from max30102
import
MAX30102
,
MAX30105_PULSE_AMP_MEDIUM
from spo2cal
import
calc_hr_and_spo2
 
 
BEATS
=
0
#
存储心率
FINGER_FLAG
=
False
#
默认表示未检测到手指
 
SPO2
=
0
#
存储血氧
TEMPERATURE
=
0
#
存储温度
 
 
def display_info
(
t
)
:
#
如果没有检测到手指,那么就不显示
if
FINGER_FLAG is False
:
return
print
(
'Heart Rate
:
'
,
BEATS
,
" SpO2:"
,
SPO2
,
" Temperture:"
,
TEMPERATURE
)
def main
(
)
:
global
BEATS
,
FINGER_FLAG
,
SPO2
,
TEMPERATURE
#
如果需要对全局变量修改,则需要
global
声明
#
创建I2C对象
(
检测MAX30102
)
i2c
=
SoftI2C
(
sda
=
Pin
(
16
)
,
scl
=
Pin
(
17
)
,
freq
=
400000
)
#
Fast
:
400kHz
,
slow
:
100kHz
#
创建传感器对象
sensor
=
MAX30102
(
i2c
=
i2c
)
#
检测是否有传感器
if
sensor.i2c_address
not
in i2c.scan
(
)
:
print
(
"没有找到传感器"
)
return
elif
not
(
sensor.check_part_id
(
)
)
:
#
检查传感器是否兼容
print
(
"检测到的I2C设备不是MAX30102或者MAX30105"
)
return
else
:
print
(
"传感器已识别到"
)
#
配置
sensor.setup_sensor
(
)
sensor.set_sample_rate
(
400
)
sensor.set_fifo_average
(
8
)
sensor.set_active_leds_amplitude
(
MAX30105_PULSE_AMP_MEDIUM
)
t_start
=
ticks_us
(
)
#
Starting
time
of the acquisition
 
MAX_HISTORY
=
32
history
=
[
]
beats_history
=
[
]
beat
=
False
 
red_list
=
[
]
ir_list
=
[
]
while
True
:
sensor.check
(
)
if
sensor.available
(
)
:
#
FIFO 先进先出,从队列中取数据。都是整形
int
red_reading
=
sensor.pop_red_from_storage
(
)
ir_reading
=
sensor.pop_ir_from_storage
(
)
if
red_reading
<
1000
:
print
(
'No finger'
)
FINGER_FLAG
=
False
#
表示没有放手指
continue
else
:
FINGER_FLAG
=
True
#
表示手指已放
#
计算心率
history.append
(
red_reading
)
#
为了防止列表过大,这里取列表的后
32
个元素
history
=
history
[
-
MAX_HISTORY
:
]
#
提取必要数据
minima
,
maxima
=
min
(
history
)
,
max
(
history
)
threshold_on
=
(
minima
+
maxima
*
3
)
// 4 # 3/4
threshold_off
=
(
minima
+
maxima
)
// 2 # 1/2
if
not
beat
and
red_reading
>
threshold_on
:
beat
=
True
t_us
=
ticks_diff
(
ticks_us
(
)
,
t_start
)
t_s
=
t_us
/
1000000
f
=
1
/
t_s
bpm
=
f
*
60
if
bpm
<
500
:
t_start
=
ticks_us
(
)
beats_history.append
(
bpm
)
beats_history
=
beats_history
[
-
MAX_HISTORY
:
]
#
只保留最大
30
个元素数据
BEATS
=
round
(
sum
(
beats_history
)
/
len
(
beats_history
)
,
2
)
#
四舍五入
if
beat
and
red_reading
<
threshold_off
:
beat
=
False
#
计算血氧
red_list.append
(
red_reading
)
ir_list.append
(
ir_reading
)
#
最多 只保留最新的
100

red_list
=
red_list
[
-
100
:
]
ir_list
=
ir_list
[
-
100
:
]
#
计算血氧值
if
len
(
red_list
)
==
100
and
len
(
ir_list
)
==
100
:
hr
,
hrb
,
sp
,
spb
=
calc_hr_and_spo2
(
red_list
,
ir_list
)
if
hrb is True
and
spb is True
:
if
sp
!=
-
999
:
SPO2
=
int
(
sp
)
#
计算温度
TEMPERATURE
=
sensor.read_temperature
(
)
if
__name__
==
'__main__'
:
tim
=
Timer
(
period
=
1000
,
mode
=
Timer.PERIODIC
,
callback
=
display_info
)
main
(
)
运行效果

mx6-aJIcxzdXS5dXLMu_lkUnPT2gXNErQouQ5AxEulg e0dd354bbee2617c1acda291ab4c4600