Commit 14e8731e by xmh

fanxing-forward:

1. 修改 CacheUtils 使用 guava cache
2. 添加是否启用转发的配置
3. 转发结果持久化
4. 转发时3次重发机制

fanxing-task-manager
1. 实现对任务的创建更新和删除动作
2. 修复对scheduling的调用

fanxing-task-scheduling
优化
1 parent dd570351
Showing 27 changed files with 307 additions and 119 deletions
spring:
cloud:
loadbalancer:
ribbon:
enabled: false
consul:
# 服务发现配置
discovery:
......
......@@ -106,5 +106,10 @@
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.0.3</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -29,7 +29,6 @@ public class RedisKeys {
/** 用来接收所有数据的队列,用于转发 */
public static final String FORWARD_FORWARD_QUEUE = "forward:forwardQueue";
/**
* 用来获取 vaServer 的心跳在 redis 中对应的 key
*
......@@ -41,5 +40,9 @@ public class RedisKeys {
return devId == null ? "scheduling:keepalive" : "scheduling:keepalive" + ":" + devId;
}
public static String getForwardResultMap(Long forwardId) {
return "forward:result:" + forwardId;
}
}
......@@ -16,6 +16,6 @@ public interface TaskSchedulingTasksAdapter {
JsonMessageUtil.JsonMessage update(Task task);
JsonMessageUtil.JsonMessage delete(Task task);
JsonMessageUtil.JsonMessage delete(String taskUnid);
}
package com.viontech.fanxing.forward.batch.writer;
import com.viontech.fanxing.commons.config.ApplicationContextProvider;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.model.Forward;
import com.viontech.fanxing.forward.ForwardApp;
import com.viontech.fanxing.forward.model.ForwardContent;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.batch.item.ItemWriter;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Future;
/**
* .
......@@ -23,21 +26,14 @@ import java.util.concurrent.Future;
@Component
public class ForwardWriter implements ItemWriter<ForwardContent> {
@Resource
private RestTemplateBuilder restTemplateBuilder;
@Override
public void write(List<? extends ForwardContent> items) {
RestTemplate restTemplate = restTemplateBuilder.build();
// List<ImmutablePair<Future<ResponseEntity<String>>, HttpCallable>> data = new ArrayList<>();
for (ForwardContent item : items) {
List<Forward> forwardList = item.getForwardList();
String json = item.getJson();
for (Forward forward : forwardList) {
HttpCallable httpCallable = new HttpCallable(restTemplate, forward, json);
HttpCallable httpCallable = new HttpCallable(forward, json);
ForwardApp.THREAD_POOL_EXECUTOR.submit(httpCallable);
// ImmutablePair<Future<ResponseEntity<String>>, HttpCallable> pair = ImmutablePair.of(future, httpCallable);
// data.add(pair);
}
}
......@@ -47,28 +43,44 @@ public class ForwardWriter implements ItemWriter<ForwardContent> {
private @Slf4j
static class HttpCallable implements Runnable {
private final RestTemplate restTemplate;
private final Forward forward;
private final String json;
private final RedissonClient redissonClient;
private int failed = 0;
public HttpCallable(RestTemplate restTemplate, Forward forward, String json) {
this.restTemplate = restTemplate;
public HttpCallable(Forward forward, String json) {
this.forward = forward;
this.json = json;
this.redissonClient = ApplicationContextProvider.getBean(RedissonClient.class);
}
@Override
public void run() {
try {
ResponseEntity<String> responseEntity = restTemplate.postForEntity(forward.getUrl(), json, String.class);
Mono<String> response = WebClient.create()
.post()
.uri(forward.getUrl())
.bodyValue(json)
.retrieve()
.bodyToMono(String.class);
String block = response.block(Duration.ofSeconds(20));
} catch (Exception e) {
failed += 1;
}
if (failed < 3) {
RMap<String, Object> map = redissonClient.getMap(RedisKeys.getForwardResultMap(forward.getId()));
if (failed == 0) {
map.put("id", forward.getId());
map.addAndGet("total", 1);
map.put("lastSendTime", new Date());
} else if (failed < 3) {
ForwardApp.THREAD_POOL_EXECUTOR.submit(this);
} else {
log.info("失败次数超过三次,不再发送,forwardId:{}", forward.getId());
map.put("id", forward.getId());
map.addAndGet("total", 1);
map.addAndGet("failed", 1);
map.put("lastSendTime", new Date());
}
}
}
......
......@@ -5,8 +5,8 @@ import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.keliu.util.JsonMessageUtil;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
......@@ -24,6 +24,8 @@ import javax.annotation.Resource;
public class DataReceiveController {
@Resource
private RedissonClient redissonClient;
@Value("${vion.forward.enable:false}")
private Boolean enableForward;
@PostMapping("/result")
public Object result(@RequestBody String analysisResultStr) {
......@@ -49,7 +51,10 @@ public class DataReceiveController {
if (queue != null) {
queue.offerFirst(jsonObject);
}
// forwardQueue.offer(jsonObject);
if (enableForward) {
forwardQueue.offer(jsonObject);
}
return JsonMessageUtil.getSuccessJsonMsg("OK");
}
......
......@@ -5,6 +5,9 @@ import com.viontech.keliu.util.JsonMessageUtil;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/**
* .
......@@ -20,4 +23,7 @@ public interface OpsFeignClient {
@GetMapping("/forwards")
JsonMessageUtil.JsonMessage<Forward> getForwards();
@PostMapping("/forwards/{id}")
JsonMessageUtil.JsonMessage<Forward> updateById(@PathVariable("id") Long id, @RequestBody Forward forward);
}
package com.viontech.fanxing.forward.runner;
import com.viontech.fanxing.commons.model.Forward;
import com.viontech.fanxing.forward.feign.OpsFeignClient;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RKeys;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
/**
* .
*
* @author 谢明辉
* @date 2021/8/3
*/
@Component
@Slf4j
public class ForwardResultPersistenceRunner {
@Resource
private RedissonClient redissonClient;
@Resource
private OpsFeignClient opsFeignClient;
@Scheduled(cron = "1 0/5 * * * ?")
public void forwardResultPersistenceRunner() {
RLock lock = redissonClient.getLock("lock:forwardResultPersistence");
boolean success = lock.tryLock();
if (!success) {
return;
}
try {
RKeys keys = redissonClient.getKeys();
Iterable<String> forwardResultKeys = keys.getKeysByPattern("forward:result:*");
for (String forwardResultKey : forwardResultKeys) {
Forward forward = new Forward();
RMap<String, Object> map = redissonClient.getMap(forwardResultKey);
Object total = map.get("total");
forward.setTotal((Long) total);
Object lastSendTime = map.get("lastSendTime");
forward.setLastSendTime((Date) lastSendTime);
Object failed = map.get("failed");
forward.setFailed((Long) failed);
Long id = (Long) map.get("id");
JsonMessageUtil.JsonMessage<Forward> forwardJsonMessage = opsFeignClient.updateById(id, forward);
}
} catch (Exception e) {
log.error("ForwardResultPersistenceRunner failed", e);
} finally {
lock.unlock();
}
}
}
package com.viontech.fanxing.forward.util;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.viontech.fanxing.commons.model.Forward;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.forward.feign.OpsFeignClient;
......@@ -9,10 +11,11 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
......@@ -21,21 +24,22 @@ import java.util.stream.Collectors;
* @author 谢明辉
* @date 2021/7/16
*/
@SuppressWarnings("unchecked")
@Component
@Slf4j
public class CacheUtils {
private static final ConcurrentHashMap<String, Object> CACHE_MAP = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Long> EXPIRE_MAP = new ConcurrentHashMap<>();
private static final Cache<Object, Object> CACHE = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(10)).build();
@Resource
private TaskManagerFeignClient taskManagerFeignClient;
@Resource
private OpsFeignClient opsFeignClient;
public synchronized Map<String, Task> getTaskMap() {
String key = "task_map";
Long expire = EXPIRE_MAP.get(key);
if (expire == null || expire < System.currentTimeMillis()) {
Map<String, Task> result;
try {
result = (Map<String, Task>) CACHE.get("task_map", () -> {
JsonMessageUtil.JsonMessage<Task> response = null;
try {
response = taskManagerFeignClient.getAllTask();
......@@ -44,21 +48,22 @@ public class CacheUtils {
}
if (response != null && response.getData() != null) {
List<Task> data = (List<Task>) response.getData();
Map<String, Task> taskMap = data.stream().collect(Collectors.toMap(Task::getUnid, x -> x, (x, y) -> x));
CACHE_MAP.put(key, taskMap);
return data.stream().collect(Collectors.toMap(Task::getUnid, x -> x, (x, y) -> x));
} else {
CACHE_MAP.put(key, Collections.emptyMap());
return Collections.emptyMap();
}
EXPIRE_MAP.put(key, System.currentTimeMillis());
});
} catch (ExecutionException e) {
log.error("", e);
result = Collections.emptyMap();
}
return (Map<String, Task>) CACHE_MAP.get(key);
return result;
}
public synchronized List<Forward> getAllForward() {
String key = "forward_list";
Long expire = EXPIRE_MAP.get(key);
if (expire == null || expire < System.currentTimeMillis()) {
List<Forward> result;
try {
result = (List<Forward>) CACHE.get("forward_list", () -> {
JsonMessageUtil.JsonMessage<Forward> response = null;
try {
response = opsFeignClient.getForwards();
......@@ -66,15 +71,16 @@ public class CacheUtils {
log.info("获取 forward_list 失败:", e);
}
if (response != null && response.getData() != null) {
List<Forward> data = (List<Forward>) response.getData();
CACHE_MAP.put(key, data);
return response.getData();
} else {
CACHE_MAP.put(key, Collections.emptyMap());
return Collections.emptyList();
}
EXPIRE_MAP.put(key, System.currentTimeMillis());
});
} catch (ExecutionException e) {
log.error("", e);
result = Collections.emptyList();
}
return (List<Forward>) CACHE_MAP.get(key);
return result;
}
......
spring:
cloud:
loadbalancer:
ribbon:
enabled: false
consul:
# 服务发现配置
discovery:
......@@ -51,6 +54,8 @@ vion:
pic:
path: G:\data
keep: 1
forward:
enable: false
redisson:
path: F:\myIDEAworkspace\jt\fanxing3\fanxing-commons\src\main\resources\redisson.yml
debug: true
\ No newline at end of file
......@@ -44,7 +44,7 @@ public class AuthorizationFilter implements GlobalFilter {
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
response.setStatusCode(HttpStatus.UNAUTHORIZED);
return response.writeWith(Flux.just(msg).map(x -> {
return response.writeWith(Mono.just(msg).map(x -> {
byte[] bytes = x.getBytes(StandardCharsets.UTF_8);
return response.bufferFactory().wrap(bytes);
}));
......
spring:
cloud:
loadbalancer:
ribbon:
enabled: false
gateway:
enabled: true
globalcors:
cors-configurations:
'[/**]':
allow-credentials: true
allowedOrigins: "*"
allowedMethods:
- ALL
allowedMethods: "*"
allowedHeaders: "*"
discovery:
locator:
enabled: true
......
spring:
cloud:
loadbalancer:
ribbon:
enabled: false
consul:
# 服务发现配置
discovery:
......
spring:
cloud:
loadbalancer:
ribbon:
enabled: false
consul:
# 服务发现配置
discovery:
......
......@@ -6,10 +6,7 @@ import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.fanxing.task.manager.controller.base.TaskBaseController;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/tasks")
......@@ -31,4 +28,18 @@ public class TaskController extends TaskBaseController {
return JsonMessageUtil.getSuccessJsonMsg(taskVo);
}
@Override
@PostMapping("/{id}")
public Object update(@PathVariable(value = "id") Long id, @RequestBody TaskVo taskVo) {
taskVo.setId(id);
taskVo = taskService.updateTask(taskVo);
return JsonMessageUtil.getSuccessJsonMsg("success", taskVo);
}
@Override
@DeleteMapping("/{id}")
public Object del(@PathVariable(value = "id") Long id) {
taskService.removeTask(id);
return JsonMessageUtil.getSuccessJsonMsg("success");
}
}
\ No newline at end of file
......@@ -4,11 +4,8 @@ import com.viontech.fanxing.commons.feing.TaskSchedulingTasksAdapter;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.keliu.util.JsonMessageUtil;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.*;
/**
* .
......@@ -17,19 +14,19 @@ import org.springframework.web.bind.annotation.RequestBody;
* @date 2021/7/12
*/
@Service
@Component
@FeignClient(value = "fanxing-task-scheduling")
public interface TaskSchedulingClient extends TaskSchedulingTasksAdapter {
@Override
@PostMapping
@PostMapping("/tasks")
JsonMessageUtil.JsonMessage add(@RequestBody Task task);
@Override
@PutMapping
@PutMapping("/tasks")
JsonMessageUtil.JsonMessage update(@RequestBody Task task);
@Override
@DeleteMapping
JsonMessageUtil.JsonMessage delete(@RequestBody Task task);
@DeleteMapping("/tasks")
JsonMessageUtil.JsonMessage delete(@RequestParam("taskUnid") String taskUnid);
}
......@@ -6,4 +6,8 @@ import com.viontech.fanxing.commons.vo.TaskVo;
public interface TaskService extends BaseService<Task> {
TaskVo addTask(Task task);
TaskVo updateTask(Task task);
void removeTask(Long id);
}
\ No newline at end of file
......@@ -29,11 +29,40 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
@Override
public TaskVo addTask(Task task) {
task = insertSelective(task);
task = selectByPrimaryKey(task.getId());
JsonMessageUtil.JsonMessage add = taskSchedulingClient.add(task);
if (add.isSuccess()) {
return new TaskVo(task);
} else {
throw new RuntimeException("添加任务失败");
throw new RuntimeException(add.getMsg());
}
}
@Transactional(rollbackFor = Exception.class)
@Override
public TaskVo updateTask(Task task) {
task = insertSelective(task);
task = selectByPrimaryKey(task.getId());
JsonMessageUtil.JsonMessage update = taskSchedulingClient.update(task);
if (update.isSuccess()) {
updateByPrimaryKey(task);
return new TaskVo(task);
} else {
throw new RuntimeException(update.getMsg());
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public void removeTask(Long id) {
Task task = selectByPrimaryKey(id);
JsonMessageUtil.JsonMessage delete = taskSchedulingClient.delete(task.getUnid());
if (delete.isSuccess()) {
deleteByPrimaryKey(id);
} else {
throw new RuntimeException(delete.getMsg());
}
}
}
\ No newline at end of file
spring:
cloud:
loadbalancer:
ribbon:
enabled: false
consul:
# 服务发现配置
discovery:
......
......@@ -35,7 +35,7 @@ public class TaskController implements TaskSchedulingTasksAdapter {
@Override
@PostMapping
public JsonMessageUtil.JsonMessage add(Task task) {
public JsonMessageUtil.JsonMessage add(@RequestBody Task task) {
TaskData taskData = new TaskData(task);
// 获取存储配置
Long storeConfigId = task.getStoreConfigId();
......@@ -53,7 +53,9 @@ public class TaskController implements TaskSchedulingTasksAdapter {
Long nextTerminateTime = nextTime.right;
if (nextExecuteTime != null) {
taskService.addExecutableTaskData(unid, nextExecuteTime);
if (nextTerminateTime != null) {
taskService.addTerminatableTaskData(unid, nextTerminateTime);
}
taskService.addTaskData(taskData);
return JsonMessageUtil.getSuccessJsonMsg("success");
} else {
......@@ -63,13 +65,13 @@ public class TaskController implements TaskSchedulingTasksAdapter {
@Override
@PutMapping
public JsonMessageUtil.JsonMessage update(Task task) {
public JsonMessageUtil.JsonMessage update(@RequestBody Task task) {
String taskUnid = task.getUnid();
VaServerInfo vaServerInfo = taskService.taskRunOn(taskUnid);
// vaServerId 为空说明任务未执行可以先删除再建立新任务
if (vaServerInfo == null) {
JsonMessageUtil.JsonMessage delete = delete(task);
JsonMessageUtil.JsonMessage delete = delete(taskUnid);
if (delete.isSuccess()) {
JsonMessageUtil.JsonMessage add = add(task);
return add;
......@@ -84,8 +86,7 @@ public class TaskController implements TaskSchedulingTasksAdapter {
@Override
@DeleteMapping
public JsonMessageUtil.JsonMessage delete(Task task) {
String taskUnid = task.getUnid();
public JsonMessageUtil.JsonMessage delete(@RequestParam("taskUnid") String taskUnid) {
vaServerService.terminateTask(taskUnid);
taskService.removeTaskDataAll(taskUnid);
// todo
......
......@@ -9,6 +9,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.HashMap;
/**
......@@ -44,6 +45,7 @@ public class VAServerController {
result.put("code", 200);
result.put("msg", "success");
result.put("resultRecvUrl", "http://" + vionGatewayIp + ":" + vionGatewayPort + "/fanxing-forward/result");
result.put("videoUploadUrl", "http://" + vionGatewayIp + ":" + vionGatewayPort + "/fanxing-forward/result");
return result;
}
......@@ -59,7 +61,8 @@ public class VAServerController {
@GetMapping("/vaServerInfo")
public Object getVaServerInfo() {
return vaServerService.getVaServerInfo();
Collection<VaServerInfo> vaServerInfo = vaServerService.getVaServerInfo();
return JsonMessageUtil.getSuccessJsonMsg(vaServerInfo);
}
/**
......
......@@ -116,7 +116,7 @@ public class RuntimeConfig {
executeTime = singleConfig.getStartTime();
terminateTime = singleConfig.getEndTime();
if (terminateTime.isBefore(now)) {
return null;
return ImmutablePair.nullPair();
}
executeTimestamp = executeTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
terminateTimestamp = terminateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
......@@ -145,14 +145,14 @@ public class RuntimeConfig {
dayOfWeek = (dayOfWeek + 1) % 7;
}
if (!success) {
return null;
return ImmutablePair.nullPair();
}
break;
case 3:
// 随机执行,待完善
break;
default:
return null;
return ImmutablePair.nullPair();
}
return ImmutablePair.of(executeTimestamp, terminateTimestamp);
......
......@@ -23,6 +23,8 @@ public class VaServerInfo implements Serializable {
private String proxy;
/** 可用资源数量 */
private Float availableResources;
/** 状态 1在线,2离线 */
private int status = 1;
public VaServerInfo setVideoResource(Float videoResource) {
this.videoResource = videoResource;
......
package com.viontech.fanxing.task.scheduling.runner;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.fanxing.task.scheduling.feign.TaskClient;
import com.viontech.fanxing.task.scheduling.model.RuntimeConfig;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.service.RedisCacheService;
import com.viontech.fanxing.task.scheduling.service.TaskService;
import com.viontech.fanxing.task.scheduling.service.VAServerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
......@@ -42,10 +47,10 @@ public class TaskRunner {
@Scheduled(fixedDelay = 5000)
public void executedTaskListener() {
RLock lock = redissonClient.getLock("executedTaskListener");
RLock lock = redissonClient.getLock("lock:executedTaskListener");
boolean isLock;
try {
isLock = lock.tryLock(10, 8, TimeUnit.SECONDS);
isLock = lock.tryLock(30, 25, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return;
}
......@@ -58,35 +63,35 @@ public class TaskRunner {
Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);
for (String taskUnid : entryCollection) {
log.info("开始任务 : {}", taskUnid);
// TaskData taskData = taskDataMap.get(taskUnid);
// Task task = taskData.getTask();
// String taskVaType = task.getVaType();
// Float resourceNeed = task.getResourceNeed();
// Collection<VaServerInfo> vaServerInfos = vaServerMap.values();
// // todo 暂时先找有可用资源的vaserver,以后再进行算法优化
// VaServerInfo server = null;
// for (VaServerInfo vaServerInfo : vaServerInfos) {
// if (vaServerInfo.getAvailableResources() > resourceNeed) {
// server = vaServerInfo;
// break;
// }
// }
//
// // 找不到可以用来执行的设备,需要修改状态
// if (server == null) {
// TaskVo taskVo = new TaskVo();
// taskVo.setStatus(TaskStatus.CAN_NOT_RUN.val);
// taskClient.updateTask(task.getId(), taskVo);
// continue;
// }
//
// boolean success = vaServerService.executeTask(taskData, server);
//
// // 修改任务状态
// TaskVo taskVo = new TaskVo();
// taskVo.setStatus(TaskStatus.RUNNING.val);
// taskClient.updateTask(task.getId(), taskVo);
// // 移除任务
TaskData taskData = taskDataMap.get(taskUnid);
Task task = taskData.getTask();
String taskVaType = task.getVaType();
Float resourceNeed = task.getResourceNeed();
Collection<VaServerInfo> vaServerInfos = vaServerMap.values();
// todo 暂时先找有可用资源的vaserver,以后再进行算法优化
VaServerInfo server = null;
for (VaServerInfo vaServerInfo : vaServerInfos) {
if (vaServerInfo.getAvailableResources() > resourceNeed) {
server = vaServerInfo;
break;
}
}
// 找不到可以用来执行的设备,需要修改状态
if (server == null) {
TaskVo taskVo = new TaskVo();
taskVo.setStatus(TaskStatus.CAN_NOT_RUN.val);
taskClient.updateTask(task.getId(), taskVo);
continue;
}
boolean success = vaServerService.executeTask(taskData, server);
// 修改任务状态
TaskVo taskVo = new TaskVo();
taskVo.setStatus(TaskStatus.RUNNING.val);
taskClient.updateTask(task.getId(), taskVo);
// 移除任务
set.remove(taskUnid);
}
......@@ -104,10 +109,10 @@ public class TaskRunner {
@Scheduled(fixedDelay = 5000)
public void terminatedTaskListener() {
RLock lock = redissonClient.getLock("terminatedTaskListener");
RLock lock = redissonClient.getLock("lock:terminatedTaskListener");
boolean isLock;
try {
isLock = lock.tryLock(10, 8, TimeUnit.SECONDS);
isLock = lock.tryLock(30, 25, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return;
}
......@@ -120,15 +125,19 @@ public class TaskRunner {
for (String taskUnid : entryCollection) {
log.info("停止任务 : {}", taskUnid);
// TaskData taskData = taskDataMap.get(taskUnid);
// RuntimeConfig runtimeConfig = taskData.getRuntimeConfig();
// boolean success = vaServerService.terminateTask(taskUnid);
// // todo 获取可用的 vaserver ,执行任务终止动作,如果成功,解除 taskData和 vaServer 的关联,并且从 zset 中移除任务,恢复vaserver资源数,计算下次任务执行时间,放入zset中
// if (success) {
// ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
// taskService.addExecutableTaskData(taskUnid, nextTime.left);
// taskService.addTerminatableTaskData(taskUnid, nextTime.right);
// }
TaskData taskData = taskDataMap.get(taskUnid);
RuntimeConfig runtimeConfig = taskData.getRuntimeConfig();
// 获取可用的 vaserver ,执行任务终止动作,如果成功,解除 taskData和 vaServer 的关联,并且从 zset 中移除任务,恢复vaserver资源数,计算下次任务执行时间,放入zset中
boolean success = vaServerService.terminateTask(taskUnid);
if (success) {
ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
Long nextExecuteTime = nextTime.left;
Long nextTerminateTime = nextTime.right;
if (nextExecuteTime != null) {
taskService.addExecutableTaskData(taskUnid, nextExecuteTime);
taskService.addTerminatableTaskData(taskUnid, nextTerminateTime);
}
}
set.remove(taskUnid);
}
} catch (Exception e) {
......
......@@ -75,10 +75,6 @@ public class TaskService {
return ImmutablePair.of(taskUnid, devId);
}
public void linkTaskAndVaServer(String taskUnid, String devId) {
RMap<String, String> map = redisCacheService.getTaskVaServerMap();
map.put(taskUnid, devId);
}
public void updateTask(Task task) {
RMap<String, TaskData> taskDataMap = redisCacheService.getTaskDataMap();
......
......@@ -10,7 +10,9 @@ import org.redisson.api.RMap;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
......@@ -47,8 +49,17 @@ public class VAServerService {
return map.get(devId);
}
public Object getVaServerInfo() {
public Collection<VaServerInfo> getVaServerInfo() {
RMap<String, VaServerInfo> map = redisCacheService.getVaServerMap();
for (Map.Entry<String, VaServerInfo> entry : map.entrySet()) {
String devId = entry.getKey();
RBucket<Date> bucket = redisCacheService.getValue(RedisKeys.getVAServerKeepAliveKey(devId));
if (!bucket.isExists()) {
VaServerInfo value = entry.getValue();
value.setStatus(0);
map.put(devId, value);
}
}
return map.values();
}
......@@ -67,11 +78,12 @@ public class VAServerService {
*/
public boolean executeTask(TaskData taskData, VaServerInfo server) {
Task task = taskData.getTask();
// todo 执行任务下发,成功后关联任务和vaServer,减少vaServer的可用资源数量
// 执行任务下发,成功后关联任务和vaServer,减少vaServer的可用资源数量
vaServerHttpService.addTask(taskData, server);
taskService.linkTaskAndVaServer(task.getUnid(), server.getDevID());
RMap<String, String> map = redisCacheService.getTaskVaServerMap();
map.put(task.getUnid(), server.getDevID());
server.setAvailableResources(server.getAvailableResources() - task.getResourceNeed());
RMap<String, VaServerInfo> vaServerMap = redisCacheService.getVaServerMap();
vaServerMap.put(server.getDevID(), server);
......
spring:
cloud:
loadbalancer:
ribbon:
enabled: false
consul:
# 服务发现配置
discovery:
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!