zuobijiancedaima
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

479 lines
16 KiB

from flask import Flask, request, jsonify
from pymongo import MongoClient
from ultralytics import YOLO
from PIL import Image
import base64
from io import BytesIO
from bson import ObjectId
import hashlib
import json
import os
from flask import Flask, request
from flask_socketio import SocketIO, emit, join_room, leave_room
import time
app = Flask(__name__)
client = MongoClient("mongodb://localhost:27019/")
db = client["back"]
usercollection = db["users"]
zuobicollection = db["zuobi"]
# sockitIo解决跨域问题
app.config["SECRET_KEY"] = "secret!"
socketio = SocketIO(app, cors_allowed_origins="*")
# 存储连接的客户端的房间号
clients = []
adminSid = ""
# 初始化考试状态
studentStatus = {}
current_dir = os.path.dirname(os.path.abspath(__file__))
weights = os.path.join(current_dir, "exam.pt")
print(89744, weights)
model = YOLO(weights)
frame_rate_divider = 300 # 设置帧率除数
frame_count = 0 # 初始化帧计数器
def dealStudentStatus():
global studentStatus
userObj = usercollection.find({"isExit": 1, "isAdmin": 0})
for studentItem in userObj:
studentStatus[str(studentItem["_id"])] = False
dealStudentStatus()
@socketio.on("connect", namespace="/ws/video")
def on_connect():
global clients
global adminSid
sid = request.sid
# 假设客户端在连接时发送了一个 id
id = request.args.get("id")
isAdmin = request.args.get("isAdmin")
if isAdmin == "1":
adminSid = sid
print(87874, adminSid)
clients.append({"id": id, "sid": sid, "isAdmin": isAdmin})
@socketio.on("disconnect", namespace="/ws/video")
def on_disconnect():
global clients
for item in clients:
if item["id"] == request.sid:
clients.remove(item)
print(f"Client disconnected with id {item['id']}")
@socketio.on("video", namespace="/ws/video")
def handle_video_frame(message):
# 广播视频帧给老师
global adminSid
global studentStatus
global model
global frame_rate_divider
global frame_count
# print(f"Received video frame from {message['userId']}")
socketio.emit(
"teacherVideo" + message["userId"], message, to=adminSid, namespace="/ws/video"
)
if frame_count % frame_rate_divider == 0:
# 这里省略了实际的base64数据
base64_str = message["data"]
# 解码base64字符串
_, img_data = base64_str.split(",")
img_data = base64.b64decode(img_data)
# 将字节流转换为PIL图像对象
image = Image.open(BytesIO(img_data))
results = model.predict(source=image, iou=0.5, conf=0.25)
det = results[0]
checkVedioResult(message, det)
frame_count += 1 # 更新帧计数器
frame_count %= frame_rate_divider # 更新帧计数器
# print(99777,frame_count)
# if studentStatus[message["userId"]]:
# print(f"学生{message['userId']}已作弊", f"{studentStatus[message['userId']]}")
def checkVedioResult(message, det):
# 如果有有效的检测结果
# {"cheating": "疑似作弊", "good": "良好", "normal": "正常"}
if det is not None and len(det):
results = [] # 初始化结果列表
for res in det.boxes:
for box in res:
# 提前计算并转换数据类型
class_id = int(box.cls.cpu())
bbox = box.xyxy.cpu().squeeze().tolist()
bbox = [int(coord) for coord in bbox] # 转换边界框坐标为整数
result = {
"bbox": bbox, # 边界框
"score": box.conf.cpu().squeeze().item(), # 置信度
"class_id": class_id, # 类别ID
}
results.append(result) # 将结果添加到列表
zuobiItem = {
"userId": message["userId"],
"msg": class_id,
"type": class_id,
"create_at": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
"update_at": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
"isExit": 1,
}
addzuobi(zuobiItem)
@socketio.on("talk", namespace="/ws/video")
def handle_talk(message):
# 说话实时传输给老师
global adminSid
# print(f"Received video frame from {message['userId']}")
socketio.emit(
"teacherTalk" + message["userId"], message, to=adminSid, namespace="/ws/video"
)
zuobiItem = {
"userId": message["userId"],
"msg": message["data"],
"type": "talk",
"create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"isExit": 1,
}
addzuobi(zuobiItem)
@socketio.on("sendMsg", namespace="/ws/video")
def handle_msg(message):
# 说话实时传输给老师
global adminSid
# print(f"Received video frame from {message['userId']}")
socketio.emit(
"teacherMsg" + message["userId"], message, to=adminSid, namespace="/ws/video"
)
zuobiItem = {
"userId": message["userId"],
"msg": message["data"],
"type": "qustion",
"create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"isExit": 1,
}
addzuobi(zuobiItem)
@socketio.on("answerMsg", namespace="/ws/video")
def handle_answer_msg(message):
sid = ""
for item in clients:
if item["id"] == message["userId"]:
sid = item["sid"]
socketio.emit(
"studentMsg" + message["userId"], message, to=sid, namespace="/ws/video"
)
zuobiItem = {
"userId": message["userId"],
"msg": message["data"],
"type": "answer",
"create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"isExit": 1,
}
addzuobi(zuobiItem)
# md5加密
def md5_encrypt(data):
md5 = hashlib.md5()
md5.update(data.encode("utf-8"))
return md5.hexdigest()
# 初始化数据
def initData():
admin = usercollection.find_one({"isAdmin": 1, "isExit": 1})
if admin is None:
# 学校名称、学校代号、专业名称、专业代号、年级、班级、学生姓名、学号、考生图片、密码、成绩、考试类型、考试科目、考试时间段、考试链接、是否是管理员、是否删除、创建时间、更新时间
# xuexiaomingcheng、xuexiaodaihao、zhuanyemingcheng、zhuanyedaihao、nianji、banji、xueshengxingming、xuehao、kaoshengtupian、mima、chengji、zuobiqingkuang(zuobileixing、zuobishijian、zuobitupian)、kaoshileixing、kaoshikemu、kaoshishijianduan、kaoshilianjie、isAdmin、isExit、chuangjianshijian、gengxinshijian
user = {
"xuexiaomingcheng": "",
"xuexiaodaihao": "",
"zhuanyemingcheng": "",
"zhuanyedaihao": "",
"nianji": "",
"banji": "",
"xueshengxingming": "老师",
"xuehao": "0000",
"kaoshengtupian": "",
"mima": md5_encrypt("0000"),
"chengji": "",
"kaoshileixing": "",
"kaoshikemu": "",
"kaoshishijianduan": "",
"kaoshilianjie": "",
"isAdmin": 1,
"isExit": 1,
"chuangjianshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
usercollection.insert_one(user)
print("管理账号'老师'已被初始化新建")
else:
print(f"管理账号'{admin['xueshengxingming']}'已存在")
# 判断是否是管理员
def isAdmin(id):
mongoId = ObjectId(id)
userObj = usercollection.find_one({"_id": mongoId, "isExit": 1})
# 检查 userObj 是否为 None
if userObj is None:
return False
# 默认返回 False 如果 isAdmin 字段不存在
adminBool = userObj.get("isAdmin", False)
return bool(adminBool)
# 开始检测
@app.route("/startCheck", methods=["post"])
def startCheck():
global studentStatus
resData = request.data
userJson = json.loads(resData)
print(f"学生{userJson['_id']}开始检测", studentStatus[userJson["_id"]])
studentStatus[userJson["_id"]] = True
print(f"学生{userJson['_id']}开始检测", studentStatus[userJson["_id"]])
return {"code": 200, "msg": "开始检测", "list": [], "hasError": False}
# 停止检测
@app.route("/stopCheck", methods=["post"])
def stopCheck():
global studentStatus
resData = request.data
userJson = json.loads(resData)
studentStatus[userJson["_id"]] = False
return {"code": 200, "msg": "停止检测", "list": [], "hasError": False}
# 用户登录
@app.route("/userLogin", methods=["post"])
def login():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "请填写信息后登录", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
res = usercollection.find_one(
{"xuehao": userJson["xuehao"], "mima": userJson["mima"]}
)
if res is None:
return {"code": 200, "msg": "暂无此用户", "list": [], "hasError": True}
res["_id"] = str(res["_id"])
return {"code": 200, "msg": "登陆成功", "list": [res], "hasError": False}
# 新增用户
@app.route("/addUser", methods=["post"])
def insert_data():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
_id = ObjectId()
userItem = {
"_id": _id,
"xuexiaomingcheng": userJson["xuexiaomingcheng"],
"xuexiaodaihao": userJson["xuexiaodaihao"],
"zhuanyemingcheng": userJson["zhuanyemingcheng"],
"zhuanyedaihao": userJson["zhuanyedaihao"],
"nianji": userJson["nianji"],
"banji": userJson["banji"],
"xueshengxingming": userJson["xueshengxingming"],
"xuehao": userJson["xuehao"],
"kaoshengtupian": userJson["kaoshengtupian"],
"mima": md5_encrypt(userJson["mima"]),
"chengji": userJson["chengji"],
"zuobiqingkuang": userJson["zuobiqingkuang"],
"kaoshileixing": userJson["kaoshileixing"],
"kaoshikemu": userJson["kaoshikemu"],
"kaoshishijianduan": userJson["kaoshishijianduan"],
"kaoshilianjie": "/student/" + str(_id), # 需要自动生成一个链接
"isAdmin": 0,
"isExit": 1,
"chuangjianshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
usercollection.insert_one(userItem)
dealStudentStatus()
return "新增成功"
# 查询用户
@app.route("/getUser", methods=["post"])
def query_data():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
users = usercollection.find()
serialized_data = []
for item in users:
item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
serialized_data.append(item)
return {
"code": 200,
"msg": "查询成功",
"list": serialized_data,
"hasError": False,
}
else:
mongoId = ObjectId(userJson["id"])
userObj = usercollection.find_one({"_id": mongoId, "isExit": 1})
userObj["_id"] = str(userObj["_id"]) # 将ObjectId转换为字符串
return {
"code": 200,
"msg": "查询成功",
"list": [userObj],
"hasError": False,
}
# 更新用户
@app.route("/updateUser", methods=["post"])
def update_data():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
userItem = {
"xuexiaomingcheng": userJson["xuexiaomingcheng"],
"xuexiaodaihao": userJson["xuexiaodaihao"],
"zhuanyemingcheng": userJson["zhuanyemingcheng"],
"zhuanyedaihao": userJson["zhuanyedaihao"],
"nianji": userJson["nianji"],
"banji": userJson["banji"],
"xueshengxingming": userJson["xueshengxingming"],
"xuehao": userJson["xuehao"],
"kaoshengtupian": userJson["kaoshengtupian"],
"chengji": userJson["chengji"],
"zuobiqingkuang": userJson["zuobiqingkuang"],
"kaoshileixing": userJson["kaoshileixing"],
"kaoshikemu": userJson["kaoshikemu"],
"kaoshishijianduan": userJson["kaoshishijianduan"],
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
usercollection.update_one(
{"_id": ObjectId(userJson["userid"])},
{"$set": userItem},
)
return "修改成功"
# 删除用户
@app.route("/delUser", methods=["post"])
def delete_data():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
serialized_data = []
for item in userJson["ids"]:
serialized_data.append(ObjectId(item)) # 将ObjectId转换为字符串
usercollection.delete_many({"_id": {"$in": serialized_data}})
return "删除成功"
# 修改密码
@app.route("/updatePassword", methods=["post"])
def updatePassword():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
usercollection.update_one(
{"_id": ObjectId(userJson["userid"])},
{
"$set": {"mima": md5_encrypt(userJson["mima"])},
},
)
return {"code": 200, "msg": "修改成功", "list": [], "hasError": False}
def addzuobi(zuobiItem):
zuobicollection.insert_one(zuobiItem)
# 删除作弊
@app.route("/delzuobi", methods=["post"])
def delzuobi():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
serialized_data = []
for item in userJson["ids"]:
serialized_data.append(ObjectId(item)) # 将ObjectId转换为字符串
zuobicollection.delete_many({"_id": {"$in": serialized_data}})
return "删除成功"
# 查询作弊
@app.route("/getzuobi", methods=["post"])
def getzuobi():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
users = zuobicollection.find()
serialized_data = []
for item in users:
item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
serialized_data.append(item)
return {
"code": 200,
"msg": "查询成功",
"list": serialized_data,
"hasError": False,
}
else:
users = zuobicollection.find({"userId": userJson["id"], "isExit": 1})
serialized_data = []
for item in users:
item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
serialized_data.append(item)
return {
"code": 200,
"msg": "查询成功",
"list": serialized_data,
"hasError": False,
}
if __name__ == "__main__":
initData()
# app.run(debug=True)
socketio.run(app, debug=True)