You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
207 lines
8.0 KiB
207 lines
8.0 KiB
import socket
|
|
import threading
|
|
from ultralytics import YOLO
|
|
import ffmpegcv
|
|
import cv2
|
|
import time
|
|
import ctypes
|
|
import json
|
|
|
|
# 加载YOLOv8模型
|
|
model = YOLO("yolov8n.pt") # yolov8i默认目标检测(i= n,m,l,x)尾缀有-pose,-seg
|
|
print("模型加载成功")
|
|
isconnected = False
|
|
data_thread = None
|
|
count = 1
|
|
# 1.创建一个udp套接字
|
|
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
|
|
# 假设你的文本文件每行都是由空格分隔的数据项,且你想将这些数据以某种结构处理(这里仅作演示,实际情况可能需要调整)
|
|
def read_and_process_txt(file_path):
|
|
processed_data = []
|
|
with open(file_path, "r") as file:
|
|
for line in file:
|
|
# 去除每行末尾的换行符
|
|
line = line.strip()
|
|
# 按空格分割每行
|
|
items = line.split()
|
|
# 这里简单地将分割后的数据作为一个列表加入到结果列表中
|
|
# 实际应用中,你可能需要根据具体数据结构进行更复杂的处理
|
|
processed_data.append(items)
|
|
|
|
# 如果你确实需要将这些数据转换为JSON,可能需要构造一个合适的结构
|
|
# 例如,如果你的文本文件每行代表一个对象的属性,你可能需要构建字典
|
|
# 下面是一个假设的例子,将每行转换为字典并尝试转为JSON字符串
|
|
json_data = {items[0]: items[1] for items in processed_data}
|
|
try:
|
|
return json_data
|
|
except TypeError as e:
|
|
print(f"Error converting to JSON: {e}")
|
|
|
|
|
|
config = read_and_process_txt("./config.txt")
|
|
|
|
|
|
def stop_thread(thread):
|
|
"""尝试强制停止一个线程"""
|
|
if not thread.is_alive():
|
|
return
|
|
tid = ctypes.c_long(thread.ident)
|
|
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(SystemExit))
|
|
if res == 0:
|
|
raise ValueError("线程可能已经终止")
|
|
elif res > 1:
|
|
# 如果返回值大于1,说明有多个线程ID与给定ID相同,需要清理
|
|
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
|
|
raise SystemError("未知错误")
|
|
|
|
|
|
# 定义发送数据的函数
|
|
def send_data(client_socket, video_path1, yasuo=90, confident=0.8, udpPort=8967):
|
|
global count
|
|
global udp_socket
|
|
print("第" + str(count) + "次连接" + " " + video_path1)
|
|
count += 1
|
|
global isconnected
|
|
cap = ffmpegcv.ReadLiveLast(ffmpegcv.VideoCaptureStreamRT, video_path1) # 不缓存
|
|
print("第" + str(count - 1) + "次连接" + " " + video_path1 + "成功")
|
|
isconnected = True
|
|
frame_rate_divider = 8 # 设置帧率除数
|
|
frame_count = 0 # 初始化帧计数器
|
|
while True:
|
|
try:
|
|
ret, frame = cap.read()
|
|
if ret:
|
|
frameCopy = frame.copy()
|
|
# 在该帧上运行YOLOv8推理
|
|
if frame_count % frame_rate_divider == 0:
|
|
# 获取处理结果
|
|
texts = []
|
|
results = model(frame)
|
|
data = results[0].boxes.data
|
|
# 解析检测结果
|
|
for i in range(data.shape[0]):
|
|
label_id = int(data[i, 5])
|
|
label = results[0].names[label_id]
|
|
confidence = float(data[i, 4])
|
|
bbox = data[i, :4]
|
|
texts.append(
|
|
{
|
|
"label": label,
|
|
"confidence": confidence,
|
|
"bbox": "({:.2f}, {:.2f}, {:.2f}, {:.2f})".format(
|
|
bbox[0], bbox[1], bbox[2], bbox[3]
|
|
),
|
|
}
|
|
)
|
|
frame_count += 1 # 更新帧计数器
|
|
personList = []
|
|
# 处理结果转换为图片
|
|
|
|
for text in texts:
|
|
if text["label"] == "person" and text["confidence"] > confident:
|
|
personList.append(text)
|
|
bbox = eval(text["bbox"])
|
|
cv2.rectangle(
|
|
frameCopy,
|
|
(int(bbox[0]), int(bbox[1])),
|
|
(int(bbox[2]), int(bbox[3])),
|
|
(255, 0, 0),
|
|
2,
|
|
)
|
|
cv2.putText(
|
|
frameCopy,
|
|
f"person:{'%.2f' % text['confidence']}",
|
|
(int(bbox[0]) + 5, int(bbox[1]) + 25),
|
|
cv2.FONT_HERSHEY_SIMPLEX,
|
|
1,
|
|
(255, 0, 0),
|
|
2,
|
|
)
|
|
if len(personList) != 0:
|
|
message_bytes = json.dumps(personList).encode("utf-8")
|
|
udp_socket.sendto(message_bytes, ("127.0.0.1", udpPort))
|
|
_, img_buffer = cv2.imencode(
|
|
".jpg", frameCopy, [cv2.IMWRITE_JPEG_QUALITY, yasuo]
|
|
)
|
|
client_socket.sendall(img_buffer)
|
|
# 如果按下'q'则中断循环
|
|
if cv2.waitKey(1) & 0xFF == ord("q"):
|
|
break
|
|
else:
|
|
isconnected = False
|
|
except BrokenPipeError:
|
|
# 当客户端断开连接时,发送数据会抛出 BrokenPipeError
|
|
print("Client disconnected. Stopping data sending.")
|
|
break
|
|
except Exception as e:
|
|
print(f"An error occurred while sending data: {e}")
|
|
break
|
|
|
|
|
|
def continue_start(client_socket, video_path1, yasuo, confident, udpPort):
|
|
while True:
|
|
time.sleep(5)
|
|
global isconnected
|
|
global data_thread
|
|
global count
|
|
if isconnected == False:
|
|
print("已断开正在尝试第" + str(count) + "次连接" + " " + video_path1)
|
|
stop_thread(data_thread)
|
|
data_thread = threading.Thread(
|
|
target=send_data,
|
|
args=(client_socket, video_path1, yasuo, confident, udpPort),
|
|
daemon=True,
|
|
)
|
|
data_thread.start()
|
|
|
|
|
|
def main(video_path1, port=8965, yasuo=90, confident=0.8, udpPort=8967):
|
|
global data_thread
|
|
# host = "192.168.1.194" # 服务器 IP 地址,这里使用 localhost
|
|
host = "127.0.0.1" # 服务器 IP 地址,这里使用 localhost
|
|
# 创建 TCP 服务器套接字
|
|
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
# 绑定服务器地址和端口
|
|
server_socket.bind((host, port))
|
|
# 开始监听连接,最大连接队列长度为 5
|
|
server_socket.listen(5)
|
|
print(f"Server listening on {host}:{port}.")
|
|
|
|
# 接受客户端连接请求
|
|
while True:
|
|
client_socket, client_addr = server_socket.accept()
|
|
print(f"Accepted connection from {client_addr[0]}:{client_addr[1]}.")
|
|
# 创建并启动发送数据的线程
|
|
data_thread = threading.Thread(
|
|
target=send_data,
|
|
args=(client_socket, video_path1, yasuo, confident, udpPort),
|
|
daemon=True,
|
|
)
|
|
data_thread.start()
|
|
# 在这里可以处理其他客户端逻辑,如接收客户端数据等
|
|
# ...
|
|
time.sleep(1)
|
|
data_thread2 = threading.Thread(
|
|
target=continue_start,
|
|
args=(client_socket, video_path1, yasuo, confident, udpPort),
|
|
)
|
|
data_thread2.start()
|
|
# 如果需要在某个条件满足时关闭连接,可以在此处添加相应代码
|
|
# ...
|
|
# 关闭客户端连接
|
|
# client_socket.close()
|
|
# print(f"Connection from {client_addr[0]}:{client_addr[1]} conneted.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# rtsp流
|
|
video_path = config["rtsp2"]
|
|
# 本地启动端口
|
|
port = int(config["tcpPort2"])
|
|
# 压缩率
|
|
yasuo = int(config["yasuolv2"])
|
|
confident = float(config["confidence2"])
|
|
udpPort = int(config["udpPort2"])
|
|
main(video_path, port, yasuo, confident, udpPort)
|
|
|