TaskRunner.java 6.04 KB
package com.viontech.fanxing.task.runner;

import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.feign.OpsClient;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.service.VAServerService;
import com.viontech.fanxing.task.service.adapter.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Collection;

/**
 * .
 *
 * @author 谢明辉
 * @date 2021/7/13
 */

@Component
@Slf4j
public class TaskRunner {

    @Resource
    private RedisService redisService;
    @Resource
    private VAServerService vaServerService;
    @Resource
    private TaskDataService taskDataService;
    @Resource
    private OpsClient opsClient;
    @Resource
    private TaskService taskService;

    @Scheduled(fixedDelay = 5000)
    public void executedTaskListener() {
        RLock jobLock = redisService.getLockMust("lock:taskRunner");
        RLock devLock = null;
        try {
            RScoredSortedSet<String> set = redisService.getToBeExecutedTaskUnidSet();
            RMap<String, VaServerInfo> vaServerMap = vaServerService.getVaServerRedisRepository().getVaServerInfoMap();

            Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);
            for (String taskUnid : entryCollection) {
                log.info("开始任务 : {}", taskUnid);
                TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid);
                if (taskData == null) {
                    log.info("找不到对应任务,移除所有:{}", taskUnid);
                    taskDataService.removeTaskDataAll(taskUnid);
                    continue;
                }
                Task task = taskData.getTask();
                String taskVaType = task.getVaType();
                Float resourceNeed = task.getResourceNeed();
                Collection<String> vaServerIdSet = vaServerMap.keySet();
                // todo 暂时先找有可用资源的vaserver,以后再进行算法优化
                VaServerInfo server = null;

                for (String devId : vaServerIdSet) {
                    VaServerInfo temp = vaServerMap.get(devId);
                    // 不在线
                    if (temp.getStatus() == 0) {
                        continue;
                    }
                    // 指定了VAServer
                    if (StringUtils.isNotBlank(taskVaType)) {
                        if (!taskVaType.equals(temp.getPlatType())) {
                            continue;
                        }
                    }
                    if (temp.getAvailableResources() >= resourceNeed) {
                        devLock = vaServerService.getVaServerRedisRepository().getDevLock(devId);
                        temp = vaServerMap.get(devId);
                        if (temp.getAvailableResources() >= resourceNeed) {
                            server = temp;
                            break;
                        } else {
                            devLock.forceUnlock();
                            devLock = null;
                        }
                    }
                }

                // 找不到可以用来执行的设备,需要修改状态
                if (server == null) {
                    log.debug("找不到可用的 VAServer,跳过:{}", taskUnid);
                    taskService.updateStatus(task.getId(), TaskStatus.CAN_NOT_RUN.val);
                    continue;
                }

                boolean success = vaServerService.executeTask(taskData, server);

                // 修改任务状态
                taskService.updateStatus(task.getId(), TaskStatus.RUNNING.val);
                // 移除任务
                set.remove(taskUnid);

            }
        } catch (Exception e) {
            log.error("", e);
        } finally {
            if (devLock != null) {
                devLock.forceUnlock();
            }
            jobLock.forceUnlock();
        }

    }

    @Scheduled(fixedDelay = 5000)
    public void terminatedTaskListener() {
        RLock jobLock = redisService.getLockMust("lock:taskRunner");
        try {
            RScoredSortedSet<String> set = redisService.getToBeTerminatedTaskUnidSet();

            Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);

            for (String taskUnid : entryCollection) {
                log.info("停止任务 : {}", taskUnid);
                TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid);
                if (taskData == null) {
                    log.info("找不到对应任务,移除所有:{}", taskUnid);
                    taskDataService.removeTaskDataAll(taskUnid);
                    continue;
                }
                //  获取可用的 vaserver ,执行任务终止动作,如果成功,解除 taskData和 vaServer 的关联,并且从 zset 中移除任务,恢复vaserver资源数,计算下次任务执行时间,放入zset中
                boolean success = vaServerService.terminateTask(taskUnid);
                if (success) {
                    taskService.updateStatus(taskData.getTask().getId(), TaskStatus.PAUSE.val);
                    // 随机任务不进行部署
                    if (taskData.getTask().getRuntimeType() != TaskStatus.PAUSE.val) {
                        boolean b = taskDataService.distributeTask(taskData);
                    }
                }
                set.remove(taskUnid);
            }
        } catch (Exception e) {
            log.error("", e);
        } finally {
            jobLock.forceUnlock();
        }
    }

}