from flask import Flask, request, jsonify from pymongo import MongoClient from bson import ObjectId import hashlib import json from flask import Flask, render_template, request from flask_socketio import SocketIO, Namespace, emit import time app = Flask(__name__) client = MongoClient("mongodb://localhost:27019/") db = client["back"] collection = db["users"] # sockitIo socketio = SocketIO() # 解决跨域问题 socketio.init_app(app, cors_allowed_origins="*") # 用来存放客户端的 sid,即 session id # 可以不单独定义字典存放 sid与namespace,flask-socketio 默认将 sid 存放在 room 中 socket_pool = {} # Websocket 通过namespace 和 sid 标识具体客户端 # 第一个 Websocket 类 class MyCustomNamespace(Namespace): name_space = "/msg" # 连接成功调用的方法 def on_connect(self): global socket_pool print("connect..") print("-----------------") print(self.server.manager) print(self.socketio) print("-----------------") print(request.namespace) socket_pool[request.sid] = self.name_space print(socket_pool) # 断开连接调用的方法 def on_disconnect(self): global socket_pool print("disconnect...") del socket_pool[request.sid] print(socket_pool) # 往 接收客户端标消息识为 'message' 的方法 def on_message(self, data): print(data) # 把消息发送到客户端的 'response' 标识的方法中, 一般是 on_response() emit("response", "123", to=request.sid) print("--------------") print(request.sid) print("--------------") print(socketio.server.manager.rooms) # 往 接收客户端标消息识为 'hello' 的方法 def on_hello(self, data): print("hello world") # 发送消息 def send(self, data): # 向 namespace中的所有 Websocket 连接广播消息, namespace参数不能少, to缺省是广播模式 emit("response", data, namespace=self.name_space) # 向 sid 所标识的客户端 单播 emit("response", data, namespace=self.name_space, to=request.sid) # 为了详细展示不同类,这里就不做继承 # 第二个 Websocket 类 class MyCustomNamespace1(Namespace): name_space = "/vedio" def on_connect(self): global socket_pool print("connect..") print(request.namespace) socket_pool[request.sid] = self.name_space print(socket_pool) def on_disconnect(self): global socket_pool print("disconnect...") del socket_pool[request.sid] print(socket_pool) def on_message(self, data): print(data) emit("response", "123") self.send("asd") # 发送消息 def send(self, data): emit("response", data, namespace=self.name_space) @app.route("/sendMsg") def sendMsg(): for sid, namespace_value in socket_pool.items(): print(sid) emit("response", "123456", namespace=namespace_value, to=sid) print("ok") return "ok" # md5加密 def md5_encrypt(data): md5 = hashlib.md5() md5.update(data.encode("utf-8")) return md5.hexdigest() # 初始化数据 def initData(): admin = collection.find_one({"isAdmin": 1, "isExit": 1}) if admin is None: # 学校名称、学校代号、专业名称、专业代号、年级、班级、学生姓名、学号、密码、成绩、作弊情况(作弊类型、作弊时间、作弊图片)、考试类型、考试科目、考试时间段、考试链接、是否是管理员、是否删除、创建时间、更新时间 # xuexiaomingcheng、xuexiaodaihao、zhuanyemingcheng、zhuanyedaihao、nianji、banji、xueshengxingming、xuehao、mima、chengji、zuobiqingkuang(zuobileixing、zuobishijian、zuobitupian)、kaoshileixing、kaoshikemu、kaoshishijianduan、kaoshilianjie、isAdmin、isExit、chuangjianshijian、gengxinshijian user = { "xuexiaomingcheng": "", "xuexiaodaihao": "", "zhuanyemingcheng": "", "zhuanyedaihao": "", "nianji": "", "banji": "", "xueshengxingming": "老师", "xuehao": "0000", "mima": md5_encrypt("0000"), "chengji": "", "zuobiqingkuang": [], "kaoshileixing": "", "kaoshikemu": "", "kaoshishijianduan": "", "kaoshilianjie": "", "isAdmin": 1, "isExit": 1, "chuangjianshijian": "", "gengxinshijian": "", } collection.insert_one(user) print("管理账号'老师'已被初始化新建") else: print(f"管理账号'{admin['xueshengxingming']}'已存在") # 判断是否是管理员 def isAdmin(id): mongoId = ObjectId(id) userObj = collection.find_one({"_id": mongoId, "isExit": 1}) # 检查 userObj 是否为 None if userObj is None: return False # 默认返回 False 如果 isAdmin 字段不存在 adminBool = userObj.get("isAdmin", False) return bool(adminBool) # 用户登录 @app.route("/userLogin", methods=["post"]) def login(): resData = request.data # 检测是否有数据 if not resData: return {"code": 200, "msg": "请填写信息后登录", "list": [], "hasError": True} # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) res = collection.find_one({"xuehao": userJson["xuehao"], "mima": userJson["mima"]}) if res is None: return {"code": 200, "msg": "暂无此用户", "list": [], "hasError": True} # serialized_data = [] # for item in data: # item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串 # serialized_data.append(item) # 将ObjectId转换为字符串 res["_id"] = str(res["_id"]) return {"code": 200, "msg": "登陆成功", "list": [res], "hasError": False} # 新增用户 @app.route("/addUser", methods=["post"]) def insert_data(): resData = request.data # 检测是否有数据 if not resData: return {"code": 200, "msg": "无数据", "list": [], "hasError": True} # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) admin = isAdmin(userJson["id"]) print(777, userJson, admin) if admin: if userJson["subType"] is "add": userItem = { "xuexiaomingcheng": userJson["xuexiaomingcheng"], "xuexiaodaihao": userJson["xuexiaodaihao"], "zhuanyemingcheng": userJson["zhuanyemingcheng"], "zhuanyedaihao": userJson["zhuanyedaihao"], "nianji": userJson["nianji"], "banji": userJson["banji"], "xueshengxingming": userJson["xueshengxingming"], "xuehao": userJson["xuehao"], "mima": md5_encrypt(userJson["mima"]), "chengji": userJson["chengji"], "zuobiqingkuang": userJson["zuobiqingkuang"], "kaoshileixing": userJson["kaoshileixing"], "kaoshikemu": userJson["kaoshikemu"], "kaoshishijianduan": userJson["kaoshishijianduan"], "kaoshilianjie": "", # 需要自动生成一个链接 "isAdmin": 0, "isExit": 1, "chuangjianshijian": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(time.time()) ), "gengxinshijian": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(time.time()) ), } collection.insert_one(userItem) return "新增成功" # 查询用户 @app.route("/getUser", methods=["post"]) def query_data(): resData = request.data # 检测是否有数据 if not resData: return {"code": 200, "msg": "请填写信息后登录", "list": [], "hasError": True} # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) admin = isAdmin(userJson["id"]) print(777, userJson, admin) if admin: users = collection.find() serialized_data = [] for item in users: item["_id"] = str(item["_id"]) # 将ObjectId转换为字符串 serialized_data.append(item) return { "code": 200, "msg": "查询成功", "list": serialized_data, "hasError": False, } else: return {"code": 200, "msg": "您不是管理员用户", "list": [], "hasError": True} # 更新用户 @app.route("/updateUser", methods=["post"]) def update_data(): resData = request.data # 检测是否有数据 if not resData: return {"code": 200, "msg": "无数据", "list": [], "hasError": True} # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) admin = isAdmin(userJson["id"]) if admin: userItem = { "xuexiaomingcheng": userJson["xuexiaomingcheng"], "xuexiaodaihao": userJson["xuexiaodaihao"], "zhuanyemingcheng": userJson["zhuanyemingcheng"], "zhuanyedaihao": userJson["zhuanyedaihao"], "nianji": userJson["nianji"], "banji": userJson["banji"], "xueshengxingming": userJson["xueshengxingming"], "xuehao": userJson["xuehao"], "chengji": userJson["chengji"], "zuobiqingkuang": userJson["zuobiqingkuang"], "kaoshileixing": userJson["kaoshileixing"], "kaoshikemu": userJson["kaoshikemu"], "kaoshishijianduan": userJson["kaoshishijianduan"], "gengxinshijian": time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime(time.time()) ), } collection.update_one( {"_id": ObjectId(userJson["userid"])}, {"$set": userItem}, ) return "修改成功" # 删除用户 @app.route("/delUser", methods=["post"]) def delete_data(): resData = request.data # 检测是否有数据 if not resData: return {"code": 200, "msg": "无数据", "list": [], "hasError": True} # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) admin = isAdmin(userJson["id"]) if admin: serialized_data = [] for item in userJson["ids"]: serialized_data.append(ObjectId(item)) # 将ObjectId转换为字符串 collection.delete_many({"_id": {"$in": serialized_data}}) return "删除成功" # 修改密码 @app.route("/updatePassword", methods=["post"]) def updatePassword(): resData = request.data # 检测是否有数据 if not resData: return {"code": 200, "msg": "无数据", "list": [], "hasError": True} # 获取到POST过来的数据,转为json形式 userJson = json.loads(resData) admin = isAdmin(userJson["id"]) print(777, userJson, admin) if admin: collection.update_one( {"_id": ObjectId(userJson["userid"])}, { "$set": {"mima": md5_encrypt(userJson["mima"])}, }, ) return {"code": 200, "msg": "修改成功", "list": [], "hasError": False} if __name__ == "__main__": initData() app.run(debug=True)