algorithm_system_server/app copy.py

327 lines
12 KiB
Python
Raw Permalink Normal View History

2024-06-21 10:06:54 +08:00
from flask import Flask, render_template, request, jsonify, Response, session,g
from werkzeug.utils import secure_filename
from algorithm.people_detection import VideoPeopleDetection
from algorithm.fire_detection import FireDetection
from algorithm.smog_detection import SmogDetection
from algorithm.helmet_detection import HelmetDetection
from algorithm.mask_detection import MaskDetection
from algorithm.electromobile_detection import ElectromobileDetection
from algorithm.glove_detection import GloveDetection
from algorithm.reflective_detection import ReflectiveDetection
from algorithm.phone_detection import PhoneDetection
from algorithm.pose_detection import PoseDetection
from algorithm.image_segmentation import ImageSegmentation
from algorithm.drowsy_detection import DrowsyDetection
from algorithm.lane_detection import LaneDetection
from algorithm.easyocr import OCR
from algorithm.detect_emotion.emotion_detection import Emotion_Detection
from algorithm.face_recognition.face_recognition import Face_Recognizer
from algorithm.Car_recognition.car_detection import CarDetection
from algorithm.pcb_detection import PCBDetection
from algorithm.Remote_sense.remote_sense import Remote_Sense
from algorithm.yolo_segment import YOLO_Segment
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
import multiprocessing
from MyThreadFunc import MyThreadFunc
import json
import pymysql
import threading
import torch
from flask_cors import CORS
app = Flask(__name__)
app.secret_key = 'super_secret_key' # Needed to use sessions
CORS(app, resources={r"/*": {"origins": "http://127.0.0.1:5173"}})
# Configure the upload folder and allowed extensions
UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'mp4'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
global cameraLocation
cameraLocation = 'rtsp://192.168.30.18:8557/h264'
#常用模型率先开启
model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/best_person_29.05.2023-yolov5s-1.pt', force_reload=True)
fire_model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/fire2.pt', force_reload=True)
face_model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/face.pt', force_reload=True)
somg_model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/smog.pt', force_reload=True)
drowsy_model = torch.hub.load((os.getcwd()) + "/algorithm/yolov5", 'custom', source='local', path='./weight/traffic/drowsy.pt', force_reload=True)
global algorithm_name
global algNameNew
algNameNew = '行人检测'
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def readFileTojson(input):
with open(input, "r", encoding="utf-8") as f:
parse_data = json.load(f)
return parse_data
@app.route("/")
def home():
return render_template("index.html")
# 获取数据库摄像头信息,传给前端
@app.route('/getSqlCamera', methods=['GET','POST'])
def getSqlCamera():
conn = pymysql.connect(host='10.51.10.122', user='root', password='fate_dev', port=3306, database="camera_rtsp", charset='utf8')
cursor = conn.cursor()
cursor.execute("select * from camera")
res = cursor.fetchall()
cursor.close()
conn.close()
#print(res)
return jsonify(res)
# 获取选择的算法
@app.route('/api/algname', methods=['POST','GET'])
def get_alg_name():
# 从 JSON 数据中提取 algName
alg_name = request.json.get('algName')
global algNameNew
algNameNew = alg_name
# 检查是否成功提取 algName
if alg_name is None:
return jsonify({"error": "algName not provided in JSON"}), 400
# 在这里处理 algName例如保存到数据库、执行算法等
# ...
# 返回一个成功的响应
return jsonify(alg_name), 200
@app.route('/getSelectAlg', methods=['GET','POST'])
def getSelectAlg():
res = algNameNew
#print(res)
return jsonify(res)
# 获取前端用户选择的摄像头信息
@app.route('/getFrontCamera', methods=['GET','POST'])
def getFrontCamera():
cameraId= request.form['camID']
# print(cameraId)
conn = pymysql.connect(host='10.51.10.122', user='root', password='fate_dev', port=3306, database="camera_rtsp", charset='utf8')
cursor = conn.cursor()
cursor.execute("select * from camera where cameraID =" +cameraId)
chsCamera = cursor.fetchall()
# 摄像头IP
global cameraLocation
cameraLocation = chsCamera[0][1]
print(cameraLocation)
cursor.close()
conn.close()
#print(chsCamera[0][1])
return jsonify(message='successfully'), 200
# 获取数据库算法信息,传给前端
@app.route('/getSqlAlg', methods=['GET','POST'])
def getSqlAlg():
conn = pymysql.connect(host='10.51.10.122', user='root', password='fate_dev', port=3306, database="camera_rtsp", charset='utf8')
cursor = conn.cursor()
cursor.execute("select * from algorithm")
res = cursor.fetchall()
cursor.close()
conn.close()
#print(res)
return jsonify(res)
# 获取前端选择的算法信息
@app.route('/getSelectAlgorithm', methods=['GET','POST'])
def getSelectAlgorithm():
global algorithm_name
algorithm_name = request.form['selectAlgorithm']
print(algorithm_name+"-----")
return jsonify(message='successfully'), 200
last_uploaded_filename = None
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify(message='No file part'), 400
file = request.files['file']
if file.filename == '':
return jsonify(message='No selected file'), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
session['uploaded_file_path'] = file_path # Save the file path to the session
return jsonify(message=f'File {filename} uploaded successfully'), 200
else:
return jsonify(message='File type not allowed'), 400
def gen(camera):
while True:
frame, resText = camera.get_frame()
# 将 JSON 字符串发送给前端
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame
+ b'\r\n\r\n' + b'--resText\r\n'
b'Content-Type: application/json\r\n\r\n'
+ json.dumps({'resText': resText}).encode('utf-8')
+ b'\r\n\r\n')
@app.route("/use_webcam")
def use_webcam():
global cameraLocation
source = cameraLocation
print("使用RTSP", source)
if algorithm_name == '行人检测':
camera = VideoPeopleDetection(model=model)
elif algorithm_name == '火焰检测':
camera = FireDetection(model=fire_model)
elif algorithm_name == '人脸识别':
camera = Face_Recognizer(model=face_model)
elif algorithm_name == '表情识别':
camera = Emotion_Detection(model=face_model)
elif algorithm_name == 'OCR':
camera = OCR()
elif algorithm_name == '车辆检测':
camera = CarDetection()
elif algorithm_name == '实例分割':
camera = YOLO_Segment()
elif algorithm_name == '疲劳检测':
camera = DrowsyDetection(model=drowsy_model)
elif algorithm_name == '车道线检测':
camera = LaneDetection()
elif algorithm_name == 'PCB缺陷检测':
camera = PCBDetection()
elif algorithm_name == '遥感目标检测':
camera = Remote_Sense()
elif algorithm_name == '未佩戴头盔检测':
camera = HelmetDetection()
elif algorithm_name == '口罩检测':
camera = MaskDetection()
elif algorithm_name == '电动车检测':
camera = ElectromobileDetection()
elif algorithm_name == '反光衣检测':
camera = ReflectiveDetection()
elif algorithm_name == '使用手机检测':
camera = PhoneDetection()
elif algorithm_name == '姿态检测':
camera = PoseDetection()
camera.use_webcam(source)
return Response(gen(camera),
mimetype="multipart/x-mixed-replace; boundary=frame")
# @app.route("/getDetectResult")
# def getDetectResult():
# camera = VideoPeopleDetection()
# numPeople, accuracy=camera.getDetectResult()
# return jsonify(numPeople,accuracy)
@app.route("/video_feed")
def video_feed():
# Check if a file has been uploaded and saved in the session
if 'uploaded_file_path' in session:
# Initialize VideoPeopleDetection with the uploaded file path
if algorithm_name == '行人检测':
camera = VideoPeopleDetection(video_path = session['uploaded_file_path'], model=model)
elif algorithm_name == '火焰检测':
camera = FireDetection(video_path = session['uploaded_file_path'], model=fire_model)
elif algorithm_name == '烟雾检测':
camera = SmogDetection(video_path = session['uploaded_file_path'], model=fire_model)
elif algorithm_name == '人脸识别':
camera = Face_Recognizer(video_path = session['uploaded_file_path'], model=face_model)
elif algorithm_name == '表情识别':
camera = Emotion_Detection(video_path = session['uploaded_file_path'], model=face_model)
elif algorithm_name == 'OCR':
camera = OCR(video_path = session['uploaded_file_path'])
elif algorithm_name == '车辆检测':
camera = CarDetection(video_path = session['uploaded_file_path'])
elif algorithm_name == '实例分割':
camera = YOLO_Segment(video_path = session['uploaded_file_path'])
elif algorithm_name == '疲劳检测':
camera = DrowsyDetection(video_path = session['uploaded_file_path'], model=drowsy_model)
elif algorithm_name == '车道线检测':
camera = LaneDetection(video_path=session['uploaded_file_path'])
elif algorithm_name == 'PCB缺陷检测':
camera = PCBDetection(video_path=session['uploaded_file_path'])
elif algorithm_name == '遥感目标检测':
camera = Remote_Sense(video_path=session['uploaded_file_path'])
elif algorithm_name == '未佩戴头盔检测':
camera = HelmetDetection(video_path=session['uploaded_file_path'])
elif algorithm_name == '口罩检测':
camera = MaskDetection(video_path=session['uploaded_file_path'])
elif algorithm_name == '电动车检测':
camera = ElectromobileDetection(video_path=session['uploaded_file_path'])
elif algorithm_name == '手套检测':
camera = GloveDetection(video_path=session['uploaded_file_path'])
elif algorithm_name == '反光衣检测':
camera = ReflectiveDetection(video_path=session['uploaded_file_path'])
elif algorithm_name == '使用手机检测':
camera = PhoneDetection(video_path=session['uploaded_file_path'])
elif algorithm_name == '姿态检测':
camera = PoseDetection(video_path=session['uploaded_file_path'])
elif algorithm_name == '肝脏图像分割':
camera = ImageSegmentation(video_path = session['uploaded_file_path'])
return Response(gen(camera),
mimetype="multipart/x-mixed-replace; boundary=frame")
else:
# If no file has been uploaded, return a default message or empty feed
return "No video uploaded yet", 200
if __name__ == "__main__":
app.run(host='10.51.10.122',debug=True, port=5001)