test_base_process.py
6.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# -*- coding: utf-8 -*-
# @Time : 2021/1/14 17:36
# @Author : Young Lee
# @Email : young_lee2017@163.com
import sys
sys.path.append('..')
import pytest
import random
import string
from selenium import webdriver
from cfg.fxinfo import *
from log.generatelog import generate_log
import allure
from lib import *
logger = generate_log('testing.log')
# 测试中需要使用的变量
add_result = {}
import_result = {}
@allure.epic('繁星平台基本操作流程测试')
@allure.feature('测试流程')
class TestBaseProcess:
""" 测试繁星基本功能的类 """
def setup_class(self):
""" 执行用例前准备工作 """
self.driver = webdriver.Chrome(driver_path)
self.driver.implicitly_wait(5)
self.driver.maximize_window()
self.stor_name = '测试' + ''.join(random.sample(string.ascii_letters + string.digits, 5))
self.task_name = '测试任务' + ''.join(random.sample(string.ascii_letters + string.digits, 5))
# self.stor_name = '测试MIwue'
# self.task_name = '测试任务njF9Y'
def teardown_class(self):
""" 测试用例执行完成之后收尾工作 """
self.driver.quit()
""" 以下为测试用例 """
@allure.story('step1: 登录')
@allure.title(f'登录平台 {plat_ip}')
@pytest.mark.global_gateway
def test_login(self):
""" 测试登录 """
page_url = login_fx(self.driver, login_url, user, passwd)
logger.info(page_url)
assert page_url == f"http://{plat_ip}:20080/#/video_task/config"
@allure.story('step2: 准备操作')
@allure.title('上传存储配置')
def test_upload_storageconfig(self):
""" 测试上传存储配置 """
upload_msg = upload_storage_config(self.driver, self.stor_name, storage_config_path)
logger.info(upload_msg)
assert upload_msg == self.stor_name or upload_msg == '存储配置名称不能重复'
@allure.story('step2: 准备操作')
@allure.title('上传视频文件')
@pytest.mark.task_precondition
def test_upload_videos(self):
""" 测试视频上传 """
uploaded_videos = upload_videos(self.driver, videos_info)
logger.info(uploaded_videos)
assert len(videos_name) == len(uploaded_videos)
@allure.story('step2: 准备操作')
@allure.title('检查分析资源')
@pytest.mark.task_precondition
def test_device_resource(self):
""" 检查分析资源 """
resource = device_resource(self.driver)
assert resource['free_resource'] > 0 and resource['total_resource'] >= resource['free_resource']
@allure.story('step3: 创建任务')
@allure.title('创建大任务')
@pytest.mark.task_precondition
@pytest.mark.global_gateway
def test_create_task(self):
""" 测试大任务创建 """
create_result = create_task(tasks_url, self.driver, self.task_name, store_conf=self.stor_name, device_type='9.245_gpu0')
logger.info(create_result)
assert create_result == {self.task_name: '未部署'} or create_result == '已有该任务'
@allure.story('step3: 创建任务')
@allure.title('添加子任务')
@pytest.mark.master_slave_subtask
def test_create_subtask(self):
""" 测试添加子任务 """
videos_batch_info = {'短视频': videos_name}
add_result_realtime = create_subtask_batch(self.driver, self.task_name, videos_batch_info, monitor_status=True, monitor_times=30)
add_result.update({result[0]:result[1] for result in add_result_realtime})
logger.info(add_result)
assert videos_name.sort() == [item[0] for item in add_result_realtime].sort()
@allure.story('step4: 任务下发状态检查')
@allure.title('子任务下发状态检查: {name}')
@pytest.mark.parametrize('name', videos_name)
def test_create_subtask_status(self, name):
""" 检查子任务下发状态 """
print(f'校验子任务下发状态: {name}')
if name in add_result.keys():
assert add_result[name] == '工作中'
else:
assert f'{name}不存在' == '工作中'
@allure.story('step5: 导入任务配置')
@allure.title('导入任务配置')
def test_import_config(self):
""" 测试任务配置导入 """
import_result_realtime = import_config(self.driver, self.task_name, task_cfg_info, retry_check_times=6)
import_result.update(import_result_realtime)
logger.info(import_result)
assert videos_name.sort() == list(import_result.keys()).sort()
@allure.story('step6: 配置下发状态检查')
@allure.title('配置下发状态检查: {name}')
@pytest.mark.parametrize('name', videos_name)
def test_import_config_status(self, name):
""" 检查配置导入状态 """
assert import_result[name] == {'参数设置': '运行中', '区域设置': '运行中', '标定设置': '运行中'}
@allure.story('step7: 任务分析情况检查')
@allure.title('视频点播和抓拍结果检查: {video_name}')
@pytest.mark.parametrize('video_name', videos_name)
def test_monitor_subtask_result(self, video_name):
""" 检查视频点播和抓拍结果 """
index_value = videos_name.index(video_name)
wait_time = 20 if index_value == 0 else 8
subtask_result = monitor_subtask_result(self.driver, wait_time, ffprobe_path, self.task_name, subtask_name=video_name)
logger.info(f"{video_name}: {subtask_result}")
assert subtask_result == {"capture_result": True, "stream_result": True}
@allure.story('step8: 任务分析情况检查')
@allure.title('过车数据检索: {name}')
@pytest.mark.parametrize('name', videos_name)
def test_result_search(self, name):
""" 测试过车记录检索 """
search_result = vehicle_search(self.driver, self.task_name, name)
logger.info(search_result)
assert isinstance(search_result, dict)
assert search_result['first_page'] > 0 and search_result['total'] > search_result['first_page']
@allure.story('step9: 删除任务')
@allure.title('删除任务')
def test_delete_task(self):
""" 测试任务删除 """
delete_result = delete_task(self.driver, self.task_name)
logger.info(delete_result)
assert delete_result == None
if __name__ == '__main__':
import shutil
import os
# pytest.main(['-s', '-v', 'test_base_process.py'])
pytest.main(['-s', '-v', '--alluredir=report/testing_report', '--clean-alluredir'])
shutil.copy('../report/environment.properties', '../report/testing_report/')
os.system('allure serve report/testing_report')