Commit 5fc02e5f by xmh

代码优化+重构

1 parent 9ce51e6a
package com.viontech.fanxing.task.scheduling.service; package com.viontech.fanxing.commons.service;
import com.viontech.fanxing.commons.constant.RedisKeys; import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import org.redisson.api.*; import org.redisson.api.*;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -17,7 +15,7 @@ import java.util.concurrent.TimeUnit; ...@@ -17,7 +15,7 @@ import java.util.concurrent.TimeUnit;
*/ */
@Service @Service
public class RedisCacheService { public class RedisService {
@Resource @Resource
private RedissonClient redissonClient; private RedissonClient redissonClient;
...@@ -35,13 +33,6 @@ public class RedisCacheService { ...@@ -35,13 +33,6 @@ public class RedisCacheService {
return redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET); return redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
} }
public RMap<String, TaskData> getTaskDataMap() {
return redissonClient.getMap(RedisKeys.SCHEDULING_TASK_DATA_MAP);
}
public RMap<String, VaServerInfo> getVaServerMap() {
return redissonClient.getMap(RedisKeys.SCHEDULING_VASERVER_MAP);
}
public <T> RBucket<T> getValue(String key) { public <T> RBucket<T> getValue(String key) {
return redissonClient.getBucket(key); return redissonClient.getBucket(key);
...@@ -59,4 +50,7 @@ public class RedisCacheService { ...@@ -59,4 +50,7 @@ public class RedisCacheService {
return lock; return lock;
} }
public RedissonClient getClient() {
return redissonClient;
}
} }
...@@ -5,11 +5,11 @@ import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; ...@@ -5,11 +5,11 @@ import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.service.VAServerService; import com.viontech.fanxing.task.scheduling.service.VAServerService;
import com.viontech.keliu.util.JsonMessageUtil; import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RMap;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
/** /**
...@@ -39,7 +39,7 @@ public class VAServerController { ...@@ -39,7 +39,7 @@ public class VAServerController {
log.info("收到注册消息:{}", JSON.toJSONString(vaServerInfo)); log.info("收到注册消息:{}", JSON.toJSONString(vaServerInfo));
vaServerService.registeVAServer(vaServerInfo); vaServerService.registerVAServer(vaServerInfo);
HashMap<String, Object> result = new HashMap<>(); HashMap<String, Object> result = new HashMap<>();
result.put("code", 200); result.put("code", 200);
...@@ -61,8 +61,8 @@ public class VAServerController { ...@@ -61,8 +61,8 @@ public class VAServerController {
@GetMapping("/vaServerInfo") @GetMapping("/vaServerInfo")
public Object getVaServerInfo() { public Object getVaServerInfo() {
Collection<VaServerInfo> vaServerInfo = vaServerService.getVaServerInfo(); RMap<String, VaServerInfo> vaServerInfoMap = vaServerService.getVaServerRedisRepository().getVaServerInfoMap();
return JsonMessageUtil.getSuccessJsonMsg(vaServerInfo); return JsonMessageUtil.getSuccessJsonMsg(vaServerInfoMap.values());
} }
/** /**
......
package com.viontech.fanxing.task.scheduling.model.vaserver; package com.viontech.fanxing.task.scheduling.model.vaserver;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.scheduling.model.TaskData;
import lombok.Getter; import lombok.Getter;
...@@ -27,7 +29,7 @@ public class VATask { ...@@ -27,7 +29,7 @@ public class VATask {
private String channel_unid; private String channel_unid;
private String stream_path; private String stream_path;
private Integer stream_type; private Integer stream_type;
private String scene; private JSONArray scene;
public VATask(TaskData taskData) { public VATask(TaskData taskData) {
Task task = taskData.getTask(); Task task = taskData.getTask();
...@@ -38,6 +40,6 @@ public class VATask { ...@@ -38,6 +40,6 @@ public class VATask {
this.channel_unid = task.getChannelUnid(); this.channel_unid = task.getChannelUnid();
this.stream_path = task.getStreamPath(); this.stream_path = task.getStreamPath();
this.stream_type = task.getStreamType(); this.stream_type = task.getStreamType();
this.scene = task.getScene(); this.scene = JSON.parseArray(task.getScene());
} }
} }
...@@ -23,7 +23,7 @@ public class VaServerInfo implements Serializable { ...@@ -23,7 +23,7 @@ public class VaServerInfo implements Serializable {
private String proxy; private String proxy;
/** 可用资源数量 */ /** 可用资源数量 */
private Float availableResources; private Float availableResources;
/** 状态 1在线,2离线 */ /** 状态 1在线,0离线 */
private int status = 1; private int status = 1;
public VaServerInfo setVideoResource(Float videoResource) { public VaServerInfo setVideoResource(Float videoResource) {
......
package com.viontech.fanxing.task.scheduling.repository;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.commons.service.RedisService;
import org.redisson.api.RMap;
import org.springframework.stereotype.Repository;
/**
* .
*
* @author 谢明辉
* @date 2021/8/16
*/
@Repository
public class TaskDataRedisRepository {
private final RMap<String, TaskData> taskDataRMap;
private final RedisService redisService;
public TaskDataRedisRepository(RedisService redisService) {
this.taskDataRMap = redisService.getClient().getMap(RedisKeys.SCHEDULING_TASK_DATA_MAP);
this.redisService = redisService;
}
public TaskData getTaskDataByUnid(String unid) {
return taskDataRMap.get(unid);
}
public void addOrUpdateTaskData(TaskData data) {
taskDataRMap.put(data.getTask().getUnid(), data);
}
public TaskData remove(String unid) {
return taskDataRMap.remove(unid);
}
}
package com.viontech.fanxing.task.scheduling.repository;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.commons.service.RedisService;
import org.redisson.api.RBucket;
import org.redisson.api.RMap;
import org.springframework.stereotype.Repository;
import java.util.Date;
import java.util.Map;
/**
* .
*
* @author 谢明辉
* @date 2021/8/16
*/
@Repository
public class VAServerRedisRepository {
private final RMap<String, VaServerInfo> vaServerMap;
private final RedisService redisService;
public VAServerRedisRepository(RedisService redisService) {
this.vaServerMap = redisService.getClient().getMap(RedisKeys.SCHEDULING_VASERVER_MAP);
this.redisService = redisService;
}
public VaServerInfo getVAServerInfoById(String devId) {
VaServerInfo vaServerInfo = vaServerMap.get(devId);
if (!online(devId)) {
vaServerInfo.setStatus(0);
addOrUpdate(devId, vaServerInfo);
}
return vaServerInfo;
}
public RMap<String, VaServerInfo> getVaServerInfoMap() {
for (Map.Entry<String, VaServerInfo> entry : vaServerMap.entrySet()) {
String devId = entry.getKey();
if (!online(devId)) {
VaServerInfo value = entry.getValue();
value.setStatus(0);
addOrUpdate(devId, value);
}
}
return vaServerMap;
}
public void addOrUpdate(String devId, VaServerInfo vaServerInfo) {
vaServerMap.put(devId, vaServerInfo);
}
public boolean online(String devId) {
RBucket<Date> bucket = redisService.getValue(RedisKeys.getVAServerKeepAliveKey(devId));
return bucket.isExists();
}
}
...@@ -4,24 +4,20 @@ import com.viontech.fanxing.commons.constant.TaskStatus; ...@@ -4,24 +4,20 @@ import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.vo.TaskVo; import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.fanxing.task.scheduling.feign.TaskClient; import com.viontech.fanxing.task.scheduling.feign.TaskClient;
import com.viontech.fanxing.task.scheduling.model.RuntimeConfig;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.service.RedisCacheService; import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.scheduling.service.TaskService; import com.viontech.fanxing.task.scheduling.service.TaskService;
import com.viontech.fanxing.task.scheduling.service.VAServerService; import com.viontech.fanxing.task.scheduling.service.VAServerService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet; import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.TimeUnit;
/** /**
* . * .
...@@ -35,9 +31,7 @@ import java.util.concurrent.TimeUnit; ...@@ -35,9 +31,7 @@ import java.util.concurrent.TimeUnit;
public class TaskRunner { public class TaskRunner {
@Resource @Resource
private RedissonClient redissonClient; private RedisService redisService;
@Resource
private RedisCacheService redisCacheService;
@Resource @Resource
private VAServerService vaServerService; private VAServerService vaServerService;
@Resource @Resource
...@@ -47,108 +41,104 @@ public class TaskRunner { ...@@ -47,108 +41,104 @@ public class TaskRunner {
@Scheduled(fixedDelay = 5000) @Scheduled(fixedDelay = 5000)
public void executedTaskListener() { public void executedTaskListener() {
RLock lock = redissonClient.getLock("lock:executedTaskListener"); RLock jobLock = redisService.getLockMust("lock:executedTaskListener");
boolean isLock; RLock devLock = null;
try { try {
isLock = lock.tryLock(30, 25, TimeUnit.SECONDS); RScoredSortedSet<String> set = redisService.getToBeExecutedTaskUnidSet();
} catch (InterruptedException e) { RMap<String, VaServerInfo> vaServerMap = vaServerService.getVaServerRedisRepository().getVaServerInfoMap();
return;
} Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);
if (isLock) { for (String taskUnid : entryCollection) {
try { log.info("开始任务 : {}", taskUnid);
RScoredSortedSet<String> set = redisCacheService.getToBeExecutedTaskUnidSet(); TaskData taskData = taskService.getRepository().getTaskDataByUnid(taskUnid);
RMap<String, TaskData> taskDataMap = redisCacheService.getTaskDataMap(); if (taskData == null) {
RMap<String, VaServerInfo> vaServerMap = redisCacheService.getVaServerMap(); log.info("找不到对应任务,移除所有:{}", taskUnid);
taskService.removeTaskDataAll(taskUnid);
Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true); continue;
for (String taskUnid : entryCollection) { }
log.info("开始任务 : {}", taskUnid); Task task = taskData.getTask();
TaskData taskData = taskDataMap.get(taskUnid); String taskVaType = task.getVaType();
Task task = taskData.getTask(); Float resourceNeed = task.getResourceNeed();
String taskVaType = task.getVaType(); Collection<String> vaServerIdSet = vaServerMap.keySet();
Float resourceNeed = task.getResourceNeed(); // todo 暂时先找有可用资源的vaserver,以后再进行算法优化
Collection<VaServerInfo> vaServerInfos = vaServerMap.values(); VaServerInfo server = null;
// todo 暂时先找有可用资源的vaserver,以后再进行算法优化
VaServerInfo server = null; for (String devId : vaServerIdSet) {
for (VaServerInfo vaServerInfo : vaServerInfos) { VaServerInfo temp = vaServerMap.get(devId);
if (vaServerInfo.getAvailableResources() > resourceNeed) { // 不在线
server = vaServerInfo; if (temp.getStatus() == 0) {
continue;
}
if (temp.getAvailableResources() > resourceNeed) {
devLock = redisService.getLockMust("lock:vaserver:" + devId);
temp = vaServerMap.get(devId);
if (temp.getAvailableResources() > resourceNeed) {
server = temp;
break; break;
} else {
devLock.forceUnlock();
devLock = null;
} }
} }
}
// 找不到可以用来执行的设备,需要修改状态 // 找不到可以用来执行的设备,需要修改状态
if (server == null) { if (server == null) {
TaskVo taskVo = new TaskVo();
taskVo.setStatus(TaskStatus.CAN_NOT_RUN.val);
taskClient.updateTask(task.getId(), taskVo);
continue;
}
boolean success = vaServerService.executeTask(taskData, server);
// 修改任务状态
TaskVo taskVo = new TaskVo(); TaskVo taskVo = new TaskVo();
taskVo.setStatus(TaskStatus.RUNNING.val); taskVo.setStatus(TaskStatus.CAN_NOT_RUN.val);
taskClient.updateTask(task.getId(), taskVo); taskClient.updateTask(task.getId(), taskVo);
// 移除任务 continue;
set.remove(taskUnid);
}
} catch (Exception e) {
log.error("", e);
} finally {
try {
lock.unlock();
} catch (Exception ignore) {
} }
boolean success = vaServerService.executeTask(taskData, server);
// 修改任务状态
TaskVo taskVo = new TaskVo();
taskVo.setStatus(TaskStatus.RUNNING.val);
taskClient.updateTask(task.getId(), taskVo);
// 移除任务
set.remove(taskUnid);
} }
} catch (Exception e) {
log.error("", e);
} finally {
if (devLock != null) {
devLock.forceUnlock();
}
jobLock.forceUnlock();
} }
} }
@Scheduled(fixedDelay = 5000) @Scheduled(fixedDelay = 5000)
public void terminatedTaskListener() { public void terminatedTaskListener() {
RLock lock = redissonClient.getLock("lock:terminatedTaskListener"); RLock jobLock = redisService.getLockMust("lock:terminatedTaskListener");
boolean isLock;
try { try {
isLock = lock.tryLock(30, 25, TimeUnit.SECONDS); RScoredSortedSet<String> set = redisService.getToBeTerminatedTaskUnidSet();
} catch (InterruptedException e) {
return; Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);
}
if (isLock) { for (String taskUnid : entryCollection) {
try { log.info("停止任务 : {}", taskUnid);
RScoredSortedSet<String> set = redisCacheService.getToBeTerminatedTaskUnidSet(); TaskData taskData = taskService.getRepository().getTaskDataByUnid(taskUnid);
RMap<String, TaskData> taskDataMap = redisCacheService.getTaskDataMap(); if (taskData == null) {
log.info("找不到对应任务,移除所有:{}", taskUnid);
Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true); taskService.removeTaskDataAll(taskUnid);
continue;
for (String taskUnid : entryCollection) {
log.info("停止任务 : {}", taskUnid);
TaskData taskData = taskDataMap.get(taskUnid);
RuntimeConfig runtimeConfig = taskData.getRuntimeConfig();
// 获取可用的 vaserver ,执行任务终止动作,如果成功,解除 taskData和 vaServer 的关联,并且从 zset 中移除任务,恢复vaserver资源数,计算下次任务执行时间,放入zset中
boolean success = vaServerService.terminateTask(taskUnid);
if (success) {
ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
Long nextExecuteTime = nextTime.left;
Long nextTerminateTime = nextTime.right;
if (nextExecuteTime != null) {
taskService.addExecutableTaskData(taskUnid, nextExecuteTime);
taskService.addTerminatableTaskData(taskUnid, nextTerminateTime);
}
}
set.remove(taskUnid);
} }
} catch (Exception e) { // 获取可用的 vaserver ,执行任务终止动作,如果成功,解除 taskData和 vaServer 的关联,并且从 zset 中移除任务,恢复vaserver资源数,计算下次任务执行时间,放入zset中
log.error("", e); boolean success = vaServerService.terminateTask(taskUnid);
} finally { if (success) {
try { boolean b = taskService.distributeTask(taskData);
lock.unlock();
} catch (Exception ignore) {
} }
set.remove(taskUnid);
} }
} catch (Exception e) {
log.error("", e);
} finally {
jobLock.forceUnlock();
} }
} }
} }
package com.viontech.fanxing.task.scheduling.service; package com.viontech.fanxing.task.scheduling.service;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.scheduling.model.RuntimeConfig; import com.viontech.fanxing.task.scheduling.model.RuntimeConfig;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.repository.TaskDataRedisRepository;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet; import org.redisson.api.RScoredSortedSet;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import tech.powerjob.client.PowerJobClient;
import tech.powerjob.common.request.http.SaveJobInfoRequest;
import javax.annotation.Resource; import javax.annotation.Resource;
...@@ -24,9 +23,11 @@ import javax.annotation.Resource; ...@@ -24,9 +23,11 @@ import javax.annotation.Resource;
public class TaskService { public class TaskService {
@Resource @Resource
private RedisCacheService redisCacheService; private RedisService redisService;
@Resource @Resource
private VAServerService vaServerService; private VAServerService vaServerService;
@Resource
private TaskDataRedisRepository taskDataRedisRepository;
public boolean distributeTask(TaskData taskData) { public boolean distributeTask(TaskData taskData) {
RuntimeConfig runtimeConfig = taskData.getRuntimeConfig(); RuntimeConfig runtimeConfig = taskData.getRuntimeConfig();
...@@ -35,10 +36,10 @@ public class TaskService { ...@@ -35,10 +36,10 @@ public class TaskService {
Long nextExecuteTime = nextTime.left; Long nextExecuteTime = nextTime.left;
Long nextTerminateTime = nextTime.right; Long nextTerminateTime = nextTime.right;
if (nextExecuteTime != null) { if (nextExecuteTime != null) {
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisCacheService.getToBeExecutedTaskUnidSet(); RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
toBeExecutedTaskUnidSet.add(nextExecuteTime, taskUnid); toBeExecutedTaskUnidSet.add(nextExecuteTime, taskUnid);
if (nextTerminateTime != null) { if (nextTerminateTime != null) {
RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisCacheService.getToBeTerminatedTaskUnidSet(); RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet();
toBeTerminatedTaskUnidSet.add(nextTerminateTime, taskUnid); toBeTerminatedTaskUnidSet.add(nextTerminateTime, taskUnid);
} }
return true; return true;
...@@ -47,31 +48,20 @@ public class TaskService { ...@@ -47,31 +48,20 @@ public class TaskService {
} }
} }
public void addTaskData(TaskData taskData) {
RMap<String, TaskData> taskDataMap = redisCacheService.getTaskDataMap();
taskDataMap.put(taskData.getTask().getUnid(), taskData);
}
public TaskData getTaskDataByUnid(String taskUnid) {
RMap<String, TaskData> taskDataMap = redisCacheService.getTaskDataMap();
return taskDataMap.get(taskUnid);
}
public void removeTaskDataAll(String taskUnid) { public void removeTaskDataAll(String taskUnid) {
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisCacheService.getToBeExecutedTaskUnidSet(); RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisCacheService.getToBeTerminatedTaskUnidSet(); RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet();
RMap<String, TaskData> taskDataMap = redisCacheService.getTaskDataMap();
toBeExecutedTaskUnidSet.remove(taskUnid); toBeExecutedTaskUnidSet.remove(taskUnid);
toBeTerminatedTaskUnidSet.remove(taskUnid); toBeTerminatedTaskUnidSet.remove(taskUnid);
taskDataMap.remove(taskUnid); taskDataRedisRepository.remove(taskUnid);
} }
public VaServerInfo taskRunOn(String taskUnid) { public VaServerInfo taskRunOn(String taskUnid) {
RMap<String, String> taskVaServerMap = redisCacheService.getTaskVaServerMap(); RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
String devId = taskVaServerMap.get(taskUnid); String devId = taskVaServerMap.get(taskUnid);
return devId == null ? null : vaServerService.getVaServerRedisRepository().getVAServerInfoById(devId);
return devId == null ? null : vaServerService.getVaServerInfoByDevId(devId);
} }
/** /**
...@@ -82,7 +72,7 @@ public class TaskService { ...@@ -82,7 +72,7 @@ public class TaskService {
* <li>right <code>devId</code></li> * <li>right <code>devId</code></li>
*/ */
public ImmutablePair<String, String> unlinkTaskAndVaServer(String taskUnid) { public ImmutablePair<String, String> unlinkTaskAndVaServer(String taskUnid) {
RMap<String, String> taskVaServerMap = redisCacheService.getTaskVaServerMap(); RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
String devId = taskVaServerMap.get(taskUnid); String devId = taskVaServerMap.get(taskUnid);
taskVaServerMap.remove(taskUnid); taskVaServerMap.remove(taskUnid);
return ImmutablePair.of(taskUnid, devId); return ImmutablePair.of(taskUnid, devId);
...@@ -90,11 +80,13 @@ public class TaskService { ...@@ -90,11 +80,13 @@ public class TaskService {
public void updateTask(Task task) { public void updateTask(Task task) {
RMap<String, TaskData> taskDataMap = redisCacheService.getTaskDataMap();
TaskData taskData = new TaskData(task); TaskData taskData = new TaskData(task);
// 需要更新taskData,并且向vaServer更新任务信息 // 需要更新taskData,并且向vaServer更新任务信息
taskDataMap.put(task.getUnid(), taskData); taskDataRedisRepository.addOrUpdateTaskData(taskData);
vaServerService.updateTask(taskData); vaServerService.updateTask(taskData);
} }
public TaskDataRedisRepository getRepository() {
return taskDataRedisRepository;
}
} }
...@@ -3,17 +3,17 @@ package com.viontech.fanxing.task.scheduling.service; ...@@ -3,17 +3,17 @@ package com.viontech.fanxing.task.scheduling.service;
import com.viontech.fanxing.commons.constant.RedisKeys; import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.exception.FanXingException; import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.repository.VAServerRedisRepository;
import org.redisson.api.RBucket; import org.redisson.api.RBucket;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Collection;
import java.util.Date; import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
...@@ -27,7 +27,9 @@ import java.util.concurrent.TimeUnit; ...@@ -27,7 +27,9 @@ import java.util.concurrent.TimeUnit;
public class VAServerService { public class VAServerService {
@Resource @Resource
private RedisCacheService redisCacheService; private RedisService redisService;
@Resource
private VAServerRedisRepository vaServerRedisRepository;
@Resource @Resource
private TaskService taskService; private TaskService taskService;
@Resource @Resource
...@@ -36,39 +38,17 @@ public class VAServerService { ...@@ -36,39 +38,17 @@ public class VAServerService {
/** /**
* 设备注册 * 设备注册
*/ */
public void registeVAServer(VaServerInfo vaServerInfo) { public void registerVAServer(VaServerInfo vaServerInfo) {
String devId = vaServerInfo.getDevID(); String devId = vaServerInfo.getDevID();
RMap<String, VaServerInfo> map = redisCacheService.getVaServerMap(); keepalive(devId);
RBucket<Date> bucket = redisCacheService.getValue(RedisKeys.getVAServerKeepAliveKey(devId)); vaServerRedisRepository.addOrUpdate(devId, vaServerInfo);
bucket.set(new Date());
bucket.expire(2, TimeUnit.MINUTES);
map.put(devId, vaServerInfo);
}
public VaServerInfo getVaServerInfoByDevId(String devId) {
RMap<String, VaServerInfo> map = redisCacheService.getVaServerMap();
return map.get(devId);
}
public Collection<VaServerInfo> getVaServerInfo() {
RMap<String, VaServerInfo> map = redisCacheService.getVaServerMap();
for (Map.Entry<String, VaServerInfo> entry : map.entrySet()) {
String devId = entry.getKey();
RBucket<Date> bucket = redisCacheService.getValue(RedisKeys.getVAServerKeepAliveKey(devId));
if (!bucket.isExists()) {
VaServerInfo value = entry.getValue();
value.setStatus(0);
map.put(devId, value);
}
}
return map.values();
} }
/** /**
* 设备心跳 * 设备心跳
*/ */
public void keepalive(String devId) { public void keepalive(String devId) {
RBucket<Date> bucket = redisCacheService.getValue(RedisKeys.getVAServerKeepAliveKey(devId)); RBucket<Date> bucket = redisService.getValue(RedisKeys.getVAServerKeepAliveKey(devId));
bucket.set(new Date()); bucket.set(new Date());
bucket.expire(2, TimeUnit.MINUTES); bucket.expire(2, TimeUnit.MINUTES);
} }
...@@ -83,11 +63,10 @@ public class VAServerService { ...@@ -83,11 +63,10 @@ public class VAServerService {
vaServerHttpService.addTask(taskData, server); vaServerHttpService.addTask(taskData, server);
RMap<String, String> map = redisCacheService.getTaskVaServerMap(); RMap<String, String> map = redisService.getTaskVaServerMap();
map.put(task.getUnid(), server.getDevID()); map.put(task.getUnid(), server.getDevID());
server.setAvailableResources(server.getAvailableResources() - task.getResourceNeed());
RMap<String, VaServerInfo> vaServerMap = redisCacheService.getVaServerMap(); modifyVAServerResource(server.getDevID(), -task.getResourceNeed());
vaServerMap.put(server.getDevID(), server);
return true; return true;
} }
...@@ -99,33 +78,43 @@ public class VAServerService { ...@@ -99,33 +78,43 @@ public class VAServerService {
* 删除任务 * 删除任务
*/ */
public boolean terminateTask(String taskUnid) { public boolean terminateTask(String taskUnid) {
TaskData taskData = taskService.getTaskDataByUnid(taskUnid); TaskData taskData = taskService.getRepository().getTaskDataByUnid(taskUnid);
if (taskData == null) {
return false;
}
Task task = taskData.getTask(); Task task = taskData.getTask();
RMap<String, String> map = redisCacheService.getTaskVaServerMap(); RMap<String, String> map = redisService.getTaskVaServerMap();
String vaServerId = map.get(taskUnid); String vaServerId = map.get(taskUnid);
// 如果vaServerId不为空,需要终止任务 // 如果vaServerId不为空,需要终止任务
if (vaServerId != null) { if (vaServerId != null) {
RMap<String, VaServerInfo> vaServerMap = redisCacheService.getVaServerMap(); VaServerInfo vaServerInfo = vaServerRedisRepository.getVAServerInfoById(vaServerId);
VaServerInfo vaServerInfo = vaServerMap.get(vaServerId);
// 下发终止任务请求 // 下发终止任务请求
vaServerHttpService.rmTask(taskUnid, vaServerInfo); vaServerHttpService.rmTask(taskUnid, vaServerInfo);
// 解除任务和 vaServer 关联, 恢复资源数量 // 解除任务和 vaServer 关联, 恢复资源数量
map.remove(taskUnid); map.remove(taskUnid);
RLock vaServerLock = redisCacheService.getLockMust("lock:vaserver:" + vaServerId); modifyVAServerResource(vaServerId, task.getResourceNeed());
try {
vaServerInfo.setAvailableResources(vaServerInfo.getAvailableResources() + task.getResourceNeed());
vaServerMap.put(vaServerId, vaServerInfo);
} finally {
vaServerLock.forceUnlock();
}
return true; return true;
} }
return true; return true;
} }
public void modifyVAServerResource(String devId, float param) {
RLock vaServerLock = redisService.getLockMust("lock:vaserver:" + devId);
try {
VaServerInfo vaServerInfo = vaServerRedisRepository.getVAServerInfoById(devId);
if (vaServerInfo == null) {
return;
}
vaServerInfo.setAvailableResources(vaServerInfo.getAvailableResources() + param);
vaServerRedisRepository.addOrUpdate(devId, vaServerInfo);
} finally {
vaServerLock.forceUnlock();
}
}
/** /**
* 修改任务 * 修改任务
*/ */
...@@ -183,7 +172,7 @@ public class VAServerService { ...@@ -183,7 +172,7 @@ public class VAServerService {
* 获取 VAServer 运行状态配置参数等 * 获取 VAServer 运行状态配置参数等
*/ */
public Object getStatus(String devId) { public Object getStatus(String devId) {
VaServerInfo vaServerInfo = getVaServerInfoByDevId(devId); VaServerInfo vaServerInfo = vaServerRedisRepository.getVAServerInfoById(devId);
if (vaServerInfo != null) { if (vaServerInfo != null) {
return vaServerHttpService.status(vaServerInfo); return vaServerHttpService.status(vaServerInfo);
} else { } else {
...@@ -227,5 +216,7 @@ public class VAServerService { ...@@ -227,5 +216,7 @@ public class VAServerService {
} }
} }
public VAServerRedisRepository getVaServerRedisRepository() {
return vaServerRedisRepository;
}
} }
...@@ -4,20 +4,21 @@ import com.alibaba.fastjson.JSON; ...@@ -4,20 +4,21 @@ import com.alibaba.fastjson.JSON;
import com.viontech.fanxing.commons.constant.RedisKeys; import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.model.StoreConfig; import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.scheduling.feign.TaskClient; import com.viontech.fanxing.task.scheduling.feign.TaskClient;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.repository.VAServerRedisRepository;
import com.viontech.keliu.util.DateUtil;
import com.viontech.keliu.util.JsonMessageUtil; import com.viontech.keliu.util.JsonMessageUtil;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.redisson.api.RMap; import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.text.ParseException;
/** /**
* . * .
...@@ -33,7 +34,9 @@ class VAServerHttpServiceTest { ...@@ -33,7 +34,9 @@ class VAServerHttpServiceTest {
@Resource @Resource
VAServerHttpService vaServerHttpService; VAServerHttpService vaServerHttpService;
@Resource @Resource
RedisCacheService redisCacheService; RedisService redisService;
@Resource
private VAServerRedisRepository vaServerRedisRepository;
@Resource @Resource
private TaskClient taskClient; private TaskClient taskClient;
...@@ -42,8 +45,7 @@ class VAServerHttpServiceTest { ...@@ -42,8 +45,7 @@ class VAServerHttpServiceTest {
@BeforeEach @BeforeEach
public void before() { public void before() {
RMap<String, VaServerInfo> map = redisCacheService.getVaServerMap(); this.vaServerInfo = vaServerRedisRepository.getVAServerInfoById("xxx-xx");
this.vaServerInfo = map.get("xxx-xx");
this.taskData = new TaskData(); this.taskData = new TaskData();
Task task = new Task(); Task task = new Task();
task.setUnid(TASK_UNID); task.setUnid(TASK_UNID);
...@@ -73,7 +75,7 @@ class VAServerHttpServiceTest { ...@@ -73,7 +75,7 @@ class VAServerHttpServiceTest {
@Test @Test
void updateAlternate() { void updateAlternate() {
vaServerHttpService.updateRotationStatus(TASK_UNID,1, vaServerInfo); vaServerHttpService.updateRotationStatus(TASK_UNID, 1, vaServerInfo);
} }
@Test @Test
...@@ -94,14 +96,19 @@ class VAServerHttpServiceTest { ...@@ -94,14 +96,19 @@ class VAServerHttpServiceTest {
} }
@Test @Test
void test() throws ParseException { void testJsonArray() {
RMap<String, String> map = redisCacheService.getTaskVaServerMap();
map.put("0a263320bd274b529e5185e3b05aa157", "xxx-xx"); }
@Test
void test() throws Exception {
// RMap<String, String> map = redisCacheService.getTaskVaServerMap();
// map.put("0a263320bd274b529e5185e3b05aa157", "xxx-xx");
// RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET); RScoredSortedSet<String> set = redisService.getClient().getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET);
// RScoredSortedSet<String> set2 = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET); RScoredSortedSet<String> set2 = redisService.getClient().getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
// //
// set.add(DateUtil.parse("yyyy-MM-dd HH:mm:ss", "2021-07-22 16:15:00").getTime(), "tttttttttttttttttt"); set.add(DateUtil.parse("yyyy-MM-dd HH:mm:ss", "2021-08-11 13:18:00").getTime(), "tttttttttttttttttt");
// set2.add(DateUtil.parse("yyyy-MM-dd HH:mm:ss", "2021-07-22 16:16:00").getTime(), "tttttttttttttttttt"); set2.add(DateUtil.parse("yyyy-MM-dd HH:mm:ss", "2021-08-11 13:20:00").getTime(), "tttttttttttttttttt");
} }
} }
\ No newline at end of file \ No newline at end of file
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!