zuobijiancedaima
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

520 lines
17 KiB

from flask import Flask, request, jsonify
from flask_cors import CORS
from pymongo import MongoClient
from ultralytics import YOLO
import base64
from bson import ObjectId
import hashlib
import json
import os
from flask import Flask, request
from flask_socketio import SocketIO, emit, join_room, leave_room
import time
import numpy as np
import cv2
app = Flask(__name__)
client = MongoClient("mongodb://localhost:27019/")
db = client["back"]
usercollection = db["users"]
zuobicollection = db["zuobi"]
# sockitIo解决跨域问题
app.config["SECRET_KEY"] = "secret!"
CORS(app, resources={r"/socket.io/*": {"origins": "*"}}) # 允许所有来源的请求
socketio = SocketIO(app, cors_allowed_origins="*")
# CORS(app)
# 存储连接的客户端的房间号
clients = []
adminSid = ""
# 初始化考试状态
studentStatus = {}
current_dir = os.path.dirname(os.path.abspath(__file__))
det_model = YOLO(os.path.join(current_dir, "yolov8n-pose.pt"))
cls_model = YOLO(os.path.join(current_dir, "yolov8n-cls.pt"))
class_name = ["normal", "raise_hand", "speak", "stand", "turn_head", "use_phone"]
frame_rate_divider = 90 # 设置帧率除数
frame_count = 0 # 初始化帧计数器
def dealStudentStatus():
global studentStatus
userObj = usercollection.find({"isExit": 1, "isAdmin": 0})
for studentItem in userObj:
studentStatus[str(studentItem["_id"])] = False
dealStudentStatus()
@socketio.on("connect")
def on_connect():
global clients
global adminSid
sid = request.sid
# 假设客户端在连接时发送了一个 id
id = request.args.get("id")
isAdmin = request.args.get("isAdmin")
if isAdmin == "1":
adminSid = sid
clients.append({"id": id, "sid": sid, "isAdmin": isAdmin})
@socketio.on("disconnect")
def on_disconnect():
global clients
for item in clients:
if item["id"] == request.sid:
clients.remove(item)
print(f"Client disconnected with id {item['id']}")
@socketio.on("video")
def handle_video_frame(message):
# 广播视频帧给老师
global adminSid
global studentStatus
global frame_rate_divider
global frame_count
# print(f"Received video frame from {message['userId']}")
frame_count += 1 # 更新帧计数器
frame_count %= frame_rate_divider # 更新帧计数器
if frame_count % frame_rate_divider == 0:
# 这里省略了实际的base64数据
base64_str = message["data"]
# 解码base64字符串
_, img_data = base64_str.split(",")
img_data = base64.b64decode(img_data)
# 将字节流转换为PIL图像对象
# 将字节数据转换为NumPy数组
np_data = np.frombuffer(img_data, dtype=np.uint8)
# 使用cv2.imdecode将数组解码为图像
image = cv2.imdecode(np_data, cv2.IMREAD_COLOR)
checkVedioResult(message, image)
socketio.emit("teacherVideo" + message["userId"], message, to=adminSid)
def checkVedioResult(message, image):
# 如果有有效的检测结果
# {"cheating": "疑似作弊", "good": "良好", "normal": "正常"}
global det_model
global cls_model
global class_name
global clients
global adminSid
det_results = det_model(image, conf=0.5, iou=0.25)
type = ""
for r in det_results:
if len(r) == 0:
type = "leave"
break
# print(57841,r.boxes)
box = r.boxes.xyxy
if len(box) == 1:
crop_image = image[
int(box[0][1]) : int(box[0][3]), int(box[0][0]) : int(box[0][2])
]
# cv2.imshow('test.png', crop_image)
# cv2.waitKey(0)
cls_results = cls_model(crop_image)
type = class_name[cls_results[0].probs.top1]
else:
type = "many_humans"
message["type"] = type
socketio.emit(
"teacherTalk" + message["userId"],
message,
to=adminSid,
)
sid = ""
for item in clients:
if item["id"] == message["userId"]:
sid = item["sid"]
socketio.emit("studentMsg" + message["userId"], message, to=sid)
# print(97444, type)
zuobiItem = {
"userId": message["userId"],
"msg": message["data"],
"type": type,
"create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"isExit": 1,
}
addzuobi(zuobiItem)
@socketio.on("talk")
def handle_talk(message):
# 说话实时传输给老师
global adminSid
# print(f"Received video frame from {message['userId']}")
message["type"] = "talk"
socketio.emit("teacherTalk" + message["userId"], message, to=adminSid)
zuobiItem = {
"userId": message["userId"],
"msg": message["data"],
"type": "talk",
"create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"isExit": 1,
}
addzuobi(zuobiItem)
@socketio.on("sendMsg")
def handle_msg(message):
# 说话实时传输给老师
global adminSid
# print(f"Received video frame from {message['userId']}")
socketio.emit("teacherMsg" + message["userId"], message, to=adminSid)
zuobiItem = {
"userId": message["userId"],
"msg": message["data"],
"type": "qustion",
"create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"isExit": 1,
}
addzuobi(zuobiItem)
@socketio.on("answerMsg")
def handle_answer_msg(message):
sid = ""
for item in clients:
if item["id"] == message["userId"]:
sid = item["sid"]
message["type"] = "answer"
socketio.emit("studentMsg" + message["userId"], message, to=sid)
zuobiItem = {
"userId": message["userId"],
"msg": message["data"],
"type": "answer",
"create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"isExit": 1,
}
addzuobi(zuobiItem)
# md5加密
def md5_encrypt(data):
md5 = hashlib.md5()
md5.update(data.encode("utf-8"))
return md5.hexdigest()
# 初始化数据
def initData():
admin = usercollection.find_one({"isAdmin": 1, "isExit": 1})
if admin is None:
# 学校名称、学校代号、专业名称、专业代号、年级、班级、学生姓名、学号、考生图片、密码、成绩、考试类型、考试科目、考试时间段、考试链接、是否是管理员、是否删除、创建时间、更新时间
# xuexiaomingcheng、xuexiaodaihao、zhuanyemingcheng、zhuanyedaihao、nianji、banji、xueshengxingming、xuehao、kaoshengtupian、mima、chengji、zuobiqingkuang(zuobileixing、zuobishijian、zuobitupian)、kaoshileixing、kaoshikemu、kaoshishijianduan、kaoshilianjie、isAdmin、isExit、chuangjianshijian、gengxinshijian
user = {
"xuexiaomingcheng": "",
"xuexiaodaihao": "",
"zhuanyemingcheng": "",
"zhuanyedaihao": "",
"nianji": "",
"banji": "",
"xueshengxingming": "老师",
"xuehao": "admin",
"kaoshengtupian": "",
"mima": md5_encrypt("admin"),
"chengji": "",
"kaoshileixing": "",
"kaoshikemu": "",
"kaoshishijianduan": "",
"kaoshilianjie": "",
"isAdmin": 1,
"isExit": 1,
"chuangjianshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
usercollection.insert_one(user)
print("管理账号'老师'已被初始化新建")
else:
print(f"管理账号'{admin['xueshengxingming']}'已存在")
# 判断是否是管理员
def isAdmin(id):
mongoId = ObjectId(id)
userObj = usercollection.find_one({"_id": mongoId, "isExit": 1})
# 检查 userObj 是否为 None
if userObj is None:
return False
# 默认返回 False 如果 isAdmin 字段不存在
adminBool = userObj.get("isAdmin", False)
return bool(adminBool)
# 开始检测
@app.route("/startCheck", methods=["post"])
def startCheck():
global studentStatus
resData = request.data
userJson = json.loads(resData)
print(f"学生{userJson['_id']}开始检测", studentStatus[userJson["_id"]])
studentStatus[userJson["_id"]] = True
print(f"学生{userJson['_id']}开始检测", studentStatus[userJson["_id"]])
return {"code": 200, "msg": "开始检测", "list": [], "hasError": False}
# 停止检测
@app.route("/stopCheck", methods=["post"])
def stopCheck():
global studentStatus
resData = request.data
userJson = json.loads(resData)
studentStatus[userJson["_id"]] = False
return {"code": 200, "msg": "停止检测", "list": [], "hasError": False}
# 用户登录
@app.route("/userLogin", methods=["post"])
def login():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "请填写信息后登录", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
res = usercollection.find_one(
{"xuehao": userJson["xuehao"], "mima": userJson["mima"]}
)
if res is None:
return {"code": 200, "msg": "暂无此用户", "list": [], "hasError": True}
res["_id"] = str(res["_id"])
return {"code": 200, "msg": "登陆成功", "list": [res], "hasError": False}
# 新增用户
@app.route("/addUser", methods=["post"])
def insert_data():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
_id = ObjectId()
userItem = {
"_id": _id,
"xuexiaomingcheng": userJson["xuexiaomingcheng"],
"xuexiaodaihao": userJson["xuexiaodaihao"],
"zhuanyemingcheng": userJson["zhuanyemingcheng"],
"zhuanyedaihao": userJson["zhuanyedaihao"],
"nianji": userJson["nianji"],
"banji": userJson["banji"],
"xueshengxingming": userJson["xueshengxingming"],
"xuehao": userJson["xuehao"],
"kaoshengtupian": userJson["kaoshengtupian"],
"mima": md5_encrypt(userJson["mima"]),
"chengji": userJson["chengji"],
"zuobiqingkuang": userJson["zuobiqingkuang"],
"kaoshileixing": userJson["kaoshileixing"],
"kaoshikemu": userJson["kaoshikemu"],
"kaoshishijianduan": userJson["kaoshishijianduan"],
"kaoshilianjie": "/student/" + str(_id), # 需要自动生成一个链接
"isAdmin": 0,
"isExit": 1,
"chuangjianshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
usercollection.insert_one(userItem)
dealStudentStatus()
return "新增成功"
# 查询用户
@app.route("/getUser", methods=["post"])
def query_data():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
users = usercollection.find()
serialized_data = []
for item in users:
item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
serialized_data.append(item)
return {
"code": 200,
"msg": "查询成功",
"list": serialized_data,
"hasError": False,
}
else:
mongoId = ObjectId(userJson["id"])
userObj = usercollection.find_one({"_id": mongoId, "isExit": 1})
userObj["_id"] = str(userObj["_id"]) # 将ObjectId转换为字符串
return {
"code": 200,
"msg": "查询成功",
"list": [userObj],
"hasError": False,
}
# 更新用户
@app.route("/updateUser", methods=["post"])
def update_data():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
userItem = {
"xuexiaomingcheng": userJson["xuexiaomingcheng"],
"xuexiaodaihao": userJson["xuexiaodaihao"],
"zhuanyemingcheng": userJson["zhuanyemingcheng"],
"zhuanyedaihao": userJson["zhuanyedaihao"],
"nianji": userJson["nianji"],
"banji": userJson["banji"],
"xueshengxingming": userJson["xueshengxingming"],
"xuehao": userJson["xuehao"],
"kaoshengtupian": userJson["kaoshengtupian"],
"chengji": userJson["chengji"],
"zuobiqingkuang": userJson["zuobiqingkuang"],
"kaoshileixing": userJson["kaoshileixing"],
"kaoshikemu": userJson["kaoshikemu"],
"kaoshishijianduan": userJson["kaoshishijianduan"],
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
usercollection.update_one(
{"_id": ObjectId(userJson["userid"])},
{"$set": userItem},
)
return "修改成功"
# 删除用户
@app.route("/delUser", methods=["post"])
def delete_data():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
serialized_data = []
for item in userJson["ids"]:
serialized_data.append(ObjectId(item)) # 将ObjectId转换为字符串
usercollection.delete_many({"_id": {"$in": serialized_data}})
return "删除成功"
# 修改密码
@app.route("/updatePassword", methods=["post"])
def updatePassword():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
usercollection.update_one(
{"_id": ObjectId(userJson["userid"])},
{
"$set": {"mima": md5_encrypt(userJson["mima"])},
},
)
return {"code": 200, "msg": "修改成功", "list": [], "hasError": False}
def addzuobi(zuobiItem):
zuobicollection.insert_one(zuobiItem)
# 删除作弊
@app.route("/delzuobi", methods=["post"])
def delzuobi():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
serialized_data = []
for item in userJson["ids"]:
serialized_data.append(ObjectId(item)) # 将ObjectId转换为字符串
zuobicollection.delete_many({"_id": {"$in": serialized_data}})
return "删除成功"
# 查询作弊
@app.route("/getzuobi", methods=["post"])
def getzuobi():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
users = zuobicollection.find()
serialized_data = []
for item in users:
item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
serialized_data.append(item)
return {
"code": 200,
"msg": "查询成功",
"list": serialized_data,
"hasError": False,
}
else:
users = zuobicollection.find({"userId": userJson["id"], "isExit": 1})
serialized_data = []
for item in users:
item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
serialized_data.append(item)
return {
"code": 200,
"msg": "查询成功",
"list": serialized_data,
"hasError": False,
}
# 警告学生
@app.route("/jinggao", methods=["post"])
def jinggao():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
studentId = userJson["studentId"]
sid = ""
for item in clients:
if item["id"] == studentId:
sid = item["sid"]
message = {"type": "jinggao", "data": "请遵守考试规则"}
socketio.emit("studentMsg" + studentId, message, to=sid)
zuobiItem = {
"userId": studentId,
"msg": message["data"],
"type": "jinggao",
"create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"isExit": 1,
}
addzuobi(zuobiItem)
return {
"code": 200,
"msg": "警告成功",
"list": [],
"hasError": False,
}
if __name__ == "__main__":
initData()
# app.run(debug=True)
socketio.run(app, host="0.0.0.0", port=5000, debug=True)