226 lines
9.6 KiB
Python
226 lines
9.6 KiB
Python
|
"""
|
|||
|
Training code for Adversarial patch training
|
|||
|
|
|||
|
|
|||
|
"""
|
|||
|
|
|||
|
import PIL
|
|||
|
from torch.utils.tensorboard import SummaryWriter
|
|||
|
|
|||
|
# import load_data
|
|||
|
from tqdm import tqdm
|
|||
|
|
|||
|
from load_data import * # 可能导致多次导入问题?
|
|||
|
import gc
|
|||
|
import matplotlib.pyplot as plt
|
|||
|
from torch import autograd
|
|||
|
from torchvision import transforms
|
|||
|
|
|||
|
import subprocess
|
|||
|
|
|||
|
import patch_config
|
|||
|
import sys
|
|||
|
import time
|
|||
|
|
|||
|
from yolo import YOLO
|
|||
|
|
|||
|
|
|||
|
class PatchTrainer(object):
|
|||
|
def __init__(self, mode):
|
|||
|
self.config = patch_config.patch_configs[mode]() # 获取对应的配置类
|
|||
|
|
|||
|
# self.darknet_model = Darknet(self.config.cfgfile) # 加载yolo模型
|
|||
|
# self.darknet_model.load_weights(self.config.weightfile) # 默认 YOLOv2 MS COCO weights, person编号是0
|
|||
|
self.darknet_model = YOLO().net
|
|||
|
self.darknet_model = self.darknet_model.eval().cuda() # TODO: Why eval?
|
|||
|
self.patch_applier = PatchApplier().cuda() # 对图像应用对抗补丁
|
|||
|
self.patch_transformer = PatchTransformer().cuda() # 变换补丁到指定大小并产生抖动
|
|||
|
# self.prob_extractor = MaxProbExtractor(0, 80, self.config).cuda() # 提取最大类别概率
|
|||
|
self.prob_extractor = MaxProbExtractor(0, 1, self.config).cuda() # 提取最大类别概率
|
|||
|
self.nps_calculator = NPSCalculator(self.config.printfile, self.config.patch_size).cuda() # 不可打印分数
|
|||
|
self.total_variation = TotalVariation().cuda() # 计算补丁的所有变化程度
|
|||
|
|
|||
|
self.writer = self.init_tensorboard(mode)
|
|||
|
|
|||
|
def init_tensorboard(self, name=None):
|
|||
|
subprocess.Popen(['tensorboard', '--logdir=runs'])
|
|||
|
if name is not None:
|
|||
|
time_str = time.strftime("%Y%m%d-%H%M%S")
|
|||
|
return SummaryWriter(f'runs/{time_str}_{name}')
|
|||
|
else:
|
|||
|
return SummaryWriter()
|
|||
|
|
|||
|
def train(self):
|
|||
|
"""
|
|||
|
Optimize a patch to generate an adversarial example.
|
|||
|
:return: Nothing
|
|||
|
"""
|
|||
|
|
|||
|
img_size = self.darknet_model.height # 416
|
|||
|
# print('batch_size:',batch_size)
|
|||
|
batch_size = self.config.batch_size # 8
|
|||
|
n_epochs = 200
|
|||
|
# n_epochs = 5
|
|||
|
# max_lab = 20 # label的最大长度
|
|||
|
max_lab = 8
|
|||
|
|
|||
|
time_str = time.strftime("%Y%m%d-%H%M%S")
|
|||
|
|
|||
|
# Generate stating point
|
|||
|
# adv_patch_cpu = self.generate_patch("gray") # 生成一个灰图,初始化为0.5
|
|||
|
adv_patch_cpu = self.read_image("saved_patches/patchnew0.jpg")
|
|||
|
|
|||
|
adv_patch_cpu.requires_grad_(True)
|
|||
|
|
|||
|
train_loader = torch.utils.data.DataLoader(
|
|||
|
InriaDataset(self.config.img_dir, self.config.lab_dir, max_lab, img_size,
|
|||
|
shuffle=True),
|
|||
|
batch_size=batch_size,
|
|||
|
shuffle=True,
|
|||
|
num_workers=0) # 与 from load_data import * 搭配导致多少导入?
|
|||
|
self.epoch_length = len(train_loader)
|
|||
|
print(f'One epoch is {len(train_loader)}')
|
|||
|
|
|||
|
optimizer = optim.Adam([adv_patch_cpu], lr=self.config.start_learning_rate, amsgrad=True) # 更新的是那个补丁
|
|||
|
scheduler = self.config.scheduler_factory(optimizer) # ICLR-2018年最佳论文提出的Adam改进版Amsgrad
|
|||
|
|
|||
|
et0 = time.time()
|
|||
|
for epoch in range(n_epochs):
|
|||
|
ep_det_loss = 0
|
|||
|
ep_nps_loss = 0
|
|||
|
ep_tv_loss = 0
|
|||
|
ep_loss = 0
|
|||
|
bt0 = time.time()
|
|||
|
for i_batch, (img_batch, lab_batch) in tqdm(enumerate(train_loader), desc=f'Running epoch {epoch}',
|
|||
|
total=self.epoch_length):
|
|||
|
with autograd.detect_anomaly(): # 1.运行前向时开启异常检测功能,则在反向时会打印引起反向失败的前向操作堆栈 2.反向计算出现“nan”时引发异常
|
|||
|
img_batch = img_batch.cuda() # 8, 3, 416, 416
|
|||
|
lab_batch = lab_batch.cuda() # 8, 14, 5 为什么要把人数的标签补到14?
|
|||
|
# print('TRAINING EPOCH %i, BATCH %i'%(epoch, i_batch))
|
|||
|
adv_patch = adv_patch_cpu.cuda() # 3, 300, 300
|
|||
|
adv_batch_t = self.patch_transformer(adv_patch, lab_batch, img_size, do_rotate=True, rand_loc=False)
|
|||
|
p_img_batch = self.patch_applier(img_batch, adv_batch_t)
|
|||
|
p_img_batch = F.interpolate(p_img_batch,
|
|||
|
(self.darknet_model.height, self.darknet_model.width)) # 确保和图片大小一致
|
|||
|
|
|||
|
# print('++++++++++++p_img_batch:+++++++++++++',p_img_batch.shape)
|
|||
|
img = p_img_batch[1, :, :, ]
|
|||
|
img = transforms.ToPILImage()(img.detach().cpu())
|
|||
|
# img.show()
|
|||
|
|
|||
|
outputs = self.darknet_model(p_img_batch) # 输入8,3,416,416 输出8,425, 13, 13 ,其中425是5*(5+80)
|
|||
|
max_prob = 0
|
|||
|
nps = 0
|
|||
|
tv = 0
|
|||
|
for l in range(len(outputs)): # 三组不同分辨率大小的输出特征分别计算
|
|||
|
output = outputs[l]
|
|||
|
max_prob += self.prob_extractor(output)
|
|||
|
nps += self.nps_calculator(adv_patch)
|
|||
|
tv += self.total_variation(adv_patch)
|
|||
|
|
|||
|
nps_loss = nps * 0.01
|
|||
|
tv_loss = tv * 2.5
|
|||
|
det_loss = torch.mean(max_prob) # 把人的置值度当成损失
|
|||
|
loss = det_loss + nps_loss + torch.max(tv_loss, torch.tensor(0.1).cuda())
|
|||
|
|
|||
|
ep_det_loss += det_loss.detach().cpu().numpy()
|
|||
|
ep_nps_loss += nps_loss.detach().cpu().numpy()
|
|||
|
ep_tv_loss += tv_loss.detach().cpu().numpy()
|
|||
|
ep_loss += loss
|
|||
|
|
|||
|
loss.backward()
|
|||
|
optimizer.step()
|
|||
|
optimizer.zero_grad()
|
|||
|
adv_patch_cpu.data.clamp_(0, 1) # keep patch in image range
|
|||
|
|
|||
|
bt1 = time.time()
|
|||
|
if i_batch % 5 == 0:
|
|||
|
iteration = self.epoch_length * epoch + i_batch
|
|||
|
|
|||
|
self.writer.add_scalar('total_loss', loss.detach().cpu().numpy(), iteration)
|
|||
|
self.writer.add_scalar('loss/det_loss', det_loss.detach().cpu().numpy(), iteration)
|
|||
|
self.writer.add_scalar('loss/nps_loss', nps_loss.detach().cpu().numpy(), iteration)
|
|||
|
self.writer.add_scalar('loss/tv_loss', tv_loss.detach().cpu().numpy(), iteration)
|
|||
|
self.writer.add_scalar('misc/epoch', epoch, iteration)
|
|||
|
self.writer.add_scalar('misc/learning_rate', optimizer.param_groups[0]["lr"], iteration)
|
|||
|
|
|||
|
self.writer.add_image('patch', adv_patch_cpu, iteration)
|
|||
|
if i_batch + 1 >= len(train_loader):
|
|||
|
print('\n')
|
|||
|
else:
|
|||
|
del adv_batch_t, output, max_prob, det_loss, p_img_batch, nps_loss, tv_loss, loss
|
|||
|
torch.cuda.empty_cache()
|
|||
|
bt0 = time.time()
|
|||
|
et1 = time.time()
|
|||
|
ep_det_loss = ep_det_loss / len(train_loader)
|
|||
|
ep_nps_loss = ep_nps_loss / len(train_loader)
|
|||
|
ep_tv_loss = ep_tv_loss / len(train_loader)
|
|||
|
ep_loss = ep_loss / len(train_loader)
|
|||
|
|
|||
|
# im = transforms.ToPILImage('RGB')(adv_patch_cpu)
|
|||
|
# plt.imshow(im)
|
|||
|
# plt.savefig(f'pics/{time_str}_{self.config.patch_name}_{epoch}.png')
|
|||
|
|
|||
|
scheduler.step(ep_loss)
|
|||
|
if True:
|
|||
|
print(' EPOCH NR: ', epoch),
|
|||
|
print('EPOCH LOSS: ', ep_loss)
|
|||
|
print(' DET LOSS: ', ep_det_loss)
|
|||
|
print(' NPS LOSS: ', ep_nps_loss)
|
|||
|
print(' TV LOSS: ', ep_tv_loss)
|
|||
|
print('EPOCH TIME: ', et1 - et0)
|
|||
|
# im = transforms.ToPILImage('RGB')(adv_patch_cpu)
|
|||
|
# plt.imshow(im)
|
|||
|
# plt.show()
|
|||
|
# im.save("saved_patches/patchnew1.jpg")
|
|||
|
im = transforms.ToPILImage('RGB')(adv_patch_cpu)
|
|||
|
if epoch >= 3:
|
|||
|
im.save(f"saved_patches/patchnew1_t1_{epoch}_{time_str}.jpg")
|
|||
|
del adv_batch_t, output, max_prob, det_loss, p_img_batch, nps_loss, tv_loss, loss
|
|||
|
torch.cuda.empty_cache()
|
|||
|
et0 = time.time()
|
|||
|
|
|||
|
def generate_patch(self, type):
|
|||
|
"""
|
|||
|
Generate a random patch as a starting point for optimization.
|
|||
|
|
|||
|
:param type: Can be 'gray' or 'random'. Whether or not generate a gray or a random patch.
|
|||
|
:return:
|
|||
|
"""
|
|||
|
if type == 'gray':
|
|||
|
adv_patch_cpu = torch.full((3, self.config.patch_size, self.config.patch_size), 0.5)
|
|||
|
elif type == 'random':
|
|||
|
adv_patch_cpu = torch.rand((3, self.config.patch_size, self.config.patch_size))
|
|||
|
|
|||
|
return adv_patch_cpu
|
|||
|
|
|||
|
def read_image(self, path):
|
|||
|
"""
|
|||
|
Read an input image to be used as a patch
|
|||
|
|
|||
|
:param path: Path to the image to be read.
|
|||
|
:return: Returns the transformed patch as a pytorch Tensor.
|
|||
|
"""
|
|||
|
patch_img = Image.open(path).convert('RGB')
|
|||
|
tf = transforms.Resize((self.config.patch_size, self.config.patch_size))
|
|||
|
patch_img = tf(patch_img)
|
|||
|
tf = transforms.ToTensor()
|
|||
|
|
|||
|
adv_patch_cpu = tf(patch_img)
|
|||
|
return adv_patch_cpu
|
|||
|
|
|||
|
|
|||
|
def main():
|
|||
|
if len(sys.argv) != 2:
|
|||
|
print('You need to supply (only) a configuration mode.')
|
|||
|
print('Possible modes are:')
|
|||
|
print(patch_config.patch_configs) # 一般传入paper_obj
|
|||
|
|
|||
|
# print('sys.argv:',sys.argv)
|
|||
|
trainer = PatchTrainer(sys.argv[1])
|
|||
|
trainer.train()
|
|||
|
|
|||
|
|
|||
|
if __name__ == '__main__':
|
|||
|
main()
|