Commit 4c0f5b97 by xmh

优化代码

1 parent 5fc02e5f
Showing 15 changed files with 175 additions and 97 deletions
...@@ -44,7 +44,7 @@ public class RedisService { ...@@ -44,7 +44,7 @@ public class RedisService {
while (!isLock) { while (!isLock) {
try { try {
isLock = lock.tryLock(30, 25, TimeUnit.SECONDS); isLock = lock.tryLock(30, 25, TimeUnit.SECONDS);
} catch (InterruptedException ignore) { } catch (Exception ignore) {
} }
} }
return lock; return lock;
......
...@@ -2,8 +2,10 @@ package com.viontech.fanxing.commons.vo; ...@@ -2,8 +2,10 @@ package com.viontech.fanxing.commons.vo;
import com.viontech.fanxing.commons.model.StoreConfig; import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.vobase.StoreConfigVoBase; import com.viontech.fanxing.commons.vobase.StoreConfigVoBase;
import org.springframework.web.multipart.MultipartFile;
public class StoreConfigVo extends StoreConfigVoBase { public class StoreConfigVo extends StoreConfigVoBase {
private MultipartFile file;
public StoreConfigVo() { public StoreConfigVo() {
super(); super();
...@@ -12,4 +14,13 @@ public class StoreConfigVo extends StoreConfigVoBase { ...@@ -12,4 +14,13 @@ public class StoreConfigVo extends StoreConfigVoBase {
public StoreConfigVo(StoreConfig storeConfig) { public StoreConfigVo(StoreConfig storeConfig) {
super(storeConfig); super(storeConfig);
} }
public MultipartFile getFile() {
return file;
}
public StoreConfigVo setFile(MultipartFile file) {
this.file = file;
return this;
}
} }
\ No newline at end of file \ No newline at end of file
...@@ -16,15 +16,15 @@ import java.util.List; ...@@ -16,15 +16,15 @@ import java.util.List;
* 支持Stream * 支持Stream
* Created by suman on 2018/7/19. * Created by suman on 2018/7/19.
*/ */
public class CompositeItemStreamProcessor<I,O> extends CompositeItemProcessor<I,O> implements ItemStream,ItemProcessor<I, O>, InitializingBean { public class CompositeItemStreamProcessor<I, O> extends CompositeItemProcessor<I, O> implements ItemStream, ItemProcessor<I, O>, InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(CompositeItemStreamProcessor.class);
private boolean ignoreItemStream = false; private boolean ignoreItemStream = false;
private Logger logger = LoggerFactory.getLogger(CompositeItemStreamProcessor.class); private List<? extends ItemProcessor<?, ?>> delegates;
public void setIgnoreItemStream(boolean ignoreItemStream) { public void setIgnoreItemStream(boolean ignoreItemStream) {
this.ignoreItemStream = ignoreItemStream; this.ignoreItemStream = ignoreItemStream;
} }
private List<? extends ItemProcessor<?, ?>> delegates;
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public O process(I item) throws Exception { public O process(I item) throws Exception {
...@@ -37,10 +37,8 @@ public class CompositeItemStreamProcessor<I,O> extends CompositeItemProcessor<I, ...@@ -37,10 +37,8 @@ public class CompositeItemStreamProcessor<I,O> extends CompositeItemProcessor<I,
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
result = processItem(delegate, result); result = processItem(delegate, result);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
long c = end-start; long c = end - start;
if(c>1) { logger.info(delegate.getClass().getSimpleName() + "Processor执行时间:" + (end - start));
logger.info(delegate.getClass().getSimpleName() + "Processor执行时间:" + (end - start));
}
} }
logger.info(item.getClass().getSimpleName() + "整体处理时间:" + (System.currentTimeMillis() - startAll)); logger.info(item.getClass().getSimpleName() + "整体处理时间:" + (System.currentTimeMillis() - startAll));
return (O) result; return (O) result;
...@@ -64,8 +62,9 @@ public class CompositeItemStreamProcessor<I,O> extends CompositeItemProcessor<I, ...@@ -64,8 +62,9 @@ public class CompositeItemStreamProcessor<I,O> extends CompositeItemProcessor<I,
/** /**
* Establishes the {@link ItemProcessor} delegates that will work on the item to be * Establishes the {@link ItemProcessor} delegates that will work on the item to be
* processed. * processed.
*
* @param delegates list of {@link ItemProcessor} delegates that will work on the * @param delegates list of {@link ItemProcessor} delegates that will work on the
* item. * item.
*/ */
@Override @Override
public void setDelegates(List<? extends ItemProcessor<?, ?>> delegates) { public void setDelegates(List<? extends ItemProcessor<?, ?>> delegates) {
......
...@@ -6,10 +6,10 @@ import org.springframework.batch.core.JobParametersBuilder; ...@@ -6,10 +6,10 @@ import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.support.SimpleJobLauncher; import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
...@@ -36,6 +36,8 @@ public class JobStartRunner implements CommandLineRunner { ...@@ -36,6 +36,8 @@ public class JobStartRunner implements CommandLineRunner {
private Job trafficFlowJob; private Job trafficFlowJob;
@Resource @Resource
private Job behaviorJob; private Job behaviorJob;
@Value("${vion.forward.enable:false}")
private Boolean enableForward;
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
...@@ -43,11 +45,27 @@ public class JobStartRunner implements CommandLineRunner { ...@@ -43,11 +45,27 @@ public class JobStartRunner implements CommandLineRunner {
sjl.setJobRepository(jobRepository); sjl.setJobRepository(jobRepository);
sjl.setTaskExecutor(new SimpleAsyncTaskExecutor()); sjl.setTaskExecutor(new SimpleAsyncTaskExecutor());
sjl.afterPropertiesSet(); sjl.afterPropertiesSet();
JobParameters jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters(); JobParameters jobParameters;
sjl.run(trafficJob, jobParameters); for (int i = 0; i < 5; i++) {
jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters(); jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters();
sjl.run(trafficFlowJob, jobParameters); sjl.run(trafficJob, jobParameters);
jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters(); }
sjl.run(behaviorJob, jobParameters);
for (int i = 0; i < 5; i++) {
jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters();
sjl.run(trafficFlowJob, jobParameters);
}
for (int i = 0; i < 5; i++) {
jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters();
sjl.run(behaviorJob, jobParameters);
}
if (enableForward) {
for (int i = 0; i < 5; i++) {
jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters();
sjl.run(forwardJob, jobParameters);
}
}
} }
} }
package com.viontech.fanxing.task.manager.controller.web; package com.viontech.fanxing.task.manager.controller.web;
import com.github.pagehelper.PageInfo;
import com.viontech.fanxing.commons.base.BaseExample; import com.viontech.fanxing.commons.base.BaseExample;
import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.vo.StoreConfigVo;
import com.viontech.fanxing.task.manager.controller.base.StoreConfigBaseController;
import com.viontech.fanxing.commons.model.StoreConfig; import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.StoreConfigExample; import com.viontech.fanxing.commons.model.StoreConfigExample;
import com.viontech.fanxing.task.manager.mapper.StoreConfigMapper; import com.viontech.fanxing.commons.vo.StoreConfigVo;
import com.viontech.fanxing.task.manager.service.adapter.StoreConfigService; import com.viontech.fanxing.task.manager.controller.base.StoreConfigBaseController;
import com.viontech.keliu.util.JsonMessageUtil;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*; import org.springframework.util.StreamUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.multipart.MultipartFile;
import java.util.List; import java.io.IOException;
import java.nio.charset.Charset;
import static com.viontech.keliu.util.JsonMessageUtil.getSuccessJsonMsg; import java.nio.charset.StandardCharsets;
@Controller @Controller
@RequestMapping("/storeConfigs") @RequestMapping("/storeConfigs")
...@@ -22,7 +24,25 @@ public class StoreConfigController extends StoreConfigBaseController { ...@@ -22,7 +24,25 @@ public class StoreConfigController extends StoreConfigBaseController {
@Override @Override
protected BaseExample getExample(StoreConfigVo storeConfigVo, int type) { protected BaseExample getExample(StoreConfigVo storeConfigVo, int type) {
StoreConfigExample storeConfigExample = (StoreConfigExample)super.getExample(storeConfigVo,type); StoreConfigExample storeConfigExample = (StoreConfigExample) super.getExample(storeConfigVo, type);
return storeConfigExample; return storeConfigExample;
} }
@Override
@PostMapping
@ResponseBody
public Object add(StoreConfigVo storeConfigVo) {
MultipartFile file = storeConfigVo.getFile();
try {
String content = StreamUtils.copyToString(file.getInputStream(), Charset.forName("GBK"));
storeConfigVo.setContent(content);
getService().insertSelective(storeConfigVo);
storeConfigVo.setContent(null);
storeConfigVo.setFile(null);
return JsonMessageUtil.getSuccessJsonMsg(storeConfigVo);
} catch (IOException e) {
logger.error("", e);
return JsonMessageUtil.getErrorJsonMsg(e.getMessage());
}
}
} }
\ No newline at end of file \ No newline at end of file
...@@ -42,4 +42,13 @@ public class TaskController extends TaskBaseController { ...@@ -42,4 +42,13 @@ public class TaskController extends TaskBaseController {
taskService.removeTask(id); taskService.removeTask(id);
return JsonMessageUtil.getSuccessJsonMsg("success"); return JsonMessageUtil.getSuccessJsonMsg("success");
} }
@PutMapping("/{id}")
public JsonMessageUtil.JsonMessage<TaskVo> updateStatus(@PathVariable("id") Long id, @RequestParam Integer status) {
TaskVo taskVo = new TaskVo();
taskVo.setStatus(status);
taskVo.setId(id);
int i = taskService.updateByPrimaryKeySelective(taskVo);
return JsonMessageUtil.getSuccessJsonMsg("success");
}
} }
\ No newline at end of file \ No newline at end of file
...@@ -42,8 +42,6 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic ...@@ -42,8 +42,6 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
@Override @Override
public TaskVo updateTask(Task task) { public TaskVo updateTask(Task task) {
task = insertSelective(task);
task = selectByPrimaryKey(task.getId());
JsonMessageUtil.JsonMessage update = taskSchedulingClient.update(task); JsonMessageUtil.JsonMessage update = taskSchedulingClient.update(task);
if (update.isSuccess()) { if (update.isSuccess()) {
......
...@@ -47,6 +47,9 @@ public class TaskController implements TaskSchedulingTasksAdapter { ...@@ -47,6 +47,9 @@ public class TaskController implements TaskSchedulingTasksAdapter {
// 计算运行时间并生成任务 // 计算运行时间并生成任务
boolean success = taskService.distributeTask(taskData); boolean success = taskService.distributeTask(taskData);
if (success) {
taskService.getRepository().addOrUpdateTaskData(taskData);
}
return success ? JsonMessageUtil.getSuccessJsonMsg("success") : JsonMessageUtil.getErrorJsonMsg("任务找不到可执行时间"); return success ? JsonMessageUtil.getSuccessJsonMsg("success") : JsonMessageUtil.getErrorJsonMsg("任务找不到可执行时间");
} }
......
...@@ -5,10 +5,7 @@ import com.viontech.fanxing.commons.vo.TaskVo; ...@@ -5,10 +5,7 @@ import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.keliu.util.JsonMessageUtil; import com.viontech.keliu.util.JsonMessageUtil;
import org.springframework.cloud.openfeign.FeignClient; import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
/** /**
* . * .
...@@ -21,8 +18,8 @@ import org.springframework.web.bind.annotation.RequestBody; ...@@ -21,8 +18,8 @@ import org.springframework.web.bind.annotation.RequestBody;
@FeignClient(value = "fanxing-task-manager") @FeignClient(value = "fanxing-task-manager")
public interface TaskClient { public interface TaskClient {
@PostMapping("/tasks/{id}") @PutMapping("/tasks/{id}")
JsonMessageUtil.JsonMessage<TaskVo> updateTask(@PathVariable("id") Long taskId, @RequestBody TaskVo taskVo); JsonMessageUtil.JsonMessage<TaskVo> updateTaskStatus(@PathVariable("id") Long taskId, @RequestParam Integer status);
@GetMapping("/storeConfigs/{id}") @GetMapping("/storeConfigs/{id}")
JsonMessageUtil.JsonMessage<StoreConfig> getStoreConfigById(@PathVariable("id") Long storeConfigId); JsonMessageUtil.JsonMessage<StoreConfig> getStoreConfigById(@PathVariable("id") Long storeConfigId);
......
...@@ -2,12 +2,18 @@ package com.viontech.fanxing.task.scheduling.model; ...@@ -2,12 +2,18 @@ package com.viontech.fanxing.task.scheduling.model;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter; import lombok.Setter;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import java.io.Serializable;
import java.security.InvalidParameterException; import java.security.InvalidParameterException;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
...@@ -31,7 +37,8 @@ import java.util.HashMap; ...@@ -31,7 +37,8 @@ import java.util.HashMap;
@Getter @Getter
@Setter @Setter
public class RuntimeConfig { @NoArgsConstructor
public class RuntimeConfig implements Serializable {
private static final String[] WEEK_ARR = new String[]{"sun", "mon", "tue", "wed", "thu", "fri", "sat"}; private static final String[] WEEK_ARR = new String[]{"sun", "mon", "tue", "wed", "thu", "fri", "sat"};
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss"); private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss");
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
...@@ -162,11 +169,19 @@ public class RuntimeConfig { ...@@ -162,11 +169,19 @@ public class RuntimeConfig {
@Getter @Getter
@Setter @Setter
@Accessors(chain = true) @Accessors(chain = true)
static class Config { static class Config implements Serializable {
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalTime start; private LocalTime start;
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalTime end; private LocalTime end;
private Long runningTime; private Long runningTime;
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime startTime; private LocalDateTime startTime;
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime endTime; private LocalDateTime endTime;
} }
......
...@@ -30,6 +30,9 @@ public class VAServerRedisRepository { ...@@ -30,6 +30,9 @@ public class VAServerRedisRepository {
public VaServerInfo getVAServerInfoById(String devId) { public VaServerInfo getVAServerInfoById(String devId) {
VaServerInfo vaServerInfo = vaServerMap.get(devId); VaServerInfo vaServerInfo = vaServerMap.get(devId);
if (vaServerInfo == null) {
return null;
}
if (!online(devId)) { if (!online(devId)) {
vaServerInfo.setStatus(0); vaServerInfo.setStatus(0);
addOrUpdate(devId, vaServerInfo); addOrUpdate(devId, vaServerInfo);
......
...@@ -84,18 +84,15 @@ public class TaskRunner { ...@@ -84,18 +84,15 @@ public class TaskRunner {
// 找不到可以用来执行的设备,需要修改状态 // 找不到可以用来执行的设备,需要修改状态
if (server == null) { if (server == null) {
TaskVo taskVo = new TaskVo(); log.info("找不到可用的 VAServer,跳过:{}", taskUnid);
taskVo.setStatus(TaskStatus.CAN_NOT_RUN.val); taskClient.updateTaskStatus(task.getId(), TaskStatus.CAN_NOT_RUN.val);
taskClient.updateTask(task.getId(), taskVo);
continue; continue;
} }
boolean success = vaServerService.executeTask(taskData, server); boolean success = vaServerService.executeTask(taskData, server);
// 修改任务状态 // 修改任务状态
TaskVo taskVo = new TaskVo(); taskClient.updateTaskStatus(task.getId(), TaskStatus.RUNNING.val);
taskVo.setStatus(TaskStatus.RUNNING.val);
taskClient.updateTask(task.getId(), taskVo);
// 移除任务 // 移除任务
set.remove(taskUnid); set.remove(taskUnid);
...@@ -130,6 +127,7 @@ public class TaskRunner { ...@@ -130,6 +127,7 @@ public class TaskRunner {
// 获取可用的 vaserver ,执行任务终止动作,如果成功,解除 taskData和 vaServer 的关联,并且从 zset 中移除任务,恢复vaserver资源数,计算下次任务执行时间,放入zset中 // 获取可用的 vaserver ,执行任务终止动作,如果成功,解除 taskData和 vaServer 的关联,并且从 zset 中移除任务,恢复vaserver资源数,计算下次任务执行时间,放入zset中
boolean success = vaServerService.terminateTask(taskUnid); boolean success = vaServerService.terminateTask(taskUnid);
if (success) { if (success) {
taskClient.updateTaskStatus(taskData.getTask().getId(), TaskStatus.PAUSE.val);
boolean b = taskService.distributeTask(taskData); boolean b = taskService.distributeTask(taskData);
} }
set.remove(taskUnid); set.remove(taskUnid);
......
package com.viontech.fanxing.task.scheduling.service; package com.viontech.fanxing.task.scheduling.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VATask; import com.viontech.fanxing.task.scheduling.model.vaserver.VATask;
...@@ -36,8 +37,7 @@ public class VAServerHttpService { ...@@ -36,8 +37,7 @@ public class VAServerHttpService {
.bodyToMono(String.class); .bodyToMono(String.class);
String response = stringMono.block(Duration.ofSeconds(20)); String response = stringMono.block(Duration.ofSeconds(20));
log.info("下发任务结果:{}", response); log.info("下发任务结果:{}", response);
return response; return JSON.parseObject(response);
} }
/** /**
...@@ -55,28 +55,27 @@ public class VAServerHttpService { ...@@ -55,28 +55,27 @@ public class VAServerHttpService {
.bodyToMono(String.class); .bodyToMono(String.class);
String response = stringMono.block(Duration.ofSeconds(20)); String response = stringMono.block(Duration.ofSeconds(20));
log.info("下发任务结果:{}", response); log.info("下发任务结果:{}", response);
return response; return JSON.parseObject(response);
} }
/** /**
* 删除任务 * 删除任务
*/ */
public Object rmTask(String taskUnid, VaServerInfo vaServerInfo) { public Object rmTask(String taskUnid, VaServerInfo vaServerInfo) {
String path = "/api/vaserver/v1/task}"; String path = "/api/vaserver/v1/deleteTask";
JSONObject jsonObject = new JSONObject();
jsonObject.put("task_unid", taskUnid);
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl()) Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
.delete() .post()
.uri(uriBuilder -> uriBuilder .uri(uriBuilder -> uriBuilder.path(path).build())
.path(path) .bodyValue(jsonObject.toString())
.queryParam("task_unid", taskUnid)
.build())
.retrieve() .retrieve()
.bodyToMono(String.class); .bodyToMono(String.class);
String response = stringMono.block(Duration.ofSeconds(20)); String response = stringMono.block(Duration.ofSeconds(20));
log.info("删除任务结果:{}", response); log.info("删除任务结果:{}", response);
return JSON.parseObject(response);
return response;
} }
/** /**
...@@ -84,18 +83,19 @@ public class VAServerHttpService { ...@@ -84,18 +83,19 @@ public class VAServerHttpService {
*/ */
public Object snapshot(String taskUnid, VaServerInfo vaServerInfo) { public Object snapshot(String taskUnid, VaServerInfo vaServerInfo) {
String path = "/api/vaserver/v1/snapshot"; String path = "/api/vaserver/v1/snapshot";
JSONObject jsonObject = new JSONObject();
jsonObject.put("task_unid", taskUnid);
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl()) Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
.get() .post()
.uri(uriBuilder -> uriBuilder .uri(uriBuilder -> uriBuilder.path(path).build())
.path(path) .bodyValue(jsonObject.toString())
.queryParam("task_unid", taskUnid)
.build())
.retrieve() .retrieve()
.bodyToMono(String.class); .bodyToMono(String.class);
String response = stringMono.block(Duration.ofSeconds(20));
log.info("截图结果:{}", response);
return response; String block = stringMono.block(Duration.ofSeconds(20));
return JSON.parseObject(block);
} }
/** /**
...@@ -116,7 +116,7 @@ public class VAServerHttpService { ...@@ -116,7 +116,7 @@ public class VAServerHttpService {
String response = stringMono.block(Duration.ofSeconds(10)); String response = stringMono.block(Duration.ofSeconds(10));
log.info("获取分析流地址结果 : {}", response); log.info("获取分析流地址结果 : {}", response);
return response; return JSON.parseObject(response);
} }
/** /**
...@@ -139,8 +139,7 @@ public class VAServerHttpService { ...@@ -139,8 +139,7 @@ public class VAServerHttpService {
String response = stringMono.block(Duration.ofSeconds(20)); String response = stringMono.block(Duration.ofSeconds(20));
log.info("输出分析流结果:{}", response); log.info("输出分析流结果:{}", response);
return JSON.parseObject(response);
return response;
} }
/** /**
...@@ -162,14 +161,13 @@ public class VAServerHttpService { ...@@ -162,14 +161,13 @@ public class VAServerHttpService {
String response = stringMono.block(Duration.ofSeconds(20)); String response = stringMono.block(Duration.ofSeconds(20));
log.info("场景切换结果:{}", response); log.info("场景切换结果:{}", response);
return JSON.parseObject(response);
return response;
} }
/** /**
* 任务轮训状态切换 * 任务轮训状态切换
*/ */
public Object updateRotationStatus(String taskUnid, Integer rotationStatus , VaServerInfo vaServerInfo) { public Object updateRotationStatus(String taskUnid, Integer rotationStatus, VaServerInfo vaServerInfo) {
String path = "/api/vaserver/v1/alternate"; String path = "/api/vaserver/v1/alternate";
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
...@@ -185,29 +183,28 @@ public class VAServerHttpService { ...@@ -185,29 +183,28 @@ public class VAServerHttpService {
String response = stringMono.block(Duration.ofSeconds(20)); String response = stringMono.block(Duration.ofSeconds(20));
log.info("轮训状态控制结果:{}", response); log.info("轮训状态控制结果:{}", response);
return JSON.parseObject(response);
return response;
} }
/** /**
* 任务轮训状态查询 * 任务轮训状态查询
*/ */
public Object getRotationStatus(String taskUnid, VaServerInfo vaServerInfo) { public Object getRotationStatus(String taskUnid, VaServerInfo vaServerInfo) {
String path = "/api/vaserver/v1/alternate"; String path = "/api/vaserver/v1/getAlternate";
JSONObject jsonObject = new JSONObject();
jsonObject.put("task_unid", taskUnid);
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl()) Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
.get() .post()
.uri(uriBuilder -> uriBuilder .uri(uriBuilder -> uriBuilder.path(path).build())
.path(path) .bodyValue(jsonObject.toString())
.queryParam("task_unid", taskUnid)
.build())
.retrieve() .retrieve()
.bodyToMono(String.class); .bodyToMono(String.class);
String response = stringMono.block(Duration.ofSeconds(20)); String response = stringMono.block(Duration.ofSeconds(20));
log.info("获取轮训状态:{}", response); log.info("获取轮训状态:{}", response);
return JSON.parseObject(response);
return response;
} }
/** /**
...@@ -226,8 +223,7 @@ public class VAServerHttpService { ...@@ -226,8 +223,7 @@ public class VAServerHttpService {
String response = stringMono.block(Duration.ofSeconds(20)); String response = stringMono.block(Duration.ofSeconds(20));
log.info("运行状态查询:{}", response); log.info("运行状态查询:{}", response);
return JSON.parseObject(response);
return response;
} }
......
...@@ -51,6 +51,12 @@ public class VAServerService { ...@@ -51,6 +51,12 @@ public class VAServerService {
RBucket<Date> bucket = redisService.getValue(RedisKeys.getVAServerKeepAliveKey(devId)); RBucket<Date> bucket = redisService.getValue(RedisKeys.getVAServerKeepAliveKey(devId));
bucket.set(new Date()); bucket.set(new Date());
bucket.expire(2, TimeUnit.MINUTES); bucket.expire(2, TimeUnit.MINUTES);
VaServerInfo vaserverInfo = vaServerRedisRepository.getVAServerInfoById(devId);
if (vaserverInfo != null) {
vaserverInfo.setStatus(1);
vaServerRedisRepository.addOrUpdate(devId, vaserverInfo);
}
} }
......
package com.viontech.fanxing.task.scheduling.service; package com.viontech.fanxing.task.scheduling.service;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.viontech.fanxing.commons.constant.RedisKeys; import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.model.StoreConfig; import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService; import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.fanxing.task.scheduling.feign.TaskClient; import com.viontech.fanxing.task.scheduling.feign.TaskClient;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.repository.TaskDataRedisRepository;
import com.viontech.fanxing.task.scheduling.repository.VAServerRedisRepository; import com.viontech.fanxing.task.scheduling.repository.VAServerRedisRepository;
import com.viontech.keliu.util.DateUtil;
import com.viontech.keliu.util.JsonMessageUtil; import com.viontech.keliu.util.JsonMessageUtil;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.redisson.api.RScoredSortedSet;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
...@@ -30,7 +30,9 @@ import javax.annotation.Resource; ...@@ -30,7 +30,9 @@ import javax.annotation.Resource;
@RunWith(SpringRunner.class) @RunWith(SpringRunner.class)
class VAServerHttpServiceTest { class VAServerHttpServiceTest {
private static final String TASK_UNID = "0a263320bd274b529e5185e3b05aa157"; private static final String TASK_UNID = "abcd-xxxx-xxxxxx-xxxxxx";
private static final String DEV_ID = "sn3device1";
@Resource @Resource
VAServerHttpService vaServerHttpService; VAServerHttpService vaServerHttpService;
@Resource @Resource
...@@ -45,7 +47,7 @@ class VAServerHttpServiceTest { ...@@ -45,7 +47,7 @@ class VAServerHttpServiceTest {
@BeforeEach @BeforeEach
public void before() { public void before() {
this.vaServerInfo = vaServerRedisRepository.getVAServerInfoById("xxx-xx"); this.vaServerInfo = vaServerRedisRepository.getVAServerInfoById(DEV_ID);
this.taskData = new TaskData(); this.taskData = new TaskData();
Task task = new Task(); Task task = new Task();
task.setUnid(TASK_UNID); task.setUnid(TASK_UNID);
...@@ -65,7 +67,7 @@ class VAServerHttpServiceTest { ...@@ -65,7 +67,7 @@ class VAServerHttpServiceTest {
@Test @Test
void startAnalyzeStream() { void startAnalyzeStream() {
vaServerHttpService.startAnalyzeStream(TASK_UNID, vaServerInfo, "rtsp://192.168.9.159:10087/0a263320bd274b529e5185e3b05aa157"); vaServerHttpService.startAnalyzeStream(TASK_UNID, vaServerInfo, "rtsp://192.168.9.245:10087/abcd-xxxx-xxxxxx-xxxxxx");
} }
@Test @Test
...@@ -90,7 +92,7 @@ class VAServerHttpServiceTest { ...@@ -90,7 +92,7 @@ class VAServerHttpServiceTest {
@Test @Test
void storeConfig() { void storeConfig() {
JsonMessageUtil.JsonMessage<StoreConfig> storeConfigById = taskClient.getStoreConfigById(3L); JsonMessageUtil.JsonMessage<StoreConfig> storeConfigById = taskClient.getStoreConfigById(6L);
System.out.println(JSON.toJSONString(storeConfigById.getData())); System.out.println(JSON.toJSONString(storeConfigById.getData()));
} }
...@@ -100,15 +102,18 @@ class VAServerHttpServiceTest { ...@@ -100,15 +102,18 @@ class VAServerHttpServiceTest {
} }
@Resource
private TaskDataRedisRepository taskDataRedisRepository;
@Test @Test
void test() throws Exception { void test() throws Exception {
// RMap<String, String> map = redisCacheService.getTaskVaServerMap(); // RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
// map.put("0a263320bd274b529e5185e3b05aa157", "xxx-xx"); // taskVaServerMap.put("b0c20c4a-fffd-11eb-a74d-0242ac11001d", DEV_ID);
JsonMessageUtil.JsonMessage<TaskVo> taskVoJsonMessage = taskClient.updateTaskStatus(19L, TaskStatus.RUNNING.val);
RScoredSortedSet<String> set = redisService.getClient().getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET); System.out.println(JSON.toJSONString(taskVoJsonMessage.getData()));
RScoredSortedSet<String> set2 = redisService.getClient().getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
// // RScoredSortedSet<String> set = redisService.getClient().getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET);
set.add(DateUtil.parse("yyyy-MM-dd HH:mm:ss", "2021-08-11 13:18:00").getTime(), "tttttttttttttttttt"); // RScoredSortedSet<String> set2 = redisService.getClient().getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
set2.add(DateUtil.parse("yyyy-MM-dd HH:mm:ss", "2021-08-11 13:20:00").getTime(), "tttttttttttttttttt"); // set.add(DateUtil.parse("yyyy-MM-dd HH:mm:ss", "2021-08-11 13:18:00").getTime(), "tttttttttttttttttt");
// set2.add(DateUtil.parse("yyyy-MM-dd HH:mm:ss", "2021-08-11 13:20:00").getTime(), "tttttttttttttttttt");
} }
} }
\ No newline at end of file \ No newline at end of file
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!