import os import argparse import torch import torch.nn as nn import torch.optim as optim import torch.multiprocessing as mp import torch.distributed as dist from torch.utils.tensorboard import SummaryWriter from tqdm import tqdm from dataloader import BPDataLoader from models.lstm import LSTMModel # 定义TensorBoard写入器 writer = SummaryWriter() # 定义训练参数 max_epochs = 100 batch_size = 1024 warmup_epochs = 10 lr = 0.0005 def train(gpu, args): os.environ['MASTER_ADDR'] = '127.0.0.1' os.environ['MASTER_PORT'] = '12355' rank = args.nr * args.gpus + gpu dist.init_process_group( backend="nccl", # init_method="env://", world_size=args.world_size, rank=rank, ) # 设置当前 GPU 设备 torch.cuda.set_device(gpu) # 创建模型并移动到对应 GPU model = LSTMModel().to(gpu) w = torch.load(r'weights/UKL/best_99_lstm_model_sbp90.9980_dbp51.0640.pth', map_location=torch.device(f'cuda:{gpu}')) # 加载权重 model.load_state_dict(w.state_dict()) model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu]) # 定义损失函数 criterion = nn.MSELoss().to(gpu) # 定义优化器 optimizer = optim.Adam(model.parameters(), lr=lr) # 定义学习率调度器 scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=max_epochs - warmup_epochs, eta_min=1e-6) # 准备数据加载器 data_type = 'X' # #检查模型存放路径是否存在 # if not os.path.exists(f'weights'): # os.makedirs(f'weights') # if not os.path.exists(f'weights/{data_type}'): # os.makedirs(f'weights/{data_type}') data_loader = BPDataLoader(data_dir='data', val_split=0.2, batch_size=batch_size, data_type=data_type) train_loader, val_loader ,train_sampler, val_sampler = data_loader.get_distributed_dataloaders(rank=gpu, world_size=args.world_size) best_val_loss_sbp = float('inf') best_val_loss_dbp = float('inf') for epoch in range(max_epochs): if epoch < warmup_epochs: warmup_lr = 1e-6 + (epoch + 1) * (5e-4 - 1e-6) / warmup_epochs for param_group in optimizer.param_groups: param_group['lr'] = warmup_lr train_sampler.set_epoch(epoch) train_loss = run_train(model, train_loader, optimizer, criterion, epoch, gpu) val_loss_sbp, val_loss_dbp = run_evaluate(model, val_loader, criterion, gpu) if gpu == 0: writer.add_scalar("Loss/train", train_loss, epoch) writer.add_scalar("Loss/val_sbp", val_loss_sbp, epoch) writer.add_scalar("Loss/val_dbp", val_loss_dbp, epoch) print(f"Epoch {epoch+1}/{max_epochs}, Train Loss: {train_loss:.4f}, Val Loss SBP: {val_loss_sbp:.4f}, Val Loss DBP: {val_loss_dbp:.4f}") if val_loss_sbp < best_val_loss_sbp or val_loss_dbp < best_val_loss_dbp: best_val_loss_sbp = val_loss_sbp best_val_loss_dbp = val_loss_dbp torch.save(model.module, f'weights/{data_type}/best_{epoch}_lstm_model_sbp{val_loss_sbp:.4f}_dbp{val_loss_dbp:.4f}.pth') torch.save(model.module, f'weights/{data_type}/last.pth') scheduler.step() writer.close() def reduce_tensor(tensor): rt = tensor.clone() dist.all_reduce(rt, op=dist.ReduceOp.SUM) rt /= dist.get_world_size() return rt def run_train(model, dataloader, optimizer, criterion, epoch, gpu): model.train() running_loss = 0.0 pbar = tqdm(dataloader, total=len(dataloader), disable=(gpu != 0),desc=f"GPU{gpu} Epoch {epoch+1}/{max_epochs}") for i, (inputs, sbp_labels, dbp_labels) in enumerate(pbar): inputs = inputs.cuda(gpu, non_blocking=True) sbp_labels = sbp_labels.cuda(gpu, non_blocking=True) dbp_labels = dbp_labels.cuda(gpu, non_blocking=True) optimizer.zero_grad() sbp_outputs, dbp_outputs = model(inputs) sbp_outputs = sbp_outputs.squeeze(1) dbp_outputs = dbp_outputs.squeeze(1) loss_sbp = criterion(sbp_outputs, sbp_labels) loss_dbp = criterion(dbp_outputs, dbp_labels) loss = loss_sbp + loss_dbp reduced_loss = reduce_tensor(loss) reduced_loss.backward() optimizer.step() running_loss += reduced_loss.item() pbar.set_postfix(loss=running_loss / (i + 1)) return running_loss / len(dataloader) def run_evaluate(model, dataloader, criterion, gpu): model.eval() running_loss_sbp = 0.0 running_loss_dbp = 0.0 with torch.no_grad(): for inputs, sbp_labels, dbp_labels in dataloader: inputs = inputs.cuda(gpu, non_blocking=True) sbp_labels = sbp_labels.cuda(gpu, non_blocking=True) dbp_labels = dbp_labels.cuda(gpu, non_blocking=True) sbp_outputs, dbp_outputs = model(inputs) sbp_outputs = sbp_outputs.squeeze(1) dbp_outputs = dbp_outputs.squeeze(1) loss_sbp = criterion(sbp_outputs, sbp_labels) loss_dbp = criterion(dbp_outputs, dbp_labels) reduced_loss_sbp = reduce_tensor(loss_sbp) reduced_loss_dbp = reduce_tensor(loss_dbp) running_loss_sbp += reduced_loss_sbp.item() running_loss_dbp += reduced_loss_dbp.item() eval_loss_sbp = running_loss_sbp / len(dataloader) eval_loss_dbp = running_loss_dbp / len(dataloader) return eval_loss_sbp, eval_loss_dbp def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("--nr", type=int, default=0) args = parser.parse_args() return args def main(): args = parse_args() ngpus_per_node = torch.cuda.device_count() if ngpus_per_node>4: ngpus_per_node = 4 args.world_size = ngpus_per_node args.gpus = max(ngpus_per_node, 1) mp.spawn(train, nprocs=args.gpus, args=(args,)) def check_path(data_type): if not os.path.exists(f'weights'): os.makedirs(f'weights') if not os.path.exists(f'weights/{data_type}'): os.makedirs(f'weights/{data_type}') if __name__ == "__main__": check_path('X') main()