import socket import threading from ultralytics import YOLO import ffmpegcv import cv2 import time import ctypes import json # 加载YOLOv8模型 model = YOLO("yolov8n.pt") # yolov8i默认目标检测(i= n,m,l,x)尾缀有-pose,-seg print("模型加载成功") isconnected = False data_thread = None count = 1 # 1.创建一个udp套接字 udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 假设你的文本文件每行都是由空格分隔的数据项,且你想将这些数据以某种结构处理(这里仅作演示,实际情况可能需要调整) def read_and_process_txt(file_path): processed_data = [] with open(file_path, "r") as file: for line in file: # 去除每行末尾的换行符 line = line.strip() # 按空格分割每行 items = line.split() # 这里简单地将分割后的数据作为一个列表加入到结果列表中 # 实际应用中,你可能需要根据具体数据结构进行更复杂的处理 processed_data.append(items) # 如果你确实需要将这些数据转换为JSON,可能需要构造一个合适的结构 # 例如,如果你的文本文件每行代表一个对象的属性,你可能需要构建字典 # 下面是一个假设的例子,将每行转换为字典并尝试转为JSON字符串 json_data = {items[0]: items[1] for items in processed_data} try: return json_data except TypeError as e: print(f"Error converting to JSON: {e}") config = read_and_process_txt("./config.txt") def stop_thread(thread): """尝试强制停止一个线程""" if not thread.is_alive(): return tid = ctypes.c_long(thread.ident) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(SystemExit)) if res == 0: raise ValueError("线程可能已经终止") elif res > 1: # 如果返回值大于1,说明有多个线程ID与给定ID相同,需要清理 ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("未知错误") # 定义发送数据的函数 def send_data(client_socket, video_path1, yasuo=90, confident=0.8, udpPort=8967): global count global udp_socket print("第" + str(count) + "次连接" + " " + video_path1) count += 1 global isconnected cap = ffmpegcv.ReadLiveLast(ffmpegcv.VideoCaptureStreamRT, video_path1) # 不缓存 print("第" + str(count - 1) + "次连接" + " " + video_path1 + "成功") isconnected = True frame_rate_divider = 8 # 设置帧率除数 frame_count = 0 # 初始化帧计数器 while True: try: ret, frame = cap.read() if ret: frameCopy = frame.copy() # 在该帧上运行YOLOv8推理 if frame_count % frame_rate_divider == 0: # 获取处理结果 texts = [] results = model(frame) data = results[0].boxes.data # 解析检测结果 for i in range(data.shape[0]): label_id = int(data[i, 5]) label = results[0].names[label_id] confidence = float(data[i, 4]) bbox = data[i, :4] texts.append( { "label": label, "confidence": confidence, "bbox": "({:.2f}, {:.2f}, {:.2f}, {:.2f})".format( bbox[0], bbox[1], bbox[2], bbox[3] ), } ) frame_count += 1 # 更新帧计数器 personList = [] # 处理结果转换为图片 for text in texts: if text["label"] == "person" and text["confidence"] > confident: personList.append(text) bbox = eval(text["bbox"]) cv2.rectangle( frameCopy, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (255, 0, 0), 2, ) cv2.putText( frameCopy, f"person:{'%.2f' % text['confidence']}", (int(bbox[0]) + 5, int(bbox[1]) + 25), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, ) if len(personList) != 0: message_bytes = json.dumps(personList).encode("utf-8") udp_socket.sendto(message_bytes, ("127.0.0.1", udpPort)) _, img_buffer = cv2.imencode( ".jpg", frameCopy, [cv2.IMWRITE_JPEG_QUALITY, yasuo] ) client_socket.sendall(img_buffer) # 如果按下'q'则中断循环 if cv2.waitKey(1) & 0xFF == ord("q"): break else: isconnected = False except BrokenPipeError: # 当客户端断开连接时,发送数据会抛出 BrokenPipeError print("Client disconnected. Stopping data sending.") break except Exception as e: print(f"An error occurred while sending data: {e}") break def continue_start(client_socket, video_path1, yasuo, confident, udpPort): while True: time.sleep(5) global isconnected global data_thread global count if isconnected == False: print("已断开正在尝试第" + str(count) + "次连接" + " " + video_path1) stop_thread(data_thread) data_thread = threading.Thread( target=send_data, args=(client_socket, video_path1, yasuo, confident, udpPort), daemon=True, ) data_thread.start() def main(video_path1, port=8965, yasuo=90, confident=0.8, udpPort=8967): global data_thread # host = "192.168.1.194" # 服务器 IP 地址,这里使用 localhost host = "127.0.0.1" # 服务器 IP 地址,这里使用 localhost # 创建 TCP 服务器套接字 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 绑定服务器地址和端口 server_socket.bind((host, port)) # 开始监听连接,最大连接队列长度为 5 server_socket.listen(5) print(f"Server listening on {host}:{port}.") # 接受客户端连接请求 while True: client_socket, client_addr = server_socket.accept() print(f"Accepted connection from {client_addr[0]}:{client_addr[1]}.") # 创建并启动发送数据的线程 data_thread = threading.Thread( target=send_data, args=(client_socket, video_path1, yasuo, confident, udpPort), daemon=True, ) data_thread.start() # 在这里可以处理其他客户端逻辑,如接收客户端数据等 # ... time.sleep(1) data_thread2 = threading.Thread( target=continue_start, args=(client_socket, video_path1, yasuo, confident, udpPort), ) data_thread2.start() # 如果需要在某个条件满足时关闭连接,可以在此处添加相应代码 # ... # 关闭客户端连接 # client_socket.close() # print(f"Connection from {client_addr[0]}:{client_addr[1]} conneted.") if __name__ == "__main__": # rtsp流 video_path = config["rtsp1"] # 本地启动端口 port = int(config["tcpPort1"]) # 压缩率 yasuo = int(config["yasuolv1"]) print(config["confidence1"]) confident = float(config["confidence1"]) udpPort = int(config["udpPort1"]) main(video_path, port, yasuo, confident, udpPort)