playcheck.py
2.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# -*- coding: utf-8 -*-
# @Time : 2020/12/20 18:24
# @Author : Young Lee
# @Email : young_lee2017@163.com
import subprocess
import json
def stream_check(stream_info):
""" 判断流信息 """
width = stream_info['streams'][0]['width']
height = stream_info['streams'][0]['height']
if width > 0 and height > 0:
print("已监测到分析流!")
return True
else:
return False
def play_check(ffprobe_path, address):
"""判断视频流是否可播放
:param ffprobe_path: ffprobe程序的路径
:param address: 视频流地址
:return True or False
"""
command = f"{ffprobe_path} -print_format json -show_streams -stimeout 2000000 -i {address}"
try:
subp = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
stdout, _ = subp.communicate(timeout=10)
stream_info = json.loads(stdout)
return stream_check(stream_info)
except Exception as e:
print("[play_check]错误信息: %s" % e)
return False
# def play_check(address):
#
# cap = cv2.VideoCapture()
# cap.open(address)
# sampling = []
# play_value = False
# if cap.isOpened():
# print("开始分析...")
# for i in range(30):
# sampling.append(cap.read()[0])
# cv2.waitKey(30)
# if sampling and sampling.count(True) > sampling.count(False):
# print("点播成功!")
# play_value = True
# else:
# print("点播失败!")
# cap.release()
# return play_value
if __name__ == "__main__":
from cfg.fxinfo import *
# address = "rtsp://192.168.9.233:8555/rtsp://192.168.8.108:10087/ffae4bfb48d94a3688a5b503bc30282e"
# print(play_check(ffprobe_path, address))
# with open('264.list', 'r') as f:
# rtsp_str = f.read()
#
# rtsp_list = [stream for stream in rtsp_str.split('\n') if stream != '']
# for stream in rtsp_list:
# play_result = play_check(ffprobe_path, stream)
# if play_result == False:
# play_result = play_check(ffprobe_path, stream)
# print(f"{stream}: {play_result}")
# i = 0
# while True:
# i += 1
# play_result = play_check(ffprobe_path, "rtsp://admin:vion1234@192.168.9.93:554/h264/ch1/main/av_stream")
# print(f"{i}: {play_result}")
#
# from multiprocessing import Pool
#
# p = Pool(2)
# for i in range(100):
# p.apply_async(play_check, args=[ffprobe_path, "rtsp://admin:vion1234@192.168.9.93:554/h264/ch1/main/av_stream",])
# p.close()
# p.join()