158 lines
5.0 KiB
Python
158 lines
5.0 KiB
Python
|
import dlib
|
||
|
import numpy as np
|
||
|
import scipy.fftpack as fftpack
|
||
|
from sklearn.decomposition import FastICA
|
||
|
import cv2
|
||
|
from scipy import signal
|
||
|
|
||
|
|
||
|
class HeartRateMonitor:
|
||
|
def __init__(self, fps, freqs_min, freqs_max):
|
||
|
self.fps = fps
|
||
|
self.freqs_min = freqs_min
|
||
|
self.freqs_max = freqs_max
|
||
|
self.all_hr_values = []
|
||
|
|
||
|
def get_channel_signal(self, ROI):
|
||
|
blue = []
|
||
|
green = []
|
||
|
red = []
|
||
|
for roi in ROI:
|
||
|
b, g, r = cv2.split(roi)
|
||
|
b = np.mean(np.sum(b)) / np.std(b)
|
||
|
g = np.mean(np.sum(g)) / np.std(g)
|
||
|
r = np.mean(np.sum(r)) / np.std(r)
|
||
|
blue.append(b)
|
||
|
green.append(g)
|
||
|
red.append(r)
|
||
|
return blue, green, red
|
||
|
|
||
|
def ICA(self, matrix, n_component, max_iter=200):
|
||
|
matrix = matrix.T
|
||
|
ica = FastICA(n_components=n_component, max_iter=max_iter)
|
||
|
u = ica.fit_transform(matrix)
|
||
|
return u.T
|
||
|
|
||
|
def fft_filter(self, signal):
|
||
|
fft = fftpack.fft(signal, axis=0)
|
||
|
frequencies = fftpack.fftfreq(signal.shape[0], d=1.0 / self.fps)
|
||
|
bound_low = (np.abs(frequencies - self.freqs_min)).argmin()
|
||
|
bound_high = (np.abs(frequencies - self.freqs_max)).argmin()
|
||
|
fft[:bound_low] = 0
|
||
|
fft[bound_high:-bound_high] = 0
|
||
|
fft[-bound_low:] = 0
|
||
|
return fft, frequencies
|
||
|
|
||
|
def find_heart_rate(self, fft, freqs):
|
||
|
fft_maximums = []
|
||
|
|
||
|
for i in range(fft.shape[0]):
|
||
|
if self.freqs_min <= freqs[i] <= self.freqs_max:
|
||
|
fftMap = abs(fft[i])
|
||
|
fft_maximums.append(fftMap.max())
|
||
|
else:
|
||
|
fft_maximums.append(0)
|
||
|
|
||
|
peaks, properties = signal.find_peaks(fft_maximums)
|
||
|
max_peak = -1
|
||
|
max_freq = 0
|
||
|
|
||
|
for peak in peaks:
|
||
|
if fft_maximums[peak] > max_freq:
|
||
|
max_freq = fft_maximums[peak]
|
||
|
max_peak = peak
|
||
|
|
||
|
return freqs[max_peak] * 60
|
||
|
|
||
|
def fourier_transform(self, signal, N, fs):
|
||
|
result = fftpack.fft(signal, N)
|
||
|
result = np.abs(result)
|
||
|
freqs = np.arange(N) / N
|
||
|
freqs = freqs * fs
|
||
|
return result[:N // 2], freqs[:N // 2]
|
||
|
|
||
|
def calculate_hrv(self, hr_values, window_size=5):
|
||
|
num_values = int(window_size * self.fps)
|
||
|
start_idx = max(0, len(hr_values) - num_values)
|
||
|
recent_hr_values = hr_values[start_idx:]
|
||
|
rr_intervals = np.array(recent_hr_values)
|
||
|
|
||
|
# 计算SDNN
|
||
|
sdnn = np.std(rr_intervals)
|
||
|
|
||
|
# 计算RMSSD
|
||
|
nn_diffs = np.diff(rr_intervals)
|
||
|
rmssd = np.sqrt(np.mean(nn_diffs ** 2))
|
||
|
|
||
|
# 计算CV R-R
|
||
|
mean_rr = np.mean(rr_intervals)
|
||
|
cv_rr = sdnn / mean_rr if mean_rr != 0 else 0
|
||
|
|
||
|
return sdnn, rmssd, cv_rr
|
||
|
|
||
|
def process_roi(self, ROI):
|
||
|
blue, green, red = self.get_channel_signal(ROI)
|
||
|
matrix = np.array([blue, green, red])
|
||
|
component = self.ICA(matrix, 3)
|
||
|
hr_values = []
|
||
|
for i in range(3):
|
||
|
fft, freqs = self.fft_filter(component[i])
|
||
|
heartrate = self.find_heart_rate(fft, freqs)
|
||
|
hr_values.append(heartrate)
|
||
|
avg_hr = sum(hr_values) / 3
|
||
|
self.all_hr_values.append(avg_hr)
|
||
|
sdnn, rmssd, cv_rr = self.calculate_hrv(self.all_hr_values, window_size=5)
|
||
|
return avg_hr, sdnn, rmssd, cv_rr
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
|
||
|
ROI = []
|
||
|
|
||
|
freqs_min = 0.8
|
||
|
freqs_max = 1.8
|
||
|
heartrate = 0
|
||
|
sdnn, rmssd, cv_rr = 0, 0, 0
|
||
|
camera_code = 0
|
||
|
capture = cv2.VideoCapture(camera_code)
|
||
|
fps = capture.get(cv2.CAP_PROP_FPS)
|
||
|
|
||
|
hr_monitor = HeartRateMonitor(fps, freqs_min, freqs_max)
|
||
|
|
||
|
detector = dlib.get_frontal_face_detector()
|
||
|
while capture.isOpened():
|
||
|
ret, frame = capture.read()
|
||
|
if not ret:
|
||
|
continue
|
||
|
dects = detector(frame)
|
||
|
for face in dects:
|
||
|
left = face.left()
|
||
|
right = face.right()
|
||
|
top = face.top()
|
||
|
bottom = face.bottom()
|
||
|
|
||
|
h = bottom - top
|
||
|
w = right - left
|
||
|
roi = frame[top + h // 10 * 2:top + h // 10 * 7, left + w // 9 * 2:left + w // 9 * 8]
|
||
|
|
||
|
cv2.rectangle(frame, (left + w // 9 * 2, top + h // 10 * 2), (left + w // 9 * 8, top + h // 10 * 7),
|
||
|
color=(0, 0, 255))
|
||
|
cv2.rectangle(frame, (left, top), (left + w, top + h), color=(0, 0, 255))
|
||
|
ROI.append(roi)
|
||
|
if len(ROI) == 300:
|
||
|
heartrate, sdnn, rmssd, cv_rr = hr_monitor.process_roi(ROI)
|
||
|
for i in range(30):
|
||
|
ROI.pop(0)
|
||
|
cv2.putText(frame, '{:.1f}bps, CV R-R: {:.2f}'.format(heartrate, cv_rr), (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.2,
|
||
|
(255, 0, 255), 2)
|
||
|
cv2.putText(frame, 'SDNN: {:.2f}, RMSSD: {:.2f}'.format(sdnn, rmssd), (50, 80),
|
||
|
cv2.FONT_HERSHEY_SIMPLEX, 1,
|
||
|
(255, 0, 255), 2)
|
||
|
cv2.imshow('frame', frame)
|
||
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
||
|
break
|
||
|
|
||
|
cv2.destroyAllWindows()
|
||
|
capture.release()
|
||
|
|