playcheck.py 2.57 KB
# -*- coding: utf-8 -*-
# @Time    :  2020/12/20 18:24
# @Author  :  Young Lee
# @Email   :  young_lee2017@163.com

import subprocess
import json


def stream_check(stream_info):
    """ 判断流信息 """
    width = stream_info['streams'][0]['width']
    height = stream_info['streams'][0]['height']
    if width > 0 and height > 0:
        print("已监测到分析流!")
        return True
    else:
        return False

def play_check(ffprobe_path, address):
    """判断视频流是否可播放

    :param ffprobe_path: ffprobe程序的路径
    :param address: 视频流地址
    :return True or False
    """
    command = f"{ffprobe_path} -print_format json -show_streams -stimeout 2000000 -i {address}"
    try:
        subp = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
        stdout, _ = subp.communicate(timeout=10)
        stream_info = json.loads(stdout)
        return stream_check(stream_info)
    except Exception as e:
        print("[play_check]错误信息: %s" % e)
        return False

# def play_check(address):
#
#     cap = cv2.VideoCapture()
#     cap.open(address)
#     sampling = []
#     play_value = False
#     if cap.isOpened():
#         print("开始分析...")
#         for i in range(30):
#             sampling.append(cap.read()[0])
#             cv2.waitKey(30)
#     if sampling and sampling.count(True) > sampling.count(False):
#         print("点播成功!")
#         play_value = True
#     else:
#         print("点播失败!")
#     cap.release()
#     return play_value


if __name__ == "__main__":
    from cfg.fxinfo import *
    # address = "rtsp://192.168.9.233:8555/rtsp://192.168.8.108:10087/ffae4bfb48d94a3688a5b503bc30282e"
    # print(play_check(ffprobe_path, address))
    # with open('264.list', 'r') as f:
    #     rtsp_str = f.read()
    #
    # rtsp_list = [stream for stream in rtsp_str.split('\n') if stream != '']
    # for stream in rtsp_list:
    #     play_result = play_check(ffprobe_path, stream)
    #     if play_result == False:
    #         play_result = play_check(ffprobe_path, stream)
    #     print(f"{stream}: {play_result}")

    # i = 0
    # while True:
    #     i += 1
    #     play_result = play_check(ffprobe_path, "rtsp://admin:vion1234@192.168.9.93:554/h264/ch1/main/av_stream")
    #     print(f"{i}: {play_result}")
    #
    # from multiprocessing import Pool
    #
    # p = Pool(2)
    # for i in range(100):
    #     p.apply_async(play_check, args=[ffprobe_path, "rtsp://admin:vion1234@192.168.9.93:554/h264/ch1/main/av_stream",])
    # p.close()
    # p.join()