import argparse import os import platform import sys from pathlib import Path import cv2 import numpy as np import time import torchvision import torch from read_data import LoadImages, LoadStreams # from models.common import DetectMultiBackend from PIL import Image, ImageDraw, ImageFont class LaneDetection(): counter_frame = 0 processed_fps = 0 def __init__(self, video_path=None): self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/traffic/lane.pt', force_reload=True) # self.model = torch.load('weight/traffic/lane.pt', map_location=self.device)['model'].float().fuse() self.classes = self.model.names self.frame = [None] self.imgsz = (640, 640) if video_path is not None: self.video_name = video_path else: self.video_name = 'vid2.mp4' # A default video file self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.dataset = LoadImages(self.video_name, img_size=self.imgsz) def use_webcam(self, source): # self.dataset.release() # Release any existing video capture # self.cap = cv2.VideoCapture(0) # Open default webcam # print('use_webcam') source = source self.dataset = LoadStreams(source, img_size=self.imgsz) self.flag = 1 # return model def class_to_label(self, x): return self.classes[int(x)] def get_frame(self): red_thres = 120, green_thres = 160, blue_thres = 120, scale = 0.6 for im0s in self.dataset: # print(self.dataset.mode) # print(self.dataset) if self.dataset.mode == 'stream': image = im0s[0].copy() else: image = im0s.copy() img = image[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to 3x416x416 img0 = img.copy() img = torch.tensor(img0) img = img.float() # uint8 to fp16/32 img /= 255.0 # 0 - 255 to 0.0 - 1.0 if img.ndimension() == 3: img = img.unsqueeze(0) img = img.to(self.device) pred = self.model(img) pred = non_max_suppression(pred, 0.25, 0.45, None, False, max_det=1000) # print(pred) for i, det in enumerate(pred): # per image im0 = im0s.copy() annotator = Annotator(im0, line_width=3, example=str(self.classes)) if len(det): # Rescale boxes from img_size to im0 size det[:, :4] = scale_boxes(img.shape[2:], det[:, :4], im0.shape).round() im0 = annotator.result() color_im0 = color_select(im0, red_thres, green_thres, blue_thres) edg_im0 = canny_edg_(color_im0) im0 = Hough_transform(edg_im0, im0, scale) ret, jpeg = cv2.imencode(".jpg", im0) accuracy = 0 num_people = 0 return jpeg.tobytes(), '' class Annotator: # YOLOv5 Annotator for train/val mosaics and jpgs and detect/hub inference annotations def __init__(self, im, line_width=None, font_size=None, font='Arial.ttf', pil=False, example='abc'): assert im.data.contiguous, 'Image not contiguous. Apply np.ascontiguousarray(im) to Annotator() input images.' non_ascii = not is_ascii(example) # non-latin labels, i.e. asian, arabic, cyrillic self.pil = pil or non_ascii if self.pil: # use PIL self.im = im if isinstance(im, Image.Image) else Image.fromarray(im) self.draw = ImageDraw.Draw(self.im) self.font = 'Arial.Unicode.ttf' else: # use cv2 self.im = im self.lw = line_width or max(round(sum(im.shape) / 2 * 0.003), 2) # line width def masks(self, masks, colors, im_gpu, alpha=0.5): """Plot masks at once. Args: masks (tensor): predicted masks on cuda, shape: [n, h, w] colors (List[List[Int]]): colors for predicted masks, [[r, g, b] * n] im_gpu (tensor): img is in cuda, shape: [3, h, w], range: [0, 1] alpha (float): mask transparency: 0.0 fully transparent, 1.0 opaque """ if self.pil: # convert to numpy first self.im = np.asarray(self.im).copy() if im_gpu is None: # Add multiple masks of shape(h,w,n) with colors list([r,g,b], [r,g,b], ...) if len(masks) == 0: return if isinstance(masks, torch.Tensor): masks = torch.as_tensor(masks, dtype=torch.uint8) masks = masks.permute(1, 2, 0).contiguous() masks = masks.cpu().numpy() # masks = np.ascontiguousarray(masks.transpose(1, 2, 0)) masks = scale_image(masks.shape[:2], masks, self.im.shape) masks = np.asarray(masks, dtype=np.float32) colors = np.asarray(colors, dtype=np.float32) # shape(n,3) s = masks.sum(2, keepdims=True).clip(0, 1) # add all masks together masks = (masks @ colors).clip(0, 255) # (h,w,n) @ (n,3) = (h,w,3) self.im[:] = masks * alpha + self.im * (1 - s * alpha) else: if len(masks) == 0: self.im[:] = im_gpu.permute(1, 2, 0).contiguous().cpu().numpy() * 255 colors = torch.tensor(colors, device=im_gpu.device, dtype=torch.float32) / 255.0 colors = colors[:, None, None] # shape(n,1,1,3) masks = masks.unsqueeze(3) # shape(n,h,w,1) masks_color = masks * (colors * alpha) # shape(n,h,w,3) inv_alph_masks = (1 - masks * alpha).cumprod(0) # shape(n,h,w,1) mcs = (masks_color * inv_alph_masks).sum(0) * 2 # mask color summand shape(n,h,w,3) im_gpu = im_gpu.flip(dims=[0]) # flip channel im_gpu = im_gpu.permute(1, 2, 0).contiguous() # shape(h,w,3) im_gpu = im_gpu * inv_alph_masks[-1] + mcs im_mask = (im_gpu * 255).byte().cpu().numpy() # print(type(im_gpu), type(im_mask), type(self.im.shape)) self.im[:] = scale_image(im_gpu.shape, im_mask, self.im.shape) if self.pil: # convert im back to PIL and update draw self.fromarray(self.im) def rectangle(self, xy, fill=None, outline=None, width=1): # Add rectangle to image (PIL-only) self.draw.rectangle(xy, fill, outline, width) def text(self, xy, text, txt_color=(255, 255, 255), anchor='top'): # Add text to image (PIL-only) if anchor == 'bottom': # start y from font bottom w, h = self.font.getsize(text) # text width, height xy[1] += 1 - h self.draw.text(xy, text, fill=txt_color, font=self.font) def fromarray(self, im): # Update self.im from a numpy array self.im = im if isinstance(im, Image.Image) else Image.fromarray(im) self.draw = ImageDraw.Draw(self.im) def result(self): # Return annotated image as array return np.asarray(self.im) def canny_edg_(img): gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # 转为灰度图像 kernel_size = 5 blur_gray = cv2.GaussianBlur(gray, (kernel_size, kernel_size), 0) # 高斯滤波 low_thres = 160 high_thres = 240 edg_img = cv2.Canny(blur_gray, low_thres, high_thres) return edg_img def color_select(img, red_thres=120, green_thres=160, blue_thres=120): # h, w = img.shape[:2] color_select = np.copy(img) bgr_thre = [blue_thres, green_thres, red_thres] thresholds = (img[:, :, 0] < bgr_thre[0]) | (img[:, :, 1] < bgr_thre[1]) | (img[:, :, 2] < bgr_thre[2]) color_select[thresholds] = [0, 0, 0] # 小于阈值的像素设置为0 return color_select def Hough_transform(edg_img, img, mask_scale=0.6): # img是原始图像 mask_img = get_mask(edg_img, mask_scale) # 掩膜图像 # -----------------霍夫曼变换----------------------- # 定义Hough 变换的参数 rho = 1 theta = np.pi/180 threshold = 2 min_line_length = 4 # 组成一条线的最小像素 max_line_length = 5 # 可连接线段之间的最大像素距离 lines = cv2.HoughLinesP(mask_img, rho, theta, threshold, np.array([]), min_line_length, max_line_length) left_line = [] right_line = [] for line in lines: for x1, y1, x2, y2 in line: if x1 == x2: pass else: # 求直线方程斜率判断左右车道 m = (y2 - y1) / (x2 - x1) c = y1 - m * x1 if m < 0: # 左车道 left_line.append((m, c)) elif m >= 0: # 右车道 right_line.append((m, c)) cv2.line(img, (x1, y1), (x2, y2), (255, 0, 0), 5) return img def scale_image(im1_shape, masks, im0_shape, ratio_pad=None): """ img1_shape: model input shape, [h, w] img0_shape: origin pic shape, [h, w, 3] masks: [h, w, num] """ # Rescale coordinates (xyxy) from im1_shape to im0_shape if ratio_pad is None: # calculate from im0_shape gain = min(im1_shape[0] / im0_shape[0], im1_shape[1] / im0_shape[1]) # gain = old / new pad = (im1_shape[1] - im0_shape[1] * gain) / 2, (im1_shape[0] - im0_shape[0] * gain) / 2 # wh padding else: pad = ratio_pad[1] top, left = int(pad[1]), int(pad[0]) # y, x bottom, right = int(im1_shape[0] - pad[1]), int(im1_shape[1] - pad[0]) if len(masks.shape) < 2: raise ValueError(f'"len of masks shape" should be 2 or 3, but got {len(masks.shape)}') masks = masks[top:bottom, left:right] # masks = masks.permute(2, 0, 1).contiguous() # masks = F.interpolate(masks[None], im0_shape[:2], mode='bilinear', align_corners=False)[0] # masks = masks.permute(1, 2, 0).contiguous() masks = cv2.resize(masks, (im0_shape[1], im0_shape[0])) if len(masks.shape) == 2: masks = masks[:, :, None] return masks def xywh2xyxy(x): # Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y return y def non_max_suppression( prediction, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False, multi_label=False, labels=(), max_det=300, nm=0, # number of masks ): """Non-Maximum Suppression (NMS) on inference results to reject overlapping detections Returns: list of detections, on (n,6) tensor per image [xyxy, conf, cls] """ if isinstance(prediction, (list, tuple)): # YOLOv5 model in validation model, output = (inference_out, loss_out) prediction = prediction[0] # select only inference output device = prediction.device mps = 'mps' in device.type # Apple MPS if mps: # MPS not fully supported yet, convert tensors to CPU labelme_dataset NMS prediction = prediction.cpu() bs = prediction.shape[0] # batch size nc = prediction.shape[2] - nm - 5 # number of classes xc = prediction[..., 4] > conf_thres # candidates # Checks assert 0 <= conf_thres <= 1, f'Invalid Confidence threshold {conf_thres}, valid values are between 0.0 and 1.0' assert 0 <= iou_thres <= 1, f'Invalid IoU {iou_thres}, valid values are between 0.0 and 1.0' # Settings # min_wh = 2 # (pixels) minimum box width and height max_wh = 7680 # (pixels) maximum box width and height max_nms = 30000 # maximum number of boxes into torchvision.ops.nms() time_limit = 0.5 + 0.05 * bs # seconds to quit after redundant = True # require redundant detections multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img) merge = False # use merge-NMS t = time.time() mi = 5 + nc # mask start index output = [torch.zeros((0, 6 + nm), device=prediction.device)] * bs for xi, x in enumerate(prediction): # image index, image inference # Apply constraints # x[((x[..., 2:4] < min_wh) | (x[..., 2:4] > max_wh)).any(1), 4] = 0 # width-height x = x[xc[xi]] # confidence # Cat apriori labels if autolabelling if labels and len(labels[xi]): lb = labels[xi] v = torch.zeros((len(lb), nc + nm + 5), device=x.device) v[:, :4] = lb[:, 1:5] # box v[:, 4] = 1.0 # conf v[range(len(lb)), lb[:, 0].long() + 5] = 1.0 # cls x = torch.cat((x, v), 0) # If none remain process next image if not x.shape[0]: continue # Compute conf x[:, 5:] *= x[:, 4:5] # conf = obj_conf * cls_conf # Box/Mask box = xywh2xyxy(x[:, :4]) # center_x, center_y, width, height) to (x1, y1, x2, y2) mask = x[:, mi:] # zero columns if no masks # Detections matrix nx6 (xyxy, conf, cls) if multi_label: i, j = (x[:, 5:mi] > conf_thres).nonzero(as_tuple=False).T x = torch.cat((box[i], x[i, 5 + j, None], j[:, None].float(), mask[i]), 1) else: # best class only conf, j = x[:, 5:mi].max(1, keepdim=True) x = torch.cat((box, conf, j.float(), mask), 1)[conf.view(-1) > conf_thres] # Filter by class if classes is not None: x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)] # Apply finite constraint # if not torch.isfinite(x).all(): # x = x[torch.isfinite(x).all(1)] # Check shape n = x.shape[0] # number of boxes if not n: # no boxes continue elif n > max_nms: # excess boxes x = x[x[:, 4].argsort(descending=True)[:max_nms]] # sort by confidence else: x = x[x[:, 4].argsort(descending=True)] # sort by confidence # Batched NMS c = x[:, 5:6] * (0 if agnostic else max_wh) # classes boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores i = torchvision.ops.nms(boxes, scores, iou_thres) # NMS if i.shape[0] > max_det: # limit detections i = i[:max_det] if merge and (1 < n < 3E3): # Merge NMS (boxes merged using weighted mean) # update boxes as boxes(i,4) = weights(i,n) * boxes(n,4) iou = box_iou(boxes[i], boxes) > iou_thres # iou matrix weights = iou * scores[None] # box weights x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(1, keepdim=True) # merged boxes if redundant: i = i[iou.sum(1) > 1] # require redundancy output[xi] = x[i] if mps: output[xi] = output[xi].to(device) return output def box_iou(box1, box2, eps=1e-7): # https://github.com/pytorch/vision/blob/master/torchvision/ops/boxes.py """ Return intersection-over-union (Jaccard index) of boxes. Both sets of boxes are expected to be in (x1, y1, x2, y2) format. Arguments: box1 (Tensor[N, 4]) box2 (Tensor[M, 4]) Returns: iou (Tensor[N, M]): the NxM matrix containing the pairwise IoU values for every element in boxes1 and boxes2 """ # inter(N,M) = (rb(N,M,2) - lt(N,M,2)).clamp(0).prod(2) (a1, a2), (b1, b2) = box1.unsqueeze(1).chunk(2, 2), box2.unsqueeze(0).chunk(2, 2) inter = (torch.min(a2, b2) - torch.max(a1, b1)).clamp(0).prod(2) # IoU = inter / (area1 + area2 - inter) return inter / ((a2 - a1).prod(2) + (b2 - b1).prod(2) - inter + eps) def scale_boxes(img1_shape, boxes, img0_shape, ratio_pad=None): # Rescale boxes (xyxy) from img1_shape to img0_shape if ratio_pad is None: # calculate from img0_shape gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding else: gain = ratio_pad[0][0] pad = ratio_pad[1] boxes[..., [0, 2]] -= pad[0] # x padding boxes[..., [1, 3]] -= pad[1] # y padding boxes[..., :4] /= gain clip_boxes(boxes, img0_shape) return boxes def is_ascii(s=''): # Is string composed of all ASCII (no UTF) characters? (note str().isascii() introduced in python 3.7) s = str(s) # convert list, tuple, None, etc. to str return len(s.encode().decode('ascii', 'ignore')) == len(s) def clip_boxes(boxes, shape): # Clip boxes (xyxy) to image shape (height, width) if isinstance(boxes, torch.Tensor): # faster individually boxes[..., 0].clamp_(0, shape[1]) # x1 boxes[..., 1].clamp_(0, shape[0]) # y1 boxes[..., 2].clamp_(0, shape[1]) # x2 boxes[..., 3].clamp_(0, shape[0]) # y2 else: # np.array (faster grouped) boxes[..., [0, 2]] = boxes[..., [0, 2]].clip(0, shape[1]) # x1, x2 boxes[..., [1, 3]] = boxes[..., [1, 3]].clip(0, shape[0]) # y1, y2 def get_mask(edg_img, mask_scale=0.6): # ----------------检测区域的选择--------------------- mask = np.zeros_like(edg_img) # 全黑的图像 ignore_mask_color = 255 # get image size imgshape = edg_img.shape # 设置mask shape [1,4,2] 一般车道位置大概占据画面的1/3的位置 ret = np.array([[(1, imgshape[0]), (1, int(imgshape[0] * mask_scale)), (imgshape[1] - 1, int(imgshape[0] * mask_scale)), (imgshape[1] - 1, imgshape[0] - 1)]], dtype=np.int32) # 多边形填充,mask是需要填充的图像,ret是多边形顶点, 将需要保留的区域填充为白色矩形 cv2.fillPoly(mask, ret, ignore_mask_color) # mask下面部分变成白色 # 图像与运算,保留掩膜图像 mask_img = cv2.bitwise_and(edg_img, mask) # ------------------------------------------------ return mask_img