Commit adb48378 by xmh

转发服务:

1. <fix> 修复获取全部任务时参数解析的错误
2. <feat> 能够保存视频,解析结果保存视频字段
1 parent 9a5a1106
Showing 18 changed files with 133 additions and 17 deletions
...@@ -107,6 +107,7 @@ public class BehaviorJobConfiguration { ...@@ -107,6 +107,7 @@ public class BehaviorJobConfiguration {
delegates.add(behaviorProcessor); delegates.add(behaviorProcessor);
CompositeItemStreamProcessor<JSONObject, Behavior> item = new CompositeItemStreamProcessor<>(); CompositeItemStreamProcessor<JSONObject, Behavior> item = new CompositeItemStreamProcessor<>();
item.setName("behaviorProcessorCompose");
item.setDelegates(delegates); item.setDelegates(delegates);
return item; return item;
} }
......
...@@ -106,6 +106,7 @@ public class TrafficFlowJobConfiguration { ...@@ -106,6 +106,7 @@ public class TrafficFlowJobConfiguration {
delegates.add(trafficFlowProcessor); delegates.add(trafficFlowProcessor);
CompositeItemStreamProcessor<JSONObject, TrafficFlowContent> item = new CompositeItemStreamProcessor<>(); CompositeItemStreamProcessor<JSONObject, TrafficFlowContent> item = new CompositeItemStreamProcessor<>();
item.setName("trafficFlowProcessorCompose");
item.setDelegates(delegates); item.setDelegates(delegates);
return item; return item;
} }
......
...@@ -107,6 +107,7 @@ public class TrafficJobConfiguration { ...@@ -107,6 +107,7 @@ public class TrafficJobConfiguration {
delegates.add(trafficProcessor); delegates.add(trafficProcessor);
CompositeItemStreamProcessor<JSONObject, TrafficContent> item = new CompositeItemStreamProcessor<>(); CompositeItemStreamProcessor<JSONObject, TrafficContent> item = new CompositeItemStreamProcessor<>();
item.setName("trafficProcessorCompose");
item.setDelegates(delegates); item.setDelegates(delegates);
return item; return item;
} }
......
...@@ -69,6 +69,8 @@ public class BehaviorProcessor implements ItemStream, ItemProcessor<JSONObject, ...@@ -69,6 +69,8 @@ public class BehaviorProcessor implements ItemStream, ItemProcessor<JSONObject,
behavior.setChannelUnid(channelUnid); behavior.setChannelUnid(channelUnid);
behavior.setTaskName(taskName); behavior.setTaskName(taskName);
behavior.setPics(picArray); behavior.setPics(picArray);
String videoName = item.getString("video_name");
behavior.setVideo(videoName);
behavior.setEventData(eventData == null ? null : eventData.toJSONString()); behavior.setEventData(eventData == null ? null : eventData.toJSONString());
behavior.setVideo(video == null ? null : video.toJSONString()); behavior.setVideo(video == null ? null : video.toJSONString());
......
...@@ -20,6 +20,7 @@ public class CompositeItemStreamProcessor<I, O> extends CompositeItemProcessor<I ...@@ -20,6 +20,7 @@ public class CompositeItemStreamProcessor<I, O> extends CompositeItemProcessor<I
private static final Logger logger = LoggerFactory.getLogger(CompositeItemStreamProcessor.class); private static final Logger logger = LoggerFactory.getLogger(CompositeItemStreamProcessor.class);
private boolean ignoreItemStream = false; private boolean ignoreItemStream = false;
private List<? extends ItemProcessor<?, ?>> delegates; private List<? extends ItemProcessor<?, ?>> delegates;
private String name;
public void setIgnoreItemStream(boolean ignoreItemStream) { public void setIgnoreItemStream(boolean ignoreItemStream) {
this.ignoreItemStream = ignoreItemStream; this.ignoreItemStream = ignoreItemStream;
...@@ -37,10 +38,9 @@ public class CompositeItemStreamProcessor<I, O> extends CompositeItemProcessor<I ...@@ -37,10 +38,9 @@ public class CompositeItemStreamProcessor<I, O> extends CompositeItemProcessor<I
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
result = processItem(delegate, result); result = processItem(delegate, result);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
long c = end - start;
logger.info(delegate.getClass().getSimpleName() + "Processor执行时间:" + (end - start)); logger.info(delegate.getClass().getSimpleName() + "Processor执行时间:" + (end - start));
} }
logger.info(item.getClass().getSimpleName() + "整体处理时间:" + (System.currentTimeMillis() - startAll)); logger.info(name == null ? item.getClass().getSimpleName() : name + "整体处理时间:" + (System.currentTimeMillis() - startAll));
return (O) result; return (O) result;
} }
...@@ -97,4 +97,13 @@ public class CompositeItemStreamProcessor<I, O> extends CompositeItemProcessor<I ...@@ -97,4 +97,13 @@ public class CompositeItemStreamProcessor<I, O> extends CompositeItemProcessor<I
} }
} }
} }
public String getName() {
return name;
}
public CompositeItemStreamProcessor<I, O> setName(String name) {
this.name = name;
return this;
}
} }
...@@ -36,11 +36,11 @@ public class PicProcessor implements ItemStream, ItemProcessor<JSONObject, JSONO ...@@ -36,11 +36,11 @@ public class PicProcessor implements ItemStream, ItemProcessor<JSONObject, JSONO
@Override @Override
public JSONObject process(JSONObject item) throws Exception { public JSONObject process(JSONObject item) throws Exception {
JSONArray pics = item.getJSONArray("pics");
ArrayList<String> picList = new ArrayList<>();
if (pics != null && pics.size() > 0) {
String eventDt = item.getString("event_dt"); String eventDt = item.getString("event_dt");
Date eventTime = DateUtil.parse(DateUtil.FORMAT_FULL, eventDt); Date eventTime = DateUtil.parse(DateUtil.FORMAT_FULL, eventDt);
ArrayList<String> picList = new ArrayList<>();
if (item.containsKey("pics") && item.getJSONArray("pics").size() > 0) {
JSONArray pics = item.getJSONArray("pics");
for (int i = 0; i < pics.size(); i++) { for (int i = 0; i < pics.size(); i++) {
JSONObject picObject = pics.getJSONObject(i); JSONObject picObject = pics.getJSONObject(i);
String base64 = picObject.getString("pic_base64"); String base64 = picObject.getString("pic_base64");
...@@ -57,6 +57,25 @@ public class PicProcessor implements ItemStream, ItemProcessor<JSONObject, JSONO ...@@ -57,6 +57,25 @@ public class PicProcessor implements ItemStream, ItemProcessor<JSONObject, JSONO
String picPathArray = String.join(",", picList); String picPathArray = String.join(",", picList);
item.put("pic_path_array", picPathArray); item.put("pic_path_array", picPathArray);
} }
// 视频
if (item.containsKey("video") && item.getJSONArray("video").size() > 0) {
ArrayList<String> videoNameList = new ArrayList<>();
JSONArray videoArr = item.getJSONArray("video");
for (int i = 0; i < videoArr.size(); i++) {
JSONObject videoObject = videoArr.getJSONObject(i);
String unid = videoObject.getString("unid");
String format = videoObject.getString("format");
String videoPath = picUtils.getFilePath(unid, eventTime, format == null ? "mp4" : format);
videoNameList.add(videoPath);
}
String videoName = String.join(",", videoNameList);
item.put("video_name", videoName);
}
// 实时推送 // 实时推送
if (picList.size() > 0 && StringUtils.isNotBlank(item.getString("vchan_refid"))) { if (picList.size() > 0 && StringUtils.isNotBlank(item.getString("vchan_refid"))) {
try { try {
......
...@@ -72,6 +72,8 @@ public class TrafficFlowProcessor implements ItemProcessor<JSONObject, TrafficFl ...@@ -72,6 +72,8 @@ public class TrafficFlowProcessor implements ItemProcessor<JSONObject, TrafficFl
flowEvent.setEventTime(eventTime); flowEvent.setEventTime(eventTime);
flowEvent.setPics(picArray); flowEvent.setPics(picArray);
flowEvent.setTaskId(taskId); flowEvent.setTaskId(taskId);
String videoName = item.getString("video_name");
flowEvent.setVideoName(videoName);
if (eventData != null) { if (eventData != null) {
// 地理位置 // 地理位置
......
...@@ -78,6 +78,8 @@ public class TrafficProcessor implements ItemProcessor<JSONObject, TrafficConten ...@@ -78,6 +78,8 @@ public class TrafficProcessor implements ItemProcessor<JSONObject, TrafficConten
traffic.setChannelUnid(channelUnid); traffic.setChannelUnid(channelUnid);
traffic.setPics(picArray); traffic.setPics(picArray);
traffic.setTaskId(taskId); traffic.setTaskId(taskId);
String videoName = item.getString("video_name");
traffic.setVideoName(videoName);
//非机动车类型 //非机动车类型
......
...@@ -3,7 +3,11 @@ package com.viontech.fanxing.forward.controller; ...@@ -3,7 +3,11 @@ package com.viontech.fanxing.forward.controller;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.constant.RedisKeys; import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.forward.model.VideoResult;
import com.viontech.fanxing.forward.util.PicUtils;
import com.viontech.keliu.util.JsonMessageUtil; import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.redisson.api.RBlockingDeque; import org.redisson.api.RBlockingDeque;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
...@@ -12,6 +16,8 @@ import org.springframework.web.bind.annotation.RequestBody; ...@@ -12,6 +16,8 @@ import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.File;
import java.util.Date;
/** /**
* . * .
...@@ -21,11 +27,14 @@ import javax.annotation.Resource; ...@@ -21,11 +27,14 @@ import javax.annotation.Resource;
*/ */
@RestController @RestController
@Slf4j
public class DataReceiveController { public class DataReceiveController {
@Resource @Resource
private RedissonClient redissonClient; private RedissonClient redissonClient;
@Value("${vion.forward.enable:false}") @Value("${vion.forward.enable:false}")
private Boolean enableForward; private Boolean enableForward;
@Resource
private PicUtils picUtils;
@PostMapping("/result") @PostMapping("/result")
public Object result(@RequestBody String analysisResultStr) { public Object result(@RequestBody String analysisResultStr) {
...@@ -58,4 +67,19 @@ public class DataReceiveController { ...@@ -58,4 +67,19 @@ public class DataReceiveController {
return JsonMessageUtil.getSuccessJsonMsg("OK"); return JsonMessageUtil.getSuccessJsonMsg("OK");
} }
@PostMapping("/result/video")
public Object video(VideoResult videoResult) throws Exception {
String unid = videoResult.getRefid();
log.info("接收到视频录像文件,refId:{},文件名:{}", unid, videoResult.getFile().getOriginalFilename());
// todo 之后会加上 eventTime 字段,目前先 new Date()
String filePath = picUtils.getFilePath(unid, new Date(), videoResult.getFormat() == null ? "mp4" : videoResult.getFormat());
File file = new File(filePath);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
FileUtils.copyToFile(videoResult.getFile().getInputStream(), file);
videoResult.setFile(null);
return videoResult;
}
} }
...@@ -6,6 +6,8 @@ import org.springframework.cloud.openfeign.FeignClient; ...@@ -6,6 +6,8 @@ import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import java.util.List;
/** /**
* . * .
* *
...@@ -18,6 +20,6 @@ import org.springframework.web.bind.annotation.GetMapping; ...@@ -18,6 +20,6 @@ import org.springframework.web.bind.annotation.GetMapping;
public interface TaskManagerFeignClient { public interface TaskManagerFeignClient {
@GetMapping("/tasks") @GetMapping("/tasks")
JsonMessageUtil.JsonMessage<Task> getAllTask(); JsonMessageUtil.JsonMessage<List<Task>> getAllTask();
} }
package com.viontech.fanxing.forward.model;
import lombok.Getter;
import lombok.Setter;
import org.springframework.web.multipart.MultipartFile;
/**
* .
*
* @author 谢明辉
* @date 2021/9/1
*/
@Getter
@Setter
public class VideoResult {
private String refid;
private String format;
private MultipartFile file;
private String unid;
}
...@@ -40,7 +40,7 @@ public class CacheUtils { ...@@ -40,7 +40,7 @@ public class CacheUtils {
Map<String, Task> result; Map<String, Task> result;
try { try {
result = (Map<String, Task>) CACHE.get("task_map", () -> { result = (Map<String, Task>) CACHE.get("task_map", () -> {
JsonMessageUtil.JsonMessage<Task> response = null; JsonMessageUtil.JsonMessage<List<Task>> response = null;
try { try {
response = taskManagerFeignClient.getAllTask(); response = taskManagerFeignClient.getAllTask();
} catch (Exception e) { } catch (Exception e) {
......
...@@ -28,11 +28,7 @@ public class PicUtils { ...@@ -28,11 +28,7 @@ public class PicUtils {
if (unid == null || "".equals(unid)) { if (unid == null || "".equals(unid)) {
unid = UUID.randomUUID().toString(); unid = UUID.randomUUID().toString();
} }
int i = unid.hashCode(); String path = getFilePath(unid, date, format);
i ^= (i >>> 16);
i = 15 & i;
String dateStr = DateUtil.format("yyyyMMdd", date);
String path = basePath + File.separator + dateStr + File.separator + i + File.separator + unid + "." + format;
File file = new File(path); File file = new File(path);
if (!file.getParentFile().exists()) { if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs(); file.getParentFile().mkdirs();
...@@ -47,5 +43,16 @@ public class PicUtils { ...@@ -47,5 +43,16 @@ public class PicUtils {
return savePic(unid, date, format, decode); return savePic(unid, date, format, decode);
} }
public String getFilePath(String unid, Date date, String format) {
if (unid == null) {
return null;
}
int i = unid.hashCode();
i ^= (i >>> 16);
i = 127 & i;
String dateStr = DateUtil.format("yyyyMMdd", date);
return basePath + File.separator + dateStr + File.separator + i + File.separator + unid + "." + format;
}
} }
...@@ -40,6 +40,10 @@ spring: ...@@ -40,6 +40,10 @@ spring:
batch: batch:
job: job:
enabled: false enabled: false
servlet:
multipart:
max-file-size: 10MB
max-request-size: 100MB
logging: logging:
config: classpath:logback-${spring.profiles.active}.xml config: classpath:logback-${spring.profiles.active}.xml
mybatis: mybatis:
......
...@@ -3,6 +3,9 @@ package com.viontech.fanxing.forward; ...@@ -3,6 +3,9 @@ package com.viontech.fanxing.forward;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.constant.RedisKeys; import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.forward.feign.TaskManagerFeignClient;
import com.viontech.keliu.util.JsonMessageUtil;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.redisson.api.RTopic; import org.redisson.api.RTopic;
...@@ -11,6 +14,7 @@ import org.springframework.boot.test.context.SpringBootTest; ...@@ -11,6 +14,7 @@ import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.List;
/** /**
* . * .
...@@ -25,6 +29,8 @@ public class Test0 { ...@@ -25,6 +29,8 @@ public class Test0 {
@Resource @Resource
private RedissonClient redissonClient; private RedissonClient redissonClient;
@Resource
private TaskManagerFeignClient taskManagerFeignClient;
@Test @Test
public void test() { public void test() {
...@@ -33,4 +39,10 @@ public class Test0 { ...@@ -33,4 +39,10 @@ public class Test0 {
topic.publish(jsonObject); topic.publish(jsonObject);
} }
@Test
public void allTask() {
JsonMessageUtil.JsonMessage<List<Task>> allTask = taskManagerFeignClient.getAllTask();
System.out.println(JSON.toJSONString(allTask));
}
} }
...@@ -46,7 +46,7 @@ public class VAServerController { ...@@ -46,7 +46,7 @@ public class VAServerController {
result.put("code", 200); result.put("code", 200);
result.put("msg", "success"); result.put("msg", "success");
result.put("resultRecvUrl", "http://" + vionGatewayIp + ":" + vionGatewayPort + "/fanxing-forward/result"); result.put("resultRecvUrl", "http://" + vionGatewayIp + ":" + vionGatewayPort + "/fanxing-forward/result");
result.put("videoUploadUrl", "http://" + vionGatewayIp + ":" + vionGatewayPort + "/fanxing-forward/result"); result.put("videoUploadUrl", "http://" + vionGatewayIp + ":" + vionGatewayPort + "/fanxing-forward/result/video");
return result; return result;
} }
......
...@@ -30,6 +30,7 @@ import java.util.HashMap; ...@@ -30,6 +30,7 @@ import java.util.HashMap;
* <li>1 运行一次,指定具体的开始和结束的日期时间</li> * <li>1 运行一次,指定具体的开始和结束的日期时间</li>
* <li>2 按照星期配置,指定每周某天的开始和结束的时间</li> * <li>2 按照星期配置,指定每周某天的开始和结束的时间</li>
* <li>3 随机运行,指定运行时长</li> * <li>3 随机运行,指定运行时长</li>
* <li>4 全天执行,只能手动停止</li>
* *
* @author 谢明辉 * @author 谢明辉
* @date 2021/7/13 * @date 2021/7/13
......
...@@ -6,6 +6,8 @@ import com.viontech.fanxing.task.scheduling.model.TaskData; ...@@ -6,6 +6,8 @@ import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VATask; import com.viontech.fanxing.task.scheduling.model.vaserver.VATask;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
...@@ -233,16 +235,17 @@ public class VAServerHttpService { ...@@ -233,16 +235,17 @@ public class VAServerHttpService {
String path = "/api/vaserver/v1/get_algo_param_template"; String path = "/api/vaserver/v1/get_algo_param_template";
JSONObject jsonObject = new JSONObject(); JSONObject jsonObject = new JSONObject();
jsonObject.put("task_algo_type", taskAlgType); jsonObject.put("task_algo_type", taskAlgType);
Mono<JSONObject> mono = WebClient.create(vaServerInfo.getServiceBaseUrl()) Mono<String> mono = WebClient.create(vaServerInfo.getServiceBaseUrl())
.post() .post()
.uri(path) .uri(path)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
.bodyValue(jsonObject) .bodyValue(jsonObject)
.retrieve() .retrieve()
.bodyToMono(JSONObject.class); .bodyToMono(String.class);
JSONObject block = mono.block(Duration.ofSeconds(20)); String block = mono.block(Duration.ofSeconds(20));
log.info("默认配置获取:{}", block); log.info("默认配置获取:{}", block);
return block; return JSON.parseObject(block);
} }
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!