TaskDataService.java 5.06 KB
package com.viontech.fanxing.task.service;

import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.runtime.RuntimeConfig;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.repository.TaskDataRedisRepository;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * .
 *
 * @author 谢明辉
 * @date 2021/7/13
 */
@Service
public class TaskDataService {

    @Resource
    private RedisService redisService;
    @Resource
    private VAServerService vaServerService;
    @Resource
    private TaskDataRedisRepository taskDataRedisRepository;
    @Resource
    private OpsClientService opsClientService;

    public void addTask(Task task) {
        TaskData taskData = new TaskData(task);
        // 获取存储配置
        Long storeConfigId = task.getStoreConfigId();
        StoreConfig storeConfig = opsClientService.getStoreConfigById(storeConfigId);
        if (storeConfig == null) {
            throw new FanXingException("无法获取对应的存储配置");
        }
        taskData.setStoreConfig(storeConfig.getContent());
        taskDataRedisRepository.addOrUpdateTaskData(taskData);
        // 计算运行时间并生成任务
        boolean success = distributeTask(taskData);
        if (!success) {
            throw new FanXingException("任务找不到可执行时间");
        }
    }

    public boolean distributeTask(TaskData taskData) {
        RuntimeConfig runtimeConfig = taskData.getRuntimeConfig();
        String taskUnid = taskData.getTask().getUnid();

        // 如果任务正在执行则不进行分配
        VaServerInfo vaServerInfo = taskRunOn(taskUnid);
        if (vaServerInfo != null) {
            return true;
        }

        ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
        Long nextExecuteTime = nextTime.left;
        Long nextTerminateTime = nextTime.right;
        if (nextExecuteTime != null) {
            RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
            toBeExecutedTaskUnidSet.add(nextExecuteTime, taskUnid);
            if (nextTerminateTime != null) {
                RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet();
                toBeTerminatedTaskUnidSet.add(nextTerminateTime, taskUnid);
            }
            return true;
        } else {
            return false;
        }
    }


    public void removeTaskDataAll(String taskUnid) {
        RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
        RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet();

        toBeExecutedTaskUnidSet.remove(taskUnid);
        toBeTerminatedTaskUnidSet.remove(taskUnid);
        taskDataRedisRepository.remove(taskUnid);
    }

    public VaServerInfo taskRunOn(String taskUnid) {
        RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
        String devId = taskVaServerMap.get(taskUnid);
        return devId == null ? null : vaServerService.getVaServerRedisRepository().getVAServerInfoById(devId);
    }

    /**
     * 解绑任务和设备的关联状态
     *
     * @return ImmutablePair<String, String>
     * <li>left <code>taskUnid</code></li>
     * <li>right <code>devId</code></li>
     */
    public ImmutablePair<String, String> unlinkTaskAndVaServer(String taskUnid) {
        RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
        String devId = taskVaServerMap.get(taskUnid);
        taskVaServerMap.remove(taskUnid);
        return ImmutablePair.of(taskUnid, devId);
    }

    public void deleteTask(String taskUnid) {
        boolean success = vaServerService.terminateTask(taskUnid);
        if (success) {
            removeTaskDataAll(taskUnid);
        } else {
            throw new FanXingException("failed");
        }
    }


    public void updateTask(Task task) {
        String taskUnid = task.getUnid();
        VaServerInfo vaServerInfo = taskRunOn(taskUnid);
        // vaServerId 为空说明任务未执行可以先删除再建立新任务
        if (vaServerInfo == null) {
            deleteTask(taskUnid);
            addTask(task);
        } else if (vaServerInfo.getStatus() == 0) {
            throw new FanXingException("设备离线");
        } else {

            TaskData taskData = new TaskData(task);
            // 需要更新taskData,并且向vaServer更新任务信息
            taskDataRedisRepository.addOrUpdateTaskData(taskData);
            vaServerService.updateTask(taskData);
        }
    }

    public TaskDataRedisRepository getRepository() {
        return taskDataRedisRepository;
    }
}