TaskRunner.java 6.82 KB
package com.viontech.fanxing.task.runner;

import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.service.VAServerService;
import com.viontech.fanxing.task.service.adapter.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Collection;

/**
 * .
 *
 * @author 谢明辉
 * @date 2021/7/13
 */

@Component
@Slf4j
@Profile("!test")
public class TaskRunner {

    @Resource
    private RedisService redisService;
    @Resource
    private VAServerService vaServerService;
    @Resource
    private TaskDataService taskDataService;
    @Resource
    private TaskService taskService;

    @Scheduled(fixedDelay = 5000)
    public void executedTaskListener() {
        RLock jobLock = redisService.getLockMust("lock:taskRunner");
        try {
            RScoredSortedSet<String> set = redisService.getToBeExecutedTaskUnidSet();
            RMap<String, VaServerInfo> vaServerMap = vaServerService.getVaServerRedisRepository().getVaServerInfoMap();

            Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);
            for (String taskUnid : entryCollection) {
                RLock taskLock = taskDataService.getRepository().getTaskLock(taskUnid);
                try {
                    TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid);
                    if (taskData == null) {
                        log.info("找不到对应任务,移除所有:{}", taskUnid);
                        taskDataService.removeTaskDataAll(taskUnid);
                        continue;
                    }
                    log.info("开始任务 [{}] [{}]", taskData.getTask().getName(), taskUnid);
                    Task task = taskData.getTask();
                    String taskVaType = task.getVaType();
                    Float resourceNeed = task.getResourceNeed();
                    Collection<String> vaServerIdSet = vaServerMap.keySet();
                    // todo 暂时先找有可用资源的vaserver,以后再进行算法优化
                    VaServerInfo server = null;

                    for (String devId : vaServerIdSet) {
                        VaServerInfo temp = vaServerMap.get(devId);
                        // 不在线
                        if (temp.getStatus() == 0) {
                            continue;
                        }
                        // 指定了VAServer
                        if (StringUtils.isNotBlank(taskVaType)) {
                            if (!taskVaType.equals(temp.getPlatType())) {
                                continue;
                            }
                        }
                        if (temp.getAvailableResources() >= resourceNeed) {
                            server = temp;
                        }
                    }
                    // 找不到可以用来执行的设备,需要修改状态
                    if (server == null) {
                        log.debug("找不到可用的 VAServer,跳过:{}", taskUnid);
                        taskService.updateStatus(task.getId(), TaskStatus.CAN_NOT_RUN.val);
                        continue;
                    }
                    RLock devLock = vaServerService.getVaServerRedisRepository().getDevLock(server.getDevID());
                    try {
                        log.info("开始下发任务:[{}]", taskData.getTask().getName());
                        vaServerService.executeTask(taskData, server);
                    } catch (Exception e) {
                        log.error("下发任务失败", e);
                        taskService.updateStatus(task.getId(), TaskStatus.CAN_NOT_RUN.val);
                        continue;
                    } finally {
                        devLock.forceUnlock();
                    }

                    // 修改任务状态
                    taskService.updateStatus(task.getId(), TaskStatus.RUNNING.val);
                    // 移除任务
                    set.remove(taskUnid);
                } finally {
                    taskLock.forceUnlock();
                }
            }
        } catch (Exception e) {
            log.error("", e);
        } finally {
            jobLock.forceUnlock();
        }

    }

    @Scheduled(fixedDelay = 5000)
    public void terminatedTaskListener() {
        RLock jobLock = redisService.getLockMust("lock:taskRunner");
        try {
            RScoredSortedSet<String> set = redisService.getToBeTerminatedTaskUnidSet();
            RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();

            Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);

            for (String taskUnid : entryCollection) {
                TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid);
                if (taskData == null) {
                    log.info("找不到对应任务,移除所有:{}", taskUnid);
                    taskDataService.removeTaskDataAll(taskUnid);
                    continue;
                }
                log.info("停止任务 [{}] [{}]", taskData.getTask().getName(), taskUnid);
                //  获取可用的 vaserver ,执行任务终止动作,如果成功,解除 taskData和 vaServer 的关联,并且从 zset 中移除任务,恢复vaserver资源数,计算下次任务执行时间,放入zset中
                boolean success = vaServerService.terminateTask(taskUnid);
                if (success) {
                    set.remove(taskUnid);
                    // 防止任务持续无法运行导致超过运行时段
                    toBeExecutedTaskUnidSet.remove(taskUnid);
                    // 随机任务不进行部署,并且状态需要改成暂停
                    if (taskData.getTask().getRuntimeType() == 3) {
                        taskService.updateStatus(taskData.getTask().getId(), TaskStatus.STOP.val);
                    } else {
                        taskService.updateStatus(taskData.getTask().getId(), TaskStatus.STAY.val);
                        boolean b = taskDataService.distributeTask(taskData);
                    }
                }

            }
        } catch (Exception e) {
            log.error("", e);
        } finally {
            jobLock.forceUnlock();
        }
    }

}