You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

195 lines
7.4 KiB

import socket
import threading
from ultralytics import YOLO
import ffmpegcv
import cv2
import time
import ctypes
import json
# 加载YOLOv8模型
model = YOLO("yolov8n.pt") # yolov8i默认目标检测(i= n,m,l,x)尾缀有-pose,-seg
print("模型加载成功")
isconnected = False
data_thread = None
count = 1
# 1.创建一个udp套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 假设你的文本文件每行都是由空格分隔的数据项,且你想将这些数据以某种结构处理(这里仅作演示,实际情况可能需要调整)
def read_and_process_txt(file_path):
processed_data = []
with open(file_path, "r") as file:
for line in file:
# 去除每行末尾的换行符
line = line.strip()
# 按空格分割每行
items = line.split()
# 这里简单地将分割后的数据作为一个列表加入到结果列表中
# 实际应用中,你可能需要根据具体数据结构进行更复杂的处理
processed_data.append(items)
# 如果你确实需要将这些数据转换为JSON,可能需要构造一个合适的结构
# 例如,如果你的文本文件每行代表一个对象的属性,你可能需要构建字典
# 下面是一个假设的例子,将每行转换为字典并尝试转为JSON字符串
json_data = {items[0]: items[1] for items in processed_data}
try:
return json_data
except TypeError as e:
print(f"Error converting to JSON: {e}")
baseConfig = read_and_process_txt("./base.txt")
config = read_and_process_txt("./config.txt")
notPng = int(baseConfig["notPng1"])
def stop_thread(thread):
"""尝试强制停止一个线程"""
if not thread.is_alive():
return
tid = ctypes.c_long(thread.ident)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(SystemExit))
if res == 0:
raise ValueError("线程可能已经终止")
elif res > 1:
# 如果返回值大于1,说明有多个线程ID与给定ID相同,需要清理
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("未知错误")
# 定义发送数据的函数
def send_data(video_path, udpPngPort, udpLabelPort, yasuo, confident):
global count
global udp_socket
global notPng
print("" + str(count) + "次连接" + " " + video_path)
count += 1
global isconnected
cap = ffmpegcv.ReadLiveLast(ffmpegcv.VideoCaptureStreamRT, video_path) # 不缓存
print("" + str(count - 1) + "次连接" + " " + video_path + "成功")
isconnected = True
frame_rate_divider = 3 # 设置帧率除数
frame_count = 0 # 初始化帧计数器
while True:
try:
ret, frame = cap.read()
if ret:
frameCopy = frame.copy()
# 在该帧上运行YOLOv8推理
if frame_count % frame_rate_divider == 0:
# 获取处理结果
texts = []
results = model(frame)
data = results[0].boxes.data
# 解析检测结果
for i in range(data.shape[0]):
label_id = int(data[i, 5])
label = results[0].names[label_id]
confidence = float(data[i, 4])
bbox = data[i, :4]
texts.append(
{
"label": label,
"confidence": confidence,
"bbox": "({:.2f}, {:.2f}, {:.2f}, {:.2f})".format(
bbox[0], bbox[1], bbox[2], bbox[3]
),
}
)
frame_count += 1 # 更新帧计数器
personList = []
# 处理结果转换为图片
for text in texts:
if text["label"] == "person" and text["confidence"] > confident:
personList.append(text)
bbox = eval(text["bbox"])
cv2.rectangle(
frameCopy,
(int(bbox[0]), int(bbox[1])),
(int(bbox[2]), int(bbox[3])),
(0, 0, 255),
2,
)
# cv2.putText(
# frameCopy,
# f"person:{'%.2f' % text['confidence']}",
# (int(bbox[0]) + 5, int(bbox[1]) + 25),
# cv2.FONT_HERSHEY_SIMPLEX,
# 1,
# (255, 0, 0),
# 2,
# )
if len(personList) != 0:
message_bytes = json.dumps(personList).encode("utf-8")
udp_socket.sendto(message_bytes, ("127.0.0.1", udpLabelPort))
_, img_buffer = cv2.imencode(
".jpg", frameCopy, [cv2.IMWRITE_JPEG_QUALITY, yasuo]
)
if notPng == 0:
print(7777, img_buffer.size)
udp_socket.sendto(img_buffer, ("127.0.0.1", udpPngPort))
# 如果按下'q'则中断循环
if cv2.waitKey(1) & 0xFF == ord("q"):
break
else:
isconnected = False
except BrokenPipeError:
# 当客户端断开连接时,发送数据会抛出 BrokenPipeError
print("Client disconnected. Stopping data sending.")
break
except Exception as e:
print(f"An error occurred while sending data: {e}")
break
def continue_start(video_path, udpPngPort, udpLabelPort, yasuo, confident):
while True:
time.sleep(5)
global isconnected
global data_thread
global count
if isconnected == False:
print("已断开正在尝试第" + str(count) + "次连接" + " " + video_path)
stop_thread(data_thread)
data_thread = threading.Thread(
target=send_data,
args=(video_path, udpPngPort, udpLabelPort, yasuo, confident),
daemon=True,
)
data_thread.start()
def main(video_path, udpPngPort, udpLabelPort, yasuo, confident):
global data_thread
# 创建并启动发送数据的线程
data_thread = threading.Thread(
target=send_data,
args=(video_path, udpPngPort, udpLabelPort, yasuo, confident),
daemon=True,
)
data_thread.start()
# 在这里可以处理其他客户端逻辑,如接收客户端数据等
# ...
time.sleep(1)
data_thread2 = threading.Thread(
target=continue_start,
args=(video_path, udpPngPort, udpLabelPort, yasuo, confident),
)
data_thread2.start()
if __name__ == "__main__":
# rtsp流
video_path = baseConfig["rtsp1"]
# 图片传输端口udpPngPort
udpPngPort = int(baseConfig["udpPngPort1"])
# 标签传输端口udpPngPort
udpLabelPort = int(baseConfig["udpLabelPort1"])
# 压缩率
yasuo = int(config["yasuolv1"])
# 置信度
confident = float(config["confidence1"])
print("启动", video_path, udpPngPort, udpLabelPort, yasuo, confident)
main(video_path, udpPngPort, udpLabelPort, yasuo, confident)