TaskDataService.java 5.12 KB
package com.viontech.fanxing.task.service;

import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.feign.OpsClient;
import com.viontech.fanxing.task.model.RuntimeConfig;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.repository.TaskDataRedisRepository;
import com.viontech.keliu.util.JsonMessageUtil;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * .
 *
 * @author 谢明辉
 * @date 2021/7/13
 */
@Service
public class TaskDataService {

    @Resource
    private RedisService redisService;
    @Resource
    private VAServerService vaServerService;
    @Resource
    private TaskDataRedisRepository taskDataRedisRepository;
    @Resource
    private OpsClient opsClient;

    public void addTask(Task task) {
        TaskData taskData = new TaskData(task);
        // 获取存储配置
        Long storeConfigId = task.getStoreConfigId();
        JsonMessageUtil.JsonMessage<StoreConfig> storeConfigRes = opsClient.getStoreConfigById(storeConfigId);
        StoreConfig storeConfigVo = storeConfigRes.getData();
        if (storeConfigVo == null) {
            throw new FanXingException("无法获取对应的存储配置");
        }
        taskData.setStoreConfig(storeConfigVo.getContent());
        taskDataRedisRepository.addOrUpdateTaskData(taskData);
        // 计算运行时间并生成任务
        boolean success = distributeTask(taskData);
        if (!success) {
            throw new FanXingException("任务找不到可执行时间");
        }
    }

    public boolean distributeTask(TaskData taskData) {
        RuntimeConfig runtimeConfig = taskData.getRuntimeConfig();
        String taskUnid = taskData.getTask().getUnid();

        // 如果任务正在执行则不进行分配
        VaServerInfo vaServerInfo = taskRunOn(taskUnid);
        if (vaServerInfo != null) {
            return false;
        }

        ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
        Long nextExecuteTime = nextTime.left;
        Long nextTerminateTime = nextTime.right;
        if (nextExecuteTime != null) {
            RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
            toBeExecutedTaskUnidSet.add(nextExecuteTime, taskUnid);
            if (nextTerminateTime != null) {
                RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet();
                toBeTerminatedTaskUnidSet.add(nextTerminateTime, taskUnid);
            }
            return true;
        } else {
            return false;
        }
    }


    public void removeTaskDataAll(String taskUnid) {
        RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
        RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet();

        toBeExecutedTaskUnidSet.remove(taskUnid);
        toBeTerminatedTaskUnidSet.remove(taskUnid);
        taskDataRedisRepository.remove(taskUnid);
    }

    public VaServerInfo taskRunOn(String taskUnid) {
        RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
        String devId = taskVaServerMap.get(taskUnid);
        return devId == null ? null : vaServerService.getVaServerRedisRepository().getVAServerInfoById(devId);
    }

    /**
     * 解绑任务和设备的关联状态
     *
     * @return ImmutablePair<String, String>
     * <li>left <code>taskUnid</code></li>
     * <li>right <code>devId</code></li>
     */
    public ImmutablePair<String, String> unlinkTaskAndVaServer(String taskUnid) {
        RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
        String devId = taskVaServerMap.get(taskUnid);
        taskVaServerMap.remove(taskUnid);
        return ImmutablePair.of(taskUnid, devId);
    }

    public void deleteTask(String taskUnid) {
        boolean success = vaServerService.terminateTask(taskUnid);
        if (success) {
            removeTaskDataAll(taskUnid);
        } else {
            throw new FanXingException("failed");
        }
    }


    public void updateTask(Task task) {
        String taskUnid = task.getUnid();
        VaServerInfo vaServerInfo = taskRunOn(taskUnid);

        // vaServerId 为空说明任务未执行可以先删除再建立新任务
        if (vaServerInfo == null) {
            deleteTask(taskUnid);
            addTask(task);
        } else {
            TaskData taskData = new TaskData(task);
            // 需要更新taskData,并且向vaServer更新任务信息
            taskDataRedisRepository.addOrUpdateTaskData(taskData);
            vaServerService.updateTask(taskData);
        }
    }

    public TaskDataRedisRepository getRepository() {
        return taskDataRedisRepository;
    }
}