TaskDataService.java 5.95 KB
package com.viontech.fanxing.task.service;

import com.viontech.fanxing.commons.constant.ChannelType;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.runtime.RuntimeConfig;
import com.viontech.fanxing.task.repository.TaskDataRedisRepository;
import com.viontech.fanxing.task.service.adapter.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * .
 *
 * @author 谢明辉
 * @date 2021/7/13
 */
@Service
@Slf4j
public class TaskDataService {

    @Resource
    private RedisService redisService;
    @Resource
    private VAServerService vaServerService;
    @Resource
    private TaskDataRedisRepository taskDataRedisRepository;
    @Resource
    private OpsClientService opsClientService;
    @Resource
    private TaskService taskService;

    public void addTask(Task task) {
        TaskData taskData = buildTaskData(task);
        taskDataRedisRepository.addOrUpdateTaskData(taskData);
        // 计算运行时间并生成任务
        boolean success = distributeTask(taskData);
        if (!success) {
            throw new FanXingException("任务找不到可执行时间");
        }
    }

    public boolean distributeTask(TaskData taskData) {
        RuntimeConfig runtimeConfig = taskData.getRuntimeConfig();
        String taskUnid = taskData.getTask().getUnid();

        // 如果任务正在执行则不进行分配
        VaServerInfo vaServerInfo = taskRunOn(taskUnid);
        if (vaServerInfo != null) {
            return true;
        }
        taskService.updateStatus(taskData.getTask().getId(), TaskStatus.STAY.val);
        ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
        log.info("部署任务[{}],运行时间:[{}]", taskData.getTask().getName(), nextTime.toString());
        Long nextExecuteTime = nextTime.left;
        Long nextTerminateTime = nextTime.right;
        if (nextExecuteTime != null) {
            RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
            toBeExecutedTaskUnidSet.add(nextExecuteTime, taskUnid);
            if (nextTerminateTime != null) {
                RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet();
                boolean add = toBeTerminatedTaskUnidSet.add(nextTerminateTime, taskUnid);
            }
            return true;
        } else {
            return false;
        }
    }


    public void removeTaskDataAll(String taskUnid) {
        RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
        RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet();

        toBeExecutedTaskUnidSet.remove(taskUnid);
        toBeTerminatedTaskUnidSet.remove(taskUnid);
        taskDataRedisRepository.remove(taskUnid);
    }

    public VaServerInfo taskRunOn(String taskUnid) {
        RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
        String devId = taskVaServerMap.get(taskUnid);
        return devId == null ? null : vaServerService.getVaServerRedisRepository().getVAServerInfoById(devId);
    }

    /**
     * 解绑任务和设备的关联状态
     *
     * @return ImmutablePair<String, String>
     * <li>left <code>taskUnid</code></li>
     * <li>right <code>devId</code></li>
     */
    public ImmutablePair<String, String> unlinkTaskAndVaServer(String taskUnid) {
        RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
        String devId = taskVaServerMap.get(taskUnid);
        taskVaServerMap.remove(taskUnid);
        return ImmutablePair.of(taskUnid, devId);
    }

    public void deleteTask(String taskUnid) {
        boolean success = vaServerService.terminateTask(taskUnid);
        if (success) {
            removeTaskDataAll(taskUnid);
        } else {
            throw new FanXingException("失败");
        }
    }


    public void updateTask(Task task, boolean rebuild) {
        String taskUnid = task.getUnid();
        VaServerInfo vaServerInfo = taskRunOn(taskUnid);
        // vaServerId 为空说明任务未执行可以先删除再建立新任务
        if (vaServerInfo == null || rebuild) {
            deleteTask(taskUnid);
            addTask(task);
        } else if (vaServerInfo.getStatus() == 0) {
            throw new FanXingException("设备离线");
        } else {

            TaskData taskData = buildTaskData(task);
            // 需要更新taskData,并且向vaServer更新任务信息
            taskDataRedisRepository.addOrUpdateTaskData(taskData);
            vaServerService.updateTask(taskData);
        }
    }

    private TaskData buildTaskData(Task task) {
        TaskData taskData = new TaskData(task);
        // 获取存储配置
        Long storeConfigId = task.getStoreConfigId();
        String config = opsClientService.getStoreConfigById(storeConfigId);
        if (config == null) {
            throw new FanXingException("无法获取对应的存储配置");
        }
        taskData.setStoreConfig(config);
        if (taskData.getTask().getStreamType().equals(ChannelType.STREAM_VIDEO_CLOUD.value)) {
            Channel channel = opsClientService.getChannelByChannelUnid(taskData.getTask().getChannelUnid());
            String deviceUnid = channel.getDeviceUnid();
            taskData.setDeviceUnid(deviceUnid);
        }
        return taskData;
    }

    public TaskDataRedisRepository getRepository() {
        return taskDataRedisRepository;
    }
}