test_base_process.py 6.54 KB
# -*- coding: utf-8 -*-
# @Time    :  2021/1/14 17:36
# @Author  :  Young Lee
# @Email   :  young_lee2017@163.com

import sys
sys.path.append('..')
import pytest
import random
import string
from selenium import webdriver
from cfg.fxinfo import *
from log.generatelog import generate_log
import allure
from lib import *

logger = generate_log('testing.log')
# 测试中需要使用的变量
add_result = {}
import_result = {}

@allure.epic('繁星平台基本操作流程测试')
@allure.feature('测试流程')
class TestBaseProcess:
    """ 测试繁星基本功能的类 """

    def setup_class(self):
        """ 执行用例前准备工作 """
        self.driver = webdriver.Chrome(driver_path)
        self.driver.implicitly_wait(5)
        self.driver.maximize_window()
        self.stor_name = '测试' + ''.join(random.sample(string.ascii_letters + string.digits, 5))
        self.task_name = '测试任务' + ''.join(random.sample(string.ascii_letters + string.digits, 5))
        # self.stor_name = '测试MIwue'
        # self.task_name = '测试任务njF9Y'

    def teardown_class(self):
        """ 测试用例执行完成之后收尾工作 """
        self.driver.quit()

    """ 以下为测试用例 """

    @allure.story('step1: 登录')
    @allure.title(f'登录平台 {plat_ip}')
    @pytest.mark.global_gateway
    def test_login(self):
        """ 测试登录 """
        page_url = login_fx(self.driver, login_url, user, passwd)
        logger.info(page_url)
        assert page_url == f"http://{plat_ip}:20080/#/video_task/config"

    @allure.story('step2: 准备操作')
    @allure.title('上传存储配置')
    def test_upload_storageconfig(self):
        """ 测试上传存储配置 """
        upload_msg = upload_storage_config(self.driver, self.stor_name, storage_config_path)
        logger.info(upload_msg)
        assert upload_msg == self.stor_name or upload_msg == '存储配置名称不能重复'

    @allure.story('step2: 准备操作')
    @allure.title('上传视频文件')
    @pytest.mark.task_precondition
    def test_upload_videos(self):
        """ 测试视频上传 """
        uploaded_videos = upload_videos(self.driver, videos_info)
        logger.info(uploaded_videos)
        assert len(videos_name) == len(uploaded_videos)

    @allure.story('step2: 准备操作')
    @allure.title('检查分析资源')
    @pytest.mark.task_precondition
    def test_device_resource(self):
        """ 检查分析资源 """
        resource = device_resource(self.driver)
        logger.info(resource)
        assert resource['free'] > 0 and resource['total'] >= resource['free']


    @allure.story('step3: 创建任务')
    @allure.title('创建大任务')
    @pytest.mark.task_precondition
    @pytest.mark.global_gateway
    def test_create_task(self):
        """ 测试大任务创建 """
        create_result = create_task(tasks_url, self.driver, self.task_name, store_conf=self.stor_name)
        logger.info(create_result)
        assert create_result == {self.task_name: '未部署'} or create_result == '已有该任务'

    @allure.story('step3: 创建任务')
    @allure.title('添加子任务')
    @pytest.mark.master_slave_subtask
    def test_create_subtask(self):
        """ 测试添加子任务 """
        videos_batch_info = {'短视频': videos_name}
        add_result_realtime = create_subtask_batch(self.driver, self.task_name, videos_batch_info, monitor_status=True, monitor_times=30)
        add_result.update({result[0]:result[1] for result in add_result_realtime})
        logger.info(add_result)
        assert videos_name.sort() == [item[0] for item in add_result_realtime].sort()

    @allure.story('step4: 任务下发状态检查')
    @allure.title('子任务下发状态检查: {name}')
    @pytest.mark.parametrize('name', videos_name)
    def test_create_subtask_status(self, name):
        """ 检查子任务下发状态 """
        print(f'校验子任务下发状态: {name}')
        if name in add_result.keys():
            assert add_result[name] == '工作中'
        else:
            assert f'{name}不存在' == '工作中'

    @allure.story('step5: 导入任务配置')
    @allure.title('导入任务配置')
    def test_import_config(self):
        """ 测试任务配置导入 """
        import_result_realtime = import_config(self.driver, self.task_name, task_cfg_info, retry_check_times=6)
        import_result.update(import_result_realtime)
        logger.info(import_result)
        assert videos_name.sort() == list(import_result.keys()).sort()

    @allure.story('step6: 配置下发状态检查')
    @allure.title('配置下发状态检查: {name}')
    @pytest.mark.parametrize('name', videos_name)
    def test_import_config_status(self, name):
        """ 检查配置导入状态 """
        assert import_result[name] == {'参数设置': '运行中', '区域设置': '运行中', '标定设置': '运行中'}

    @allure.story('step7: 任务分析情况检查')
    @allure.title('视频点播和抓拍结果检查: {video_name}')
    @pytest.mark.parametrize('video_name', videos_name)
    def test_monitor_subtask_result(self, video_name):
        """ 检查视频点播和抓拍结果 """
        index_value = videos_name.index(video_name)
        wait_time = 20 if index_value == 0 else 8
        subtask_result = monitor_subtask_result(self.driver, wait_time, ffprobe_path, self.task_name, subtask_name=video_name)
        logger.info(f"{video_name}: {subtask_result}")
        assert subtask_result == {"capture_result": True, "stream_result": True}

    @allure.story('step8: 任务分析情况检查')
    @allure.title('过车数据检索: {name}')
    @pytest.mark.parametrize('name', videos_name)
    def test_result_search(self, name):
        """ 测试过车记录检索 """
        search_result = vehicle_search(self.driver, self.task_name, name)
        logger.info(search_result)
        assert isinstance(search_result, dict)
        assert search_result['first_page'] > 0 and search_result['total'] > search_result['first_page']

    @allure.story('step9: 删除任务')
    @allure.title('删除任务')
    def test_delete_task(self):
        """ 测试任务删除 """
        delete_result = delete_task(self.driver, self.task_name)
        logger.info(delete_result)
        assert delete_result == None


if __name__ == '__main__':
    import shutil
    import os

    # pytest.main(['-s', '-v', 'test_base_process.py'])
    pytest.main(['-s', '-v', '--alluredir=report/testing_report', '--clean-alluredir'])
    shutil.copy('../report/environment.properties', '../report/testing_report/')
    os.system('allure serve report/testing_report')