tjy/demo/apis/bp/ddp_X.py

198 lines
6.0 KiB
Python

import os
import argparse
import torch
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
import torch.distributed as dist
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
from dataloader import BPDataLoader
from apis.lstm import LSTMModel
# 定义TensorBoard写入器
writer = SummaryWriter()
# 定义训练参数
max_epochs = 100
batch_size = 1024
warmup_epochs = 10
lr = 0.0005
def train(gpu, args):
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '12355'
rank = args.nr * args.gpus + gpu
dist.init_process_group(
backend="nccl",
# init_method="env://",
world_size=args.world_size,
rank=rank,
)
# 设置当前 GPU 设备
torch.cuda.set_device(gpu)
# 创建模型并移动到对应 GPU
model = LSTMModel().to(gpu)
w = torch.load(r'weights/UKL/best_99_lstm_model_sbp90.9980_dbp51.0640.pth',
map_location=torch.device(f'cuda:{gpu}'))
# 加载权重
model.load_state_dict(w.state_dict())
model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu])
# 定义损失函数
criterion = nn.MSELoss().to(gpu)
# 定义优化器
optimizer = optim.Adam(model.parameters(), lr=lr)
# 定义学习率调度器
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=max_epochs - warmup_epochs, eta_min=1e-6)
# 准备数据加载器
data_type = 'X'
# #检查模型存放路径是否存在
# if not os.path.exists(f'weights'):
# os.makedirs(f'weights')
# if not os.path.exists(f'weights/{data_type}'):
# os.makedirs(f'weights/{data_type}')
data_loader = BPDataLoader(data_dir='data', val_split=0.2, batch_size=batch_size, data_type=data_type)
train_loader, val_loader ,train_sampler, val_sampler = data_loader.get_distributed_dataloaders(rank=gpu, world_size=args.world_size)
best_val_loss_sbp = float('inf')
best_val_loss_dbp = float('inf')
for epoch in range(max_epochs):
if epoch < warmup_epochs:
warmup_lr = 1e-6 + (epoch + 1) * (5e-4 - 1e-6) / warmup_epochs
for param_group in optimizer.param_groups:
param_group['lr'] = warmup_lr
train_sampler.set_epoch(epoch)
train_loss = run_train(model, train_loader, optimizer, criterion, epoch, gpu)
val_loss_sbp, val_loss_dbp = run_evaluate(model, val_loader, criterion, gpu)
if gpu == 0:
writer.add_scalar("Loss/train", train_loss, epoch)
writer.add_scalar("Loss/val_sbp", val_loss_sbp, epoch)
writer.add_scalar("Loss/val_dbp", val_loss_dbp, epoch)
print(f"Epoch {epoch+1}/{max_epochs}, Train Loss: {train_loss:.4f}, Val Loss SBP: {val_loss_sbp:.4f}, Val Loss DBP: {val_loss_dbp:.4f}")
if val_loss_sbp < best_val_loss_sbp or val_loss_dbp < best_val_loss_dbp:
best_val_loss_sbp = val_loss_sbp
best_val_loss_dbp = val_loss_dbp
torch.save(model.module, f'weights/{data_type}/best_{epoch}_lstm_model_sbp{val_loss_sbp:.4f}_dbp{val_loss_dbp:.4f}.pth')
torch.save(model.module, f'weights/{data_type}/last.pth')
scheduler.step()
writer.close()
def reduce_tensor(tensor):
rt = tensor.clone()
dist.all_reduce(rt, op=dist.ReduceOp.SUM)
rt /= dist.get_world_size()
return rt
def run_train(model, dataloader, optimizer, criterion, epoch, gpu):
model.train()
running_loss = 0.0
pbar = tqdm(dataloader, total=len(dataloader), disable=(gpu != 0),desc=f"GPU{gpu} Epoch {epoch+1}/{max_epochs}")
for i, (inputs, sbp_labels, dbp_labels) in enumerate(pbar):
inputs = inputs.cuda(gpu, non_blocking=True)
sbp_labels = sbp_labels.cuda(gpu, non_blocking=True)
dbp_labels = dbp_labels.cuda(gpu, non_blocking=True)
optimizer.zero_grad()
sbp_outputs, dbp_outputs = model(inputs)
sbp_outputs = sbp_outputs.squeeze(1)
dbp_outputs = dbp_outputs.squeeze(1)
loss_sbp = criterion(sbp_outputs, sbp_labels)
loss_dbp = criterion(dbp_outputs, dbp_labels)
loss = loss_sbp + loss_dbp
reduced_loss = reduce_tensor(loss)
reduced_loss.backward()
optimizer.step()
running_loss += reduced_loss.item()
pbar.set_postfix(loss=running_loss / (i + 1))
return running_loss / len(dataloader)
def run_evaluate(model, dataloader, criterion, gpu):
model.eval()
running_loss_sbp = 0.0
running_loss_dbp = 0.0
with torch.no_grad():
for inputs, sbp_labels, dbp_labels in dataloader:
inputs = inputs.cuda(gpu, non_blocking=True)
sbp_labels = sbp_labels.cuda(gpu, non_blocking=True)
dbp_labels = dbp_labels.cuda(gpu, non_blocking=True)
sbp_outputs, dbp_outputs = model(inputs)
sbp_outputs = sbp_outputs.squeeze(1)
dbp_outputs = dbp_outputs.squeeze(1)
loss_sbp = criterion(sbp_outputs, sbp_labels)
loss_dbp = criterion(dbp_outputs, dbp_labels)
reduced_loss_sbp = reduce_tensor(loss_sbp)
reduced_loss_dbp = reduce_tensor(loss_dbp)
running_loss_sbp += reduced_loss_sbp.item()
running_loss_dbp += reduced_loss_dbp.item()
eval_loss_sbp = running_loss_sbp / len(dataloader)
eval_loss_dbp = running_loss_dbp / len(dataloader)
return eval_loss_sbp, eval_loss_dbp
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--nr", type=int, default=0)
args = parser.parse_args()
return args
def main():
args = parse_args()
ngpus_per_node = torch.cuda.device_count()
if ngpus_per_node>4:
ngpus_per_node = 4
args.world_size = ngpus_per_node
args.gpus = max(ngpus_per_node, 1)
mp.spawn(train, nprocs=args.gpus, args=(args,))
def check_path(data_type):
if not os.path.exists(f'weights'):
os.makedirs(f'weights')
if not os.path.exists(f'weights/{data_type}'):
os.makedirs(f'weights/{data_type}')
if __name__ == "__main__":
check_path('X')
main()