Commit 9a5a1106 by xmh

任务调度:

1. 添加全天不间断运行任务
2. 优化任务删除逻辑
3. 下发任务和终止任务的 runner 共用同一个的分布式锁
4. 添加获取默认算法配置接口,修改'输出分析流接口'为GET请求

任务管理:
1. 新添加的任务如果没有 'scenes' 或 '存储配置' 则不进行任务调度
2. 增加 '手动调度某个任务' 的接口
3. 修改任务逻辑变更,也要对 'scene' 和 '存储配置' 进行判断, 内部调用修改状态接口使用新的方法

运维服务:
1. 增加 nvs3000 拉取接口

网关:
1. 修改配置文件, 'health-check' 改外10s

commons:
1. 添加实时推送分析结果的redisTopicKey
2. 修改 fanXingExceptionHandler 对 FanXingException 异常信息的打印

转发服务:
1. 打开分析结果处理中对任务的判断
2. 添加分析结果实时推送的 websocket
3. 修改 PicProcessor , 在图片处理和保存后进行分析结果的发布
1 parent 4c0f5b97
Showing 25 changed files with 422 additions and 66 deletions
......@@ -27,6 +27,8 @@ public class GlobalExceptionHandler {
public Object fanXingExceptionHandler(FanXingException fanXingException) {
JsonMessageUtil.JsonMessage errorJsonMsg = JsonMessageUtil.getErrorJsonMsg(fanXingException.getMessage());
errorJsonMsg.setData(fanXingException.getData());
StackTraceElement[] stackTrace = fanXingException.getStackTrace();
log.info("\n接口调用出错\n错误信息:[{}]\n错误原因:[{}]", fanXingException.getMessage(), stackTrace.length > 0 ? stackTrace[0].toString() : "未知");
return errorJsonMsg;
}
}
......@@ -28,6 +28,9 @@ public class RedisKeys {
public static final String FORWARD_BEHAVIOR_QUEUE = "forward:behaviorQueue";
/** 用来接收所有数据的队列,用于转发 */
public static final String FORWARD_FORWARD_QUEUE = "forward:forwardQueue";
/** topic,发布图片实时推送信息 */
public static final String FORWARD_PIC_PUSH_REAL_TIME = "forward:picPushRealTime";
/**
* 用来获取 vaServer 的心跳在 redis 中对应的 key
......
......@@ -19,6 +19,10 @@
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>com.viontech</groupId>
<artifactId>fanxing-commons</artifactId>
<version>${parent.version}</version>
......
package com.viontech.fanxing.forward;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
......@@ -12,9 +11,9 @@ import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
......@@ -30,6 +29,7 @@ import java.util.concurrent.TimeUnit;
@EnableFeignClients
@EnableTransactionManagement
@EnableBatchProcessing
@EnableWebSocket
@Slf4j
public class ForwardApp {
......
......@@ -17,7 +17,7 @@ public class WorkQueueClearChunkListener implements ChunkListener {
@Override
public void beforeChunk(ChunkContext context) {
logger.info("WorkQueueClearChunkListener.beforeChunk");
logger.debug("WorkQueueClearChunkListener.beforeChunk");
}
@Override
......
......@@ -32,19 +32,21 @@ public class BehaviorProcessor implements ItemStream, ItemProcessor<JSONObject,
@Override
public Behavior process(JSONObject item) throws Exception {
log.info("收到 behavior 消息 , eventId:{}",item.getString("event_refid"));
// final Map<String, Task> taskMap = cacheUtils.getTaskMap();
// String taskUnid = item.getString("task_id");
// final Task task = taskMap.get(taskUnid);
// if (task == null) {
// log.info("无法找到对应task:{}", item.toJSONString());
// return null;
// }
// final Long taskId = task.getId();
log.info("收到 behavior 消息 , eventId:{}", item.getString("event_refid"));
final Map<String, Task> taskMap = cacheUtils.getTaskMap();
String taskUnid = item.getString("subtask_id");
final Task task = taskMap.get(taskUnid);
Long taskId;
if (task == null) {
log.info("无法找到对应task:{}", item.toJSONString());
taskId = null;
} else {
taskId = task.getId();
}
Behavior behavior = new Behavior();
// behavior.setTaskId(taskId);
behavior.setTaskId(taskId);
String eventType = item.getString("event_type");
String eventRefid = item.getString("event_refid");
......
......@@ -2,9 +2,13 @@ package com.viontech.fanxing.forward.batch.processor;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.forward.util.PicUtils;
import com.viontech.keliu.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStream;
......@@ -27,14 +31,16 @@ public class PicProcessor implements ItemStream, ItemProcessor<JSONObject, JSONO
@Resource
private PicUtils picUtils;
@Resource
private RedissonClient redissonClient;
@Override
public JSONObject process(JSONObject item) throws Exception {
JSONArray pics = item.getJSONArray("pics");
ArrayList<String> picList = new ArrayList<>();
if (pics != null && pics.size() > 0) {
String eventDt = item.getString("event_dt");
Date eventTime = DateUtil.parse(DateUtil.FORMAT_FULL, eventDt);
ArrayList<String> picList = new ArrayList<>();
for (int i = 0; i < pics.size(); i++) {
JSONObject picObject = pics.getJSONObject(i);
String base64 = picObject.getString("pic_base64");
......@@ -51,6 +57,16 @@ public class PicProcessor implements ItemStream, ItemProcessor<JSONObject, JSONO
String picPathArray = String.join(",", picList);
item.put("pic_path_array", picPathArray);
}
// 实时推送
if (picList.size() > 0 && StringUtils.isNotBlank(item.getString("vchan_refid"))) {
try {
RTopic topic = redissonClient.getTopic(RedisKeys.FORWARD_PIC_PUSH_REAL_TIME);
topic.publish(item);
} catch (Exception e) {
log.info("发布实时图片信息失败", e);
}
}
return item;
}
......
......@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.model.FlowData;
import com.viontech.fanxing.commons.model.FlowEvent;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.forward.model.TrafficFlowContent;
import com.viontech.fanxing.forward.util.CacheUtils;
import com.viontech.keliu.util.DateUtil;
......@@ -19,6 +20,7 @@ import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* .
......@@ -37,15 +39,16 @@ public class TrafficFlowProcessor implements ItemProcessor<JSONObject, TrafficFl
public TrafficFlowContent process(JSONObject item) throws Exception {
log.info("收到 flow 消息 , eventId:{}", item.getString("event_refid"));
// final Map<String, Task> taskMap = cacheUtils.getTaskMap();
// String taskUnid = item.getString("task_id");
// final Task task = taskMap.get(taskUnid);
// if (task == null) {
// log.info("无法找到对应task:{}", item.toJSONString());
// return null;
// }
// final Long taskId = task.getId();
Long taskId = 0L;
final Map<String, Task> taskMap = cacheUtils.getTaskMap();
String taskUnid = item.getString("subtask_id");
final Task task = taskMap.get(taskUnid);
Long taskId;
if (task == null) {
log.info("无法找到对应task:{}", item.toJSONString());
taskId = null;
} else {
taskId = task.getId();
}
TrafficFlowContent content = new TrafficFlowContent();
FlowEvent flowEvent = new FlowEvent();
......@@ -68,7 +71,7 @@ public class TrafficFlowProcessor implements ItemProcessor<JSONObject, TrafficFl
flowEvent.setEventId(eventRefid);
flowEvent.setEventTime(eventTime);
flowEvent.setPics(picArray);
// flowEvent.setTaskId(taskId);
flowEvent.setTaskId(taskId);
if (eventData != null) {
// 地理位置
......@@ -111,7 +114,7 @@ public class TrafficFlowProcessor implements ItemProcessor<JSONObject, TrafficFl
JSONObject item = array.getJSONObject(i);
FlowData flowData = new FlowData();
// flowData.setTaskId(taskId);
flowData.setTaskId(taskId);
flowData.setDetectionType(detectionType);
flowData.setEventTime(eventTime);
flowData.setDetectionType(detectionType);
......
......@@ -2,6 +2,7 @@ package com.viontech.fanxing.forward.batch.processor;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.Traffic;
import com.viontech.fanxing.commons.model.TrafficFace;
import com.viontech.fanxing.forward.model.TrafficContent;
......@@ -18,6 +19,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date;
import java.util.Map;
/**
* .
......@@ -36,15 +38,17 @@ public class TrafficProcessor implements ItemProcessor<JSONObject, TrafficConten
@Override
public TrafficContent process(JSONObject item) throws Exception {
log.info("收到 traffic 消息 , eventId:{}", item.getString("event_refid"));
// final Map<String, Task> taskMap = cacheUtils.getTaskMap();
//
// String taskUnid = item.getString("task_id");
// final Task task = taskMap.get(taskUnid);
// if (task == null) {
// log.info("无法找到对应task:{}", item.toJSONString());
// return null;
// }
// final Long taskId = task.getId();
final Map<String, Task> taskMap = cacheUtils.getTaskMap();
String taskUnid = item.getString("subtask_id");
final Task task = taskMap.get(taskUnid);
Long taskId;
if (task == null) {
log.info("无法找到对应task:{}", item.toJSONString());
taskId = null;
} else {
taskId = task.getId();
}
TrafficContent content = new TrafficContent();
Traffic traffic = new Traffic();
......@@ -67,14 +71,13 @@ public class TrafficProcessor implements ItemProcessor<JSONObject, TrafficConten
eventRefid = eventData.getString("ID");
}
// todo taskId
traffic.setEventCate(eventCate);
traffic.setEventType(eventType);
traffic.setEventId(eventRefid);
traffic.setEventTime(eventTime);
traffic.setChannelUnid(channelUnid);
traffic.setPics(picArray);
// traffic.setTaskId(taskId);
traffic.setTaskId(taskId);
//非机动车类型
......
package com.viontech.fanxing.forward.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
* .
*
* @author 谢明辉
* @date 2021/8/30
*/
@Configuration
public class ForwardWebConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
package com.viontech.fanxing.forward.runner;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.forward.ws.PicRealTimeEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* .
*
* @author 谢明辉
* @date 2021/8/30
*/
@Component
@Slf4j
public class PicRealTimeRunner implements CommandLineRunner {
@Resource
private RedissonClient redissonClient;
@Override
public void run(String... args) throws Exception {
RTopic topic = redissonClient.getTopic(RedisKeys.FORWARD_PIC_PUSH_REAL_TIME);
topic.addListener(JSONObject.class, (channel, msg) -> {
try {
String channelUnid = msg.getString("vchan_refid");
PicRealTimeEndpoint.sendInfo(channelUnid, msg);
} catch (Exception e) {
log.info("推送失败", e);
}
});
}
}
......@@ -29,7 +29,7 @@ import java.util.stream.Collectors;
@Slf4j
public class CacheUtils {
private static final Cache<Object, Object> CACHE = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(10)).build();
private static final Cache<Object, Object> CACHE = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(2)).build();
@Resource
private TaskManagerFeignClient taskManagerFeignClient;
......
package com.viontech.fanxing.forward.ws;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.MultimapBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
/**
* .
*
* @author 谢明辉
* @date 2021/8/30
*/
@ServerEndpoint("/picRealTime/{channelUnid}")
@Component
@Slf4j
public class PicRealTimeEndpoint {
private final static ListMultimap<String, Session> SESSION_MAP = MultimapBuilder.hashKeys().linkedListValues().build();
public static void sendInfo(String channelUnid, JSONObject info) {
try {
List<Session> sessions = SESSION_MAP.get(channelUnid);
for (Session session : sessions) {
try {
sendInfo(session, info.toJSONString());
} catch (Exception ignore) {
}
}
} catch (Exception e) {
log.error("", e);
}
}
public static void sendInfo(Session session, String msg) throws IOException {
session.getBasicRemote().sendText(msg);
}
@OnOpen
public void onOpen(Session session, @PathParam("channelUnid") String channelUnid) {
log.info("websocket 上线, channelUnid:{}, sessionId:{}", channelUnid, session.getId());
SESSION_MAP.put(channelUnid, session);
}
@OnClose
public void onClose(Session session, @PathParam("channelUnid") String channelUnid) {
log.info("websocket 下线, channelUnid:{}, sessionId:{}", channelUnid, session.getId());
SESSION_MAP.remove(channelUnid, session);
}
@OnError
public void onError(Session session, Throwable error) {
log.info("", error);
}
}
package com.viontech.fanxing.forward;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.constant.RedisKeys;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
/**
* .
*
......@@ -16,9 +23,14 @@ import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
public class Test0 {
@Resource
private RedissonClient redissonClient;
@Test
public void test() {
JSONObject jsonObject = JSON.parseObject("{\"subtask_id\":\"abcd-xxxx-xxxxxx-xxxxxx\",\"dev_unid\":\"\",\"event_dt\":\"2021-08-30 05:34:26.034\",\"vchan_refid\":\"12333\",\"aux_dev_info\":{\"camera_hwid\":\"\"},\"source_type\":\"pull_video_stream\",\"task_id\":\"\",\"event_data\":{\"start_dt\":\"2021-08-30 05:34:26.034\",\"end_dt\":\"2021-08-30 05:34:26.034\",\"location\":{\"code\":\"\",\"name\":\"\",\"department\":{\"code\":\"\",\"name\":\"\"},\"drive_direction\":{\"code\":\"1\"},\"direction\":{\"code\":\"1\"}},\"model\":\"1\",\"ID\":\"2021083013342719900000000020242ac110009\",\"device\":{\"code\":\"\",\"ip\":\"127.0.0.1\",\"host\":\"172.17.0.9\",\"name\":\"\",\"model\":\"\"},\"illegal\":{\"state\":0},\"uservehicle\":{\"vehicletype\":\"NULL\",\"platetype\":\"NULL\",\"platecolor\":\"NULL\",\"logotype\":\"NULL\",\"vehiclecolor\":\"NULL\"},\"version\":\"VT-CAM-S2E-1.0.0\",\"lane\":{\"number\":0,\"code\":\"00\"},\"speed\":19.440,\"vehicle\":{\"vehicleDetRect\":{\"top\":0.2810,\"left\":0.8330,\"bottom\":0.4010,\"right\":0.9270},\"driver\":{\"safe_belt\":[{\"score\":0,\"state\":\"1\"},{\"score\":0,\"state\":\"1\"}],\"face\":[{\"rect\":{\"top\":0.0,\"left\":0.0,\"bottom\":0.0,\"right\":0.0},\"state\":\"-1\"},{\"rect\":{\"top\":0.0,\"left\":0.0,\"bottom\":0.0,\"right\":0.0},\"state\":\"-1\"}],\"holdingchild\":{\"score\":0,\"state\":\"0\"},\"phone\":{\"score\":0,\"state\":\"0\"}},\"objectClass\":1,\"model\":1,\"plate\":{\"rect\":{\"top\":0.2657407522201538,\"left\":0.8067708611488342,\"bottom\":0.3962962925434113,\"right\":0.9369791746139526},\"score\":0,\"type_code_alg\":\"0\",\"char_count\":2,\"head_text\":\"\",\"text\":\"无牌\",\"color_code\":\"0\",\"other_text\":\"\",\"type_code\":\"02\"},\"body\":{\"color\":{\"score\":47,\"code\":\"A\"},\"logo\":{\"image\":\"全景1\",\"rect\":{\"top\":0.0,\"left\":0.0,\"bottom\":1.0,\"right\":1.0},\"score\":18,\"code\":\"74\",\"name\":\"致观\"},\"type\":{\"special_type\":0,\"score\":82,\"code\":\"K33\"}},\"b_dangerous\":0}},\"pic_path_array\":\"/images/20210830/11/acb72636-a1d5-4687-ac4b-edaec8a5aef1.jpg\",\"event_cate\":\"traffic\",\"thread_id\":\"abcd-xxxx-xxxxxx-xxxxxx\",\"event_refid\":\"\",\"event_type\":\"vehicle\",\"vdev_unid\":\"\",\"pics\":[{\"ofilename\":\"20210830133426034_无牌_车道0.jpg\",\"format\":\"jpg\",\"hight\":1160,\"width\":3860,\"object_rect\":{\"top\":0.3163793103448276,\"left\":0.4012953367875648,\"bottom\":0.4379310344827586,\"right\":0.4660621761658031},\"shoot_dt\":\"2021-08-30 05:34:26.034\",\"feature_rect\":{\"top\":0.4137931034482759,\"left\":0.4295336787564767,\"bottom\":0.4224137931034483,\"right\":0.4375647668393782}}]}");
RTopic topic = redissonClient.getTopic(RedisKeys.FORWARD_PIC_PUSH_REAL_TIME);
topic.publish(jsonObject);
}
}
......@@ -3,6 +3,8 @@ spring:
loadbalancer:
ribbon:
enabled: false
health-check:
interval: 10s
gateway:
enabled: true
globalcors:
......
package com.viontech.fanxing.ops.controller.web;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.base.BaseExample;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.model.ChannelExample;
import com.viontech.fanxing.commons.vo.ChannelVo;
import com.viontech.fanxing.ops.controller.base.ChannelBaseController;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.client.WebClient;
@Controller
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@RestController
@RequestMapping("/channels")
@Slf4j
public class ChannelController extends ChannelBaseController {
@Override
......@@ -16,4 +29,69 @@ public class ChannelController extends ChannelBaseController {
ChannelExample channelExample = (ChannelExample) super.getExample(channelVo, type);
return channelExample;
}
@PostMapping("/nvs3000")
public Object nvs3000(@RequestBody JSONObject jsonObject) {
String addressUnid = jsonObject.getString("addressUnid");
String nvsUrl = jsonObject.getString("nvsUrl");
String nvsRegex = jsonObject.getString("nvsRegex");
ChannelExample channelExample = new ChannelExample();
channelExample.createCriteria().andAddressUnidEqualTo(addressUnid).andExpandEqualTo("nvs3000");
channelExample.createColumns().hasChannelUnidColumn();
List<Channel> channels = getService().selectByExample(channelExample);
Set<String> channelUnidSet = channels.stream().map(Channel::getChannelUnid).collect(Collectors.toSet());
JSONObject nvsPostData = new JSONObject();
nvsPostData.put("fromindex", 0);
nvsPostData.put("toindex", -1);
JSONObject nvsResponse = WebClient.create()
.post()
.uri(nvsUrl + "/nvsthird/getcamlist")
.bodyValue(nvsPostData)
.retrieve()
.bodyToMono(JSONObject.class)
.block(Duration.ofSeconds(10));
if (nvsResponse == null || !nvsResponse.containsKey("group")) {
return JsonMessageUtil.getSuccessJsonMsg("没有数据");
}
JSONArray group = nvsResponse.getJSONArray("group");
int repeat = 0;
int filtered = 0;
int success = 0;
for (int i = 0; i < group.size(); i++) {
JSONObject item = group.getJSONObject(i);
String description = item.getString("description");
if (StringUtils.isBlank(description)) {
filtered++;
continue;
}
String name = item.getString("name");
String id = item.getString("id");
if (channelUnidSet.contains(id)) {
repeat++;
continue;
}
Channel channel = new Channel();
channel.setChannelUnid(id);
channel.setName(name);
channel.setAddressUnid(addressUnid);
channel.setType(1);
channel.setExpand("nvs3000");
channel.setStreamType(0);
channel.setStreamPath(nvsRegex + id);
getService().insertSelective(channel);
success++;
}
jsonObject.put("filtered", filtered);
jsonObject.put("repeat", repeat);
jsonObject.put("insert", success);
log.info("拉取nvs3000 : {}", jsonObject);
return JsonMessageUtil.getSuccessJsonMsg(jsonObject);
}
}
\ No newline at end of file
......@@ -45,10 +45,18 @@ public class TaskController extends TaskBaseController {
@PutMapping("/{id}")
public JsonMessageUtil.JsonMessage<TaskVo> updateStatus(@PathVariable("id") Long id, @RequestParam Integer status) {
TaskVo taskVo = new TaskVo();
taskVo.setStatus(status);
taskVo.setId(id);
int i = taskService.updateByPrimaryKeySelective(taskVo);
taskService.updateStatus(id, status);
return JsonMessageUtil.getSuccessJsonMsg("success");
}
@GetMapping("/startTask/{id}")
public JsonMessageUtil.JsonMessage<TaskVo> startTask(@PathVariable("id") Long id) {
try {
taskService.startTask(id);
} catch (Exception e) {
log.error("启动任务异常", e);
return JsonMessageUtil.getErrorJsonMsg(e.getMessage());
}
return JsonMessageUtil.getSuccessJsonMsg("success");
}
}
\ No newline at end of file
......@@ -10,4 +10,8 @@ public interface TaskService extends BaseService<Task> {
TaskVo updateTask(Task task);
void removeTask(Long id);
void updateStatus(Long id, Integer status);
void startTask(Long id) throws Exception;
}
\ No newline at end of file
......@@ -2,12 +2,14 @@ package com.viontech.fanxing.task.manager.service.impl;
import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.base.BaseServiceImpl;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.fanxing.task.manager.feign.TaskSchedulingClient;
import com.viontech.fanxing.task.manager.mapper.TaskMapper;
import com.viontech.fanxing.task.manager.service.adapter.TaskService;
import com.viontech.keliu.util.JsonMessageUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
......@@ -31,25 +33,28 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
task = insertSelective(task);
task = selectByPrimaryKey(task.getId());
JsonMessageUtil.JsonMessage add = taskSchedulingClient.add(task);
if (add.isSuccess()) {
return new TaskVo(task);
} else {
throw new RuntimeException(add.getMsg());
if (StringUtils.isNotBlank(task.getScene()) && task.getStoreConfigId() != null) {
JsonMessageUtil.JsonMessage add = taskSchedulingClient.add(task);
if (!add.isSuccess()) {
throw new RuntimeException(add.getMsg());
}
}
return new TaskVo(task);
}
@Transactional(rollbackFor = Exception.class)
@Override
public TaskVo updateTask(Task task) {
updateByPrimaryKeySelective(task);
task = selectByPrimaryKey(task.getId());
JsonMessageUtil.JsonMessage update = taskSchedulingClient.update(task);
if (update.isSuccess()) {
updateByPrimaryKey(task);
return new TaskVo(task);
} else {
throw new RuntimeException(update.getMsg());
if (StringUtils.isNotBlank(task.getScene()) && task.getStoreConfigId() != null) {
JsonMessageUtil.JsonMessage add = taskSchedulingClient.add(task);
if (!add.isSuccess()) {
throw new RuntimeException(add.getMsg());
}
}
return new TaskVo(task);
}
@Override
......@@ -63,4 +68,35 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
throw new RuntimeException(delete.getMsg());
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public void updateStatus(Long id, Integer status) {
TaskVo taskVo = new TaskVo();
taskVo.setStatus(status);
taskVo.setId(id);
updateByPrimaryKeySelective(taskVo);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void startTask(Long id) throws Exception {
Task task = selectByPrimaryKey(id);
if (StringUtils.isBlank(task.getScene())) {
throw new IllegalArgumentException("场景配置为空,无法执行");
}
if (task.getStoreConfigId() == null) {
throw new IllegalArgumentException("存储配置为空,无法执行");
}
TaskVo taskVo = new TaskVo();
taskVo.setStatus(TaskStatus.PAUSE.val);
taskVo.setId(id);
updateByPrimaryKeySelective(taskVo);
JsonMessageUtil.JsonMessage add = taskSchedulingClient.add(task);
if (!add.isSuccess()) {
throw new RuntimeException(add.getMsg());
}
}
}
\ No newline at end of file
......@@ -77,9 +77,13 @@ public class TaskController implements TaskSchedulingTasksAdapter {
@Override
@DeleteMapping
public JsonMessageUtil.JsonMessage delete(@RequestParam("taskUnid") String taskUnid) {
vaServerService.terminateTask(taskUnid);
taskService.removeTaskDataAll(taskUnid);
// todo
return JsonMessageUtil.getSuccessJsonMsg("success");
boolean success = vaServerService.terminateTask(taskUnid);
if (success) {
taskService.removeTaskDataAll(taskUnid);
return JsonMessageUtil.getSuccessJsonMsg("success");
} else {
return JsonMessageUtil.getErrorJsonMsg("failed");
}
}
}
package com.viontech.fanxing.task.scheduling.controller;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.service.VAServerService;
import com.viontech.keliu.util.JsonMessageUtil;
......@@ -76,8 +77,10 @@ public class VAServerController {
/**
* 输出分析流,每30秒调一次,不调用就不再输出视频流
*/
@GetMapping("/startAnalyzeStream")
public Object startAnalyzeStream(@RequestParam String taskUnid, @RequestParam String url) {
@PostMapping("/startAnalyzeStream")
public Object startAnalyzeStream(@RequestBody JSONObject object) {
String taskUnid = object.getString("taskUnid");
String url = object.getString("url");
return vaServerService.startAnalyzeStream(taskUnid, url);
}
......@@ -116,5 +119,12 @@ public class VAServerController {
return vaServerService.getRotationStatus(taskUnid);
}
/**
* @param type 任务的类型 0 交通, 1 客流, 2 安防, 3 违停, 4 人脸
*/
@GetMapping("/getDefaultAlgorithmConfig")
public Object getDefaultAlgorithmConfig(@RequestParam String type) {
return vaServerService.getDefaultAlgorithmConfig(type);
}
}
......@@ -84,10 +84,15 @@ public class RuntimeConfig implements Serializable {
this.weekConfigMap.put(i, weekConfig);
}
break;
// 随机执行
case 3:
Long runningTime = config.getLong("runningTime");
this.singleConfig = new Config().setRunningTime(runningTime);
break;
// 开始后一直执行
case 4:
this.singleConfig = new Config().setStartTime(LocalDateTime.now());
break;
default:
throw new InvalidParameterException("错误的type");
}
......@@ -158,6 +163,9 @@ public class RuntimeConfig implements Serializable {
case 3:
// 随机执行,待完善
break;
case 4:
executeTimestamp = singleConfig.getStartTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
break;
default:
return ImmutablePair.nullPair();
}
......
......@@ -2,11 +2,10 @@ package com.viontech.fanxing.task.scheduling.runner;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.scheduling.feign.TaskClient;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.scheduling.service.TaskService;
import com.viontech.fanxing.task.scheduling.service.VAServerService;
import lombok.extern.slf4j.Slf4j;
......@@ -41,7 +40,7 @@ public class TaskRunner {
@Scheduled(fixedDelay = 5000)
public void executedTaskListener() {
RLock jobLock = redisService.getLockMust("lock:executedTaskListener");
RLock jobLock = redisService.getLockMust("lock:taskRunner");
RLock devLock = null;
try {
RScoredSortedSet<String> set = redisService.getToBeExecutedTaskUnidSet();
......@@ -110,7 +109,7 @@ public class TaskRunner {
@Scheduled(fixedDelay = 5000)
public void terminatedTaskListener() {
RLock jobLock = redisService.getLockMust("lock:terminatedTaskListener");
RLock jobLock = redisService.getLockMust("lock:taskRunner");
try {
RScoredSortedSet<String> set = redisService.getToBeTerminatedTaskUnidSet();
......
......@@ -226,5 +226,24 @@ public class VAServerHttpService {
return JSON.parseObject(response);
}
/**
* 获取默认算法配置
*/
public Object getDefaultAlgorithmConfig(VaServerInfo vaServerInfo, String taskAlgType) {
String path = "/api/vaserver/v1/get_algo_param_template";
JSONObject jsonObject = new JSONObject();
jsonObject.put("task_algo_type", taskAlgType);
Mono<JSONObject> mono = WebClient.create(vaServerInfo.getServiceBaseUrl())
.post()
.uri(path)
.bodyValue(jsonObject)
.retrieve()
.bodyToMono(JSONObject.class);
JSONObject block = mono.block(Duration.ofSeconds(20));
log.info("默认配置获取:{}", block);
return block;
}
}
......@@ -96,6 +96,9 @@ public class VAServerService {
// 如果vaServerId不为空,需要终止任务
if (vaServerId != null) {
VaServerInfo vaServerInfo = vaServerRedisRepository.getVAServerInfoById(vaServerId);
if (0 == vaServerInfo.getStatus()) {
return false;
}
// 下发终止任务请求
vaServerHttpService.rmTask(taskUnid, vaServerInfo);
// 解除任务和 vaServer 关联, 恢复资源数量
......@@ -222,6 +225,21 @@ public class VAServerService {
}
}
public Object getDefaultAlgorithmConfig(String taskAlgType) {
RMap<String, VaServerInfo> map = vaServerRedisRepository.getVaServerInfoMap();
VaServerInfo temp = null;
for (VaServerInfo item : map.readAllValues()) {
if (item.getStatus() == 1) {
temp = item;
break;
}
}
if (temp == null) {
throw new FanXingException("没有在线的VAServer");
}
return vaServerHttpService.getDefaultAlgorithmConfig(temp, taskAlgType);
}
public VAServerRedisRepository getVaServerRedisRepository() {
return vaServerRedisRepository;
}
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!