import socket import threading from ultralytics import YOLO import ffmpegcv import cv2 import time import ctypes import json # 加载YOLOv8模型 model = YOLO("yolov8n.pt") # yolov8i默认目标检测(i= n,m,l,x)尾缀有-pose,-seg print("模型加载成功") isconnected = False data_thread = None count = 1 # 1.创建一个udp套接字 udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 假设你的文本文件每行都是由空格分隔的数据项,且你想将这些数据以某种结构处理(这里仅作演示,实际情况可能需要调整) def read_and_process_txt(file_path): processed_data = [] with open(file_path, "r") as file: for line in file: # 去除每行末尾的换行符 line = line.strip() # 按空格分割每行 items = line.split() # 这里简单地将分割后的数据作为一个列表加入到结果列表中 # 实际应用中,你可能需要根据具体数据结构进行更复杂的处理 processed_data.append(items) # 如果你确实需要将这些数据转换为JSON,可能需要构造一个合适的结构 # 例如,如果你的文本文件每行代表一个对象的属性,你可能需要构建字典 # 下面是一个假设的例子,将每行转换为字典并尝试转为JSON字符串 json_data = {items[0]: items[1] for items in processed_data} try: return json_data except TypeError as e: print(f"Error converting to JSON: {e}") baseConfig = read_and_process_txt("./base.txt") config = read_and_process_txt("./config.txt") notPng = int(baseConfig["notPng1"]) def stop_thread(thread): """尝试强制停止一个线程""" if not thread.is_alive(): return tid = ctypes.c_long(thread.ident) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(SystemExit)) if res == 0: raise ValueError("线程可能已经终止") elif res > 1: # 如果返回值大于1,说明有多个线程ID与给定ID相同,需要清理 ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("未知错误") # 定义发送数据的函数 def send_data(video_path, udpPngPort, udpLabelPort, yasuo, confident): global count global udp_socket global notPng print("第" + str(count) + "次连接" + " " + video_path) count += 1 global isconnected cap = ffmpegcv.ReadLiveLast(ffmpegcv.VideoCaptureStreamRT, video_path) # 不缓存 print("第" + str(count - 1) + "次连接" + " " + video_path + "成功") isconnected = True frame_rate_divider = 3 # 设置帧率除数 frame_count = 0 # 初始化帧计数器 while True: try: ret, frame = cap.read() if ret: frameCopy = frame.copy() # 在该帧上运行YOLOv8推理 if frame_count % frame_rate_divider == 0: # 获取处理结果 texts = [] results = model(frame) data = results[0].boxes.data # 解析检测结果 for i in range(data.shape[0]): label_id = int(data[i, 5]) label = results[0].names[label_id] confidence = float(data[i, 4]) bbox = data[i, :4] texts.append( { "label": label, "confidence": confidence, "bbox": "({:.2f}, {:.2f}, {:.2f}, {:.2f})".format( bbox[0], bbox[1], bbox[2], bbox[3] ), } ) frame_count += 1 # 更新帧计数器 personList = [] # 处理结果转换为图片 for text in texts: if text["label"] == "person" and text["confidence"] > confident: personList.append(text) bbox = eval(text["bbox"]) cv2.rectangle( frameCopy, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), (0, 0, 255), 2, ) # cv2.putText( # frameCopy, # f"person:{'%.2f' % text['confidence']}", # (int(bbox[0]) + 5, int(bbox[1]) + 25), # cv2.FONT_HERSHEY_SIMPLEX, # 1, # (255, 0, 0), # 2, # ) if len(personList) != 0: message_bytes = json.dumps(personList).encode("utf-8") udp_socket.sendto(message_bytes, ("127.0.0.1", udpLabelPort)) _, img_buffer = cv2.imencode( ".jpg", frameCopy, [cv2.IMWRITE_JPEG_QUALITY, yasuo] ) if notPng == 0: print(7777, img_buffer.size) udp_socket.sendto(img_buffer, ("127.0.0.1", udpPngPort)) # 如果按下'q'则中断循环 if cv2.waitKey(1) & 0xFF == ord("q"): break else: isconnected = False except BrokenPipeError: # 当客户端断开连接时,发送数据会抛出 BrokenPipeError print("Client disconnected. Stopping data sending.") break except Exception as e: print(f"An error occurred while sending data: {e}") break def continue_start(video_path, udpPngPort, udpLabelPort, yasuo, confident): while True: time.sleep(5) global isconnected global data_thread global count if isconnected == False: print("已断开正在尝试第" + str(count) + "次连接" + " " + video_path) stop_thread(data_thread) data_thread = threading.Thread( target=send_data, args=(video_path, udpPngPort, udpLabelPort, yasuo, confident), daemon=True, ) data_thread.start() def main(video_path, udpPngPort, udpLabelPort, yasuo, confident): global data_thread # 创建并启动发送数据的线程 data_thread = threading.Thread( target=send_data, args=(video_path, udpPngPort, udpLabelPort, yasuo, confident), daemon=True, ) data_thread.start() # 在这里可以处理其他客户端逻辑,如接收客户端数据等 # ... time.sleep(1) data_thread2 = threading.Thread( target=continue_start, args=(video_path, udpPngPort, udpLabelPort, yasuo, confident), ) data_thread2.start() if __name__ == "__main__": # rtsp流 video_path = baseConfig["rtsp1"] # 图片传输端口udpPngPort udpPngPort = int(baseConfig["udpPngPort1"]) # 标签传输端口udpPngPort udpLabelPort = int(baseConfig["udpLabelPort1"]) # 压缩率 yasuo = int(config["yasuolv1"]) # 置信度 confident = float(config["confidence1"]) print("启动", video_path, udpPngPort, udpLabelPort, yasuo, confident) main(video_path, udpPngPort, udpLabelPort, yasuo, confident)