monitor_subtask_result.py
4.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# -*- coding: utf-8 -*-
# @Time : 2020/12/17 14:29
# @Author : Young Lee
# @Email : young_lee2017@163.com
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from jumppage import jump_page
from playcheck import play_check
import requests
import time
import re
from mythread import MyThread
def monitor_subtask_capture(driver, wait_time, subtask_flag):
""" 监测是否接收到抓拍结果 """
try:
WebDriverWait(driver, wait_time).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#pane-first .el-row"))
)
print("已检测到实时抓拍结果!")
except Exception:
print(f"未监测到任务'{subtask_flag}'的实时抓拍数据!")
return False
return True
def monitor_subtask_stream(plat_ip, ffprobe_path, subtask_id):
""" 监测是否接收到分析流 """
req_url = f"http://{plat_ip}:20080/api/v1/devconf_fx/analyse/live/play_url?subtask_id={subtask_id}&_t={int(time.time())}"
try:
address = requests.get(req_url, timeout=10).json()['rtsp_url']
if not address:
print("分析流播放地址为空!")
return False
except Exception:
print("获取分析流播放地址出错!")
return False
return play_check(ffprobe_path, address)
def monitor_subtask_result(driver, wait_time, ffprobe_path, task_name, subtask_name=None, subtask_id=None):
"""监测是否有抓拍数据
:param wait_time: 等候抓拍结果的时间
:param ffprobe_path: ffprobe的路径
:param task_name: 大任务名称
:param subtask_name: 子任务名称
:param subtask_id: 子任务ID, 优先级大于subtask_name,subtask_id不为None时使用subtask_id进行定位
:return: {'capture_result': capture_result, 'stream_result': stream_result} 分别为抓拍结果和流播放结果
"""
common_xpath = f"//span[@class='task-title'][text()='{task_name}']"
subtask_flag = subtask_name if subtask_name else subtask_id
subtask_li = f"{common_xpath}/../following-sibling::ol[1]/li[@data-subtask='{subtask_id}']" if subtask_id \
else f"{common_xpath}/../following-sibling::ol[1]/li[@title='{subtask_name}']"
jump_page(driver, "任务管理", "场景设置")
# 展开大任务
try:
driver.find_element_by_xpath(common_xpath).click()
except Exception:
print(f"未找到大任务'{task_name}'!")
return
# 点击子任务
try:
driver.find_element_by_xpath(subtask_li).click()
except Exception:
print(f"未找到子任务'{subtask_flag}'!")
return
# 将分析流监测放入线程
if not subtask_id:
subtask_id = driver.find_element_by_xpath(subtask_li).get_attribute("data-subtask")
plat_ip = re.search("\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}", driver.current_url).group()
# stream_result = monitor_subtask_stream(plat_ip, ffprobe_path, subtask_id)
stream_result_thread = MyThread(monitor_subtask_stream, plat_ip, ffprobe_path, subtask_id)
stream_result_thread.start()
# 抓拍结果监测
capture_result = monitor_subtask_capture(driver, wait_time, subtask_flag)
stream_result_thread.join()
stream_result = stream_result_thread.get_result()
return {"capture_result": capture_result, "stream_result": stream_result}
if __name__ == "__main__":
from unitinit import *
from login import login_fx
login_fx(driver, login_url, user, passwd)
# monitor_subtask_capture(driver, "非机动车逆行", 30, subtask_name="非机动车逆行", subtask_id="f6343c3ce4a34a0d888ac75164ef59c5")
# driver.quit()
from cfg.fxinfo import *
import datetime
# address = "rtsp://192.168.9.233:8555/rtsp://192.168.8.108:10087/ffae4bfb48d94a3688a5b503bc30282e"
# play_check(address)
# for i in range(100):
# stream = play_check(ffprobe_path, address)
# if not stream:
# print("*********************************************************")
# print(f"{datetime.datetime.now().strftime('%H:%M:%S')} : {stream}")
# print("again")
result = monitor_subtask_result(driver, 10, ffprobe_path, "流量", subtask_name="流量")
print(result)
driver.quit()