import cv2 import numpy as np from scipy import signal from scipy.fftpack import fft from scipy.signal import find_peaks class RespirationRateDetector: def __init__(self, args): self.args = args def FeaturePointSelectionStrategy(self, Image, FPN=5, QualityLevel=0.3): Image_gray = Image feature_params = dict(maxCorners=self.args.FSS_maxCorners, qualityLevel=QualityLevel, minDistance=self.args.FSS_minDistance) p0 = cv2.goodFeaturesToTrack(Image_gray, mask=self.args.FSS_mask, **feature_params) """ Robust checking """ while (p0 is None): QualityLevel = QualityLevel - self.args.FSS_QualityLevelRV feature_params = dict(maxCorners=self.args.FSS_maxCorners, qualityLevel=QualityLevel, minDistance=self.args.FSS_minDistance) p0 = cv2.goodFeaturesToTrack(Image_gray, mask=None, **feature_params) if len(p0) < FPN: FPN = len(p0) h = Image_gray.shape[0] / 2 w = Image_gray.shape[1] / 2 p1 = p0.copy() p1[:, :, 0] -= w p1[:, :, 1] -= h p1_1 = np.multiply(p1, p1) p1_2 = np.sum(p1_1, 2) p1_3 = np.sqrt(p1_2) p1_4 = p1_3[:, 0] p1_5 = np.argsort(p1_4) FPMap = np.zeros((FPN, 1, 2), dtype=np.float32) for i in range(FPN): FPMap[i, :, :] = p0[p1_5[i], :, :] return FPMap def CorrelationGuidedOpticalFlowMethod(self, FeatureMtx_Amp, RespCurve): CGAmp_Mtx = FeatureMtx_Amp.T CGAmpAugmented_Mtx = np.zeros((CGAmp_Mtx.shape[0] + 1, CGAmp_Mtx.shape[1])) CGAmpAugmented_Mtx[0, :] = RespCurve CGAmpAugmented_Mtx[1:, :] = CGAmp_Mtx Correlation_Mtx = np.corrcoef(CGAmpAugmented_Mtx) CM_mean = np.mean(abs(Correlation_Mtx[0, 1:])) Quality_num = (abs(Correlation_Mtx[0, 1:]) >= CM_mean).sum() QualityFeaturePoint_arg = (abs(Correlation_Mtx[0, 1:]) >= CM_mean).argsort()[0 - Quality_num:] CGOF_Mtx = np.zeros((FeatureMtx_Amp.shape[0], Quality_num)) for i in range(Quality_num): CGOF_Mtx[:, i] = FeatureMtx_Amp[:, QualityFeaturePoint_arg[i]] CGOF_Mtx_RespCurve = np.sum(CGOF_Mtx, 1) / Quality_num return CGOF_Mtx_RespCurve def ImproveOpticalFlow(self, frames, fs): feature_params = dict(maxCorners=self.args.OFP_maxCorners, qualityLevel=self.args.OFP_qualityLevel, minDistance=self.args.OFP_minDistance) old_frame = frames[0] old_gray = cv2.cvtColor(old_frame, cv2.COLOR_BGR2GRAY) p0 = cv2.goodFeaturesToTrack(old_gray, mask=self.args.OFP_mask, **feature_params) """ Robust Checking """ while (p0 is None): self.args.OFP_qualityLevel = self.args.OFP_qualityLevel - self.args.OFP_QualityLevelRV feature_params = dict(maxCorners=self.args.OFP_maxCorners, qualityLevel=self.args.OFP_qualityLevel, minDistance=self.args.OFP_minDistance) p0 = cv2.goodFeaturesToTrack(old_gray, mask=None, **feature_params) """ FeaturePoint Selection Strategy """ if self.args.FSS: p0 = self.FeaturePointSelectionStrategy(Image=old_gray, FPN=self.args.FSS_FPN, QualityLevel=self.args.FSS_qualityLevel) else: p0 = cv2.goodFeaturesToTrack(old_gray, mask=None, **feature_params) lk_params = dict(winSize=self.args.OFP_winSize, maxLevel=self.args.OFP_maxLevel) total_frame = len(frames) FeatureMtx = np.zeros((total_frame, p0.shape[0], 2)) FeatureMtx[0, :, 0] = p0[:, 0, 0].T FeatureMtx[0, :, 1] = p0[:, 0, 1].T frame_num = 1 while (frame_num < total_frame): frame_num += 1 frame = frames[frame_num - 1] frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) pl, st, err = cv2.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params) old_gray = frame_gray.copy() p0 = pl.reshape(-1, 1, 2) FeatureMtx[frame_num - 1, :, 0] = p0[:, 0, 0].T FeatureMtx[frame_num - 1, :, 1] = p0[:, 0, 1].T FeatureMtx_Amp = np.sqrt(FeatureMtx[:, :, 0] ** 2 + FeatureMtx[:, :, 1] ** 2) RespCurve = np.sum(FeatureMtx_Amp, 1) / p0.shape[0] """ CCorrelation-Guided Optical Flow Method """ if self.args.CGOF: RespCurve = self.CorrelationGuidedOpticalFlowMethod(FeatureMtx_Amp, RespCurve) """" Filter """ if self.args.filter: original_signal = RespCurve # filter_order = self.args.Filter_order LowPass = self.args.Filter_LowPass / 60 HighPass = self.args.Filter_HighPass / 60 b, a = signal.butter(filter_order, [2 * LowPass / fs, 2 * HighPass / fs], self.args.Filter_type) filtedResp = signal.filtfilt(b, a, original_signal) else: filtedResp = RespCurve """ Normalization """ if self.args.Normalization: Resp_max = max(filtedResp) Resp_min = min(filtedResp) Resp_norm = (filtedResp - Resp_min) / (Resp_max - Resp_min) - 0.5 else: Resp_norm = filtedResp return 1 - Resp_norm def FFT(self, data, fs): fft_y = fft(data) maxFrequency = fs f = np.linspace(0, maxFrequency, len(data)) abs_y = np.abs(fft_y) normalization_y = abs_y / len(data) normalization_half_y = normalization_y[range(int(len(data) / 2))] sorted_indices = np.argsort(normalization_half_y) RR = f[sorted_indices[-2]] * 60 return RR def PeakCounting(self, data, fs, Height=0.1, Threshold=0.2, MaxRR=30): Distance = 60 / MaxRR * fs peaks, _ = find_peaks(data, height=Height, threshold=Threshold, distance=Distance) RR = len(peaks) / (len(data) / fs) * 60 return RR def CrossingPoint(self, data, fs): shfit_distance = int(fs / 2) data_shift = np.zeros(data.shape) - 1 data_shift[shfit_distance:] = data[:-shfit_distance] cross_curve = data - data_shift zero_number = 0 zero_index = [] for i in range(len(cross_curve) - 1): if cross_curve[i] == 0: zero_number += 1 zero_index.append(i) else: if cross_curve[i] * cross_curve[i + 1] < 0: zero_number += 1 zero_index.append(i) cw = zero_number N = len(data) RR1 = ((cw / 2) / (N / fs)) * 60 return RR1 def NegativeFeedbackCrossoverPointMethod(self, data, fs, QualityLevel=0.2): shfit_distance = int(fs / 2) data_shift = np.zeros(data.shape) - 1 data_shift[shfit_distance:] = data[:-shfit_distance] cross_curve = data - data_shift zero_number = 0 zero_index = [] for i in range(len(cross_curve) - 1): if cross_curve[i] == 0: zero_number += 1 zero_index.append(i) else: if cross_curve[i] * cross_curve[i + 1] < 0: zero_number += 1 zero_index.append(i) cw = zero_number N = len(data) RR1 = ((cw / 2) / (N / fs)) * 60 if (len(zero_index) <= 1): RR2 = RR1 else: time_span = 60 / RR1 / 2 * fs * QualityLevel zero_span = [] for i in range(len(zero_index) - 1): zero_span.append(zero_index[i + 1] - zero_index[i]) while (min(zero_span) < time_span): doubt_point = np.argmin(zero_span) zero_index.pop(doubt_point) zero_index.pop(doubt_point) if len(zero_index) <= 1: break zero_span = [] for i in range(len(zero_index) - 1): zero_span.append(zero_index[i + 1] - zero_index[i]) zero_number = len(zero_index) cw = zero_number RR2 = ((cw / 2) / (N / fs)) * 60 return RR2 def detect_respiration_rate(self, frames, fs): resp_curve = self.ImproveOpticalFlow(frames, fs) RR_FFT = self.FFT(resp_curve, fs) RR_PC = self.PeakCounting(resp_curve, fs) RR_CP = self.CrossingPoint(resp_curve, fs) RR_NFCP = self.NegativeFeedbackCrossoverPointMethod(resp_curve, fs) return resp_curve, RR_FFT, RR_PC, RR_CP, RR_NFCP