zuobijiancedaima
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

279 lines
9.5 KiB

9 months ago
from flask import Flask, request, jsonify
9 months ago
from pymongo import MongoClient
9 months ago
from bson import ObjectId
import hashlib
import json
from flask import Flask, render_template, request
9 months ago
from flask_socketio import SocketIO, emit, join_room, leave_room
9 months ago
import time
9 months ago
app = Flask(__name__)
client = MongoClient("mongodb://localhost:27019/")
db = client["back"]
collection = db["users"]
9 months ago
# sockitIo解决跨域问题
9 months ago
app.config["SECRET_KEY"] = "secret!"
socketio = SocketIO(app, cors_allowed_origins="*")
# 存储连接的客户端的房间号
9 months ago
clients = []
adminSid = ""
9 months ago
9 months ago
@socketio.on("connect", namespace="/ws/video")
def on_connect():
9 months ago
global clients
global adminSid
sid = request.sid
# 假设客户端在连接时发送了一个 id
id = request.args.get("id")
isAdmin = request.args.get("isAdmin")
if isAdmin == "1":
adminSid = sid
print(87874, adminSid)
clients.append({"id": id, "sid": sid, "isAdmin": isAdmin})
9 months ago
9 months ago
@socketio.on("disconnect", namespace="/ws/video")
def on_disconnect():
9 months ago
global clients
for item in clients:
if item["id"] == request.sid:
clients.remove(item)
print(f"Client disconnected with id {item['id']}")
9 months ago
9 months ago
@socketio.on("video", namespace="/ws/video")
9 months ago
def handle_video_frame(message):
9 months ago
# 广播视频帧给老师
global adminSid
socketio.emit("teacherVideo", message, room=adminSid)
9 months ago
# md5加密
def md5_encrypt(data):
md5 = hashlib.md5()
md5.update(data.encode("utf-8"))
return md5.hexdigest()
9 months ago
9 months ago
# 初始化数据
def initData():
9 months ago
admin = collection.find_one({"isAdmin": 1, "isExit": 1})
if admin is None:
9 months ago
# 学校名称、学校代号、专业名称、专业代号、年级、班级、学生姓名、学号、密码、成绩、作弊情况(作弊类型、作弊时间、作弊图片)、考试类型、考试科目、考试时间段、考试链接、是否是管理员、是否删除、创建时间、更新时间
# xuexiaomingcheng、xuexiaodaihao、zhuanyemingcheng、zhuanyedaihao、nianji、banji、xueshengxingming、xuehao、mima、chengji、zuobiqingkuang(zuobileixing、zuobishijian、zuobitupian)、kaoshileixing、kaoshikemu、kaoshishijianduan、kaoshilianjie、isAdmin、isExit、chuangjianshijian、gengxinshijian
9 months ago
user = {
"xuexiaomingcheng": "",
"xuexiaodaihao": "",
"zhuanyemingcheng": "",
"zhuanyedaihao": "",
"nianji": "",
"banji": "",
"xueshengxingming": "老师",
"xuehao": "0000",
9 months ago
"mima": md5_encrypt("0000"),
9 months ago
"chengji": "",
"zuobiqingkuang": [],
"kaoshileixing": "",
"kaoshikemu": "",
"kaoshishijianduan": "",
9 months ago
"kaoshilianjie": "",
9 months ago
"isAdmin": 1,
"isExit": 1,
"chuangjianshijian": "",
"gengxinshijian": "",
}
collection.insert_one(user)
9 months ago
print("管理账号'老师'已被初始化新建")
9 months ago
else:
9 months ago
print(f"管理账号'{admin['xueshengxingming']}'已存在")
# 判断是否是管理员
def isAdmin(id):
mongoId = ObjectId(id)
userObj = collection.find_one({"_id": mongoId, "isExit": 1})
# 检查 userObj 是否为 None
if userObj is None:
return False
# 默认返回 False 如果 isAdmin 字段不存在
adminBool = userObj.get("isAdmin", False)
return bool(adminBool)
9 months ago
9 months ago
# 用户登录
@app.route("/userLogin", methods=["post"])
def login():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "请填写信息后登录", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
res = collection.find_one({"xuehao": userJson["xuehao"], "mima": userJson["mima"]})
if res is None:
return {"code": 200, "msg": "暂无此用户", "list": [], "hasError": True}
# serialized_data = []
# for item in data:
# item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
# serialized_data.append(item)
# 将ObjectId转换为字符串
res["_id"] = str(res["_id"])
return {"code": 200, "msg": "登陆成功", "list": [res], "hasError": False}
9 months ago
# 新增用户
9 months ago
@app.route("/addUser", methods=["post"])
9 months ago
def insert_data():
9 months ago
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "无数据", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
9 months ago
_id = ObjectId()
userItem = {
"_id": _id,
"xuexiaomingcheng": userJson["xuexiaomingcheng"],
"xuexiaodaihao": userJson["xuexiaodaihao"],
"zhuanyemingcheng": userJson["zhuanyemingcheng"],
"zhuanyedaihao": userJson["zhuanyedaihao"],
"nianji": userJson["nianji"],
"banji": userJson["banji"],
"xueshengxingming": userJson["xueshengxingming"],
"xuehao": userJson["xuehao"],
"mima": md5_encrypt(userJson["mima"]),
"chengji": userJson["chengji"],
"zuobiqingkuang": userJson["zuobiqingkuang"],
"kaoshileixing": userJson["kaoshileixing"],
"kaoshikemu": userJson["kaoshikemu"],
"kaoshishijianduan": userJson["kaoshishijianduan"],
"kaoshilianjie": "/student/" + str(_id), # 需要自动生成一个链接
"isAdmin": 0,
"isExit": 1,
"chuangjianshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
collection.insert_one(userItem)
return "新增成功"
9 months ago
# 查询用户
9 months ago
@app.route("/getUser", methods=["post"])
9 months ago
def query_data():
9 months ago
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "请填写信息后登录", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
users = collection.find()
serialized_data = []
for item in users:
item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
serialized_data.append(item)
return {
"code": 200,
"msg": "查询成功",
"list": serialized_data,
"hasError": False,
}
else:
9 months ago
mongoId = ObjectId(userJson["id"])
userObj = collection.find_one({"_id": mongoId, "isExit": 1})
userObj["_id"] = str(userObj["_id"]) # 将ObjectId转换为字符串
return {
"code": 200,
"msg": "查询成功",
"list": [userObj],
"hasError": False,
}
9 months ago
# 更新用户
9 months ago
@app.route("/updateUser", methods=["post"])
9 months ago
def update_data():
9 months ago
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "无数据", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
userItem = {
"xuexiaomingcheng": userJson["xuexiaomingcheng"],
"xuexiaodaihao": userJson["xuexiaodaihao"],
"zhuanyemingcheng": userJson["zhuanyemingcheng"],
"zhuanyedaihao": userJson["zhuanyedaihao"],
"nianji": userJson["nianji"],
"banji": userJson["banji"],
"xueshengxingming": userJson["xueshengxingming"],
"xuehao": userJson["xuehao"],
"chengji": userJson["chengji"],
"zuobiqingkuang": userJson["zuobiqingkuang"],
"kaoshileixing": userJson["kaoshileixing"],
"kaoshikemu": userJson["kaoshikemu"],
"kaoshishijianduan": userJson["kaoshishijianduan"],
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
collection.update_one(
{"_id": ObjectId(userJson["userid"])},
{"$set": userItem},
)
return "修改成功"
9 months ago
# 删除用户
9 months ago
@app.route("/delUser", methods=["post"])
9 months ago
def delete_data():
9 months ago
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "无数据", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
serialized_data = []
for item in userJson["ids"]:
serialized_data.append(ObjectId(item)) # 将ObjectId转换为字符串
collection.delete_many({"_id": {"$in": serialized_data}})
return "删除成功"
# 修改密码
@app.route("/updatePassword", methods=["post"])
def updatePassword():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "无数据", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
collection.update_one(
{"_id": ObjectId(userJson["userid"])},
{
"$set": {"mima": md5_encrypt(userJson["mima"])},
},
)
return {"code": 200, "msg": "修改成功", "list": [], "hasError": False}
9 months ago
if __name__ == "__main__":
9 months ago
initData()
9 months ago
# app.run(debug=True)
9 months ago
socketio.run(app, debug=True)