import dlib import numpy as np import scipy.fftpack as fftpack from sklearn.decomposition import FastICA import cv2 from scipy import signal class HeartRateMonitor: def __init__(self, fps, freqs_min, freqs_max): self.fps = fps self.freqs_min = freqs_min self.freqs_max = freqs_max self.all_hr_values = [] def get_channel_signal(self, ROI): blue = [] green = [] red = [] for roi in ROI: b, g, r = cv2.split(roi) b = np.mean(np.sum(b)) / np.std(b) g = np.mean(np.sum(g)) / np.std(g) r = np.mean(np.sum(r)) / np.std(r) blue.append(b) green.append(g) red.append(r) return blue, green, red def ICA(self, matrix, n_component, max_iter=200): matrix = matrix.T ica = FastICA(n_components=n_component, max_iter=max_iter) u = ica.fit_transform(matrix) return u.T def fft_filter(self, signal): fft = fftpack.fft(signal, axis=0) frequencies = fftpack.fftfreq(signal.shape[0], d=1.0 / self.fps) bound_low = (np.abs(frequencies - self.freqs_min)).argmin() bound_high = (np.abs(frequencies - self.freqs_max)).argmin() fft[:bound_low] = 0 fft[bound_high:-bound_high] = 0 fft[-bound_low:] = 0 return fft, frequencies def find_heart_rate(self, fft, freqs): fft_maximums = [] for i in range(fft.shape[0]): if self.freqs_min <= freqs[i] <= self.freqs_max: fftMap = abs(fft[i]) fft_maximums.append(fftMap.max()) else: fft_maximums.append(0) peaks, properties = signal.find_peaks(fft_maximums) max_peak = -1 max_freq = 0 for peak in peaks: if fft_maximums[peak] > max_freq: max_freq = fft_maximums[peak] max_peak = peak return freqs[max_peak] * 60 def fourier_transform(self, signal, N, fs): result = fftpack.fft(signal, N) result = np.abs(result) freqs = np.arange(N) / N freqs = freqs * fs return result[:N // 2], freqs[:N // 2] def calculate_hrv(self, hr_values, window_size=5): num_values = int(window_size * self.fps) start_idx = max(0, len(hr_values) - num_values) recent_hr_values = hr_values[start_idx:] rr_intervals = np.array(recent_hr_values) # 计算SDNN sdnn = np.std(rr_intervals) # 计算RMSSD nn_diffs = np.diff(rr_intervals) rmssd = np.sqrt(np.mean(nn_diffs ** 2)) # 计算CV R-R mean_rr = np.mean(rr_intervals) cv_rr = sdnn / mean_rr if mean_rr != 0 else 0 return sdnn, rmssd, cv_rr def process_roi(self, ROI): blue, green, red = self.get_channel_signal(ROI) matrix = np.array([blue, green, red]) component = self.ICA(matrix, 3) hr_values = [] for i in range(3): fft, freqs = self.fft_filter(component[i]) heartrate = self.find_heart_rate(fft, freqs) hr_values.append(heartrate) avg_hr = sum(hr_values) / 3 self.all_hr_values.append(avg_hr) sdnn, rmssd, cv_rr = self.calculate_hrv(self.all_hr_values, window_size=5) return avg_hr, sdnn, rmssd, cv_rr if __name__ == '__main__': ROI = [] freqs_min = 0.8 freqs_max = 1.8 heartrate = 0 sdnn, rmssd, cv_rr = 0, 0, 0 camera_code = 0 capture = cv2.VideoCapture(camera_code) fps = capture.get(cv2.CAP_PROP_FPS) hr_monitor = HeartRateMonitor(fps, freqs_min, freqs_max) detector = dlib.get_frontal_face_detector() while capture.isOpened(): ret, frame = capture.read() if not ret: continue dects = detector(frame) for face in dects: left = face.left() right = face.right() top = face.top() bottom = face.bottom() h = bottom - top w = right - left roi = frame[top + h // 10 * 2:top + h // 10 * 7, left + w // 9 * 2:left + w // 9 * 8] cv2.rectangle(frame, (left + w // 9 * 2, top + h // 10 * 2), (left + w // 9 * 8, top + h // 10 * 7), color=(0, 0, 255)) cv2.rectangle(frame, (left, top), (left + w, top + h), color=(0, 0, 255)) ROI.append(roi) if len(ROI) == 300: heartrate, sdnn, rmssd, cv_rr = hr_monitor.process_roi(ROI) for i in range(30): ROI.pop(0) cv2.putText(frame, '{:.1f}bps, CV R-R: {:.2f}'.format(heartrate, cv_rr), (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (255, 0, 255), 2) cv2.putText(frame, 'SDNN: {:.2f}, RMSSD: {:.2f}'.format(sdnn, rmssd), (50, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 255), 2) cv2.imshow('frame', frame) if cv2.waitKey(1) & 0xFF == ord('q'): break cv2.destroyAllWindows() capture.release()