Commit dd570351 by xmh

添加图片删除机制,任务调度服务开发

1 parent 4e429a9d
Showing 16 changed files with 317 additions and 157 deletions
package com.viontech.fanxing.commons.base;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.keliu.util.JsonMessageUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -16,9 +17,16 @@ import org.springframework.web.bind.annotation.RestControllerAdvice;
public class GlobalExceptionHandler {
private static final Logger log = LoggerFactory.getLogger(GlobalExceptionHandler.class);
@ExceptionHandler
@ExceptionHandler(Exception.class)
public Object exceptionHandler(Exception e) {
log.info("", e);
return JsonMessageUtil.getErrorJsonMsg(e.getMessage());
}
@ExceptionHandler(FanXingException.class)
public Object fanXingExceptionHandler(FanXingException fanXingException) {
JsonMessageUtil.JsonMessage errorJsonMsg = JsonMessageUtil.getErrorJsonMsg(fanXingException.getMessage());
errorJsonMsg.setData(fanXingException.getData());
return errorJsonMsg;
}
}
......@@ -6,7 +6,7 @@ package com.viontech.fanxing.commons.constant;
* @author 谢明辉
* @date 2021/7/12
*/
@SuppressWarnings("ALL")
public class RedisKeys {
/** 将要被执行的任务的 unid 有序集合,执行时间作为分数 */
......
package com.viontech.fanxing.commons.exception;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
/**
* .
*
* @author 谢明辉
* @date 2021/7/26
*/
@Getter
@Setter
public class FanXingException extends RuntimeException implements Serializable {
private Integer code;
private Object data;
public FanXingException() {}
public FanXingException(String message) {
super(message);
}
public FanXingException(String message, Integer code, Object data) {
super(message);
this.code = code;
this.data = data;
}
public FanXingException(String message, Object data) {
super(message);
this.data = data;
}
}
package com.viontech.fanxing.forward.runner;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.io.File;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
/**
* .
*
* @author 谢明辉
* @date 2021/7/27
*/
@Component
@Slf4j
public class PicKeepRunner {
@Value("${vion.pic.keep:7}")
private Integer keep;
@Value("${vion.pic.path:/images}")
private String basePath;
@Scheduled(cron = "0 0 08 * * ?")
public void run() {
if (keep == -1) {
return;
}
try {
long nowDay = LocalDate.now().toEpochDay();
DateTimeFormatter yyyyMMdd = DateTimeFormatter.ofPattern("yyyyMMdd");
File baseDir = new File(basePath);
File[] files = baseDir.listFiles((dir, name) -> name.matches("[1-9]\\d{7}"));
for (File file : files) {
String name = file.getName();
LocalDate day = LocalDate.parse(name, yyyyMMdd);
long l = day.toEpochDay();
if (nowDay - l > keep) {
log.info("删除文件夹:{}", name);
FileUtils.deleteDirectory(file);
}
}
} catch (Exception e) {
log.error("", e);
}
}
}
......@@ -50,6 +50,7 @@ pagehelper:
vion:
pic:
path: G:\data
keep: 1
redisson:
path: F:\myIDEAworkspace\jt\fanxing3\fanxing-commons\src\main\resources\redisson.yml
debug: true
\ No newline at end of file
......@@ -5,9 +5,11 @@ import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.service.VAServerService;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.HashMap;
/**
* .
......@@ -23,15 +25,26 @@ public class VAServerController {
@Resource
private VAServerService vaServerService;
@Value("${vion.gateway.ip}")
private String vionGatewayIp;
@Value("${vion.gateway.port}")
private String vionGatewayPort;
/**
* 注册
*/
@PostMapping("/register")
public Object register(@RequestBody VaServerInfo vaServerInfo) {
log.info("收到注册消息:{}", JSON.toJSONString(vaServerInfo));
vaServerService.registeVAServer(vaServerInfo);
return JsonMessageUtil.getSuccessJsonMsg("success");
HashMap<String, Object> result = new HashMap<>();
result.put("code", 200);
result.put("msg", "success");
result.put("resultRecvUrl", "http://" + vionGatewayIp + ":" + vionGatewayPort + "/fanxing-forward/result");
return result;
}
/**
......@@ -44,6 +57,11 @@ public class VAServerController {
return JsonMessageUtil.getSuccessJsonMsg("success");
}
@GetMapping("/vaServerInfo")
public Object getVaServerInfo() {
return vaServerService.getVaServerInfo();
}
/**
* 获取分析视频点播地址
*/
......@@ -56,16 +74,16 @@ public class VAServerController {
* 输出分析流,每30秒调一次,不调用就不再输出视频流
*/
@GetMapping("/startAnalyzeStream")
public Object startAnalyzeStream(@RequestParam String taskUnid) {
return vaServerService.startAnalyzeStream(taskUnid);
public Object startAnalyzeStream(@RequestParam String taskUnid, @RequestParam String url) {
return vaServerService.startAnalyzeStream(taskUnid, url);
}
/**
* 获取vaServer运行状态,配置等信息
*/
@GetMapping("/status")
public Object status(@RequestParam String taskUnid) {
return vaServerService.getStatus(taskUnid);
public Object status(@RequestParam String devId) {
return vaServerService.getStatus(devId);
}
......@@ -85,5 +103,15 @@ public class VAServerController {
return vaServerService.snapshot(taskUnid);
}
@GetMapping("/updateRotationStatus")
public Object updateRotationStatus(@RequestParam String taskUnid, @RequestParam Integer status) {
return vaServerService.updateRotationStatus(taskUnid, status);
}
@GetMapping("/getRotationStatus")
public Object getRotationStatus(@RequestParam String taskUnid) {
return vaServerService.getRotationStatus(taskUnid);
}
}
package com.viontech.fanxing.task.scheduling.feign;
import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.vo.StoreConfigVo;
import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.keliu.util.JsonMessageUtil;
import org.springframework.cloud.openfeign.FeignClient;
......
......@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.model.Task;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.security.InvalidParameterException;
......@@ -157,57 +158,16 @@ public class RuntimeConfig {
return ImmutablePair.of(executeTimestamp, terminateTimestamp);
}
public static class Config {
public
@Getter
@Setter
@Accessors(chain = true)
static class Config {
private LocalTime start;
private LocalTime end;
private Long runningTime;
private LocalDateTime startTime;
private LocalDateTime endTime;
public LocalTime getStart() {
return start;
}
public Config setStart(LocalTime start) {
this.start = start;
return this;
}
public LocalTime getEnd() {
return end;
}
public Config setEnd(LocalTime end) {
this.end = end;
return this;
}
public Long getRunningTime() {
return runningTime;
}
public Config setRunningTime(Long runningTime) {
this.runningTime = runningTime;
return this;
}
public LocalDateTime getStartTime() {
return startTime;
}
public Config setStartTime(LocalDateTime startTime) {
this.startTime = startTime;
return this;
}
public LocalDateTime getEndTime() {
return endTime;
}
public Config setEndTime(LocalDateTime endTime) {
this.endTime = endTime;
return this;
}
}
}
......@@ -17,10 +17,11 @@ public class VaServerInfo implements Serializable {
private String devID;
private String serviceName;
/** 最大资源数量 */
private Float videoResource;
private String serviceBaseUrl;
private String proxy;
/** 可用资源数量 */
private Float availableResources;
public VaServerInfo setVideoResource(Float videoResource) {
......
package com.viontech.fanxing.task.scheduling.runner;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.fanxing.task.scheduling.feign.TaskClient;
import com.viontech.fanxing.task.scheduling.model.RuntimeConfig;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.service.RedisCacheService;
import com.viontech.fanxing.task.scheduling.service.TaskService;
import com.viontech.fanxing.task.scheduling.service.VAServerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
......@@ -33,9 +28,12 @@ import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class TaskRunner {
@Resource
private RedissonClient redissonClient;
@Resource
private RedisCacheService redisCacheService;
@Resource
private VAServerService vaServerService;
@Resource
private TaskService taskService;
......@@ -53,9 +51,10 @@ public class TaskRunner {
}
if (isLock) {
try {
RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET);
RMap<String, TaskData> taskDataMap = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_DATA_MAP);
RMap<String, VaServerInfo> vaServerMap = redissonClient.getMap(RedisKeys.SCHEDULING_VASERVER_MAP);
RScoredSortedSet<String> set = redisCacheService.getToBeExecutedTaskUnidSet();
RMap<String, TaskData> taskDataMap = redisCacheService.getTaskDataMap();
RMap<String, VaServerInfo> vaServerMap = redisCacheService.getVaServerMap();
Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);
for (String taskUnid : entryCollection) {
log.info("开始任务 : {}", taskUnid);
......@@ -114,8 +113,9 @@ public class TaskRunner {
}
if (isLock) {
try {
RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
RMap<String, TaskData> taskDataMap = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_DATA_MAP);
RScoredSortedSet<String> set = redisCacheService.getToBeTerminatedTaskUnidSet();
RMap<String, TaskData> taskDataMap = redisCacheService.getTaskDataMap();
Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);
for (String taskUnid : entryCollection) {
......
package com.viontech.fanxing.task.scheduling.service;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import org.redisson.api.RBucket;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
/**
* .
*
* @author 谢明辉
* @date 2021/7/26
*/
@Service
public class RedisCacheService {
@Resource
private RedissonClient redissonClient;
public RMap<String, String> getTaskVaServerMap() {
return redissonClient.getMap(RedisKeys.SCHEDULING_TASK_VASERVER_MAP);
}
public RScoredSortedSet<String> getToBeExecutedTaskUnidSet() {
return redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET);
}
public RScoredSortedSet<String> getToBeTerminatedTaskUnidSet() {
return redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
}
public RMap<String, TaskData> getTaskDataMap() {
return redissonClient.getMap(RedisKeys.SCHEDULING_TASK_DATA_MAP);
}
public RMap<String, VaServerInfo> getVaServerMap() {
return redissonClient.getMap(RedisKeys.SCHEDULING_VASERVER_MAP);
}
public <T> RBucket<T> getValue(String key) {
return redissonClient.getBucket(key);
}
}
package com.viontech.fanxing.task.scheduling.service;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
......@@ -22,34 +20,34 @@ import javax.annotation.Resource;
public class TaskService {
@Resource
private RedissonClient redissonClient;
private RedisCacheService redisCacheService;
@Resource
private VAServerService vaServerService;
public boolean addExecutableTaskData(String taskUnid, Long timestamp) {
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET);
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisCacheService.getToBeExecutedTaskUnidSet();
return toBeExecutedTaskUnidSet.add(timestamp, taskUnid);
}
public boolean addTerminatableTaskData(String taskUnid, Long timestamp) {
RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisCacheService.getToBeTerminatedTaskUnidSet();
return toBeTerminatedTaskUnidSet.add(timestamp, taskUnid);
}
public void addTaskData(TaskData taskData) {
RMap<String, TaskData> taskDataMap = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_DATA_MAP);
RMap<String, TaskData> taskDataMap = redisCacheService.getTaskDataMap();
taskDataMap.put(taskData.getTask().getUnid(), taskData);
}
public TaskData getTaskDataByUnid(String taskUnid) {
RMap<String, TaskData> taskDataMap = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_DATA_MAP);
RMap<String, TaskData> taskDataMap = redisCacheService.getTaskDataMap();
return taskDataMap.get(taskUnid);
}
public void removeTaskDataAll(String taskUnid) {
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET);
RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
RMap<String, TaskData> taskDataMap = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_DATA_MAP);
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisCacheService.getToBeExecutedTaskUnidSet();
RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisCacheService.getToBeTerminatedTaskUnidSet();
RMap<String, TaskData> taskDataMap = redisCacheService.getTaskDataMap();
toBeExecutedTaskUnidSet.remove(taskUnid);
toBeTerminatedTaskUnidSet.remove(taskUnid);
......@@ -57,13 +55,10 @@ public class TaskService {
}
public VaServerInfo taskRunOn(String taskUnid) {
RMap<String, String> taskVaServerMap = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_VASERVER_MAP);
RMap<String, String> taskVaServerMap = redisCacheService.getTaskVaServerMap();
String devId = taskVaServerMap.get(taskUnid);
if (devId != null) {
return vaServerService.getVaServerInfoByDevId(devId);
} else {
return null;
}
return devId == null ? null : vaServerService.getVaServerInfoByDevId(devId);
}
/**
......@@ -74,19 +69,19 @@ public class TaskService {
* <li>right <code>devId</code></li>
*/
public ImmutablePair<String, String> unlinkTaskAndVaServer(String taskUnid) {
RMap<String, String> map = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_VASERVER_MAP);
String devId = map.get(taskUnid);
map.remove(taskUnid);
RMap<String, String> taskVaServerMap = redisCacheService.getTaskVaServerMap();
String devId = taskVaServerMap.get(taskUnid);
taskVaServerMap.remove(taskUnid);
return ImmutablePair.of(taskUnid, devId);
}
public void linkTaskAndVaServer(String taskUnid, String devId) {
RMap<String, String> map = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_VASERVER_MAP);
RMap<String, String> map = redisCacheService.getTaskVaServerMap();
map.put(taskUnid, devId);
}
public void updateTask(Task task) {
RMap<String, TaskData> taskDataMap = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_DATA_MAP);
RMap<String, TaskData> taskDataMap = redisCacheService.getTaskDataMap();
TaskData taskData = new TaskData(task);
// 需要更新taskData,并且向vaServer更新任务信息
taskDataMap.put(task.getUnid(), taskData);
......
package com.viontech.fanxing.task.scheduling.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VATask;
......@@ -37,7 +36,7 @@ public class VAServerHttpService {
.bodyToMono(String.class);
String response = stringMono.block(Duration.ofSeconds(20));
log.info("下发任务结果:{}", response);
return null;
return response;
}
......@@ -56,7 +55,7 @@ public class VAServerHttpService {
.bodyToMono(String.class);
String response = stringMono.block(Duration.ofSeconds(20));
log.info("下发任务结果:{}", response);
return null;
return response;
}
/**
......@@ -77,7 +76,7 @@ public class VAServerHttpService {
log.info("删除任务结果:{}", response);
return null;
return response;
}
/**
......@@ -96,13 +95,13 @@ public class VAServerHttpService {
String response = stringMono.block(Duration.ofSeconds(20));
log.info("截图结果:{}", response);
return null;
return response;
}
/**
* 获取点播地址
*/
public String getAnalyzeStream(String taskUnid, VaServerInfo vaServerInfo) {
public Object getAnalyzeStream(String taskUnid, VaServerInfo vaServerInfo) {
String path = "/api/vaserver/v1/get_analyze_stream";
JSONObject jsonObject = new JSONObject();
......@@ -115,25 +114,21 @@ public class VAServerHttpService {
.retrieve()
.bodyToMono(String.class);
JSONObject response = stringMono.blockOptional(Duration.ofSeconds(10)).map(JSON::parseObject).get();
Integer code = response.getInteger("code");
if (code == null || -1 == code) {
throw new RuntimeException(response.getString("msg"));
} else {
return response.getString("stream_url");
}
String response = stringMono.block(Duration.ofSeconds(10));
log.info("获取分析流地址结果 : {}", response);
return response;
}
/**
* 开始输出分析流
*/
public Object startAnalyzeStream(String taskUnid, VaServerInfo vaServerInfo) {
public Object startAnalyzeStream(String taskUnid, VaServerInfo vaServerInfo, String url) {
String path = "/api/vaserver/v1/start_analyze_stream";
JSONObject jsonObject = new JSONObject();
jsonObject.put("task_unid", taskUnid);
jsonObject.put("isDrawRect", 1);
jsonObject.put("mediaServerPushUrl", "rtmp://192.168.9.159:1200/task1111111");
jsonObject.put("mediaServerPushUrl", url);
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
.post()
......@@ -145,13 +140,13 @@ public class VAServerHttpService {
String response = stringMono.block(Duration.ofSeconds(20));
log.info("输出分析流结果:{}", response);
return null;
return response;
}
/**
* 切换预置位
*/
public Object switchScene(String taskUnid, VaServerInfo vaServerInfo,String sceneId) {
public Object switchScene(String taskUnid, VaServerInfo vaServerInfo, String sceneId) {
String path = "/api/vaserver/v1/switch_scene";
JSONObject jsonObject = new JSONObject();
......@@ -168,18 +163,18 @@ public class VAServerHttpService {
String response = stringMono.block(Duration.ofSeconds(20));
log.info("场景切换结果:{}", response);
return null;
return response;
}
/**
* 任务状态轮询控制
* 任务轮训状态切换
*/
public Object updateAlternate(String taskUnid, VaServerInfo vaServerInfo) {
public Object updateRotationStatus(String taskUnid, Integer rotationStatus , VaServerInfo vaServerInfo) {
String path = "/api/vaserver/v1/alternate";
JSONObject jsonObject = new JSONObject();
jsonObject.put("task_unid", taskUnid);
jsonObject.put("alternateStatus", "1");
jsonObject.put("alternateStatus", rotationStatus.toString());
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
.post()
......@@ -191,13 +186,13 @@ public class VAServerHttpService {
String response = stringMono.block(Duration.ofSeconds(20));
log.info("轮训状态控制结果:{}", response);
return null;
return response;
}
/**
* 获取轮询状态
* 任务轮训状态查询
*/
public Object getAlternate(String taskUnid, VaServerInfo vaServerInfo) {
public Object getRotationStatus(String taskUnid, VaServerInfo vaServerInfo) {
String path = "/api/vaserver/v1/alternate";
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
......@@ -210,9 +205,9 @@ public class VAServerHttpService {
.bodyToMono(String.class);
String response = stringMono.block(Duration.ofSeconds(20));
log.info("轮训状态控制结果:{}", response);
log.info("获取轮训状态:{}", response);
return null;
return response;
}
/**
......@@ -230,9 +225,9 @@ public class VAServerHttpService {
.bodyToMono(String.class);
String response = stringMono.block(Duration.ofSeconds(20));
log.info("轮训状态控制结果:{}", response);
log.info("运行状态查询:{}", response);
return null;
return response;
}
......
package com.viontech.fanxing.task.scheduling.service;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import org.redisson.api.RBucket;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
......@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
public class VAServerService {
@Resource
private RedissonClient redissonClient;
private RedisCacheService redisCacheService;
@Resource
private TaskService taskService;
@Resource
......@@ -34,24 +34,29 @@ public class VAServerService {
* 设备注册
*/
public void registeVAServer(VaServerInfo vaServerInfo) {
String devID = vaServerInfo.getDevID();
RMap<String, VaServerInfo> map = redissonClient.getMap(RedisKeys.SCHEDULING_VASERVER_MAP);
RBucket<Date> bucket = redissonClient.getBucket(RedisKeys.getVAServerKeepAliveKey(devID));
String devId = vaServerInfo.getDevID();
RMap<String, VaServerInfo> map = redisCacheService.getVaServerMap();
RBucket<Date> bucket = redisCacheService.getValue(RedisKeys.getVAServerKeepAliveKey(devId));
bucket.set(new Date());
bucket.expire(2, TimeUnit.MINUTES);
map.put(devID, vaServerInfo);
map.put(devId, vaServerInfo);
}
public VaServerInfo getVaServerInfoByDevId(String devId) {
RMap<String, VaServerInfo> map = redissonClient.getMap(RedisKeys.SCHEDULING_VASERVER_MAP);
RMap<String, VaServerInfo> map = redisCacheService.getVaServerMap();
return map.get(devId);
}
public Object getVaServerInfo() {
RMap<String, VaServerInfo> map = redisCacheService.getVaServerMap();
return map.values();
}
/**
* 设备心跳
*/
public void keepalive(String devId) {
RBucket<Date> bucket = redissonClient.getBucket(RedisKeys.getVAServerKeepAliveKey(devId));
RBucket<Date> bucket = redisCacheService.getValue(RedisKeys.getVAServerKeepAliveKey(devId));
bucket.set(new Date());
bucket.expire(2, TimeUnit.MINUTES);
}
......@@ -68,7 +73,7 @@ public class VAServerService {
taskService.linkTaskAndVaServer(task.getUnid(), server.getDevID());
server.setAvailableResources(server.getAvailableResources() - task.getResourceNeed());
RMap<String, VaServerInfo> vaServerMap = redissonClient.getMap(RedisKeys.SCHEDULING_VASERVER_MAP);
RMap<String, VaServerInfo> vaServerMap = redisCacheService.getVaServerMap();
vaServerMap.put(server.getDevID(), server);
return false;
}
......@@ -84,12 +89,12 @@ public class VAServerService {
TaskData taskData = taskService.getTaskDataByUnid(taskUnid);
Task task = taskData.getTask();
RMap<String, String> map = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_VASERVER_MAP);
RMap<String, String> map = redisCacheService.getTaskVaServerMap();
String vaServerId = map.get(taskUnid);
// 如果vaServerId不为空,需要终止任务
if (vaServerId != null) {
RMap<String, VaServerInfo> vaServerMap = redissonClient.getMap(RedisKeys.SCHEDULING_VASERVER_MAP);
RMap<String, VaServerInfo> vaServerMap = redisCacheService.getVaServerMap();
VaServerInfo vaServerInfo = vaServerMap.get(vaServerId);
// 下发终止任务请求
vaServerHttpService.rmTask(taskUnid, vaServerInfo);
......@@ -119,7 +124,7 @@ public class VAServerService {
if (vaServerInfo != null) {
return vaServerHttpService.snapshot(taskUnid, vaServerInfo);
} else {
throw new RuntimeException("任务不在运行状态");
throw new FanXingException("任务不在运行状态", taskUnid);
}
}
......@@ -131,19 +136,19 @@ public class VAServerService {
if (vaServerInfo != null) {
return vaServerHttpService.getAnalyzeStream(taskUnid, vaServerInfo);
} else {
throw new RuntimeException("任务不在运行状态");
throw new FanXingException("任务不在运行状态", taskUnid);
}
}
/**
* 输出分析流
*/
public Object startAnalyzeStream(String taskUnid) {
public Object startAnalyzeStream(String taskUnid, String url) {
VaServerInfo vaServerInfo = taskService.taskRunOn(taskUnid);
if (vaServerInfo != null) {
return vaServerHttpService.startAnalyzeStream(taskUnid, vaServerInfo);
return vaServerHttpService.startAnalyzeStream(taskUnid, vaServerInfo, url);
} else {
throw new RuntimeException("任务不在运行状态");
throw new FanXingException("任务不在运行状态", taskUnid);
}
}
......@@ -163,7 +168,7 @@ public class VAServerService {
if (vaServerInfo != null) {
return vaServerHttpService.status(vaServerInfo);
} else {
throw new RuntimeException("任务不在运行状态");
throw new FanXingException("无法获取到对应的设备", devId);
}
}
......@@ -175,8 +180,33 @@ public class VAServerService {
if (vaServerInfo != null) {
return vaServerHttpService.switchScene(taskUnid, vaServerInfo, sceneId);
} else {
throw new RuntimeException("任务不在运行状态");
throw new FanXingException("任务不在运行状态", taskUnid);
}
}
/**
* 任务轮训状态切换
*/
public Object updateRotationStatus(String taskUnid, Integer rotationStatus) {
VaServerInfo vaServerInfo = taskService.taskRunOn(taskUnid);
if (vaServerInfo != null) {
return vaServerHttpService.updateRotationStatus(taskUnid, rotationStatus, vaServerInfo);
} else {
throw new FanXingException("任务不在运行状态", taskUnid);
}
}
/**
* 任务轮训状态查询
*/
public Object getRotationStatus(String taskUnid) {
VaServerInfo vaServerInfo = taskService.taskRunOn(taskUnid);
if (vaServerInfo != null) {
return vaServerHttpService.getRotationStatus(taskUnid, vaServerInfo);
} else {
throw new FanXingException("任务不在运行状态", taskUnid);
}
}
}
......@@ -36,3 +36,6 @@ logging:
vion:
redisson:
path: F:\myIDEAworkspace\jt\fanxing3\fanxing-commons\src\main\resources\redisson.yml
gateway:
ip: 192.168.9.233
port: 30000
\ No newline at end of file
......@@ -4,28 +4,20 @@ import com.alibaba.fastjson.JSON;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.vo.StoreConfigVo;
import com.viontech.fanxing.task.scheduling.feign.TaskClient;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.keliu.util.DateUtil;
import com.viontech.keliu.util.JsonMessageUtil;
import io.micrometer.core.instrument.Statistic;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang3.builder.ToStringExclude;
import org.junit.Before;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.text.ParseException;
import java.util.Date;
/**
* .
......@@ -37,20 +29,20 @@ import java.util.Date;
@RunWith(SpringRunner.class)
class VAServerHttpServiceTest {
private static final String TASK_UNID = "0a263320bd274b529e5185e3b05aa157";
@Resource
VAServerHttpService vaServerHttpService;
@Resource
private RedissonClient redissonClient;
RedisCacheService redisCacheService;
@Resource
private TaskClient taskClient;
private VaServerInfo vaServerInfo;
private TaskData taskData;
private static final String TASK_UNID = "8af60a07c86d4571b037af3f4ccf681c";
@BeforeEach
public void before() {
RMap<String, VaServerInfo> map = redissonClient.getMap(RedisKeys.SCHEDULING_VASERVER_MAP);
RMap<String, VaServerInfo> map = redisCacheService.getVaServerMap();
this.vaServerInfo = map.get("xxx-xx");
this.taskData = new TaskData();
Task task = new Task();
......@@ -59,7 +51,6 @@ class VAServerHttpServiceTest {
}
@Test
void snapshot() {
vaServerHttpService.snapshot(TASK_UNID, vaServerInfo);
......@@ -67,28 +58,27 @@ class VAServerHttpServiceTest {
@Test
void getAnalyzeStream() {
String analyzeStream = vaServerHttpService.getAnalyzeStream(TASK_UNID, vaServerInfo);
System.out.println(analyzeStream);
Object analyzeStream = vaServerHttpService.getAnalyzeStream(TASK_UNID, vaServerInfo);
}
@Test
void startAnalyzeStream() {
vaServerHttpService.startAnalyzeStream(TASK_UNID, vaServerInfo);
vaServerHttpService.startAnalyzeStream(TASK_UNID, vaServerInfo, "rtsp://192.168.9.159:10087/0a263320bd274b529e5185e3b05aa157");
}
@Test
void switchScene() {
vaServerHttpService.switchScene(TASK_UNID, vaServerInfo,"1");
vaServerHttpService.switchScene(TASK_UNID, vaServerInfo, "1");
}
@Test
void updateAlternate() {
vaServerHttpService.updateAlternate(TASK_UNID, vaServerInfo);
vaServerHttpService.updateRotationStatus(TASK_UNID,1, vaServerInfo);
}
@Test
void getAlternate() {
vaServerHttpService.getAlternate(TASK_UNID, vaServerInfo);
vaServerHttpService.getRotationStatus(TASK_UNID, vaServerInfo);
}
@Test
......@@ -105,10 +95,13 @@ class VAServerHttpServiceTest {
@Test
void test() throws ParseException {
RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET);
RScoredSortedSet<String> set2 = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
set.add(DateUtil.parse("yyyy-MM-dd HH:mm:ss", "2021-07-22 16:15:00").getTime(), "tttttttttttttttttt");
set2.add(DateUtil.parse("yyyy-MM-dd HH:mm:ss", "2021-07-22 16:16:00").getTime(), "tttttttttttttttttt");
RMap<String, String> map = redisCacheService.getTaskVaServerMap();
map.put("0a263320bd274b529e5185e3b05aa157", "xxx-xx");
// RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET);
// RScoredSortedSet<String> set2 = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
//
// set.add(DateUtil.parse("yyyy-MM-dd HH:mm:ss", "2021-07-22 16:15:00").getTime(), "tttttttttttttttttt");
// set2.add(DateUtil.parse("yyyy-MM-dd HH:mm:ss", "2021-07-22 16:16:00").getTime(), "tttttttttttttttttt");
}
}
\ No newline at end of file
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!