tjy/HeartRate/HeartRateMonitor.py

158 lines
5.0 KiB
Python

import dlib
import numpy as np
import scipy.fftpack as fftpack
from sklearn.decomposition import FastICA
import cv2
from scipy import signal
class HeartRateMonitor:
def __init__(self, fps, freqs_min, freqs_max):
self.fps = fps
self.freqs_min = freqs_min
self.freqs_max = freqs_max
self.all_hr_values = []
def get_channel_signal(self, ROI):
blue = []
green = []
red = []
for roi in ROI:
b, g, r = cv2.split(roi)
b = np.mean(np.sum(b)) / np.std(b)
g = np.mean(np.sum(g)) / np.std(g)
r = np.mean(np.sum(r)) / np.std(r)
blue.append(b)
green.append(g)
red.append(r)
return blue, green, red
def ICA(self, matrix, n_component, max_iter=200):
matrix = matrix.T
ica = FastICA(n_components=n_component, max_iter=max_iter)
u = ica.fit_transform(matrix)
return u.T
def fft_filter(self, signal):
fft = fftpack.fft(signal, axis=0)
frequencies = fftpack.fftfreq(signal.shape[0], d=1.0 / self.fps)
bound_low = (np.abs(frequencies - self.freqs_min)).argmin()
bound_high = (np.abs(frequencies - self.freqs_max)).argmin()
fft[:bound_low] = 0
fft[bound_high:-bound_high] = 0
fft[-bound_low:] = 0
return fft, frequencies
def find_heart_rate(self, fft, freqs):
fft_maximums = []
for i in range(fft.shape[0]):
if self.freqs_min <= freqs[i] <= self.freqs_max:
fftMap = abs(fft[i])
fft_maximums.append(fftMap.max())
else:
fft_maximums.append(0)
peaks, properties = signal.find_peaks(fft_maximums)
max_peak = -1
max_freq = 0
for peak in peaks:
if fft_maximums[peak] > max_freq:
max_freq = fft_maximums[peak]
max_peak = peak
return freqs[max_peak] * 60
def fourier_transform(self, signal, N, fs):
result = fftpack.fft(signal, N)
result = np.abs(result)
freqs = np.arange(N) / N
freqs = freqs * fs
return result[:N // 2], freqs[:N // 2]
def calculate_hrv(self, hr_values, window_size=5):
num_values = int(window_size * self.fps)
start_idx = max(0, len(hr_values) - num_values)
recent_hr_values = hr_values[start_idx:]
rr_intervals = np.array(recent_hr_values)
# 计算SDNN
sdnn = np.std(rr_intervals)
# 计算RMSSD
nn_diffs = np.diff(rr_intervals)
rmssd = np.sqrt(np.mean(nn_diffs ** 2))
# 计算CV R-R
mean_rr = np.mean(rr_intervals)
cv_rr = sdnn / mean_rr if mean_rr != 0 else 0
return sdnn, rmssd, cv_rr
def process_roi(self, ROI):
blue, green, red = self.get_channel_signal(ROI)
matrix = np.array([blue, green, red])
component = self.ICA(matrix, 3)
hr_values = []
for i in range(3):
fft, freqs = self.fft_filter(component[i])
heartrate = self.find_heart_rate(fft, freqs)
hr_values.append(heartrate)
avg_hr = sum(hr_values) / 3
self.all_hr_values.append(avg_hr)
sdnn, rmssd, cv_rr = self.calculate_hrv(self.all_hr_values, window_size=5)
return avg_hr, sdnn, rmssd, cv_rr
if __name__ == '__main__':
ROI = []
freqs_min = 0.8
freqs_max = 1.8
heartrate = 0
sdnn, rmssd, cv_rr = 0, 0, 0
camera_code = 0
capture = cv2.VideoCapture(camera_code)
fps = capture.get(cv2.CAP_PROP_FPS)
hr_monitor = HeartRateMonitor(fps, freqs_min, freqs_max)
detector = dlib.get_frontal_face_detector()
while capture.isOpened():
ret, frame = capture.read()
if not ret:
continue
dects = detector(frame)
for face in dects:
left = face.left()
right = face.right()
top = face.top()
bottom = face.bottom()
h = bottom - top
w = right - left
roi = frame[top + h // 10 * 2:top + h // 10 * 7, left + w // 9 * 2:left + w // 9 * 8]
cv2.rectangle(frame, (left + w // 9 * 2, top + h // 10 * 2), (left + w // 9 * 8, top + h // 10 * 7),
color=(0, 0, 255))
cv2.rectangle(frame, (left, top), (left + w, top + h), color=(0, 0, 255))
ROI.append(roi)
if len(ROI) == 300:
heartrate, sdnn, rmssd, cv_rr = hr_monitor.process_roi(ROI)
for i in range(30):
ROI.pop(0)
cv2.putText(frame, '{:.1f}bps, CV R-R: {:.2f}'.format(heartrate, cv_rr), (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.2,
(255, 0, 255), 2)
cv2.putText(frame, 'SDNN: {:.2f}, RMSSD: {:.2f}'.format(sdnn, rmssd), (50, 80),
cv2.FONT_HERSHEY_SIMPLEX, 1,
(255, 0, 255), 2)
cv2.imshow('frame', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
capture.release()