algorithm_system_server/app.py

351 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from flask import Flask, render_template, request, jsonify, Response, session,g
from werkzeug.utils import secure_filename
from algorithm.people_detection import VideoPeopleDetection
from algorithm.fire_detection import FireDetection
from algorithm.smog_detection import SmogDetection
from algorithm.helmet_detection import HelmetDetection
from algorithm.mask_detection import MaskDetection
from algorithm.electromobile_detection import ElectromobileDetection
from algorithm.glove_detection import GloveDetection
from algorithm.reflective_detection import ReflectiveDetection
from algorithm.phone_detection import PhoneDetection
from algorithm.pose_detection import PoseDetection
from algorithm.image_segmentation import ImageSegmentation
from algorithm.drowsy_detection import DrowsyDetection
from algorithm.lane_detection import LaneDetection
from algorithm.trafiic_lights import TrafficLightsDetection
from algorithm.easyocr import OCR
from algorithm.detect_emotion.emotion_detection import Emotion_Detection
from algorithm.face_recognition.face_recognition import Face_Recognizer
from algorithm.Car_recognition.car_detection import CarDetection
from algorithm.pcb_detection import PCBDetection
from algorithm.Remote_sense.remote_sense import Remote_Sense
from algorithm.safe_detection import SafeDetection
from algorithm.traffic_logo_detection import TrafficLogoDetection
from algorithm.yolo_segment import YOLO_Segment
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
import multiprocessing
from MyThreadFunc import MyThreadFunc
import json
import pymysql
import threading
import torch
from flask_cors import CORS
app = Flask(__name__)
app.secret_key = 'super_secret_key' # Needed to use sessions
CORS(app, resources={r"/*": {"origins": "http://127.0.0.1:5173"}})
# Configure the upload folder and allowed extensions
UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'mp4'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
global cameraLocation
cameraLocation = 'rtsp://192.168.30.18:8557/h264'
#常用模型率先开启
model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/best_person_29.05.2023-yolov5s-1.pt', force_reload=True)
fire_model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/fire2.pt', force_reload=True)
face_model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/face.pt', force_reload=True)
somg_model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/smog.pt', force_reload=True)
drowsy_model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/traffic/drowsy.pt', force_reload=True)
# traffic_lights_model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/traffic/traffic.pt', force_reload=True)
global algTextNew
algTextNew = '对生产环境的行人进行检测'
global algorithm_name
global algNameNew
algNameNew = '行人检测'
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def readFileTojson(input):
with open(input, "r", encoding="utf-8") as f:
parse_data = json.load(f)
return parse_data
@app.route("/")
def home():
return render_template("index.html")
# 获取数据库摄像头信息,传给前端
@app.route('/getSqlCamera', methods=['GET','POST'])
def getSqlCamera():
conn = pymysql.connect(host='10.51.10.122', user='root', password='fate_dev', port=3306, database="camera_rtsp", charset='utf8')
cursor = conn.cursor()
cursor.execute("select * from camera")
res = cursor.fetchall()
cursor.close()
conn.close()
#print(res)
return jsonify(res)
# 获取选择的算法
@app.route('/api/algname', methods=['POST','GET'])
def get_alg_name():
# 从 JSON 数据中提取 algName
alg_name = request.json.get('algName')
alg_descrip = request.json.get('algText')
global algNameNew
algNameNew = alg_name
global algTextNew
algTextNew = alg_descrip
# 检查是否成功提取 algName
if alg_name is None:
return jsonify({"error": "algName not provided in JSON"}), 400
if alg_descrip is None:
return jsonify({"error": "alg_descrip not provided in JSON"}), 400
# 在这里处理 algName例如保存到数据库、执行算法等
# ...
# 返回一个成功的响应
return jsonify(alg_name,alg_descrip ), 200
@app.route('/getSelectAlg', methods=['GET','POST'])
def getSelectAlg():
res = [algNameNew, algTextNew]
#print(res)
return jsonify(res)
# 获取前端用户选择的摄像头信息
@app.route('/getFrontCamera', methods=['GET','POST'])
def getFrontCamera():
cameraId= request.form['camID']
# print(cameraId)
conn = pymysql.connect(host='10.51.10.122', user='root', password='fate_dev', port=3306, database="camera_rtsp", charset='utf8')
cursor = conn.cursor()
cursor.execute("select * from camera where cameraID =" +cameraId)
chsCamera = cursor.fetchall()
# 摄像头IP
global cameraLocation
cameraLocation = chsCamera[0][1]
print(cameraLocation)
cursor.close()
conn.close()
#print(chsCamera[0][1])
return jsonify(message='successfully'), 200
# 获取数据库算法信息,传给前端
@app.route('/getSqlAlg', methods=['GET','POST'])
def getSqlAlg():
conn = pymysql.connect(host='10.51.10.122', user='root', password='fate_dev', port=3306, database="camera_rtsp", charset='utf8')
cursor = conn.cursor()
cursor.execute("select * from algorithm")
res = cursor.fetchall()
cursor.close()
conn.close()
#print(res)
return jsonify(res)
# 获取前端选择的算法信息
@app.route('/getSelectAlgorithm', methods=['GET','POST'])
def getSelectAlgorithm():
global algorithm_name
algorithm_name = request.form['selectAlgorithm']
print(algorithm_name+"-----")
return jsonify(message='successfully'), 200
last_uploaded_filename = None
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify(message='No file part'), 400
file = request.files['file']
if file.filename == '':
return jsonify(message='No selected file'), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
session['uploaded_file_path'] = file_path # Save the file path to the session
return jsonify(message=f'File {filename} uploaded successfully'), 200
else:
return jsonify(message='File type not allowed'), 400
def gen(camera):
while True:
frame, resText = camera.get_frame()
# 将 JSON 字符串发送给前端
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame
+ b'\r\n\r\n' + b'--resText\r\n'
b'Content-Type: application/json\r\n\r\n'
+ json.dumps({'resText': resText}).encode('utf-8')
+ b'\r\n\r\n')
@app.route("/use_webcam")
def use_webcam():
global cameraLocation
source = cameraLocation
print("使用RTSP", source)
if algNameNew == '行人检测':
camera = VideoPeopleDetection(model=model)
elif algNameNew == '火焰识别':
camera = FireDetection(model=fire_model)
elif algNameNew == '人脸识别':
camera = Face_Recognizer(model=face_model)
elif algNameNew == '表情识别':
camera = Emotion_Detection(model=face_model)
elif algNameNew == 'OCR':
camera = OCR()
elif algNameNew == '车辆检测':
camera = CarDetection()
elif algNameNew == '实例分割':
camera = YOLO_Segment()
elif algNameNew == '疲劳检测':
camera = DrowsyDetection(model=drowsy_model)
elif algNameNew == '车道线检测':
camera = LaneDetection()
elif algNameNew == '红绿灯检测':
camera = TrafficLightsDetection()
elif algNameNew == 'PCB缺陷检测':
camera = PCBDetection()
elif algNameNew == '遥感目标检测':
camera = Remote_Sense()
elif algNameNew == '电动车头盔识别':
camera = HelmetDetection()
elif algNameNew == '戴口罩识别':
camera = MaskDetection()
elif algNameNew == '电动车检测':
camera = ElectromobileDetection()
#
elif algNameNew == '反光衣识别':
camera = ReflectiveDetection()
elif algNameNew == '玩手机识别':
camera = PhoneDetection()
elif algNameNew == '姿态检测':
camera = PoseDetection()
elif algNameNew == '安全检测':
camera = SafeDetection()
elif algNameNew == '交通标志检测':
camera = TrafficLogoDetection()
camera.use_webcam(source)
return Response(gen(camera),
mimetype="multipart/x-mixed-replace; boundary=frame")
# @app.route("/getDetectResult")
# def getDetectResult():
# camera = VideoPeopleDetection()
# numPeople, accuracy=camera.getDetectResult()
# return jsonify(numPeople,accuracy)
@app.route("/video_feed")
def video_feed():
# Check if a file has been uploaded and saved in the session
if 'uploaded_file_path' in session:
# Initialize VideoPeopleDetection with the uploaded file path
if algNameNew == '行人检测':
camera = VideoPeopleDetection(video_path = session['uploaded_file_path'], model=model)
elif algNameNew == '火焰识别':
camera = FireDetection(video_path = session['uploaded_file_path'], model=fire_model)
elif algNameNew == '烟雾识别':
camera = SmogDetection(video_path = session['uploaded_file_path'], model=fire_model)
elif algNameNew == '人脸识别':
camera = Face_Recognizer(video_path = session['uploaded_file_path'], model=face_model)
elif algNameNew == '表情识别':
camera = Emotion_Detection(video_path = session['uploaded_file_path'], model=face_model)
elif algNameNew == 'OCR':
camera = OCR(video_path = session['uploaded_file_path'])
elif algNameNew == '车辆检测':
camera = CarDetection(video_path = session['uploaded_file_path'])
elif algNameNew == '实例分割':
camera = YOLO_Segment(video_path = session['uploaded_file_path'])
elif algNameNew == '疲劳检测':
camera = DrowsyDetection(video_path = session['uploaded_file_path'], model=drowsy_model)
elif algNameNew == '车道线检测':
camera = LaneDetection(video_path=session['uploaded_file_path'])
elif algNameNew == '红绿灯检测':
camera = TrafficLightsDetection(video_path=session['uploaded_file_path'])
elif algNameNew == 'PCB缺陷检测':
camera = PCBDetection(video_path=session['uploaded_file_path'])
elif algNameNew == '遥感目标检测':
camera = Remote_Sense(video_path=session['uploaded_file_path'])
elif algNameNew == '电动车头盔识别':
camera = HelmetDetection(video_path=session['uploaded_file_path'])
elif algNameNew == '戴口罩识别':
camera = MaskDetection(video_path=session['uploaded_file_path'])
elif algNameNew == '电动车检测':
camera = ElectromobileDetection(video_path=session['uploaded_file_path'])
elif algNameNew == '手套检测':
camera = GloveDetection(video_path=session['uploaded_file_path'])
elif algNameNew == '反光衣识别':
camera = ReflectiveDetection(video_path=session['uploaded_file_path'])
elif algNameNew == '玩手机识别':
camera = PhoneDetection(video_path=session['uploaded_file_path'])
elif algNameNew == '姿态检测':
camera = PoseDetection(video_path=session['uploaded_file_path'])
elif algNameNew == '肝脏图像分割':
camera = ImageSegmentation(video_path = session['uploaded_file_path'])
elif algNameNew == '安全检测':
camera = SafeDetection(video_path = session['uploaded_file_path'])
elif algNameNew == '交通标志检测':
camera = TrafficLogoDetection(video_path = session['uploaded_file_path'])
return Response(gen(camera),
mimetype="multipart/x-mixed-replace; boundary=frame")
else:
# If no file has been uploaded, return a default message or empty feed
return "No video uploaded yet", 200
if __name__ == "__main__":
app.run(host='10.51.10.122',debug=True, port=5001)