zuobijiancedaima
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

286 lines
9.7 KiB

from flask import Flask, request, jsonify
from pymongo import MongoClient
from bson import ObjectId
import hashlib
import json
from flask import Flask, render_template, request
from flask_socketio import SocketIO, emit, join_room, leave_room
import time
app = Flask(__name__)
client = MongoClient("mongodb://localhost:27019/")
db = client["back"]
collection = db["users"]
# sockitIo解决跨域问题
app.config["SECRET_KEY"] = "secret!"
socketio = SocketIO(app, cors_allowed_origins="*")
# 存储连接的客户端的房间号
clients = {}
@socketio.on("connect", namespace="/ws/video")
def on_connect():
print("Client connected")
@socketio.on("disconnect", namespace="/ws/video")
def on_disconnect():
print("Client disconnected")
# 移除客户端的房间关联
room = clients.pop(request.sid, None)
if room:
leave_room(room)
@socketio.on("join", namespace="/ws/video")
def on_join(data):
room = data["room"]
join_room(room)
clients[request.sid] = room
@socketio.on("leave", namespace="/ws/video")
def on_leave(data):
room = data["room"]
leave_room(room)
clients.pop(request.sid, None)
@socketio.on("message", namespace="/ws/video")
def handle_video_frame(message):
# 广播视频帧给房间内的所有客户端,除了发送该帧的客户端
room = clients.get(request.sid)
if room:
emit("message", message, room=room, include_self=False)
# md5加密
def md5_encrypt(data):
md5 = hashlib.md5()
md5.update(data.encode("utf-8"))
return md5.hexdigest()
# 初始化数据
def initData():
admin = collection.find_one({"isAdmin": 1, "isExit": 1})
if admin is None:
# 学校名称、学校代号、专业名称、专业代号、年级、班级、学生姓名、学号、密码、成绩、作弊情况(作弊类型、作弊时间、作弊图片)、考试类型、考试科目、考试时间段、考试链接、是否是管理员、是否删除、创建时间、更新时间
# xuexiaomingcheng、xuexiaodaihao、zhuanyemingcheng、zhuanyedaihao、nianji、banji、xueshengxingming、xuehao、mima、chengji、zuobiqingkuang(zuobileixing、zuobishijian、zuobitupian)、kaoshileixing、kaoshikemu、kaoshishijianduan、kaoshilianjie、isAdmin、isExit、chuangjianshijian、gengxinshijian
user = {
"xuexiaomingcheng": "",
"xuexiaodaihao": "",
"zhuanyemingcheng": "",
"zhuanyedaihao": "",
"nianji": "",
"banji": "",
"xueshengxingming": "老师",
"xuehao": "0000",
"mima": md5_encrypt("0000"),
"chengji": "",
"zuobiqingkuang": [],
"kaoshileixing": "",
"kaoshikemu": "",
"kaoshishijianduan": "",
"kaoshilianjie": "",
"isAdmin": 1,
"isExit": 1,
"chuangjianshijian": "",
"gengxinshijian": "",
}
collection.insert_one(user)
print("管理账号'老师'已被初始化新建")
else:
print(f"管理账号'{admin['xueshengxingming']}'已存在")
# 判断是否是管理员
def isAdmin(id):
mongoId = ObjectId(id)
userObj = collection.find_one({"_id": mongoId, "isExit": 1})
# 检查 userObj 是否为 None
if userObj is None:
return False
# 默认返回 False 如果 isAdmin 字段不存在
adminBool = userObj.get("isAdmin", False)
return bool(adminBool)
# 用户登录
@app.route("/userLogin", methods=["post"])
def login():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "请填写信息后登录", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
res = collection.find_one({"xuehao": userJson["xuehao"], "mima": userJson["mima"]})
if res is None:
return {"code": 200, "msg": "暂无此用户", "list": [], "hasError": True}
# serialized_data = []
# for item in data:
# item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
# serialized_data.append(item)
# 将ObjectId转换为字符串
res["_id"] = str(res["_id"])
return {"code": 200, "msg": "登陆成功", "list": [res], "hasError": False}
# 新增用户
@app.route("/addUser", methods=["post"])
def insert_data():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "无数据", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
_id = ObjectId()
print(77337, str(_id))
userItem = {
"_id": _id,
"xuexiaomingcheng": userJson["xuexiaomingcheng"],
"xuexiaodaihao": userJson["xuexiaodaihao"],
"zhuanyemingcheng": userJson["zhuanyemingcheng"],
"zhuanyedaihao": userJson["zhuanyedaihao"],
"nianji": userJson["nianji"],
"banji": userJson["banji"],
"xueshengxingming": userJson["xueshengxingming"],
"xuehao": userJson["xuehao"],
"mima": md5_encrypt(userJson["mima"]),
"chengji": userJson["chengji"],
"zuobiqingkuang": userJson["zuobiqingkuang"],
"kaoshileixing": userJson["kaoshileixing"],
"kaoshikemu": userJson["kaoshikemu"],
"kaoshishijianduan": userJson["kaoshishijianduan"],
"kaoshilianjie": "/student/" + str(_id), # 需要自动生成一个链接
"isAdmin": 0,
"isExit": 1,
"chuangjianshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
collection.insert_one(userItem)
return "新增成功"
# 查询用户
@app.route("/getUser", methods=["post"])
def query_data():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "请填写信息后登录", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
print(777, userJson, admin)
if admin:
users = collection.find()
serialized_data = []
for item in users:
item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
serialized_data.append(item)
return {
"code": 200,
"msg": "查询成功",
"list": serialized_data,
"hasError": False,
}
else:
mongoId = ObjectId(userJson["id"])
userObj = collection.find_one({"_id": mongoId, "isExit": 1})
userObj["_id"] = str(userObj["_id"]) # 将ObjectId转换为字符串
return {
"code": 200,
"msg": "查询成功",
"list": [userObj],
"hasError": False,
}
# 更新用户
@app.route("/updateUser", methods=["post"])
def update_data():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "无数据", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
userItem = {
"xuexiaomingcheng": userJson["xuexiaomingcheng"],
"xuexiaodaihao": userJson["xuexiaodaihao"],
"zhuanyemingcheng": userJson["zhuanyemingcheng"],
"zhuanyedaihao": userJson["zhuanyedaihao"],
"nianji": userJson["nianji"],
"banji": userJson["banji"],
"xueshengxingming": userJson["xueshengxingming"],
"xuehao": userJson["xuehao"],
"chengji": userJson["chengji"],
"zuobiqingkuang": userJson["zuobiqingkuang"],
"kaoshileixing": userJson["kaoshileixing"],
"kaoshikemu": userJson["kaoshikemu"],
"kaoshishijianduan": userJson["kaoshishijianduan"],
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
collection.update_one(
{"_id": ObjectId(userJson["userid"])},
{"$set": userItem},
)
return "修改成功"
# 删除用户
@app.route("/delUser", methods=["post"])
def delete_data():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "无数据", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
serialized_data = []
for item in userJson["ids"]:
serialized_data.append(ObjectId(item)) # 将ObjectId转换为字符串
collection.delete_many({"_id": {"$in": serialized_data}})
return "删除成功"
# 修改密码
@app.route("/updatePassword", methods=["post"])
def updatePassword():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "无数据", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
print(777, userJson, admin)
if admin:
collection.update_one(
{"_id": ObjectId(userJson["userid"])},
{
"$set": {"mima": md5_encrypt(userJson["mima"])},
},
)
return {"code": 200, "msg": "修改成功", "list": [], "hasError": False}
if __name__ == "__main__":
initData()
# app.run(debug=True)
socketio.run(app, debug=True)