You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

208 lines
8.0 KiB

import socket
import threading
from ultralytics import YOLO
import ffmpegcv
import cv2
import time
import ctypes
import json
# 加载YOLOv8模型
model = YOLO("yolov8n.pt") # yolov8i默认目标检测(i= n,m,l,x)尾缀有-pose,-seg
print("模型加载成功")
isconnected = False
data_thread = None
count = 1
# 1.创建一个udp套接字
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 假设你的文本文件每行都是由空格分隔的数据项,且你想将这些数据以某种结构处理(这里仅作演示,实际情况可能需要调整)
def read_and_process_txt(file_path):
processed_data = []
with open(file_path, "r") as file:
for line in file:
# 去除每行末尾的换行符
line = line.strip()
# 按空格分割每行
items = line.split()
# 这里简单地将分割后的数据作为一个列表加入到结果列表中
# 实际应用中,你可能需要根据具体数据结构进行更复杂的处理
processed_data.append(items)
# 如果你确实需要将这些数据转换为JSON,可能需要构造一个合适的结构
# 例如,如果你的文本文件每行代表一个对象的属性,你可能需要构建字典
# 下面是一个假设的例子,将每行转换为字典并尝试转为JSON字符串
json_data = {items[0]: items[1] for items in processed_data}
try:
return json_data
except TypeError as e:
print(f"Error converting to JSON: {e}")
config = read_and_process_txt("./config.txt")
def stop_thread(thread):
"""尝试强制停止一个线程"""
if not thread.is_alive():
return
tid = ctypes.c_long(thread.ident)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(SystemExit))
if res == 0:
raise ValueError("线程可能已经终止")
elif res > 1:
# 如果返回值大于1,说明有多个线程ID与给定ID相同,需要清理
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("未知错误")
# 定义发送数据的函数
def send_data(client_socket, video_path1, yasuo=90, confident=0.8, udpPort=8967):
global count
global udp_socket
print("" + str(count) + "次连接" + " " + video_path1)
count += 1
global isconnected
cap = ffmpegcv.ReadLiveLast(ffmpegcv.VideoCaptureStreamRT, video_path1) # 不缓存
print("" + str(count - 1) + "次连接" + " " + video_path1 + "成功")
isconnected = True
frame_rate_divider = 8 # 设置帧率除数
frame_count = 0 # 初始化帧计数器
while True:
try:
ret, frame = cap.read()
if ret:
frameCopy = frame.copy()
# 在该帧上运行YOLOv8推理
if frame_count % frame_rate_divider == 0:
# 获取处理结果
texts = []
results = model(frame)
data = results[0].boxes.data
# 解析检测结果
for i in range(data.shape[0]):
label_id = int(data[i, 5])
label = results[0].names[label_id]
confidence = float(data[i, 4])
bbox = data[i, :4]
texts.append(
{
"label": label,
"confidence": confidence,
"bbox": "({:.2f}, {:.2f}, {:.2f}, {:.2f})".format(
bbox[0], bbox[1], bbox[2], bbox[3]
),
}
)
frame_count += 1 # 更新帧计数器
personList = []
# 处理结果转换为图片
for text in texts:
if text["label"] == "person" and text["confidence"] > confident:
personList.append(text)
bbox = eval(text["bbox"])
cv2.rectangle(
frameCopy,
(int(bbox[0]), int(bbox[1])),
(int(bbox[2]), int(bbox[3])),
(255, 0, 0),
2,
)
cv2.putText(
frameCopy,
f"person:{'%.2f' % text['confidence']}",
(int(bbox[0]) + 5, int(bbox[1]) + 25),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(255, 0, 0),
2,
)
if len(personList) != 0:
message_bytes = json.dumps(personList).encode("utf-8")
udp_socket.sendto(message_bytes, ("127.0.0.1", udpPort))
_, img_buffer = cv2.imencode(
".jpg", frameCopy, [cv2.IMWRITE_JPEG_QUALITY, yasuo]
)
client_socket.sendall(img_buffer)
# 如果按下'q'则中断循环
if cv2.waitKey(1) & 0xFF == ord("q"):
break
else:
isconnected = False
except BrokenPipeError:
# 当客户端断开连接时,发送数据会抛出 BrokenPipeError
print("Client disconnected. Stopping data sending.")
break
except Exception as e:
print(f"An error occurred while sending data: {e}")
break
def continue_start(client_socket, video_path1, yasuo, confident, udpPort):
while True:
time.sleep(5)
global isconnected
global data_thread
global count
if isconnected == False:
print("已断开正在尝试第" + str(count) + "次连接" + " " + video_path1)
stop_thread(data_thread)
data_thread = threading.Thread(
target=send_data,
args=(client_socket, video_path1, yasuo, confident, udpPort),
daemon=True,
)
data_thread.start()
def main(video_path1, port=8965, yasuo=90, confident=0.8, udpPort=8967):
global data_thread
# host = "192.168.1.194" # 服务器 IP 地址,这里使用 localhost
host = "127.0.0.1" # 服务器 IP 地址,这里使用 localhost
# 创建 TCP 服务器套接字
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定服务器地址和端口
server_socket.bind((host, port))
# 开始监听连接,最大连接队列长度为 5
server_socket.listen(5)
print(f"Server listening on {host}:{port}.")
# 接受客户端连接请求
while True:
client_socket, client_addr = server_socket.accept()
print(f"Accepted connection from {client_addr[0]}:{client_addr[1]}.")
# 创建并启动发送数据的线程
data_thread = threading.Thread(
target=send_data,
args=(client_socket, video_path1, yasuo, confident, udpPort),
daemon=True,
)
data_thread.start()
# 在这里可以处理其他客户端逻辑,如接收客户端数据等
# ...
time.sleep(1)
data_thread2 = threading.Thread(
target=continue_start,
args=(client_socket, video_path1, yasuo, confident, udpPort),
)
data_thread2.start()
# 如果需要在某个条件满足时关闭连接,可以在此处添加相应代码
# ...
# 关闭客户端连接
# client_socket.close()
# print(f"Connection from {client_addr[0]}:{client_addr[1]} conneted.")
if __name__ == "__main__":
# rtsp流
video_path = config["rtsp1"]
# 本地启动端口
port = int(config["tcpPort1"])
# 压缩率
yasuo = int(config["yasuolv1"])
print(config["confidence1"])
confident = float(config["confidence1"])
udpPort = int(config["udpPort1"])
main(video_path, port, yasuo, confident, udpPort)