zuobijiancedaima
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

336 lines
12 KiB

from flask import Flask, request, jsonify
from pymongo import MongoClient
from bson import ObjectId
import hashlib
import json
from flask import Flask, render_template, request
from flask_socketio import SocketIO, Namespace, emit
import time
app = Flask(__name__)
client = MongoClient("mongodb://localhost:27019/")
db = client["back"]
collection = db["users"]
# sockitIo解决跨域问题
socketMsg = SocketIO(app, cors_allowed_origins="*")
socketVedio = SocketIO(app, cors_allowed_origins="*")
# 用来存放客户端的 sid,即 session id
# 可以不单独定义字典存放 sid与namespace,flask-socketio 默认将 sid 存放在 room 中
socket_pool = {}
# Websocket 通过namespace 和 sid 标识具体客户端
# 第一个 Websocket 类用于聊天
class MyCustomNamespace(Namespace):
name_space = "/msg"
# 连接成功调用的方法
def on_connect(self):
global socket_pool
socket_pool[request.sid] = self.name_space
print(1, socket_pool)
print(2, "Client connected")
print(3, "-----------------")
# 断开连接调用的方法
def on_disconnect(self):
global socket_pool
print(4, "disconnect...")
del socket_pool[request.sid]
print(5, socket_pool)
# 往 接收客户端标消息识为 'message' 的方法
def on_message(self, data):
print(6, data)
# 把消息发送到客户端的 'response' 标识的方法中, 一般是 on_response()
emit("response", "123", to=request.sid)
print(7, "--------------")
print(8, request.sid)
print(9, "--------------")
print(10, socketio.server.manager.rooms)
# 往 接收客户端标消息识为 'hello' 的方法
def on_hello(self, data):
print(11, "hello world")
# 发送消息
def send(self, data):
# 向 namespace中的所有 Websocket 连接广播消息, namespace参数不能少, to缺省是广播模式
emit("response", data, namespace=self.name_space)
# 向 sid 所标识的客户端 单播
emit("response", data, namespace=self.name_space, to=request.sid)
# 为了详细展示不同类,这里就不做继承
# 第二个 Websocket 类用于视频
class MyCustomNamespace1(Namespace):
name_space = "/vedio"
def on_connect(self):
global socket_pool
print(12, "connect..")
print(13, request.namespace)
socket_pool[request.sid] = self.name_space
print(14, socket_pool)
def on_disconnect(self):
global socket_pool
print(15, "disconnect...")
del socket_pool[request.sid]
print(16, socket_pool)
def on_message(self, data):
print(17, data)
emit("response", "123")
self.send("asd")
# 发送消息
def send(self, data):
emit("response", data, namespace=self.name_space)
socketMsg.on_namespace(MyCustomNamespace("/msg"))
socketVedio.on_namespace(MyCustomNamespace("/vedio"))
@app.route("/sendMsg")
def sendMsg():
for sid, namespace_value in socket_pool.items():
print(18, sid)
emit("response", "123456", namespace=namespace_value, to=sid)
print(19, "ok")
return "ok"
# md5加密
def md5_encrypt(data):
md5 = hashlib.md5()
md5.update(data.encode("utf-8"))
return md5.hexdigest()
# 初始化数据
def initData():
admin = collection.find_one({"isAdmin": 1, "isExit": 1})
if admin is None:
# 学校名称、学校代号、专业名称、专业代号、年级、班级、学生姓名、学号、密码、成绩、作弊情况(作弊类型、作弊时间、作弊图片)、考试类型、考试科目、考试时间段、考试链接、是否是管理员、是否删除、创建时间、更新时间
# xuexiaomingcheng、xuexiaodaihao、zhuanyemingcheng、zhuanyedaihao、nianji、banji、xueshengxingming、xuehao、mima、chengji、zuobiqingkuang(zuobileixing、zuobishijian、zuobitupian)、kaoshileixing、kaoshikemu、kaoshishijianduan、kaoshilianjie、isAdmin、isExit、chuangjianshijian、gengxinshijian
user = {
"xuexiaomingcheng": "",
"xuexiaodaihao": "",
"zhuanyemingcheng": "",
"zhuanyedaihao": "",
"nianji": "",
"banji": "",
"xueshengxingming": "老师",
"xuehao": "0000",
"mima": md5_encrypt("0000"),
"chengji": "",
"zuobiqingkuang": [],
"kaoshileixing": "",
"kaoshikemu": "",
"kaoshishijianduan": "",
"kaoshilianjie": "",
"isAdmin": 1,
"isExit": 1,
"chuangjianshijian": "",
"gengxinshijian": "",
}
collection.insert_one(user)
print("管理账号'老师'已被初始化新建")
else:
print(f"管理账号'{admin['xueshengxingming']}'已存在")
# 判断是否是管理员
def isAdmin(id):
mongoId = ObjectId(id)
userObj = collection.find_one({"_id": mongoId, "isExit": 1})
# 检查 userObj 是否为 None
if userObj is None:
return False
# 默认返回 False 如果 isAdmin 字段不存在
adminBool = userObj.get("isAdmin", False)
return bool(adminBool)
# 用户登录
@app.route("/userLogin", methods=["post"])
def login():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "请填写信息后登录", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
res = collection.find_one({"xuehao": userJson["xuehao"], "mima": userJson["mima"]})
if res is None:
return {"code": 200, "msg": "暂无此用户", "list": [], "hasError": True}
# serialized_data = []
# for item in data:
# item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
# serialized_data.append(item)
# 将ObjectId转换为字符串
res["_id"] = str(res["_id"])
return {"code": 200, "msg": "登陆成功", "list": [res], "hasError": False}
# 新增用户
@app.route("/addUser", methods=["post"])
def insert_data():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "无数据", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
_id = ObjectId()
print(77337, str(_id))
userItem = {
"_id": _id,
"xuexiaomingcheng": userJson["xuexiaomingcheng"],
"xuexiaodaihao": userJson["xuexiaodaihao"],
"zhuanyemingcheng": userJson["zhuanyemingcheng"],
"zhuanyedaihao": userJson["zhuanyedaihao"],
"nianji": userJson["nianji"],
"banji": userJson["banji"],
"xueshengxingming": userJson["xueshengxingming"],
"xuehao": userJson["xuehao"],
"mima": md5_encrypt(userJson["mima"]),
"chengji": userJson["chengji"],
"zuobiqingkuang": userJson["zuobiqingkuang"],
"kaoshileixing": userJson["kaoshileixing"],
"kaoshikemu": userJson["kaoshikemu"],
"kaoshishijianduan": userJson["kaoshishijianduan"],
"kaoshilianjie": "/student/" + str(_id), # 需要自动生成一个链接
"isAdmin": 0,
"isExit": 1,
"chuangjianshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
collection.insert_one(userItem)
return "新增成功"
# 查询用户
@app.route("/getUser", methods=["post"])
def query_data():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "请填写信息后登录", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
print(777, userJson, admin)
if admin:
users = collection.find()
serialized_data = []
for item in users:
item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
serialized_data.append(item)
return {
"code": 200,
"msg": "查询成功",
"list": serialized_data,
"hasError": False,
}
else:
mongoId = ObjectId(userJson["id"])
userObj = collection.find_one({"_id": mongoId, "isExit": 1})
userObj["_id"] = str(userObj["_id"]) # 将ObjectId转换为字符串
return {
"code": 200,
"msg": "查询成功",
"list": [userObj],
"hasError": False,
}
# 更新用户
@app.route("/updateUser", methods=["post"])
def update_data():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "无数据", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
userItem = {
"xuexiaomingcheng": userJson["xuexiaomingcheng"],
"xuexiaodaihao": userJson["xuexiaodaihao"],
"zhuanyemingcheng": userJson["zhuanyemingcheng"],
"zhuanyedaihao": userJson["zhuanyedaihao"],
"nianji": userJson["nianji"],
"banji": userJson["banji"],
"xueshengxingming": userJson["xueshengxingming"],
"xuehao": userJson["xuehao"],
"chengji": userJson["chengji"],
"zuobiqingkuang": userJson["zuobiqingkuang"],
"kaoshileixing": userJson["kaoshileixing"],
"kaoshikemu": userJson["kaoshikemu"],
"kaoshishijianduan": userJson["kaoshishijianduan"],
"gengxinshijian": time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time())
),
}
collection.update_one(
{"_id": ObjectId(userJson["userid"])},
{"$set": userItem},
)
return "修改成功"
# 删除用户
@app.route("/delUser", methods=["post"])
def delete_data():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "无数据", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
if admin:
serialized_data = []
for item in userJson["ids"]:
serialized_data.append(ObjectId(item)) # 将ObjectId转换为字符串
collection.delete_many({"_id": {"$in": serialized_data}})
return "删除成功"
# 修改密码
@app.route("/updatePassword", methods=["post"])
def updatePassword():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "无数据", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
admin = isAdmin(userJson["id"])
print(777, userJson, admin)
if admin:
collection.update_one(
{"_id": ObjectId(userJson["userid"])},
{
"$set": {"mima": md5_encrypt(userJson["mima"])},
},
)
return {"code": 200, "msg": "修改成功", "list": [], "hasError": False}
if __name__ == "__main__":
initData()
# app.run(debug=True)
socketMsg.run(app, debug=True)
# socketVedio.run(app, debug=True)