Commit 8112ccf4 by 李杨杨

创建项目

1 parent 65c3da5e
# -*- coding: utf-8 -*-
# @Time : 2021/1/22 16:11
# @Author : Young Lee
# @Email : young_lee2017@163.com
import pytest
import configparser
import os
def pytest_addoption(parser):
parser.addini('overall_skips', type='linelist', default=[], help='skip all markers if the specified one failed')
parser.addini('patriarch_skips', type='linelist', default=[], help='skip some markers if first one failed')
parser.addini('group_skips', type='linelist', default=[], help='skip some markers of group if one failed')
def gain_skips(cfg, item='pytest'):
raw = configfile.get(item, cfg).split('\n')
return [x for x in raw if x]
current_dir = os.path.dirname(os.path.abspath(__file__))
ini_path = os.path.join(current_dir, 'pytest.ini')
configfile = configparser.ConfigParser()
configfile.read(ini_path)
overall_skips = gain_skips('overall_skips')
patriarch_skips = gain_skips('patriarch_skips')
group_skips = gain_skips('group_skips')
def add_failedmarkers(item, markers):
if markers:
item.session.failedmarkers.extend(markers)
item.session.failedmarkers = list(set(item.session.failedmarkers))
def pytest_sessionstart(session):
session.failedmarkers = []
session.skip_all_flag = False
session.patriarch_skip_flag = {skip: 1 for skip in patriarch_skips}
def pytest_runtest_makereport(item, call):
if not item.session.skip_all_flag:
patriarch_markers = [marker.name for marker in item.iter_markers() if marker.name in patriarch_skips]
for marker in patriarch_markers:
number = item.session.patriarch_skip_flag[marker]
if number < 2:
number += 1
item.session.patriarch_skip_flag[marker] = number
elif number == 2:
if call.excinfo:
add_failedmarkers(item, patriarch_markers)
number += 1
item.session.patriarch_skip_flag[marker] = number
if call.excinfo is not None:
overall_markers = [marker.name for marker in item.iter_markers() if marker.name in overall_skips]
if overall_markers:
item.session.skip_all_flag = True
return
group_markers = [marker.name for marker in item.iter_markers() if marker.name in group_skips]
add_failedmarkers(item, group_markers)
def pytest_runtest_setup(item):
if item.session.skip_all_flag:
pytest.skip(f"Dependency Skip")
markers = [marker.name for marker in item.iter_markers()]
for marker in markers:
if marker in item.session.failedmarkers:
pytest.skip(f"Dependency Skip")
\ No newline at end of file
[pytest]
markers = global_gateway
task_precondition
overall_skips =
patriarch_skips =
group_skips = task_precondition
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2021/1/14 17:36
# @Author : Young Lee
# @Email : young_lee2017@163.com
import sys
sys.path.append('..')
import pytest
import random
import string
from selenium import webdriver
from cfg.fxinfo import *
from log.generatelog import generate_log
import allure
from lib import *
logger = generate_log('testing.log')
# 测试中需要使用的变量
add_result = {}
import_result = {}
@allure.epic('繁星平台基本操作流程测试')
@allure.feature('测试流程')
class TestBaseProcess:
""" 测试繁星基本功能的类 """
def setup_class(self):
""" 执行用例前准备工作 """
self.driver = webdriver.Chrome(driver_path)
self.driver.implicitly_wait(5)
self.driver.maximize_window()
self.stor_name = '测试' + ''.join(random.sample(string.ascii_letters + string.digits, 5))
self.task_name = '测试任务' + ''.join(random.sample(string.ascii_letters + string.digits, 5))
# self.stor_name = '测试MIwue'
# self.task_name = '测试任务njF9Y'
def teardown_class(self):
""" 测试用例执行完成之后收尾工作 """
self.driver.quit()
""" 以下为测试用例 """
@allure.story('step1: 登录')
@allure.title(f'登录平台 {plat_ip}')
@pytest.mark.global_gateway
def test_login(self):
""" 测试登录 """
page_url = login_fx(self.driver, login_url, user, passwd)
logger.info(page_url)
assert page_url == f"http://{plat_ip}:20080/#/video_task/config"
@allure.story('step2: 准备操作')
@allure.title('上传存储配置')
def test_upload_storageconfig(self):
""" 测试上传存储配置 """
upload_msg = upload_storage_config(self.driver, self.stor_name, storage_config_path)
logger.info(upload_msg)
assert upload_msg == self.stor_name or upload_msg == '存储配置名称不能重复'
@allure.story('step2: 准备操作')
@allure.title('上传视频文件')
@pytest.mark.task_precondition
def test_upload_videos(self):
""" 测试视频上传 """
uploaded_videos = upload_videos(self.driver, videos_info)
logger.info(uploaded_videos)
assert len(videos_name) == len(uploaded_videos)
@allure.story('step2: 准备操作')
@allure.title('检查分析资源')
@pytest.mark.task_precondition
def test_device_resource(self):
""" 检查分析资源 """
resource = device_resource(self.driver)
assert resource['free_resource'] > 0 and resource['total_resource'] >= resource['free_resource']
@allure.story('step3: 创建任务')
@allure.title('创建大任务')
@pytest.mark.task_precondition
@pytest.mark.global_gateway
def test_create_task(self):
""" 测试大任务创建 """
create_result = create_task(tasks_url, self.driver, self.task_name, store_conf=self.stor_name, device_type='9.245_gpu0')
logger.info(create_result)
assert create_result == {self.task_name: '未部署'} or create_result == '已有该任务'
@allure.story('step3: 创建任务')
@allure.title('添加子任务')
@pytest.mark.master_slave_subtask
def test_create_subtask(self):
""" 测试添加子任务 """
videos_batch_info = {'短视频': videos_name}
add_result_realtime = create_subtask_batch(self.driver, self.task_name, videos_batch_info, monitor_status=True, monitor_times=30)
add_result.update({result[0]:result[1] for result in add_result_realtime})
logger.info(add_result)
assert videos_name.sort() == [item[0] for item in add_result_realtime].sort()
@allure.story('step4: 任务下发状态检查')
@allure.title('子任务下发状态检查: {name}')
@pytest.mark.parametrize('name', videos_name)
def test_create_subtask_status(self, name):
""" 检查子任务下发状态 """
print(f'校验子任务下发状态: {name}')
if name in add_result.keys():
assert add_result[name] == '工作中'
else:
assert f'{name}不存在' == '工作中'
@allure.story('step5: 导入任务配置')
@allure.title('导入任务配置')
def test_import_config(self):
""" 测试任务配置导入 """
import_result_realtime = import_config(self.driver, self.task_name, task_cfg_info, retry_check_times=6)
import_result.update(import_result_realtime)
logger.info(import_result)
assert videos_name.sort() == list(import_result.keys()).sort()
@allure.story('step6: 配置下发状态检查')
@allure.title('配置下发状态检查: {name}')
@pytest.mark.parametrize('name', videos_name)
def test_import_config_status(self, name):
""" 检查配置导入状态 """
assert import_result[name] == {'参数设置': '运行中', '区域设置': '运行中', '标定设置': '运行中'}
@allure.story('step7: 任务分析情况检查')
@allure.title('视频点播和抓拍结果检查: {video_name}')
@pytest.mark.parametrize('video_name', videos_name)
def test_monitor_subtask_result(self, video_name):
""" 检查视频点播和抓拍结果 """
index_value = videos_name.index(video_name)
wait_time = 20 if index_value == 0 else 8
subtask_result = monitor_subtask_result(self.driver, wait_time, ffprobe_path, self.task_name, subtask_name=video_name)
logger.info(f"{video_name}: {subtask_result}")
assert subtask_result == {"capture_result": True, "stream_result": True}
@allure.story('step8: 任务分析情况检查')
@allure.title('过车数据检索: {name}')
@pytest.mark.parametrize('name', videos_name)
def test_result_search(self, name):
""" 测试过车记录检索 """
search_result = vehicle_search(self.driver, self.task_name, name)
logger.info(search_result)
assert isinstance(search_result, dict)
assert search_result['first_page'] > 0 and search_result['total'] > search_result['first_page']
@allure.story('step9: 删除任务')
@allure.title('删除任务')
def test_delete_task(self):
""" 测试任务删除 """
delete_result = delete_task(self.driver, self.task_name)
logger.info(delete_result)
assert delete_result == None
if __name__ == '__main__':
import shutil
import os
# pytest.main(['-s', '-v', 'test_base_process.py'])
pytest.main(['-s', '-v', '--alluredir=report/testing_report', '--clean-alluredir'])
shutil.copy('../report/environment.properties', '../report/testing_report/')
os.system('allure serve report/testing_report')
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/10 23:00
# @Author : Young Lee
# @Email : young_lee2017@163.com
import os
import time
import re
# 测试平台信息
driver_path = r"D:\chromedriver.exe"
plat_ip = "192.168.9.245"
# chrome_path = r"D:\Program Files\Google\Chrome\Application\chrome.exe"
user = "admin"
passwd = "admin"
login_url = f"http://{plat_ip}:20080"
home_url = f"http://{plat_ip}:20080/#/video_task/config"
# 本地短时录像信息和任务配置信息
videos_dir = r"\\192.168.9.1\t测试\任务信息库\video"
task_cfg_dir = r"\\192.168.9.1\t测试\任务信息库\config"
videos_fullname = [file for file in os.listdir(videos_dir) if file.endswith(".avi") or
file.endswith(".mp4") or file.endswith(".h264")]
videos_name = [os.path.splitext(file)[0] for file in videos_fullname]
videos_path = [os.path.join(videos_dir, file) for file in videos_fullname]
videos_info = {videos_name[i]: videos_path[i] for i in range(len(videos_name))}
task_cfg_fullname = [file for file in os.listdir(task_cfg_dir) if file.endswith(".tar.gz")]
task_cfg_name = [re.search("(.*)_\w+.tar.gz", file).group(1) for file in task_cfg_fullname]
task_cfg_path = [os.path.join(task_cfg_dir, file) for file in task_cfg_fullname]
task_cfg_info = {task_cfg_name[i]: task_cfg_path[i] for i in range(len(task_cfg_name))}
# 获取所有任务信息的接口
tasks_url = f"http://{plat_ip}:20080/api/v1/devconf_fx/tasks?_t={int(time.time())}&offset=0&source_type=pull_video_stream&task_type=normal"
# ffprobe工具路径
ffprobe_path = r"D:\ffmpeg\bin\ffprobe.exe"
# 存储配置路径
storage_config_path = r"D:\StorageConfigTool_r4348\StorageConfig.xml"
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2021/1/28 23:43
# @Author : Young Lee
# @Email : young_lee2017@163.com
import sys
sys.path.append(__path__[0])
from .login import login_fx
from .uploadvideos import upload_videos
from .uploadstorageconfig import upload_storage_config
from .deviceresource import device_resource
from .createtask import create_task
from .createsubtask import create_subtask_batch
from .importconfig import import_config
from .monitor_subtask_result import monitor_subtask_result
from .dataretrieval import vehicle_search
from .deletetask import delete_task
__all__ = ['login_fx', 'upload_videos', 'upload_storage_config', 'device_resource', 'create_task', 'create_subtask_batch', 'import_config',
'monitor_subtask_result', 'vehicle_search', 'delete_task']
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/23 9:09
# @Author : Young Lee
# @Email : young_lee2017@163.com
from selenium.webdriver.common.keys import Keys
from jumppage import jump_page
import time
import math
expanded_bar = "./div[@class='el-tree-node__content']/span[contains(@class, 'el-tree-node__expand-icon')]"
add_bar_xpath = "./div[@class='el-tree-node__content']/span[@class='custom-tree-node']/span[@class='tree-btn']" \
"/i[@class='el-icon-plus']"
def ensure_all_expanded(expanded_all):
""" 确保所有的组织/地点处于展开状态 """
for list_ele in expanded_all:
whether_expanded = list_ele.find_element_by_xpath(expanded_bar).get_attribute("class")
if "is-leaf" not in whether_expanded and "expanded" not in whether_expanded:
list_ele.find_element_by_xpath(expanded_bar).click()
time.sleep(0.3)
def build_structure(driver, expanded_all, structure):
""" 创建组织/地点 """
videos_list = expanded_all[-1]
for i in range(len(structure)):
number = int(time.time() * math.pow(10, 6))
videos_list.find_element_by_xpath(add_bar_xpath).click()
if i == len(structure)-1:
print(f"开始创建地点 '{structure[i]}'")
# 选择地点
driver.find_element_by_css_selector("input[type='text'][placeholder='请选择']").click()
driver.find_element_by_css_selector(".el-scrollbar__view>li:nth-last-child(1)").click()
else:
print(f"开始创建组织 '{structure[i]}'")
# 填入编号
driver.find_element_by_xpath(
"//label[contains(text(), '编号')]/following-sibling::div[1]//input"
).send_keys(number)
# 填入名称
driver.find_element_by_xpath(
"//label[contains(text(), '名称')]/following-sibling::div[1]//input"
).send_keys(structure[i])
# 点击确定
driver.find_element_by_xpath(
"//div[@aria-label='添加']/div[@class='el-dialog__footer']//button//span[text()='确 定']"
).click()
# videos_list.find_element_by_xpath(expanded_bar).click()
time.sleep(0.5)
# 全部展开
ensure_all_expanded(expanded_all)
videos_list = videos_list.find_element_by_xpath(
f"./div[@role='group']/div[@role='treeitem']/div[@class='el-tree-node__content']/span[@class='custom-"
f"tree-node']/span/span[text()='{structure[i]}']/../../../..")
expanded_all.append(videos_list)
def add_various_info(driver, expanded_all, cameras_info):
""" 添加相机信息 """
camera_xpath = "./div[@role='group']/div[@role='treeitem']/div[@class='el-tree-node__content']/span[@class='custom-tree-node']/span/span"
camera_names = []
videos_list = expanded_all[-1]
camera_port = 554 # 端口固定为554
ensure_all_expanded(expanded_all)
time.sleep(0.5)
last_child_class = videos_list.find_element_by_xpath("./div[last()]").get_attribute("class")
if last_child_class == "el-tree-node__children":
camera_names = [ele.get_attribute("textContent") for ele in videos_list.find_elements_by_xpath(camera_xpath)]
for info in cameras_info:
camera_name = info["camera_name"]
camera_address = info["camera_address"]
if not camera_name:
print("未填写相机名称")
continue
if not camera_address:
print("未填写视频串")
continue
if camera_names and camera_name in camera_names:
print(f"相机 '{camera_name}' 已存在!")
continue
camera_number = info["camera_number"] if "camera_number" in info.keys() else int(time.time() * math.pow(10, 6))
print(f"开始添加相机 '{camera_name}'")
# 点开添加界面
videos_list.find_element_by_xpath(add_bar_xpath).click()
# 填入设备编号
number_ele = driver.find_element_by_css_selector("label[for='vchan_refid']+.el-form-item__content>div>input")
number_ele.send_keys(Keys.CONTROL, 'a')
number_ele.send_keys(camera_number)
# 填入设备名称
name_ele = driver.find_element_by_css_selector("div[aria-label='添加相机'] label[for='name']+.el-form-item__content>div>input")
name_ele.send_keys(Keys.CONTROL, 'a')
name_ele.send_keys(camera_name)
# 填入端口
port_ele = driver.find_element_by_css_selector("label[for='port']+.el-form-item__content>div>input")
port_ele.send_keys(Keys.CONTROL, 'a')
port_ele.send_keys(camera_port)
# 选择协议‘rtsp’
protocol_ele = driver.find_element_by_xpath("//label[@class='el-form-item__label'][text()='协议']/following-sib"
"ling::div[@class='el-form-item__content']/div")
protocol_ele.click()
protocol_ele.find_element_by_xpath("//span[text()='rtsp']").click()
# 填入视频串
address_ele = driver.find_element_by_css_selector("label[for='video_source_url']+.el-form-item__content>div>input")
address_ele.send_keys(Keys.CONTROL, 'a')
address_ele.send_keys(camera_address)
# 点击确定
driver.find_element_by_xpath(
"//div[contains(@class, 'treeBox')]//div[@aria-label='添加相机']/div[@class='el-dialog__footer']/span/"
"button/span[text()='确 定']").click()
# 捕捉添加返回消息
message = driver.find_element_by_css_selector("p[class='el-message__content']").get_attribute("textContent")
if message != "添加成功":
print(f"相机 '{camera_name}' 添加失败, 提示信息: {message}")
# 点击取消
driver.find_element_by_xpath(
"//div[contains(@class, 'treeBox')]//div[@aria-label='添加相机']/div[@class='el-dialog__footer']/span/"
"button/span[text()='取 消']").click()
# 展开
time.sleep(0.5)
ensure_all_expanded(expanded_all)
# 已添加相机校验
time.sleep(0.5)
last_child_class = videos_list.find_element_by_xpath("./div[last()]").get_attribute("class")
if last_child_class == "el-tree-node__children":
camera_names = [ele.get_attribute("textContent") for ele in videos_list.find_elements_by_xpath(camera_xpath)]
return [check_info['camera_name'] for check_info in cameras_info if check_info['camera_name'] in camera_names]
def add_camera(driver, group_path, cameras_info):
"""
添加相机资源
group_path: 组织结构,如'cs1/cs2'(组织:cs1, 地点:cs2)
camera_info: 相机信息,
[{'camera_name': <camera name>, 'camera_number': <camera_number>, 'camera_address': <camera_address>}, {...}]
camera_name: 相机名称, camera_number(可选): 设备编号, camera_address: 视频串
return: 返回已成功添加的相机名称
"""
common_xpath = "//div[contains(@class, 'treeBox')]//span[text()='手动添加相机资源']/../../../.."
jump_page(driver, "资源管理", "视频设备")
expanded_all = []
group_structure = group_path.split("/")
videos_list = driver.find_element_by_xpath(common_xpath)
expanded_all.append(videos_list)
# 组织/地点若存在则展开,不存在则添加
for i in range(len(group_structure)):
whether_expanded = videos_list.find_element_by_xpath(expanded_bar).get_attribute("class")
if "is-leaf" not in whether_expanded and "expanded" not in whether_expanded:
videos_list.find_element_by_xpath(expanded_bar).click()
time.sleep(0.5)
try:
find_xpath = f"./div[@role='group']/div[@role='treeitem']/div[@class='el-tree-node__content']/span[@class" \
f"='custom-tree-node']/span/span[text()='{group_structure[i]}']/../../../.."
videos_list = videos_list.find_element_by_xpath(find_xpath)
expanded_all.append(videos_list)
if i < len(group_structure) - 1:
print(f"组织 '{group_structure[i]}' 已存在!")
else:
print(f"地点 '{group_structure[i]}' 已存在!")
except Exception:
build_structure(driver, expanded_all, group_structure[i:])
break
# 添加相机
return add_various_info(driver, expanded_all, cameras_info)
if __name__ == "__main__":
from unitinit import *
login_fx(driver, login_url, user, passwd)
group_apth = "cs1/cs2"
group_apth1 = "危险品/ces111"
info1 = {"camera_name": "test1", "camera_number": "cs2", "camera_address": "rtsp://test1.264"}
info2 = {"camera_name": "test2", "camera_number": "cs5", "camera_address": "rtsp://test2.264"}
info3 = {"camera_name": "121", "camera_address": "rtsp://test2.264"}
cameras_info = [info1, info2]
print(add_camera(driver, group_apth1, [info3]))
time.sleep(5)
driver.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/14 9:05
# @Author : Young Lee
# @Email : young_lee2017@163.com
import time
from findtask import find_task
from monitor_subtask_status import monitor_subtask_status
def select_videos(videos_list, add_videos):
""" 勾选视频 """
for add_video in add_videos:
try:
check_ele = videos_list.find_element_by_xpath(f".//span[text()='{add_video}']/../preceding-sibling::label")
if "is-checked" not in check_ele.get_attribute("class"):
check_ele.click()
except Exception:
print(f"未找到视频: {add_video}")
def create_subtask(driver, group_path, add_videos):
"""创建子任务
:param group_path: 找寻视频的路径
:param add_videos: list, 要添加的视频列表
"""
print(f"开始勾选>> '{group_path}'下的视频列表: {add_videos}")
group_structure = group_path.split("/")
if group_structure[0] == "外部设备":
group_structure[0] = 1
elif group_structure[0] == "内部设备":
group_structure[0] = 2
if len(group_structure) < 2:
print("填写的视频寻找路径不全[内部设备]!")
return
elif group_structure[0] == "短视频":
group_structure[0] = 3
else:
print("所给video_path错误!")
return
videos_list = driver.find_element_by_css_selector(f"div[role='tree']>div:nth-child({group_structure[0]})")
whether_expanded = videos_list.find_element_by_css_selector(
"div[class='el-tree-node__content']>span:nth-child(1)"
).get_attribute("class")
if "expanded" not in whether_expanded:
videos_list.find_element_by_css_selector(
"div[class='el-tree-node__content']>span[class='custom-tree-node']"
).click()
time.sleep(1)
ele = "."
detail_ele = ele
if group_structure[0] == 1 or group_structure[0] == 3:
spec_videos_list = videos_list.find_element_by_xpath("./div[@role='group']")
select_videos(spec_videos_list, add_videos)
else:
for i in range(1, len(group_structure)):
ele = ele + "/div[@role='group']/div[@role='treeitem']"
detail_ele = f"{ele}/div[@class='el-tree-node__content']/span[@class='custom-tree-node']/span[text()='{group_structure[i]}']"
try:
whether_expanded = videos_list.find_element_by_xpath(
f"{detail_ele}/../preceding-sibling::span"
).get_attribute("class")
except Exception as e:
print(f"未找到{group_structure[i]}", e)
return
if "expanded" not in whether_expanded:
videos_list.find_element_by_xpath(detail_ele).click()
spec_videos_list = videos_list.find_element_by_xpath(
f"{detail_ele}/../../following-sibling::div[@role='group']")
select_videos(spec_videos_list, add_videos)
# videos_list.find_element_by_xpath(f"{ele}//span[text()='{group_structure[-1]}']").click()
def create_subtask_batch(driver, task_name, videos_batch_info, monitor_status=False, monitor_times=30):
"""批量添加子任务
:param task_name: 要添加子任务的大任务名称
:param videos_batch_info: {group_path(视频寻找路径,/分割): add_videos(视频名称,list)}
:param monitor_status: 是否开启子任务下发状态监测,默认False
:param monitor_times: 重复监测次数(下发状态为"工作中"时停止),默认为30次,两次监测间隔2S
:return: monitor_status为True时返回subtasks_status: [subtask_name, subtask_status, subtask_id]
monitor_status为False时返回已下发任务列表
"""
subtasks_status = []
# 寻找任务
subtasks_id_before_create = find_task(driver, task_name)
if not subtasks_id_before_create:
return
# 打开‘修改视频源配置’
driver.find_element_by_xpath(f"//div[text()='{task_name}']/../following-sibling::td[last()]//span[contains(@class,"
f"'el-icon-video-camera-solid')]").click()
# 批量勾选视频
for group_path, add_videos in videos_batch_info.items():
create_subtask(driver, group_path, add_videos)
# 点击确定下发任务
driver.find_element_by_xpath(
"//div[@aria-label='视频源配置修改']/div[@class='el-dialog__footer']//span[text()='确 定']"
).click()
time.sleep(1)
if monitor_status:
for i in range(monitor_times):
print(f"第{i+1}次:")
# subtasks_status = monitor_subtask_status(driver, task_name, subtasks_id_before_create)
subtasks_status = monitor_subtask_status(driver, task_name)
if "部署中" not in [value[1] for value in subtasks_status]:
break
time.sleep(2)
return subtasks_status
else:
subtasks_status = monitor_subtask_status(driver, task_name, subtasks_id_before_create)
return [value[0] for value in subtasks_status]
if __name__ == "__main__":
from unitinit import *
from login import login_fx
task_name = "顺德逆行"
group_path1 = "内部设备/123123/9.179"
add_videos1 = ["泥头车", "运输车"]
group_path2 = "短视频"
add_videos2 = ["大货车合并文件"]
group_path3 = "外部设备"
add_videos3 = ["34020000001339990002", "34020000001339990003"]
# batch_info = {group_path1: add_videos1, group_path2: add_videos2, group_path3: add_videos3}
batch_info = {group_path1: add_videos1}
login_fx(driver, login_url, user, passwd)
# print(find_task(driver, task_name))
# print(monitor_subtask_status(driver, task_name, []))
retrun_value = create_subtask_batch(driver, task_name, batch_info)
print(retrun_value)
# create_subtask(driver, task_name, {group_path: add_videos})
# start = time.time()
# find_task(driver, task_name)
# end = time.time()
# print(end-start)
# time.sleep(10)
driver.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/13 14:47
# @Author : Young Lee
# @Email : young_lee2017@163.com
from selenium.webdriver.common.keys import Keys
from jumppage import jump_page
import time
import requests
def check_exist(tasks_url, task_name):
""" 调用任务api查询是否有同名任务 """
try:
list_data = requests.get(tasks_url, timeout=7).json()["list_data"]
if list_data:
exist_tasks = [task['task_name'] for task in list_data]
return True if task_name in exist_tasks else False
else:
return False
except Exception:
return False
def create_task(tasks_url, driver, task_name, task_algo_type="交通", resource_use=1, running_time=None, store_conf=None, device_type=None):
""""创建任务
:param task_name: 大任务名称
:param task_algo_type: 算法类型,默认"交通"
:param resource_use: 场景占用,默认1
:param running_time: 默认None(全天), 需要指定时间段时需要传入列表[开始时间, 结束时间]
:param device_type: 默认None(随机下发), 需要指定下发到某设备需要传入设备类型
:return: {first_task_name: first_task_status}
"""
whether_exising = check_exist(tasks_url, task_name)
jump_page(driver, "任务管理", "任务设置")
driver.find_element_by_xpath("//div[@class='content']/div[1]//span[text()='新建任务']").click()
# 填入任务名称
driver.find_element_by_css_selector("label[for='task_name'] + div input").send_keys(task_name)
# 选择算法类型
driver.find_element_by_css_selector("label[for='task_algo_type'] + div input").click()
time.sleep(0.5)
driver.find_element_by_xpath(
f"//label[@for='task_algo_type']/following-sibling::div//li/span[text()='{task_algo_type}']"
).click()
# 选择场景占用
driver.find_element_by_css_selector("label[for='resource_use'] + div input").click()
time.sleep(0.5)
driver.find_element_by_xpath(f"//label[@for='resource_use']/following-sibling::div//li[{resource_use}]").click()
# 选择任务运行时间段
if running_time:
start_time = running_time[0]
end_time = running_time[1]
driver.find_element_by_xpath("//label[text()='时间计划']/following-sibling::*//input").click() # 点开时间计划进行选择
time.sleep(0.5)
driver.find_element_by_xpath("//span[text()='时间']").click()
start_time_ele = driver.find_element_by_css_selector("input[placeholder='开始时间']")
start_time_ele.send_keys(Keys.CONTROL,'a') # 全选开始时间文本
start_time_ele.send_keys(start_time) # 输入开始时间
end_time_ele = driver.find_element_by_css_selector("input[placeholder='结束时间']")
end_time_ele.send_keys(Keys.CONTROL,'a') # 全选结束时间文本
end_time_ele.send_keys(end_time) # 输入结束时间
driver.find_element_by_css_selector("button[class='el-time-panel__btn confirm']").click() # 点击确定
# 选择存储配置
driver.find_element_by_css_selector("label[for='store_conf_unid'] + div input").click() # 点开存储配置列表
time.sleep(0.5)
if store_conf:
stor_num = driver.execute_script("return document.evaluate(\"//label[@for='store_conf_unid']/following-sibling"
"::div//ul\", document).iterateNext().childNodes.length")
if stor_num == 0:
return "未发现存储配置"
ul_xpath = "//label[@for='store_conf_unid']/following-sibling::div//ul/li"
li_eles = driver.find_elements_by_xpath(ul_xpath)
stor_names = [ele.get_attribute('textContent') for ele in li_eles]
if store_conf in stor_names:
driver.find_element_by_xpath(f"{ul_xpath}/span[text()='{store_conf}']").click() # 选择指定存储配置
else:
print(f"未找到存储配置{store_conf}, 选择列表内第一个替代")
driver.find_element_by_css_selector("label[for='store_conf_unid'] + div li:nth-child(1)").click() # 选择第一个存储配置
else:
print("未指定存储配置,自动选择列表内第一个")
driver.find_element_by_css_selector("label[for='store_conf_unid'] + div li:nth-child(1)").click() # 选择第一个存储配置
# 选择设备类型
if device_type:
driver.find_element_by_xpath("//label[text()='设备类型']/following-sibling::div").click() # 展开设备类型列表
time.sleep(0.5)
driver.find_element_by_xpath(f"//li[text()=' {device_type} ']").click() # 选择指定设备类型
# 单击确定进行新建
driver.find_element_by_css_selector(
"div[aria-label='新建任务']>div[class='el-dialog__footer'] button:nth-child(2)"
).click()
if whether_exising:
print("存在同名任务")
try:
click_result = driver.find_element_by_css_selector("div[role='alert']>p").get_attribute('textContent')
driver.find_element_by_xpath("//div[@aria-label='新建任务']/div[@class='el-dialog__footer']/span/button/"
"span[text()='取 消']").click()
return click_result
except Exception:
return
time.sleep(2)
# 获得当前第一个任务的名称和部署状态
first_task_name = driver.find_element_by_css_selector(
"table[class='el-table__body'] tr:nth-child(1) td:nth-child(3)>div"
).text
first_task_status = driver.find_element_by_css_selector(
"table[class='el-table__body'] tr:nth-child(1) td:nth-child(4)>div"
).text
return {first_task_name: first_task_status}
if __name__ == "__main__":
from unitinit import *
from login import login_fx
login_fx(driver, login_url, user, passwd)
task_info = create_task(
tasks_url, driver, "测试任务njF9Y",
task_algo_type="交通",
# resource_use=1,
# running_time=["00:00:00", "12:00:00"],
store_conf="基础",
# device_type="9.133_gpu0"
)
print(task_info)
# from createsubtask import create_subtask_batch
# create_subtask_batch(driver, '100辆车', {'短视频': ['100辆车']}, monitor_status=True, monitor_times=20)
driver.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/22 9:12
# @Author : Young Lee
# @Email : young_lee2017@163.com
from jumppage import jump_page
import time
import re
def select_condition(driver, conditopn_type, specific_value):
"""选择条件"""
common_xpath = f"//label[@class='el-form-item__label'][text()='{conditopn_type}']"
input_select = f"{common_xpath}/following-sibling::div[1]//input"
# li_select = f"{common_xpath}/following-sibling::div[1]//li/span[text()='{specific_value}']"
driver.find_element_by_xpath(input_select).click() # 点击选择框
time.sleep(0.5)
li_eles = driver.find_elements_by_xpath(f"{common_xpath}/following-sibling::div[1]//li/span")
li_dic = {ele.get_attribute('textContent'): ele for ele in li_eles}
if specific_value not in li_dic.keys():
return False
li_dic[specific_value].click()
return True
# driver.find_element_by_xpath(li_select).click() # 选择具体值
def vehicle_search(driver, task_name=None, subtask_name=None):
"""过车记录检索
:param task_name: 大任务名称
:param subtask_name: 子任务名称
:return: {'first_page': <first page num>, 'total': <total num>}
"""
jump_page(driver, "智能检索", "过车检索")
if task_name:
select_result = select_condition(driver, "任务名称", task_name)
if not select_result:
print(f"未找到大任务'{task_name}'")
return f"未找到大任务'{task_name}'"
if subtask_name:
select_result = select_condition(driver, "视频名称", subtask_name)
if not select_result:
print(f"未找到子任务'{subtask_name}'")
return f"未找到子任务'{subtask_name}'"
driver.find_element_by_xpath("//div[@class='innnerBox']/div[2]//button/span[text()='查询']").click() # 点击查询
# 等待查询数据出现,30S超时
start = time.time()
while True:
style = driver.find_element_by_class_name("el-loading-mask").get_attribute('style')
now = time.time()
if style == 'display: none;':
break
elif (now - start) > 30:
print("超时未检索到数据")
return "超时未检索到数据"
time.sleep(0.2)
judge_name = driver.find_element_by_css_selector(".el-table__body-wrapper>*:nth-last-child(1)").get_attribute('class')
if judge_name == 'el-table__empty-block':
vehiclesnumber_of_firstpage = 0
else:
li_eles = driver.find_elements_by_css_selector("table[class='el-table__body']>tbody>tr")
vehiclesnumber_of_firstpage = len(li_eles)
total_ele = driver.find_element_by_css_selector("div[class='innnerBox']>div:nth-child(3) span[class='el-pagination__total']")
# driver.execute_script("arguments[0].style.display='block'", total_ele)
total_number = total_ele.get_attribute("textContent")
vehiclesnumber_of_all = re.search("[0-9]+", total_number).group()
search_result = {"first_page": vehiclesnumber_of_firstpage, "total": int(vehiclesnumber_of_all)}
print(search_result)
return search_result
if __name__ == "__main__":
from unitinit import *
from login import login_fx
login_fx(driver, login_url, user, passwd)
vehicle_search(driver, '测试任务DOmxF', '大货车合并文件')
driver.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2021/1/4 22:05
# @Author : Young Lee
# @Email : young_lee2017@163.com
from findtask import find_task
from monitor_subtask_status import monitor_subtask_status
import time
def delete_task(driver, task_name, monitor_times=30):
"""删除大任务
:param task_name: 大任务名称
:param monitor_times: 最大检测次数,默认为30
:return: 删除成功返回None, 删除失败返回task_name
"""
subtasks_id = find_task(driver, task_name)
if isinstance(subtasks_id, list):
delete_subtask(driver, task_name, monitor_times=monitor_times, subtasks_id=subtasks_id)
elif not subtasks_id:
print(f"任务 '{task_name}' 不存在, 无需删除!")
return
# 删除大任务
driver.find_element_by_xpath(f"//div[text()='{task_name}']/../following-sibling::td[last()]//span[contains(@class, "
f"'el-icon-delete')]").click()
driver.find_element_by_xpath("//div[@aria-label='提示']//button/span[contains(text(), '确定')]").click()
check_task = find_task(driver, task_name)
return task_name if check_task else None
def delete_subtask(driver, task_name, subtasks_list=None, monitor_times=30, subtasks_id=None):
"""删除子任务
:param task_name: 大任务名称
:param subtasks_list: 要删除子任务名称列表
:param monitor_times: 最大检测次数,默认为30
:param subtask_id: 大任务下子任务ID列表,为None时则自动收集
:return: 返回未删掉的任务列表
"""
rest_subtasks = []
subtasks_id = subtasks_id if subtasks_id else find_task(driver, task_name)
if not subtasks_id:
print("未找到任何任务!")
return
# 打开‘修改视频源配置’
driver.find_element_by_xpath(f"//div[text()='{task_name}']/../following-sibling::td[last()]//span[contains(@class,"
f"'el-icon-video-camera-solid')]").click()
# 删除列表中存在的任务
existing_eles = driver.find_elements_by_css_selector("div[aria-label='视频源配置修改'] tbody>tr")
for i in range(len(existing_eles)-1, -1, -1):
existing_ele = existing_eles[i]
subtask_name = existing_ele.find_element_by_css_selector("td:nth-child(1)>div>div").get_attribute("textContent").strip()
if not subtasks_list or subtask_name in subtasks_list:
print(f"删除 '{subtask_name}'")
existing_ele.find_element_by_css_selector("td:nth-last-child(1)>div>.el-icon-delete").click()
driver.find_element_by_xpath("//div[@class='el-message-box']//button/span[contains(text(),'确定')]").click()
time.sleep(0.5)
driver.find_element_by_xpath("//div[@aria-label='视频源配置修改']/div[@class='el-dialog__footer']/span/button/"
"span[text()='确 定']").click()
for i in range(monitor_times):
print(f"第{i}次:")
subtasks_status = monitor_subtask_status(driver, task_name)
if subtasks_status:
subtasks_name = [subtask[0] for subtask in subtasks_status]
rest_subtasks = [name for name in subtasks_list if name in subtasks_name] if subtasks_list else subtasks_name
else:
rest_subtasks = []
if not rest_subtasks:
break
time.sleep(2)
return rest_subtasks
if __name__ == '__main__':
from unitinit import *
login_fx(driver, login_url, user, passwd)
task_name = "测试"
subtask_list = ["100辆车"]
print(delete_task(driver, task_name))
# print(delete_subtask(driver, task_name, subtask_list))
driver.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/11 17:48
# @Author : Young Lee
# @Email : young_lee2017@163.com
from jumppage import jump_page
def device_resource(driver):
"""获取分析资源数量
:return: {"total": total_resource, "free": free_resource}
"""
jump_page(driver, "系统运维", "设备管理")
try:
total_element = driver.find_element_by_xpath("//span[text()='分析资源数:']/following-sibling::span")
free_element = driver.find_element_by_xpath("//span[text()='离线资源数:']/following-sibling::span")
total_resource = int(total_element.text)
free_resource = int(free_element.text)
return {"total": total_resource, "free": free_resource}
except Exception:
return {"total": 0, "free": 0}
if __name__ == "__main__":
from unitinit import *
from login import login_fx
login_fx(driver, login_url, user, passwd)
print(device_resource(driver))
driver.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/17 9:15
# @Author : Young Lee
# @Email : young_lee2017@163.com
from jumppage import jump_page
from selenium.webdriver.common.keys import Keys
import time
def get_subtasks_id(driver, common_xpath):
"""获取子任务ID列表
:return: 返回子任务ID列表,不存在子任务时返回'notaskinfo'
"""
expanded_bar = f"{common_xpath}/../preceding-sibling::td[2]/div/div"
driver.find_element_by_xpath(expanded_bar).click()
time.sleep(1)
exist_subtask = driver.find_element_by_xpath(
f"{common_xpath}/../../following-sibling::tr[1]/td/div[1]"
).get_attribute("class")
if exist_subtask == "notaskinfo":
return exist_subtask
subtasks_id = []
subtasks_id_ele = driver.find_elements_by_xpath(
f"{common_xpath}/../../following-sibling::tr[1]/td//span[@class='label'][text()='任务ID']/following-sibling::span[1]")
for subtask_id_ele in subtasks_id_ele:
subtasks_id.append(subtask_id_ele.text)
driver.find_element_by_xpath(expanded_bar).click()
return subtasks_id
def find_task(driver, task_name):
"""定位任务位置
:return: 找不到任务时返回None; 找到大任务但其下无子任务时返回"notaskinfo"; 找到大任务并其下有子任务时返回子任务ID列表
"""
jump_page(driver, "任务管理", "任务设置")
input_ele = driver.find_element_by_css_selector("input[placeholder='请输入任务名称']")
input_ele.send_keys(Keys.CONTROL, 'a')
input_ele.send_keys(task_name)
driver.find_element_by_xpath("//div[@class='content']/div[1]//span[text()='查询']").click()
time.sleep(1)
common_xpath = f"//div[@class='content']/div[2]//tbody/tr/td[3]/div[text()='{task_name}']"
num_of_pages = int(driver.find_element_by_css_selector(".el-pager>li:nth-last-child(1)").text)
for num in range(num_of_pages):
if num > 0:
driver.find_element_by_css_selector(f".el-pager>li:nth-child({num+1})").click()
time.sleep(1)
find_result_check = driver.find_element_by_css_selector(".el-table__body-wrapper>*:nth-last-child(1)").get_attribute("class")
if find_result_check == "el-table__empty-block":
continue
find_out_task_eles = driver.find_elements_by_css_selector(".content>div:nth-child(2) tbody>tr>td:nth-child(3)>div")
find_out_tasks = [ele.get_attribute("textContent") for ele in find_out_task_eles]
if task_name in find_out_tasks:
return get_subtasks_id(driver, common_xpath)
else:
print(f"第{num+1}页: 未找到 '{task_name}'")
if num + 1 == num_of_pages:
return
else:
continue
if __name__ == "__main__":
from unitinit import *
login_fx(driver, login_url, user, passwd)
print(find_task(driver, "测试2021"))
driver.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2021/1/11 22:14
# @Author : Young Lee
# @Email : young_lee2017@163.com
import time
import os
from selenium.webdriver import ActionChains
from jumppage import jump_page
def check_send_status(driver, review_list, target_task_ele, import_result):
""" 检查配置下发状态 """
if target_task_ele and review_list:
for name in review_list:
target_task_ele.find_element_by_css_selector(f"ol>li[title='{name}']").click()
mask_ele = driver.find_element_by_class_name("el-loading-mask")
flag = False
for i in range(30):
if mask_ele.get_attribute('style') == 'display: none;':
break
time.sleep(0.3)
if i == 29:
import_result[name] = 'review: loding failed'
flag = True
# 任务信息加载失败则跳过本次检测
if flag:
continue
# 移动鼠标到参数、区域、标定设置之上,获取配置下发状态
set_send_status = {}
for set_ele in driver.find_elements_by_css_selector(".table-set>span"):
set_type = set_ele.get_attribute('textContent')
ActionChains(driver).move_to_element(set_ele).perform()
set_status = driver.find_element_by_css_selector(
"div[role='tooltip'][aria-hidden='false']").get_attribute('textContent')
set_send_status[set_type] = set_status
# 记录配置下发状态
import_result[name] = set_send_status
def import_config(driver, task_name, subtasks_cfg_info, retry_check_times=5):
"""导入任务配置
:param task_name: 大任务名称
:param subtasks_cfg_info: 配置信息 {<任务名称>: <本地配置文件路径>, ...}
:param retry_check_times: 导入配置响应状态监测的最大次数,默认5次
:return: {任务名称: 导入结果},导入成功时'导入结果'为配置下发状态(dict),导入失败时'导入结果'为失败原因(str)
"""
import_result = {}
review_list = []
target_task_ele = None
jump_page(driver, '任务管理', '场景设置')
tasks_ele = driver.find_elements_by_css_selector(".tasklist>li")
for task_ele in tasks_ele:
task_title = task_ele.find_element_by_css_selector(".task-title")
ele_name = task_title.get_attribute('textContent')
if ele_name != task_name:
continue
target_task_ele = task_ele
tag_name = driver.find_element_by_css_selector(".tasklist>li>*:nth-last-child(1)").get_attribute('tagName')
if tag_name != 'OL':
task_title.click()
subtasks_ele = task_ele.find_elements_by_css_selector('ol>li')
subtasks_title = [title.get_attribute('title') for title in subtasks_ele]
# 挨个任务导入
for name, cfg in subtasks_cfg_info.items():
if name not in subtasks_title or not os.path.exists(cfg):
import_result[name] = 'preparation: subtask/config not found'
continue
subtasks_ele[subtasks_title.index(name)].click()
mask_ele = driver.find_element_by_class_name("el-loading-mask")
for i in range(30):
if mask_ele.get_attribute('style') == 'display: none;':
break
time.sleep(0.3)
# 清除已存在的alert
while True:
last_ele = driver.find_element_by_css_selector("#theme-name>*:nth-last-child(1)")
if last_ele.get_attribute('tagName') == 'DIV' and last_ele.get_attribute('role') == 'alert':
try:
driver.execute_script("document.querySelector(\"div[role='alert']\").remove()")
except Exception:
pass
else:
break
# 导入配置
driver.find_element_by_xpath("//div[@class='setting-box']/div[contains(text(), '配置导入')]/span").click()
driver.find_element_by_css_selector(
"div[aria-label='配置导入'] .el-dialog__body .upload-demo input").send_keys(cfg)
driver.find_element_by_xpath(
"//div[@aria-label='配置导入']/div[@class='el-dialog__footer']/span/button/span[text()='确 定']").click()
response_class = driver.find_element_by_css_selector(
"div[role='alert']:nth-last-child(1)>i").get_attribute('class')
response_message = driver.find_element_by_css_selector("div[role='alert']>p").get_attribute('textContent')
print(f"导入操作响应信息: {response_message}")
if 'el-icon-success' in response_class:
review_list.append(name)
import_result[name] = None
else:
import_result[name] = 'import: import failed'
break
# 复检配置导入状态
time.sleep(1)
while review_list:
check_send_status(driver, review_list, target_task_ele, import_result)
for name in review_list:
status = import_result[name]
if isinstance(status, dict) and '下发中' not in status.values():
review_list.remove(name)
retry_check_times -= 1
if retry_check_times < 1:
break
return import_result
if __name__ == '__main__':
from unitinit import *
from login import login_fx
login_fx(driver, login_url, user, passwd)
# cfg_path = r"\\192.168.9.1\t测试\任务信息库\config\100辆车_6f52d8381807461b96f102a2d73c5c62.tar.gz"
for i in range(30):
import_result = import_config(driver, '测试任务njF9Y', task_cfg_info)
print(import_result)
driver.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/10 23:49
# @Author : Young Lee
# @Email : young_lee2017@163.com
import time
def jump_page(driver, large_module, small_module):
"""跳转到指定页面
:param large_module: 大模块名称
:param small_module: 子模块名称
"""
large_xpath = f"//div[@class='el-submenu__title']//span[text()='{large_module}']"
large_li_xpath = f"{large_xpath}/../.."
small_xpath = f"//ul[@role='menu']/li[contains(text(),'{small_module}')]"
if "is-opened" not in driver.find_element_by_xpath(large_li_xpath).get_attribute("class"):
driver.find_element_by_xpath(large_xpath).click()
if "is-active" not in driver.find_element_by_xpath(small_xpath).get_attribute("class"):
time.sleep(0.5)
driver.find_element_by_xpath(small_xpath).click()
time.sleep(1)
if __name__ == "__main__":
from unitinit import *
login_fx(driver, login_url, user, passwd)
jump_page(driver, '资源管理', '视频设备')
time.sleep(3)
driver.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/10 23:07
# @Author : Young Lee
# @Email : young_lee2017@163.com
import time
def login_fx(driver, login_url, user, passwd):
"""登录繁星平台
:param login_url: 登录地址
:param user: 用户名
:param passwd: 密码
:return: 登陆后首页地址
"""
driver.get(login_url)
elements = driver.find_elements_by_class_name("el-input__inner")
# print(elements)
elements[0].send_keys(user)
elements[1].send_keys(passwd)
driver.find_element_by_tag_name("button").click()
time.sleep(2)
currentPage_url = driver.current_url
return currentPage_url
if __name__ == "__main__":
from selenium import webdriver
from cfg.fxinfo import *
driver = webdriver.Chrome(driver_path)
driver.implicitly_wait(5)
page_url = login_fx(driver, login_url, user, passwd)
print(page_url)
driver.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/17 14:29
# @Author : Young Lee
# @Email : young_lee2017@163.com
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from jumppage import jump_page
from playcheck import play_check
import requests
import time
import re
from mythread import MyThread
def monitor_subtask_capture(driver, wait_time, subtask_flag):
""" 监测是否接收到抓拍结果 """
try:
WebDriverWait(driver, wait_time).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#pane-first .el-row"))
)
print("已检测到实时抓拍结果!")
except Exception:
print(f"未监测到任务'{subtask_flag}'的实时抓拍数据!")
return False
return True
def monitor_subtask_stream(plat_ip, ffprobe_path, subtask_id):
""" 监测是否接收到分析流 """
req_url = f"http://{plat_ip}:20080/api/v1/devconf_fx/analyse/live/play_url?subtask_id={subtask_id}&_t={int(time.time())}"
try:
address = requests.get(req_url, timeout=10).json()['rtsp_url']
if not address:
print("分析流播放地址为空!")
return False
except Exception:
print("获取分析流播放地址出错!")
return False
return play_check(ffprobe_path, address)
def monitor_subtask_result(driver, wait_time, ffprobe_path, task_name, subtask_name=None, subtask_id=None):
"""监测是否有抓拍数据
:param wait_time: 等候抓拍结果的时间
:param ffprobe_path: ffprobe的路径
:param task_name: 大任务名称
:param subtask_name: 子任务名称
:param subtask_id: 子任务ID, 优先级大于subtask_name,subtask_id不为None时使用subtask_id进行定位
:return: {'capture_result': capture_result, 'stream_result': stream_result} 分别为抓拍结果和流播放结果
"""
common_xpath = f"//span[@class='task-title'][text()='{task_name}']"
subtask_flag = subtask_name if subtask_name else subtask_id
subtask_li = f"{common_xpath}/../following-sibling::ol[1]/li[@data-subtask='{subtask_id}']" if subtask_id \
else f"{common_xpath}/../following-sibling::ol[1]/li[@title='{subtask_name}']"
jump_page(driver, "任务管理", "场景设置")
# 展开大任务
try:
driver.find_element_by_xpath(common_xpath).click()
except Exception:
print(f"未找到大任务'{task_name}'!")
return
# 点击子任务
try:
driver.find_element_by_xpath(subtask_li).click()
except Exception:
print(f"未找到子任务'{subtask_flag}'!")
return
# 将分析流监测放入线程
if not subtask_id:
subtask_id = driver.find_element_by_xpath(subtask_li).get_attribute("data-subtask")
plat_ip = re.search("\d{1,3}.\d{1,3}.\d{1,3}.\d{1,3}", driver.current_url).group()
# stream_result = monitor_subtask_stream(plat_ip, ffprobe_path, subtask_id)
stream_result_thread = MyThread(monitor_subtask_stream, plat_ip, ffprobe_path, subtask_id)
stream_result_thread.start()
# 抓拍结果监测
capture_result = monitor_subtask_capture(driver, wait_time, subtask_flag)
stream_result_thread.join()
stream_result = stream_result_thread.get_result()
return {"capture_result": capture_result, "stream_result": stream_result}
if __name__ == "__main__":
from unitinit import *
from login import login_fx
login_fx(driver, login_url, user, passwd)
# monitor_subtask_capture(driver, "非机动车逆行", 30, subtask_name="非机动车逆行", subtask_id="f6343c3ce4a34a0d888ac75164ef59c5")
# driver.quit()
from cfg.fxinfo import *
import datetime
# address = "rtsp://192.168.9.233:8555/rtsp://192.168.8.108:10087/ffae4bfb48d94a3688a5b503bc30282e"
# play_check(address)
# for i in range(100):
# stream = play_check(ffprobe_path, address)
# if not stream:
# print("*********************************************************")
# print(f"{datetime.datetime.now().strftime('%H:%M:%S')} : {stream}")
# print("again")
result = monitor_subtask_result(driver, 10, ffprobe_path, "流量", subtask_name="流量")
print(result)
driver.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/17 9:17
# @Author : Young Lee
# @Email : young_lee2017@163.com
import time
def monitor_subtask_status(driver, task_name, excluded_list=None):
"""子任务下发状态监控
:param task_name: 大任务名称
:param excluded_list: 要排除的子任务ID列表
:return: 返回subtasks_status [subtask_name, subtask_status, subtask_id]
"""
if excluded_list is None:
excluded_list = []
print("监测子任务状态...")
subtasks_status = []
common_xpath = f"//div[@class='content']/div[2]//tbody/tr/td[3]/div[text()='{task_name}']"
expanded_bar = f"{common_xpath}/../preceding-sibling::td[2]/div/div"
subtasks_td = f"{common_xpath}/../../following-sibling::tr[1]/td/div"
if "el-table__expand-icon--expanded" not in driver.find_element_by_xpath(expanded_bar).get_attribute("class"):
driver.find_element_by_xpath(expanded_bar).click()
time.sleep(1)
if driver.find_element_by_xpath(subtasks_td).get_attribute("class") == "notaskinfo":
return
subtasks_ele = driver.find_elements_by_xpath(subtasks_td)
for subtask_ele in subtasks_ele:
subtask_id = subtask_ele.find_element_by_xpath("./span[@class='label'][text()='任务ID']/following-sibling::span[1]").text
if subtask_id not in excluded_list:
subtask_name = subtask_ele.find_element_by_xpath("./span[@class='label'][text()='视频源文件']/following-sibling::span[1]").text
subtask_status = subtask_ele.find_element_by_css_selector(".taskstatus>span").text
subtasks_status.append([subtask_name, subtask_status, subtask_id])
driver.find_element_by_xpath(expanded_bar).click()
return subtasks_status
if __name__ == '__main__':
from unitinit import *
from login import login_fx
from jumppage import jump_page
login_fx(driver, login_url, user, passwd)
jump_page(driver, '任务管理', '任务设置')
monitor_subtask_status(driver, '测试任务wDShB')
# -*- coding: utf-8 -*-
# @Time : 2020/12/21 14:23
# @Author : Young Lee
# @Email : young_lee2017@163.com
from threading import Thread
class MyThread(Thread):
def __init__(self, func, *args):
super(MyThread, self).__init__()
self.func = func
self.args = args
def run(self):
self.result = self.func(*self.args)
def get_result(self):
return self.result
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/20 18:24
# @Author : Young Lee
# @Email : young_lee2017@163.com
import subprocess
import json
def stream_check(stream_info):
""" 判断流信息 """
width = stream_info['streams'][0]['width']
height = stream_info['streams'][0]['height']
if width > 0 and height > 0:
print("已监测到分析流!")
return True
else:
return False
def play_check(ffprobe_path, address):
"""判断视频流是否可播放
:param ffprobe_path: ffprobe程序的路径
:param address: 视频流地址
:return True or False
"""
command = f"{ffprobe_path} -print_format json -show_streams -stimeout 2000000 -i {address}"
try:
subp = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
subp.wait(10)
stdout = subp.stdout.read()
stream_info = json.loads(stdout)
return stream_check(stream_info)
except Exception as e:
print("[play_check]错误信息: %s" % e)
return False
# def play_check(address):
#
# cap = cv2.VideoCapture()
# cap.open(address)
# sampling = []
# play_value = False
# if cap.isOpened():
# print("开始分析...")
# for i in range(30):
# sampling.append(cap.read()[0])
# cv2.waitKey(30)
# if sampling and sampling.count(True) > sampling.count(False):
# print("点播成功!")
# play_value = True
# else:
# print("点播失败!")
# cap.release()
# return play_value
if __name__ == "__main__":
from cfg.fxinfo import *
# address = "rtsp://192.168.9.233:8555/rtsp://192.168.8.108:10087/ffae4bfb48d94a3688a5b503bc30282e"
# print(play_check(ffprobe_path, address))
# with open('264.list', 'r') as f:
# rtsp_str = f.read()
#
# rtsp_list = [stream for stream in rtsp_str.split('\n') if stream != '']
# for stream in rtsp_list:
# play_result = play_check(ffprobe_path, stream)
# if play_result == False:
# play_result = play_check(ffprobe_path, stream)
# print(f"{stream}: {play_result}")
# i = 0
# while True:
# i += 1
# play_result = play_check(ffprobe_path, "rtsp://admin:vion1234@192.168.9.93:554/h264/ch1/main/av_stream")
# print(f"{i}: {play_result}")
#
# from multiprocessing import Pool
#
# p = Pool(2)
# for i in range(100):
# p.apply_async(play_check, args=[ffprobe_path, "rtsp://admin:vion1234@192.168.9.93:554/h264/ch1/main/av_stream",])
# p.close()
# p.join()
# -*- coding: utf-8 -*-
# @Time : 2020/12/10 23:15
# @Author : Young Lee
# @Email : young_lee2017@163.com
from selenium import webdriver
# from selenium.webdriver.common.by import By
# from selenium.webdriver.support.ui import WebDriverWait
# from selenium.webdriver.support import expected_conditions as EC
# from selenium.webdriver.common.keys import Keys
from cfg.fxinfo import *
driver = webdriver.Chrome(driver_path)
driver.implicitly_wait(5)
driver.maximize_window()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/22 14:33
# @Author : Young Lee
# @Email : young_lee2017@163.com
from jumppage import jump_page
from selenium.webdriver.common.keys import Keys
import time
def upload_storage_config(driver, config_name, config_path):
"""上传存储配置
:param config_name: 存储配置名称
:param config_path: 存储配置路径
:return: first_sto_name 已上传的第一个存储配置名称 或者失败信息
"""
existing = False
jump_page(driver, "资源管理", "存储配置")
# 输入存储配置名称进行查询过滤
search_ele = driver.find_element_by_css_selector("div[class='content']>div input[placeholder='请输入存储配置名称']")
search_ele.send_keys(Keys.CONTROL, 'a')
search_ele.send_keys(config_name)
driver.find_element_by_xpath("//div[@class='content']/div[1]//button/span[text()='查询']").click()
time.sleep(1)
res_class = driver.find_element_by_css_selector(".el-table__body-wrapper>*:nth-last-child(1)").get_attribute('class')
if res_class != 'el-table__empty-block':
# 获取页数
page_common_xpath = "//div[@class='content']/div[2]//ul[@class='el-pager']"
page_ele = driver.find_element_by_xpath(f"{page_common_xpath}/li[last()]")
page_nums = int(page_ele.get_attribute("textContent"))
for page_num in range(1, page_nums+1):
if page_num != 1:
driver.find_element_by_xpath(f"{page_common_xpath}/li[{page_num}]").click()
time.sleep(0.5)
sto_eles = driver.find_elements_by_css_selector("tbody>tr>td:nth-child(1)>div")
storage_list = [sto_ele.get_attribute("textContent") for sto_ele in sto_eles]
print(storage_list)
if config_name in storage_list:
print("存在同名存储配置")
existing = True
# 添加存储配置
driver.find_element_by_xpath("//div[@class='content']/div[1]//button/span[text()='添加']").click()
# 清除已存在的存储配置上传列表
while True:
child_nodes_num = driver.execute_script("return document.getElementsByClassName('el-upload-list')[0].childNodes.length")
if child_nodes_num == 0:
break
# 若存在存储配置文件,则清除
driver.find_element_by_class_name("el-upload-list__item-name").click()
driver.find_element_by_class_name("el-upload-list__item").send_keys(Keys.DELETE)
time.sleep(0.5)
# 输入存储配置名称
name_input_ele = driver.find_element_by_css_selector("div[aria-label='添加'] input[type='text']")
name_input_ele.send_keys(Keys.CONTROL, 'a')
name_input_ele.send_keys(config_name)
# 输入存储配置文件路径
driver.find_element_by_css_selector("div[aria-label='添加'] input[type='file']").send_keys(config_path)
time.sleep(0.5)
# 点击上传
driver.find_element_by_xpath("//div[@class='el-dialog__footer']//span[text()='上 传']").click()
# 捕捉响应消息
upload_msg_class = driver.find_element_by_css_selector("div[role='alert']>i").get_attribute('class')
if 'el-icon-error' in upload_msg_class:
# 关闭上传窗口,返回错误信息
driver.find_element_by_css_selector("div[aria-label='添加']>.el-dialog__header>button[aria-label='Close']").click()
return driver.find_element_by_css_selector('.el-message__content').get_attribute('textContent') if existing else '上传失败'
first_sto_name = driver.find_element_by_css_selector(
"tbody>tr>td:nth-child(1)>div:nth-child(1)"
).get_attribute("textContent")
return first_sto_name
if __name__ == "__main__":
from unitinit import *
from login import login_fx
login_fx(driver, login_url, user, passwd)
for i in range(100):
first_sto_name = upload_storage_config(driver, "测试MIwue", storage_config_path)
print(first_sto_name)
driver.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2020/12/10 23:41
# @Author : Young Lee
# @Email : young_lee2017@163.com
import time
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from jumppage import jump_page
def get_uploded(driver):
""" 获取已上传视频列表 """
expanded_bar = "//div[contains(@class, 'treeBox')]//span[text()='手动添加录像资源']/../../preceding-sibling::span"
if "expanded" not in driver.find_element_by_xpath(expanded_bar).get_attribute("class"):
driver.find_element_by_xpath(expanded_bar).click()
time.sleep(1)
elements = driver.find_elements_by_css_selector(".el-tree-node__children span[class='tree-label']")
return [element.text for element in elements]
def upload_videos(driver, videos_info):
"""上传指定目录的视频文件
:param videos_info: 视频信息 {name: path, ...}
:return: 已上传任务名称列表
"""
add_bar = "//div[contains(@class, 'treeBox')]//span[text()='手动添加录像资源']/../following-sibling::span"
jump_page(driver, "资源管理", "视频设备")
uploded_videos = get_uploded(driver)
videos_name = list(videos_info.keys())
if [ele for ele in videos_name if ele in uploded_videos] == videos_name:
print("***无需上传任何视频文件***")
return videos_name
driver.find_element_by_xpath(add_bar).click()
input_element = driver.find_element_by_css_selector("input[type='file']")
for video_name, video_path in videos_info.items():
if video_name in uploded_videos:
print(f"{video_name} 已存在,无需上传!")
continue
print(f"开始上传 --> {video_path}...")
input_element.send_keys(video_path)
driver.find_element_by_xpath("//span[text()='上 传']").click()
try:
WebDriverWait(driver, 30).until(
EC.presence_of_element_located((By.CLASS_NAME, "is-success"))
)
except Exception:
print(f"{video_name}似乎没有上传成功!")
finally:
# 清除上传列表
video_display = driver.find_element_by_class_name("el-upload-list__item")
video_display.click()
time.sleep(1)
video_display.send_keys(Keys.DELETE)
time.sleep(1)
driver.find_element_by_xpath("//span[text()='上 传']/../preceding-sibling::button[1]").click()
upload_list = get_uploded(driver)
return list(set(videos_info.keys()) & set(upload_list))
if __name__ == "__main__":
from unitinit import *
from login import login_fx
login_fx(driver, login_url, user, passwd)
print(upload_videos(driver, videos_info))
driver.quit()
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2021/1/19 9:17
# @Author : Young Lee
# @Email : young_lee2017@163.com
import logging
import os
import configparser
import re
from logging.config import fileConfig
def generate_log(logfile_name='testing.log', logfile_path=None, config_path=None, logger_name='simpleLog'):
"""动态复用日志配置文件,更具需要更换日志存储路径
:param logfile_name: 日志名称,logfile_path为None时logfile_name生效
:param logfile_path: 日志存储路径,为空时存储在generatelog.py同目录下的testing.log中
:param config_path: 日志配置文件路径,为空时为generatelog.py同目录下的logging.conf文件
:param logger_name: 要使用的日志器名称
:return: logger对象
"""
current_dir = os.path.dirname(os.path.abspath(__file__))
if logfile_path is None:
logfile_path = os.path.join(current_dir, logfile_name)
logfile_path_str = f"r'{logfile_path}'"
if config_path is None:
config_path = os.path.join(current_dir, 'logging.conf')
config = configparser.ConfigParser()
config.read(config_path)
for sec in config.sections():
if not re.match('handler_.*', sec):
continue
if 'args' not in config.options(sec):
continue
cfg_args = config.get(sec, 'args').replace('LogFilePath', logfile_path_str)
config.set(sec, 'args', cfg_args)
fileConfig(config)
logger = logging.getLogger(logger_name)
return logger
if __name__ == '__main__':
import time
logger = generate_log('test_2021.log')
while True:
logger.debug('debug message')
logger.info('info message')
logger.warning('warning message')
logger.error('error message')
logger.critical('critical message')
time.sleep(1)
\ No newline at end of file
[loggers]
keys = root, simpleLog
[handlers]
keys = consoleHandler, fileHandler, timedRotatingHandler, rotatingFileHandler
[formatters]
keys = simpleFormatter
[logger_root]
level = DEBUG
handlers = consoleHandler
[logger_simpleLog]
level = DEBUG
handlers = consoleHandler, rotatingFileHandler
qualname = simpleLog
propagate = 0
[handler_consoleHandler]
class = StreamHandler
args = (sys.stdout,)
level = INFO
formatter = simpleFormatter
[handler_fileHandler]
class = FileHandler
args = (LogFilePath,)
level = WARNING
formatter = simpleFormatter
[handler_timedRotatingHandler]
class = handlers.TimedRotatingFileHandler
#('testing.log', when='M', interval=1, backupCount=2, encoding='utf-8')
args = (LogFilePath, 'midnight', 1, 3, 'utf-8',)
level = WARNING
formatter = simpleFormatter
[handler_rotatingFileHandler]
; logging.handlers.RotatingFileHandler('all.log', maxBytes=1024, backupCount=2, encoding='utf-8')
class = logging.handlers.RotatingFileHandler
args = (LogFilePath, 'a', 10*1024*1024, 3, 'utf-8',)
level = INFO
formatter = simpleFormatter
[formatter_simpleFormatter]
format = [%(asctime)s] [%(levelname)s] [%(funcName)s] [%(message)s]
; datefmt = %Y-%m-%d %H:%M:%S
\ No newline at end of file
# -*- coding: utf-8 -*-
# @Time : 2021/2/4 15:48
# @Author : Young Lee
# @Email : young_lee2017@163.com
import shutil
import os
import sys
import re
import subprocess
import winreg
# 检查python版本
python_version = re.search('\d+.\d+', sys.version).group()
if float(python_version) < 3.7:
print(f"The current version of Python is {python_version}, less than 3.7 !")
print("This project is based on the development of Python 3.9. It is recommended to install Python 3.9.")
sys.exit(0)
# 检查必备模块是否安装
pip_stdout = subprocess.Popen('pip list', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True).stdout.readlines()
installed_modules = []
for i in pip_stdout:
i = str(i, encoding='utf-8')
i = i[:i.find(' ')]
installed_modules.append(i)
installed_modules = installed_modules[2:]
requirements = ['requests', 'selenium', 'pytest', 'allure-pytest']
need_install = [module for module in requirements if module not in installed_modules]
if need_install:
print(f'Please install those modules: {need_install}.')
sys.exit(0)
# 获取谷歌浏览器版本号
try:
# 从注册表中获得版本号
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r'Software\Google\Chrome\BLBeacon')
chrome_version, type = winreg.QueryValueEx(key, 'version')
except WindowsError as e:
chrome_version = '未知'
# 生成环境配置文件
with open('report/environment.properties', 'w', encoding='utf-8') as f:
f.write('Browser=Chrome\n')
f.write(f'Browser.Version={chrome_version}\n')
f.write('Stand=Production\n')
f.write(f'Python.Version={python_version}')
if __name__ == '__main__':
import pytest
pytest.main(['-s', '-v', f'--alluredir=report/testing_report', '--clean-alluredir', './cases'])
shutil.copy('report/environment.properties', 'report/testing_report/')
os.system('allure serve report/testing_report')
\ No newline at end of file
Browser=Chrome
Browser.Version=88.0.4324.104
Stand=Production
Python.Version=3.9
\ No newline at end of file
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!