from flask import Flask, render_template, request, jsonify, Response, session,g from werkzeug.utils import secure_filename from algorithm.people_detection import VideoPeopleDetection from algorithm.fire_detection import FireDetection from algorithm.smog_detection import SmogDetection from algorithm.helmet_detection import HelmetDetection from algorithm.mask_detection import MaskDetection from algorithm.electromobile_detection import ElectromobileDetection from algorithm.glove_detection import GloveDetection from algorithm.reflective_detection import ReflectiveDetection from algorithm.phone_detection import PhoneDetection from algorithm.pose_detection import PoseDetection from algorithm.image_segmentation import ImageSegmentation from algorithm.drowsy_detection import DrowsyDetection from algorithm.lane_detection import LaneDetection from algorithm.easyocr import OCR from algorithm.detect_emotion.emotion_detection import Emotion_Detection from algorithm.face_recognition.face_recognition import Face_Recognizer from algorithm.Car_recognition.car_detection import CarDetection from algorithm.pcb_detection import PCBDetection from algorithm.Remote_sense.remote_sense import Remote_Sense from algorithm.yolo_segment import YOLO_Segment import os os.environ['CUDA_VISIBLE_DEVICES'] = '1' import multiprocessing from MyThreadFunc import MyThreadFunc import json import pymysql import threading import torch from flask_cors import CORS app = Flask(__name__) app.secret_key = 'super_secret_key' # Needed to use sessions CORS(app, resources={r"/*": {"origins": "http://127.0.0.1:5173"}}) # Configure the upload folder and allowed extensions UPLOAD_FOLDER = 'uploads' ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'mp4'} app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER global cameraLocation cameraLocation = 'rtsp://192.168.30.18:8557/h264' #常用模型率先开启 model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/best_person_29.05.2023-yolov5s-1.pt', force_reload=True) fire_model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/fire2.pt', force_reload=True) face_model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/face.pt', force_reload=True) somg_model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/smog.pt', force_reload=True) drowsy_model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/traffic/drowsy.pt', force_reload=True) global algorithm_name global algNameNew algNameNew = '行人检测' if not os.path.exists(UPLOAD_FOLDER): os.makedirs(UPLOAD_FOLDER) def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def readFileTojson(input): with open(input, "r", encoding="utf-8") as f: parse_data = json.load(f) return parse_data @app.route("/") def home(): return render_template("index.html") # 获取数据库摄像头信息,传给前端 @app.route('/getSqlCamera', methods=['GET','POST']) def getSqlCamera(): conn = pymysql.connect(host='10.51.10.122', user='root', password='fate_dev', port=3306, database="camera_rtsp", charset='utf8') cursor = conn.cursor() cursor.execute("select * from camera") res = cursor.fetchall() cursor.close() conn.close() #print(res) return jsonify(res) # 获取选择的算法 @app.route('/api/algname', methods=['POST','GET']) def get_alg_name(): # 从 JSON 数据中提取 algName alg_name = request.json.get('algName') global algNameNew algNameNew = alg_name # 检查是否成功提取 algName if alg_name is None: return jsonify({"error": "algName not provided in JSON"}), 400 # 在这里处理 algName,例如保存到数据库、执行算法等 # ... # 返回一个成功的响应 return jsonify(alg_name), 200 @app.route('/getSelectAlg', methods=['GET','POST']) def getSelectAlg(): res = algNameNew #print(res) return jsonify(res) # 获取前端用户选择的摄像头信息 @app.route('/getFrontCamera', methods=['GET','POST']) def getFrontCamera(): cameraId= request.form['camID'] # print(cameraId) conn = pymysql.connect(host='10.51.10.122', user='root', password='fate_dev', port=3306, database="camera_rtsp", charset='utf8') cursor = conn.cursor() cursor.execute("select * from camera where cameraID =" +cameraId) chsCamera = cursor.fetchall() # 摄像头IP global cameraLocation cameraLocation = chsCamera[0][1] print(cameraLocation) cursor.close() conn.close() #print(chsCamera[0][1]) return jsonify(message='successfully'), 200 # 获取数据库算法信息,传给前端 @app.route('/getSqlAlg', methods=['GET','POST']) def getSqlAlg(): conn = pymysql.connect(host='10.51.10.122', user='root', password='fate_dev', port=3306, database="camera_rtsp", charset='utf8') cursor = conn.cursor() cursor.execute("select * from algorithm") res = cursor.fetchall() cursor.close() conn.close() #print(res) return jsonify(res) # 获取前端选择的算法信息 @app.route('/getSelectAlgorithm', methods=['GET','POST']) def getSelectAlgorithm(): global algorithm_name algorithm_name = request.form['selectAlgorithm'] print(algorithm_name+"-----") return jsonify(message='successfully'), 200 last_uploaded_filename = None @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify(message='No file part'), 400 file = request.files['file'] if file.filename == '': return jsonify(message='No selected file'), 400 if file and allowed_file(file.filename): filename = secure_filename(file.filename) file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(file_path) session['uploaded_file_path'] = file_path # Save the file path to the session return jsonify(message=f'File {filename} uploaded successfully'), 200 else: return jsonify(message='File type not allowed'), 400 def gen(camera): while True: frame, resText = camera.get_frame() # 将 JSON 字符串发送给前端 yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n' + b'--resText\r\n' b'Content-Type: application/json\r\n\r\n' + json.dumps({'resText': resText}).encode('utf-8') + b'\r\n\r\n') @app.route("/use_webcam") def use_webcam(): global cameraLocation source = cameraLocation print("使用RTSP", source) if algorithm_name == '行人检测': camera = VideoPeopleDetection(model=model) elif algorithm_name == '火焰检测': camera = FireDetection(model=fire_model) elif algorithm_name == '人脸识别': camera = Face_Recognizer(model=face_model) elif algorithm_name == '表情识别': camera = Emotion_Detection(model=face_model) elif algorithm_name == 'OCR': camera = OCR() elif algorithm_name == '车辆检测': camera = CarDetection() elif algorithm_name == '实例分割': camera = YOLO_Segment() elif algorithm_name == '疲劳检测': camera = DrowsyDetection(model=drowsy_model) elif algorithm_name == '车道线检测': camera = LaneDetection() elif algorithm_name == 'PCB缺陷检测': camera = PCBDetection() elif algorithm_name == '遥感目标检测': camera = Remote_Sense() elif algorithm_name == '未佩戴头盔检测': camera = HelmetDetection() elif algorithm_name == '口罩检测': camera = MaskDetection() elif algorithm_name == '电动车检测': camera = ElectromobileDetection() elif algorithm_name == '反光衣检测': camera = ReflectiveDetection() elif algorithm_name == '使用手机检测': camera = PhoneDetection() elif algorithm_name == '姿态检测': camera = PoseDetection() camera.use_webcam(source) return Response(gen(camera), mimetype="multipart/x-mixed-replace; boundary=frame") # @app.route("/getDetectResult") # def getDetectResult(): # camera = VideoPeopleDetection() # numPeople, accuracy=camera.getDetectResult() # return jsonify(numPeople,accuracy) @app.route("/video_feed") def video_feed(): # Check if a file has been uploaded and saved in the session if 'uploaded_file_path' in session: # Initialize VideoPeopleDetection with the uploaded file path if algorithm_name == '行人检测': camera = VideoPeopleDetection(video_path = session['uploaded_file_path'], model=model) elif algorithm_name == '火焰检测': camera = FireDetection(video_path = session['uploaded_file_path'], model=fire_model) elif algorithm_name == '烟雾检测': camera = SmogDetection(video_path = session['uploaded_file_path'], model=fire_model) elif algorithm_name == '人脸识别': camera = Face_Recognizer(video_path = session['uploaded_file_path'], model=face_model) elif algorithm_name == '表情识别': camera = Emotion_Detection(video_path = session['uploaded_file_path'], model=face_model) elif algorithm_name == 'OCR': camera = OCR(video_path = session['uploaded_file_path']) elif algorithm_name == '车辆检测': camera = CarDetection(video_path = session['uploaded_file_path']) elif algorithm_name == '实例分割': camera = YOLO_Segment(video_path = session['uploaded_file_path']) elif algorithm_name == '疲劳检测': camera = DrowsyDetection(video_path = session['uploaded_file_path'], model=drowsy_model) elif algorithm_name == '车道线检测': camera = LaneDetection(video_path=session['uploaded_file_path']) elif algorithm_name == 'PCB缺陷检测': camera = PCBDetection(video_path=session['uploaded_file_path']) elif algorithm_name == '遥感目标检测': camera = Remote_Sense(video_path=session['uploaded_file_path']) elif algorithm_name == '未佩戴头盔检测': camera = HelmetDetection(video_path=session['uploaded_file_path']) elif algorithm_name == '口罩检测': camera = MaskDetection(video_path=session['uploaded_file_path']) elif algorithm_name == '电动车检测': camera = ElectromobileDetection(video_path=session['uploaded_file_path']) elif algorithm_name == '手套检测': camera = GloveDetection(video_path=session['uploaded_file_path']) elif algorithm_name == '反光衣检测': camera = ReflectiveDetection(video_path=session['uploaded_file_path']) elif algorithm_name == '使用手机检测': camera = PhoneDetection(video_path=session['uploaded_file_path']) elif algorithm_name == '姿态检测': camera = PoseDetection(video_path=session['uploaded_file_path']) elif algorithm_name == '肝脏图像分割': camera = ImageSegmentation(video_path = session['uploaded_file_path']) return Response(gen(camera), mimetype="multipart/x-mixed-replace; boundary=frame") else: # If no file has been uploaded, return a default message or empty feed return "No video uploaded yet", 200 if __name__ == "__main__": app.run(host='10.51.10.122',debug=True, port=5001)