zuobijiancedaima
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

416 lines
14 KiB

from flask import Flask, request, jsonify
from pymongo import MongoClient
from bson import ObjectId
import hashlib
import json
from flask import Flask, request
from flask_socketio import SocketIO, emit, join_room, leave_room
import time
app = Flask(__name__)
client = MongoClient("mongodb://localhost:27019/")
db = client["back"]
usercollection = db["users"]
zuobicollection = db["zuobi"]
# sockitIo解决跨域问题
app.config["SECRET_KEY"] = "secret!"
socketio = SocketIO(app, cors_allowed_origins="*")
# 存储连接的客户端的房间号
clients = []
adminSid = ""
# 初始化考试状态
studentStatus = {}
def dealStudentStatus():
global studentStatus
userObj = usercollection.find({"isExit": 1, "isAdmin": 0})
for studentItem in userObj:
studentStatus[str(studentItem["_id"])] = False
dealStudentStatus()
@socketio.on("connect", namespace="/ws/video")
def on_connect():
global clients
global adminSid
sid = request.sid
# 假设客户端在连接时发送了一个 id
id = request.args.get("id")
isAdmin = request.args.get("isAdmin")
if isAdmin == "1":
adminSid = sid
print(87874, adminSid)
clients.append({"id": id, "sid": sid, "isAdmin": isAdmin})
@socketio.on("disconnect", namespace="/ws/video")
def on_disconnect():
global clients
for item in clients:
if item["id"] == request.sid:
clients.remove(item)
print(f"Client disconnected with id {item['id']}")
@socketio.on("video", namespace="/ws/video")
def handle_video_frame(message):
# 广播视频帧给老师
global adminSid
global studentStatus
# print(f"Received video frame from {message['userId']}")
socketio.emit(
"teacherVideo" + message["userId"], message, to=adminSid, namespace="/ws/video"
)
if studentStatus[message["userId"]]:
print(f"学生{message['userId']}已作弊", f"{studentStatus[message['userId']]}")
@socketio.on("talk", namespace="/ws/video")
def handle_talk(message):
# 说话实时传输给老师
global adminSid
# print(f"Received video frame from {message['userId']}")
socketio.emit(
"teacherTalk" + message["userId"], message, to=adminSid, namespace="/ws/video"
)
zuobiItem = {
"userId": message["userId"],
"msg": message["data"],
"type": "talk",
"create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"isExit": 1,
}
addzuobi(zuobiItem)
@socketio.on("sendMsg", namespace="/ws/video")
def handle_msg(message):
# 说话实时传输给老师
global adminSid
# print(f"Received video frame from {message['userId']}")
socketio.emit(
"teacherMsg" + message["userId"], message, to=adminSid, namespace="/ws/video"
)
zuobiItem = {
"userId": message["userId"],
"msg": message["data"],
"type": "qustion",
"create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"isExit": 1,
}
addzuobi(zuobiItem)
@socketio.on("answerMsg", namespace="/ws/video")
def handle_answer_msg(message):
sid = ""
for item in clients:
if item["id"] == message["userId"]:
sid = item["sid"]
socketio.emit(
"studentMsg" + message["userId"], message, to=sid, namespace="/ws/video"
)
zuobiItem = {
"userId": message["userId"],
"msg": message["data"],
"type": "answer",
"create_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"update_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),
"isExit": 1,
}
addzuobi(zuobiItem)
# md5加密
def md5_encrypt(data):
md5 = hashlib.md5()
md5.update(data.encode("utf-8"))
return md5.hexdigest()
# 初始化数据
def initData():
admin = usercollection.find_one({"isAdmin": 1, "isExit": 1})
if admin is None:
# 学校名称、学校代号、专业名称、专业代号、年级、班级、学生姓名、学号、考生图片、密码、成绩、考试类型、考试科目、考试时间段、考试链接、是否是管理员、是否删除、创建时间、更新时间
# xuexiaomingcheng、xuexiaodaihao、zhuanyemingcheng、zhuanyedaihao、nianji、banji、xueshengxingming、xuehao、kaoshengtupian、mima、chengji、zuobiqingkuang(zuobileixing、zuobishijian、zuobitupian)、kaoshileixing、kaoshikemu、kaoshishijianduan、kaoshilianjie、isAdmin、isExit、chuangjianshijian、gengxinshijian
user = {
"xuexiaomingcheng": "",
"xuexiaodaihao": "",
"zhuanyemingcheng": "",
"zhuanyedaihao": "",
"nianji": "",
"banji": "",
"xueshengxingming": "老师",
"xuehao": "0000",
"kaoshengtupian": "",
"mima": md5_encrypt("0000"),
"chengji": "",
"kaoshileixing": "",
"kaoshikemu": "",
"kaoshishijianduan": "",
"kaoshilianjie": "",
"isAdmin": 1,
"isExit": 1,
"chuangjianshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
usercollection.insert_one(user)
print("管理账号'老师'已被初始化新建")
else:
print(f"管理账号'{admin['xueshengxingming']}'已存在")
# 判断是否是管理员
def isAdmin(id):
mongoId = ObjectId(id)
userObj = usercollection.find_one({"_id": mongoId, "isExit": 1})
# 检查 userObj 是否为 None
if userObj is None:
return False
# 默认返回 False 如果 isAdmin 字段不存在
adminBool = userObj.get("isAdmin", False)
return bool(adminBool)
# 开始检测
@app.route("/startCheck", methods=["post"])
def startCheck():
global studentStatus
resData = request.data
userJson = json.loads(resData)
print(f"学生{userJson['_id']}开始检测", studentStatus[userJson["_id"]])
studentStatus[userJson["_id"]] = True
print(f"学生{userJson['_id']}开始检测", studentStatus[userJson["_id"]])
return {"code": 200, "msg": "开始检测", "list": [], "hasError": False}
# 停止检测
@app.route("/stopCheck", methods=["post"])
def stopCheck():
global studentStatus
resData = request.data
userJson = json.loads(resData)
studentStatus[userJson["_id"]] = False
return {"code": 200, "msg": "停止检测", "list": [], "hasError": False}
# 用户登录
@app.route("/userLogin", methods=["post"])
def login():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "请填写信息后登录", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
res = usercollection.find_one(
{"xuehao": userJson["xuehao"], "mima": userJson["mima"]}
)
if res is None:
return {"code": 200, "msg": "暂无此用户", "list": [], "hasError": True}
res["_id"] = str(res["_id"])
return {"code": 200, "msg": "登陆成功", "list": [res], "hasError": False}
# 新增用户
@app.route("/addUser", methods=["post"])
def insert_data():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
_id = ObjectId()
userItem = {
"_id": _id,
"xuexiaomingcheng": userJson["xuexiaomingcheng"],
"xuexiaodaihao": userJson["xuexiaodaihao"],
"zhuanyemingcheng": userJson["zhuanyemingcheng"],
"zhuanyedaihao": userJson["zhuanyedaihao"],
"nianji": userJson["nianji"],
"banji": userJson["banji"],
"xueshengxingming": userJson["xueshengxingming"],
"xuehao": userJson["xuehao"],
"mima": md5_encrypt(userJson["mima"]),
"chengji": userJson["chengji"],
"zuobiqingkuang": userJson["zuobiqingkuang"],
"kaoshileixing": userJson["kaoshileixing"],
"kaoshikemu": userJson["kaoshikemu"],
"kaoshishijianduan": userJson["kaoshishijianduan"],
"kaoshilianjie": "/student/" + str(_id), # 需要自动生成一个链接
"isAdmin": 0,
"isExit": 1,
"chuangjianshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
usercollection.insert_one(userItem)
dealStudentStatus()
return "新增成功"
# 查询用户
@app.route("/getUser", methods=["post"])
def query_data():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
users = usercollection.find()
serialized_data = []
for item in users:
item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
serialized_data.append(item)
return {
"code": 200,
"msg": "查询成功",
"list": serialized_data,
"hasError": False,
}
else:
mongoId = ObjectId(userJson["id"])
userObj = usercollection.find_one({"_id": mongoId, "isExit": 1})
userObj["_id"] = str(userObj["_id"]) # 将ObjectId转换为字符串
return {
"code": 200,
"msg": "查询成功",
"list": [userObj],
"hasError": False,
}
# 更新用户
@app.route("/updateUser", methods=["post"])
def update_data():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
userItem = {
"xuexiaomingcheng": userJson["xuexiaomingcheng"],
"xuexiaodaihao": userJson["xuexiaodaihao"],
"zhuanyemingcheng": userJson["zhuanyemingcheng"],
"zhuanyedaihao": userJson["zhuanyedaihao"],
"nianji": userJson["nianji"],
"banji": userJson["banji"],
"xueshengxingming": userJson["xueshengxingming"],
"xuehao": userJson["xuehao"],
"chengji": userJson["chengji"],
"zuobiqingkuang": userJson["zuobiqingkuang"],
"kaoshileixing": userJson["kaoshileixing"],
"kaoshikemu": userJson["kaoshikemu"],
"kaoshishijianduan": userJson["kaoshishijianduan"],
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
usercollection.update_one(
{"_id": ObjectId(userJson["userid"])},
{"$set": userItem},
)
return "修改成功"
# 删除用户
@app.route("/delUser", methods=["post"])
def delete_data():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
serialized_data = []
for item in userJson["ids"]:
serialized_data.append(ObjectId(item)) # 将ObjectId转换为字符串
usercollection.delete_many({"_id": {"$in": serialized_data}})
return "删除成功"
# 修改密码
@app.route("/updatePassword", methods=["post"])
def updatePassword():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
usercollection.update_one(
{"_id": ObjectId(userJson["userid"])},
{
"$set": {"mima": md5_encrypt(userJson["mima"])},
},
)
return {"code": 200, "msg": "修改成功", "list": [], "hasError": False}
def addzuobi(zuobiItem):
zuobicollection.insert_one(zuobiItem)
# 删除作弊
@app.route("/delzuobi", methods=["post"])
def delzuobi():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
serialized_data = []
for item in userJson["ids"]:
serialized_data.append(ObjectId(item)) # 将ObjectId转换为字符串
zuobicollection.delete_many({"_id": {"$in": serialized_data}})
return "删除成功"
# 查询作弊
@app.route("/getzuobi", methods=["post"])
def getzuobi():
resData = request.data
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
users = zuobicollection.find()
serialized_data = []
for item in users:
item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
serialized_data.append(item)
return {
"code": 200,
"msg": "查询成功",
"list": serialized_data,
"hasError": False,
}
else:
users = zuobicollection.find({"userId": userJson["id"], "isExit": 1})
serialized_data = []
for item in users:
item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
serialized_data.append(item)
return {
"code": 200,
"msg": "查询成功",
"list": serialized_data,
"hasError": False,
}
if __name__ == "__main__":
initData()
# app.run(debug=True)
socketio.run(app, debug=True)