tjy/demo/apis/bp/dataloader.py

286 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import h5py
def custom_collate_fn(batch):
X, y_SBP, y_DBP = zip(*batch)
X = torch.tensor(np.array(X), dtype=torch.float32)
y_SBP = torch.tensor(y_SBP, dtype=torch.float32)
y_DBP = torch.tensor(y_DBP, dtype=torch.float32)
return X, y_SBP, y_DBP
class BPDataset(Dataset):
def __init__(self, X_data, y_SBP, y_DBP):
self.X_data = X_data
self.y_SBP = y_SBP
self.y_DBP = y_DBP
def __len__(self):
return len(self.y_SBP)
def __getitem__(self, idx):
# X_sample = self.X_data[idx * 250:(idx + 1) * 250]
X_sample = self.X_data[idx]
y_SBP_sample = self.y_SBP[idx]
y_DBP_sample = self.y_DBP[idx]
return X_sample, y_SBP_sample, y_DBP_sample
class BPDataLoader:
def __init__(self, data_dir, val_split=0.2, batch_size=32, shuffle=True, data_type='npy'):
self.data_dir = data_dir
self.val_split = val_split
self.batch_size = batch_size
self.shuffle = shuffle
self.train_dataloader = None
self.val_dataloader = None
self.data_type = data_type
def load_data(self):
X_BP_path = os.path.join(self.data_dir, 'X_BP.npy')
y_DBP_path = os.path.join(self.data_dir, 'Y_DBP.npy')
y_SBP_path = os.path.join(self.data_dir, 'Y_SBP.npy')
X_BP = np.load(X_BP_path)
# 将数据reshape成(batch_size, 250,1)的形状
X_BP = X_BP.reshape(-1, 250, 1)
y_DBP = np.load(y_DBP_path)
y_SBP = np.load(y_SBP_path)
return X_BP, y_DBP, y_SBP
def load_data_UKL_h5(self):
X_BP_path = os.path.join(self.data_dir, 'rPPG-BP-UKL_rppg_7s.h5')
with h5py.File(X_BP_path, 'r') as f:
rppg = f.get('rppg')
BP = f.get('label')
rppg = np.array(rppg)
BP = np.array(BP)
# 将数据从(875, 7851)reshape成(7851, 875, 1)的形状
rppg = rppg.transpose(1, 0)
rppg = rppg.reshape(-1, 875, 1)
X_BP = rppg
y_DBP = BP[1]
y_SBP = BP[0]
return X_BP, y_DBP, y_SBP
def load_data_MIMIC_h5(self):
X_BP_path = os.path.join(self.data_dir, 'MIMIC-III_ppg_dataset.h5')
#
# 获取data_dir下文件列表
files = os.listdir(self.data_dir)
# 检查是否存在已经处理好的数据
if 'X_MIMIC_BP.npy' in files and 'Y_MIMIC_DBP.npy' in files and 'Y_MIMIC_SBP.npy' in files:
print('loading preprocessed data.....')
X_BP = np.load(os.path.join(self.data_dir, 'X_MIMIC_BP.npy'))
y_DBP = np.load(os.path.join(self.data_dir, 'Y_MIMIC_DBP.npy'))
y_SBP = np.load(os.path.join(self.data_dir, 'Y_MIMIC_SBP.npy'))
return X_BP, y_DBP, y_SBP
with h5py.File(X_BP_path, 'r') as f:
ppg = f.get('ppg')
BP = f.get('label')
ppg = np.array(ppg)
BP = np.array(BP)
# 统计BP中SBP的最大值和最小值
max_sbp = np.max(BP[:, 0])
min_sbp = np.min(BP[:, 0])
max_sbp = 10 - max_sbp % 10 + max_sbp
min_sbp = min_sbp - min_sbp % 10
# 划分区间
bins = np.arange(min_sbp, max_sbp, 10)
print(bins)
sampled_ppg_data = []
sampled_bp_data = []
for i in range(len(bins) - 1):
# 获取当前区间的数据
bin_data_sbp_dbp = BP[(BP[:, 0] >= bins[i]) & (BP[:, 0] < bins[i + 1])]
bin_data_ppg = ppg[(BP[:, 0] >= bins[i]) & (BP[:, 0] < bins[i + 1])]
# 如果当前区间有数据
if len(bin_data_sbp_dbp) > 0:
# 从当前区间中随机抽取20%的数据
num_samples = int(len(bin_data_sbp_dbp) * 0.1)
indices = np.random.choice(len(bin_data_sbp_dbp), num_samples, replace=False)
sampled_bin_data_sbp_dbp = bin_data_sbp_dbp[indices]
sampled_bin_data_ppg = bin_data_ppg[indices]
# 将抽取的数据添加到最终的列表中
sampled_bp_data.append(sampled_bin_data_sbp_dbp)
sampled_ppg_data.append(sampled_bin_data_ppg)
# 将列表中的数据合并成NumPy数组
ppg = np.concatenate(sampled_ppg_data, axis=0)
BP = np.concatenate(sampled_bp_data, axis=0)
print(ppg.shape, BP.shape)
# 将数据从(9054000, 875)reshape成(9054000, 875, 1)的形状
ppg = ppg.reshape(-1, 875, 1)
X_BP = ppg
# 取出第一列赋值给y_DBP第0列赋值给y_SBP
y_DBP = BP[:, 1]
y_SBP = BP[:, 0]
# 将数据保存到文件中
np.save('data/X_MIMIC_BP.npy', X_BP)
np.save('data/Y_MIMIC_DBP.npy', y_DBP)
np.save('data/Y_MIMIC_SBP.npy', y_SBP)
return X_BP, y_DBP, y_SBP
def load_data_MIMIC_h5_full(self):
X_BP_path = os.path.join(self.data_dir, 'MIMIC-III_ppg_dataset.h5')
# 获取data_dir下文件列表
files = os.listdir(self.data_dir)
# 检查是否存在已经处理好的数据
if 'X_MIMIC_BP_full.npy' in files and 'Y_MIMIC_DBP_full.npy' in files and 'Y_MIMIC_SBP_full.npy' in files:
print('loading preprocessed data.....')
X_BP = np.load(os.path.join(self.data_dir, 'X_MIMIC_BP_full.npy'))
y_DBP = np.load(os.path.join(self.data_dir, 'Y_MIMIC_DBP_full.npy'))
y_SBP = np.load(os.path.join(self.data_dir, 'Y_MIMIC_SBP_full.npy'))
return X_BP, y_DBP, y_SBP
with h5py.File(X_BP_path, 'r') as f:
ppg = f.get('ppg')
BP = f.get('label')
ppg = np.array(ppg)
BP = np.array(BP)
# 将数据从(9054000, 875)reshape成(9054000, 875, 1)的形状
ppg = ppg.reshape(-1, 875, 1)
X_BP = ppg
# 取出第一列赋值给y_DBP第0列赋值给y_SBP
y_DBP = BP[:, 1]
y_SBP = BP[:, 0]
print("data shape:", X_BP.shape, y_DBP.shape, y_SBP.shape)
print("saving data.....")
# 将数据保存到文件中
np.save('data/X_MIMIC_BP_full.npy', X_BP)
np.save('data/Y_MIMIC_DBP_full.npy', y_DBP)
np.save('data/Y_MIMIC_SBP_full.npy', y_SBP)
print("data saved.....")
return X_BP, y_DBP, y_SBP
def create_dataset(self, X_data, y_SBP, y_DBP):
return BPDataset(X_data, y_SBP, y_DBP)
def split_data(self, X_data, y_SBP, y_DBP):
X_train, X_val, y_train_SBP, y_val_SBP, y_train_DBP, y_val_DBP = train_test_split(
X_data, y_SBP, y_DBP, test_size=self.val_split, random_state=42
)
# print(X_train.shape, X_val.shape, y_train_SBP.shape, y_val_SBP.shape, y_train_DBP.shape, y_val_DBP.shape)
train_dataset = self.create_dataset(X_train, y_train_SBP, y_train_DBP)
val_dataset = self.create_dataset(X_val, y_val_SBP, y_val_DBP)
return train_dataset, val_dataset
def create_dataloaders(self):
if self.data_type == 'UKL':
X_data, y_DBP, y_SBP = self.load_data_UKL_h5()
elif self.data_type == 'MIMIC':
X_data, y_DBP, y_SBP = self.load_data_MIMIC_h5()
elif self.data_type == 'MIMIC_full':
X_data, y_DBP, y_SBP = self.load_data_MIMIC_h5_full()
else:
X_data, y_DBP, y_SBP = self.load_data()
train_dataset, val_dataset = self.split_data(X_data, y_SBP, y_DBP)
self.train_dataloader = DataLoader(
train_dataset, batch_size=self.batch_size, shuffle=self.shuffle, collate_fn=custom_collate_fn
)
self.val_dataloader = DataLoader(
val_dataset, batch_size=self.batch_size, shuffle=False, collate_fn=custom_collate_fn
)
def get_dataloaders(self):
if self.train_dataloader is None or self.val_dataloader is None:
self.create_dataloaders()
return self.train_dataloader, self.val_dataloader
def get_distributed_dataloaders(self, world_size, rank):
if self.data_type == 'UKL':
X_data, y_DBP, y_SBP = self.load_data_UKL_h5()
elif self.data_type == 'MIMIC':
X_data, y_DBP, y_SBP = self.load_data_MIMIC_h5()
elif self.data_type == 'MIMIC_full':
X_data, y_DBP, y_SBP = self.load_data_MIMIC_h5_full()
else:
X_data, y_DBP, y_SBP = self.load_data()
train_dataset, val_dataset = self.split_data(X_data, y_SBP, y_DBP)
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=world_size, rank=rank, shuffle=True
)
val_sampler = torch.utils.data.distributed.DistributedSampler(
val_dataset, num_replicas=world_size, rank=rank, shuffle=False
)
train_dataloader = DataLoader(
train_dataset,
batch_size=self.batch_size,
sampler=train_sampler,
collate_fn=custom_collate_fn,
)
val_dataloader = DataLoader(
val_dataset,
batch_size=self.batch_size,
sampler=val_sampler,
collate_fn=custom_collate_fn,
)
return train_dataloader, val_dataloader, train_sampler, val_sampler
# 使用示例
#
# data_loader = BPDataLoader(data_dir='data', val_split=0.2, batch_size=32,data_type='MIMIC')
# train_dataloader, val_dataloader = data_loader.get_dataloaders()
#
# for i, (X, y_SBP, y_DBP) in enumerate(train_dataloader):
# print(f"Batch {i+1}: X.shape={X.shape }, y_SBP.shape={y_SBP.shape}, y_DBP.shape={y_DBP.shape}")
# if i == 2:
# break