zuobijiancedaima
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

217 lines
6.7 KiB

9 months ago
from flask import Flask, request, jsonify
9 months ago
from pymongo import MongoClient
9 months ago
from bson import ObjectId
import hashlib
import json
from flask import Flask, render_template, request
from flask_socketio import SocketIO, Namespace, emit
9 months ago
app = Flask(__name__)
client = MongoClient("mongodb://localhost:27019/")
db = client["back"]
collection = db["users"]
9 months ago
# sockitIo
socketio = SocketIO()
# 解决跨域问题
socketio.init_app(app, cors_allowed_origins="*")
# 用来存放客户端的 sid,即 session id
# 可以不单独定义字典存放 sid与namespace,flask-socketio 默认将 sid 存放在 room 中
socket_pool = {}
# Websocket 通过namespace 和 sid 标识具体客户端
# 第一个 Websocket 类
class MyCustomNamespace(Namespace):
name_space = "/msg"
# 连接成功调用的方法
def on_connect(self):
global socket_pool
print("connect..")
print("-----------------")
print(self.server.manager)
print(self.socketio)
print("-----------------")
print(request.namespace)
socket_pool[request.sid] = self.name_space
print(socket_pool)
# 断开连接调用的方法
def on_disconnect(self):
global socket_pool
print("disconnect...")
del socket_pool[request.sid]
print(socket_pool)
# 往 接收客户端标消息识为 'message' 的方法
def on_message(self, data):
print(data)
# 把消息发送到客户端的 'response' 标识的方法中, 一般是 on_response()
emit("response", "123", to=request.sid)
print("--------------")
print(request.sid)
print("--------------")
print(socketio.server.manager.rooms)
# 往 接收客户端标消息识为 'hello' 的方法
def on_hello(self, data):
print("hello world")
# 发送消息
def send(self, data):
# 向 namespace中的所有 Websocket 连接广播消息, namespace参数不能少, to缺省是广播模式
emit("response", data, namespace=self.name_space)
# 向 sid 所标识的客户端 单播
emit("response", data, namespace=self.name_space, to=request.sid)
# 为了详细展示不同类,这里就不做继承
# 第二个 Websocket 类
class MyCustomNamespace1(Namespace):
name_space = "/vedio"
def on_connect(self):
global socket_pool
print("connect..")
print(request.namespace)
socket_pool[request.sid] = self.name_space
print(socket_pool)
def on_disconnect(self):
global socket_pool
print("disconnect...")
del socket_pool[request.sid]
print(socket_pool)
def on_message(self, data):
print(data)
emit("response", "123")
self.send("asd")
# 发送消息
def send(self, data):
emit("response", data, namespace=self.name_space)
@app.route("/sendMsg")
def sendMsg():
for sid, namespace_value in socket_pool.items():
print(sid)
emit("response", "123456", namespace=namespace_value, to=sid)
print("ok")
return "ok"
# md5加密
def md5_encrypt(data):
md5 = hashlib.md5()
md5.update(data.encode("utf-8"))
return md5.hexdigest()
9 months ago
9 months ago
# 初始化数据
def initData():
9 months ago
admin = collection.find_one({"isAdmin": 1, "isExit": 1})
if admin is None:
# 学校名称、学校代号、专业名称、专业代号、年级、班级、学生姓名、学号、密码、成绩、作弊情况(作弊类型、作弊时间、作弊图片)、考试类型、考试科目、考试时间段、是否是管理员、是否删除、创建时间、更新时间
# xuexiaomingcheng、xuexiaodaihao、zhuanyemingcheng、zhuanyedaihao、nianji、banji、xueshengxingming、xuehao、mima、chengji、zuobiqingkuang(zuobileixing、zuobishijian、zuobitupian)、kaoshileixing、kaoshikemu、kaoshishijianduan、isAdmin、isExit、chuangjianshijian、gengxinshijian
9 months ago
user = {
"xuexiaomingcheng": "",
"xuexiaodaihao": "",
"zhuanyemingcheng": "",
"zhuanyedaihao": "",
"nianji": "",
"banji": "",
"xueshengxingming": "老师",
"xuehao": "0000",
9 months ago
"mima": md5_encrypt("0000"),
9 months ago
"chengji": "",
"zuobiqingkuang": [],
"kaoshileixing": "",
"kaoshikemu": "",
"kaoshishijianduan": "",
"isAdmin": 1,
"isExit": 1,
"chuangjianshijian": "",
"gengxinshijian": "",
}
collection.insert_one(user)
9 months ago
print("管理账号'老师'已被初始化新建")
9 months ago
else:
9 months ago
print(f"管理账号'{admin['xueshengxingming']}'已存在")
# 判断是否是管理员
def isAdmin(id):
mongoId = ObjectId(id)
userObj = collection.find_one({"_id": mongoId, "isExit": 1})
# 检查 userObj 是否为 None
if userObj is None:
return False
# 默认返回 False 如果 isAdmin 字段不存在
adminBool = userObj.get("isAdmin", False)
return bool(adminBool)
9 months ago
9 months ago
# 用户登录
@app.route("/userLogin", methods=["post"])
def login():
resData = request.data
# 检测是否有数据
if not resData:
return {"code": 200, "msg": "请填写信息后登录", "list": [], "hasError": True}
# 获取到POST过来的数据,转为json形式
userJson = json.loads(resData)
res = collection.find_one({"xuehao": userJson["xuehao"], "mima": userJson["mima"]})
if res is None:
return {"code": 200, "msg": "暂无此用户", "list": [], "hasError": True}
# serialized_data = []
# for item in data:
# item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串
# serialized_data.append(item)
# 将ObjectId转换为字符串
res["_id"] = str(res["_id"])
return {"code": 200, "msg": "登陆成功", "list": [res], "hasError": False}
9 months ago
# 新增用户
9 months ago
@app.route("/addUser", methods=["post"])
9 months ago
def insert_data():
user = {"name": "John Doe", "age": 25, "city": "New York"}
collection.insert_one(user)
return "Data inserted successfully!"
# 查询用户
9 months ago
@app.route("/getUser", methods=["post"])
9 months ago
def query_data():
users = collection.find()
result = ""
for user in users:
result += f"Name: {user['name']}, Age: {user['age']}, City: {user['city']}<br>"
return result
# 更新用户
9 months ago
@app.route("/updateUser", methods=["post"])
9 months ago
def update_data():
query = {"name": "John Doe"}
new_data = {"$set": {"age": 30, "city": "San Francisco"}}
collection.update_one(query, new_data)
return "Data updated successfully!"
# 删除用户
9 months ago
@app.route("/delUser", methods=["post"])
9 months ago
def delete_data():
query = {"name": "John Doe"}
collection.delete_one(query)
return "Data deleted successfully!"
if __name__ == "__main__":
9 months ago
initData()
9 months ago
app.run(debug=True)