from flask import Flask, request, jsonify from pymongo import MongoClient from ultralytics import YOLO from PIL import Image import base64 from io import BytesIO from bson import ObjectId import hashlib import json import os from flask import Flask, request from flask_socketio import SocketIO, emit, join_room, leave_room import time app = Flask(__name__) client = MongoClient("mongodb://localhost:27019/") db = client["back"] usercollection = db["users"] zuobicollection = db["zuobi"] # sockitIo解决跨域问题 app.config["SECRET_KEY"] = "secret!" socketio = SocketIO(app, cors_allowed_origins="*") # 存储连接的客户端的房间号 clients = [] adminSid = "" # 初始化考试状态 studentStatus = {} current_dir = os.path.dirname(os.path.abspath(__file__)) weights = os.path.join(current_dir, "exam.pt") print(89744, weights) model = YOLO(weights) frame_rate_divider = 300 # 设置帧率除数 frame_count = 0 # 初始化帧计数器 def dealStudentStatus(): global studentStatus userObj = usercollection.find({"isExit": 1, "isAdmin": 0}) for studentItem in userObj: studentStatus[str(studentItem["_id"])] = False dealStudentStatus() @socketio.on("connect", namespace="/ws/video") def on_connect(): global clients global adminSid sid = request.sid # 假设客户端在连接时发送了一个 id id = request.args.get("id") isAdmin = request.args.get("isAdmin") if isAdmin == "1": adminSid = sid print(87874, adminSid) clients.append({"id": id, "sid": sid, "isAdmin": isAdmin}) @socketio.on("disconnect", namespace="/ws/video") def on_disconnect(): global clients for item in clients: if item["id"] == request.sid: clients.remove(item) print(f"Client disconnected with id {item['id']}") @socketio.on("video", namespace="/ws/video") def handle_video_frame(message): # 广播视频帧给老师 global adminSid global studentStatus global model global frame_rate_divider global frame_count # print(f"Received video frame from {message['userId']}") socketio.emit( "teacherVideo" + message["userId"], message, to=adminSid, namespace="/ws/video" ) if frame_count % frame_rate_divider == 0: # 这里省略了实际的base64数据 base64_str = message["data"] # 解码base64字符串 _, img_data = base64_str.split(",") img_data = base64.b64decode(img_data) # 将字节流转换为PIL图像对象 image = Image.open(BytesIO(img_data)) results = model.predict(source=image, iou=0.5, conf=0.25) det = results[0] checkVedioResult(message, det) frame_count += 1 # 更新帧计数器 frame_count %= frame_rate_divider # 更新帧计数器 # print(99777,frame_count) # if studentStatus[message["userId"]]: # print(f"学生{message['userId']}已作弊", f"{studentStatus[message['userId']]}") def checkVedioResult(message, det): # 如果有有效的检测结果 # {"cheating": "疑似作弊", "good": "良好", "normal": "正常"} if det is not None and len(det): results = [] # 初始化结果列表 for res in det.boxes: for box in res: # 提前计算并转换数据类型 class_id = int(box.cls.cpu()) bbox = box.xyxy.cpu().squeeze().tolist() bbox = [int(coord) for coord in bbox] # 转换边界框坐标为整数 result = { "bbox": bbox, # 边界框 "score": box.conf.cpu().squeeze().item(), # 置信度 "class_id": class_id, # 类别ID } results.append(result) # 将结果添加到列表 zuobiItem = { "userId": message["userId"], "msg": class_id, "type": class_id, "create_at": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(time.time()) ), "update_at": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(time.time()) ), "isExit": 1, } addzuobi(zuobiItem) @socketio.on("talk", namespace="/ws/video") def handle_talk(message): # 说话实时传输给老师 global adminSid # print(f"Received video frame from {message['userId']}") socketio.emit( "teacherTalk" + message["userId"], message, to=adminSid, namespace="/ws/video" ) zuobiItem = { "userId": message["userId"], "msg": message["data"], "type": "talk", "create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), "update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), "isExit": 1, } addzuobi(zuobiItem) @socketio.on("sendMsg", namespace="/ws/video") def handle_msg(message): # 说话实时传输给老师 global adminSid # print(f"Received video frame from {message['userId']}") socketio.emit( "teacherMsg" + message["userId"], message, to=adminSid, namespace="/ws/video" ) zuobiItem = { "userId": message["userId"], "msg": message["data"], "type": "qustion", "create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), "update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), "isExit": 1, } addzuobi(zuobiItem) @socketio.on("answerMsg", namespace="/ws/video") def handle_answer_msg(message): sid = "" for item in clients: if item["id"] == message["userId"]: sid = item["sid"] socketio.emit( "studentMsg" + message["userId"], message, to=sid, namespace="/ws/video" ) zuobiItem = { "userId": message["userId"], "msg": message["data"], "type": "answer", "create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), "update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())), "isExit": 1, } addzuobi(zuobiItem) # md5加密 def md5_encrypt(data): md5 = hashlib.md5() md5.update(data.encode("utf-8")) return md5.hexdigest() # 初始化数据 def initData(): admin = usercollection.find_one({"isAdmin": 1, "isExit": 1}) if admin is None: # 学校名称、学校代号、专业名称、专业代号、年级、班级、学生姓名、学号、考生图片、密码、成绩、考试类型、考试科目、考试时间段、考试链接、是否是管理员、是否删除、创建时间、更新时间 # xuexiaomingcheng、xuexiaodaihao、zhuanyemingcheng、zhuanyedaihao、nianji、banji、xueshengxingming、xuehao、kaoshengtupian、mima、chengji、zuobiqingkuang(zuobileixing、zuobishijian、zuobitupian)、kaoshileixing、kaoshikemu、kaoshishijianduan、kaoshilianjie、isAdmin、isExit、chuangjianshijian、gengxinshijian user = { "xuexiaomingcheng": "", "xuexiaodaihao": "", "zhuanyemingcheng": "", "zhuanyedaihao": "", "nianji": "", "banji": "", "xueshengxingming": "老师", "xuehao": "0000", "kaoshengtupian": "", "mima": md5_encrypt("0000"), "chengji": "", "kaoshileixing": "", "kaoshikemu": "", "kaoshishijianduan": "", "kaoshilianjie": "", "isAdmin": 1, "isExit": 1, "chuangjianshijian": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(time.time()) ), "gengxinshijian": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(time.time()) ), } usercollection.insert_one(user) print("管理账号'老师'已被初始化新建") else: print(f"管理账号'{admin['xueshengxingming']}'已存在") # 判断是否是管理员 def isAdmin(id): mongoId = ObjectId(id) userObj = usercollection.find_one({"_id": mongoId, "isExit": 1}) # 检查 userObj 是否为 None if userObj is None: return False # 默认返回 False 如果 isAdmin 字段不存在 adminBool = userObj.get("isAdmin", False) return bool(adminBool) # 开始检测 @app.route("/startCheck", methods=["post"]) def startCheck(): global studentStatus resData = request.data userJson = json.loads(resData) print(f"学生{userJson['_id']}开始检测", studentStatus[userJson["_id"]]) studentStatus[userJson["_id"]] = True print(f"学生{userJson['_id']}开始检测", studentStatus[userJson["_id"]]) return {"code": 200, "msg": "开始检测", "list": [], "hasError": False} # 停止检测 @app.route("/stopCheck", methods=["post"]) def stopCheck(): global studentStatus resData = request.data userJson = json.loads(resData) studentStatus[userJson["_id"]] = False return {"code": 200, "msg": "停止检测", "list": [], "hasError": False} # 用户登录 @app.route("/userLogin", methods=["post"]) def login(): resData = request.data # 检测是否有数据 if not resData: return {"code": 200, "msg": "请填写信息后登录", "list": [], "hasError": True} # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) res = usercollection.find_one( {"xuehao": userJson["xuehao"], "mima": userJson["mima"]} ) if res is None: return {"code": 200, "msg": "暂无此用户", "list": [], "hasError": True} res["_id"] = str(res["_id"]) return {"code": 200, "msg": "登陆成功", "list": [res], "hasError": False} # 新增用户 @app.route("/addUser", methods=["post"]) def insert_data(): resData = request.data # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) admin = isAdmin(userJson["id"]) if admin: _id = ObjectId() userItem = { "_id": _id, "xuexiaomingcheng": userJson["xuexiaomingcheng"], "xuexiaodaihao": userJson["xuexiaodaihao"], "zhuanyemingcheng": userJson["zhuanyemingcheng"], "zhuanyedaihao": userJson["zhuanyedaihao"], "nianji": userJson["nianji"], "banji": userJson["banji"], "xueshengxingming": userJson["xueshengxingming"], "xuehao": userJson["xuehao"], "kaoshengtupian": userJson["kaoshengtupian"], "mima": md5_encrypt(userJson["mima"]), "chengji": userJson["chengji"], "zuobiqingkuang": userJson["zuobiqingkuang"], "kaoshileixing": userJson["kaoshileixing"], "kaoshikemu": userJson["kaoshikemu"], "kaoshishijianduan": userJson["kaoshishijianduan"], "kaoshilianjie": "/student/" + str(_id), # 需要自动生成一个链接 "isAdmin": 0, "isExit": 1, "chuangjianshijian": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(time.time()) ), "gengxinshijian": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(time.time()) ), } usercollection.insert_one(userItem) dealStudentStatus() return "新增成功" # 查询用户 @app.route("/getUser", methods=["post"]) def query_data(): resData = request.data # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) admin = isAdmin(userJson["id"]) if admin: users = usercollection.find() serialized_data = [] for item in users: item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串 serialized_data.append(item) return { "code": 200, "msg": "查询成功", "list": serialized_data, "hasError": False, } else: mongoId = ObjectId(userJson["id"]) userObj = usercollection.find_one({"_id": mongoId, "isExit": 1}) userObj["_id"] = str(userObj["_id"]) # 将ObjectId转换为字符串 return { "code": 200, "msg": "查询成功", "list": [userObj], "hasError": False, } # 更新用户 @app.route("/updateUser", methods=["post"]) def update_data(): resData = request.data # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) admin = isAdmin(userJson["id"]) if admin: userItem = { "xuexiaomingcheng": userJson["xuexiaomingcheng"], "xuexiaodaihao": userJson["xuexiaodaihao"], "zhuanyemingcheng": userJson["zhuanyemingcheng"], "zhuanyedaihao": userJson["zhuanyedaihao"], "nianji": userJson["nianji"], "banji": userJson["banji"], "xueshengxingming": userJson["xueshengxingming"], "xuehao": userJson["xuehao"], "kaoshengtupian": userJson["kaoshengtupian"], "chengji": userJson["chengji"], "zuobiqingkuang": userJson["zuobiqingkuang"], "kaoshileixing": userJson["kaoshileixing"], "kaoshikemu": userJson["kaoshikemu"], "kaoshishijianduan": userJson["kaoshishijianduan"], "gengxinshijian": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(time.time()) ), } usercollection.update_one( {"_id": ObjectId(userJson["userid"])}, {"$set": userItem}, ) return "修改成功" # 删除用户 @app.route("/delUser", methods=["post"]) def delete_data(): resData = request.data # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) admin = isAdmin(userJson["id"]) if admin: serialized_data = [] for item in userJson["ids"]: serialized_data.append(ObjectId(item)) # 将ObjectId转换为字符串 usercollection.delete_many({"_id": {"$in": serialized_data}}) return "删除成功" # 修改密码 @app.route("/updatePassword", methods=["post"]) def updatePassword(): resData = request.data # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) admin = isAdmin(userJson["id"]) if admin: usercollection.update_one( {"_id": ObjectId(userJson["userid"])}, { "$set": {"mima": md5_encrypt(userJson["mima"])}, }, ) return {"code": 200, "msg": "修改成功", "list": [], "hasError": False} def addzuobi(zuobiItem): zuobicollection.insert_one(zuobiItem) # 删除作弊 @app.route("/delzuobi", methods=["post"]) def delzuobi(): resData = request.data # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) admin = isAdmin(userJson["id"]) if admin: serialized_data = [] for item in userJson["ids"]: serialized_data.append(ObjectId(item)) # 将ObjectId转换为字符串 zuobicollection.delete_many({"_id": {"$in": serialized_data}}) return "删除成功" # 查询作弊 @app.route("/getzuobi", methods=["post"]) def getzuobi(): resData = request.data # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) admin = isAdmin(userJson["id"]) if admin: users = zuobicollection.find() serialized_data = [] for item in users: item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串 serialized_data.append(item) return { "code": 200, "msg": "查询成功", "list": serialized_data, "hasError": False, } else: users = zuobicollection.find({"userId": userJson["id"], "isExit": 1}) serialized_data = [] for item in users: item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串 serialized_data.append(item) return { "code": 200, "msg": "查询成功", "list": serialized_data, "hasError": False, } if __name__ == "__main__": initData() # app.run(debug=True) socketio.run(app, debug=True)