Commit 7ef951c1 by xmh

commons:

1. <feat> 所有 vion 开头的配置都放在 VionConfig 中
2. <feat> 添加 视频类型和视频流类型 的枚举类

转发服务:
1. <feat> 分析结果与任务有关的提出一个公用的 TaskInfoProcessor
2. <feat> 任务执行使用配置的线程池
3. <fix> 修复 traffic 写入时没有写入任务id
4. <fix> 修复 TaskFeignClient.getAllTask 返回值解析错误

网关:
1. <fix> 修复 WebConfig 引起的无法启动的错误

运维服务:
1. <feat> 添加第一版的录像上传接口,之后会添加 createTime 和 tag

任务服务:
1. <fix> 修复更新任务时调用了添加任务的接口
1 parent fab50ddf
Showing 36 changed files with 329 additions and 164 deletions
......@@ -111,5 +111,9 @@
<artifactId>caffeine</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.viontech.fanxing.commons.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
* .
*
* @author 谢明辉
* @date 2021/9/8
*/
@RefreshScope
@Configuration
@Getter
@Setter
@ConfigurationProperties(prefix = "vion")
public class VionConfig {
private Image image;
private boolean enableForward;
private boolean enableAuthorization;
private Gateway gateway;
private List<String> supportedVideoFormats;
public @Getter
@Setter
static class Image {
private String path;
private Integer keep;
}
public @Getter
@Setter
static class Gateway {
private String ip;
private String port;
}
}
package com.viontech.fanxing.commons.constant;
/**
* .
*
* @author 谢明辉
* @date 2021/9/8
*/
public enum ChannelType {
/** 手动添加 */
MANUALLY(0),
/** 第三方对接 */
THIRD_PART(1),
/** 上传的文件 */
FILE(2),
/** stream type rtsp */
STREAM_RTSP(0),
/** stream type file */
STREAM_FILE(2),
;
public int value;
ChannelType(int value) {
this.value = value;
}
}
package com.viontech.fanxing.commons.feing;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.keliu.util.JsonMessageUtil;
/**
* .
*
* @author 谢明辉
* @date 2021/7/12
*/
public interface TaskSchedulingTasksAdapter {
JsonMessageUtil.JsonMessage add(Task task);
JsonMessageUtil.JsonMessage update(Task task);
JsonMessageUtil.JsonMessage delete(String taskUnid);
}
......@@ -2,9 +2,13 @@ package com.viontech.fanxing.commons.vo;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.vobase.ChannelVoBase;
import net.minidev.json.annotate.JsonIgnore;
import org.springframework.web.multipart.MultipartFile;
public class ChannelVo extends ChannelVoBase {
@JsonIgnore
private MultipartFile file;
private Boolean tree;
public ChannelVo() {
......@@ -23,4 +27,13 @@ public class ChannelVo extends ChannelVoBase {
this.tree = tree;
return this;
}
public MultipartFile getFile() {
return file;
}
public ChannelVo setFile(MultipartFile file) {
this.file = file;
return this;
}
}
\ No newline at end of file
......@@ -11,7 +11,6 @@ import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -29,7 +28,6 @@ import java.util.concurrent.TimeUnit;
@EnableFeignClients
@EnableTransactionManagement
@EnableBatchProcessing
@EnableWebSocket
@Slf4j
public class ForwardApp {
......@@ -48,6 +46,7 @@ public class ForwardApp {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(30);
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setThreadNamePrefix("fanxing-forward-process-");
return threadPoolTaskExecutor;
}
......
......@@ -9,6 +9,7 @@ import com.viontech.fanxing.forward.batch.listener.WorkQueueClearChunkListener;
import com.viontech.fanxing.forward.batch.processor.BehaviorProcessor;
import com.viontech.fanxing.forward.batch.processor.CompositeItemStreamProcessor;
import com.viontech.fanxing.forward.batch.processor.PicProcessor;
import com.viontech.fanxing.forward.batch.processor.TaskInfoProcessor;
import com.viontech.fanxing.forward.batch.reader.ConcurrencyReader;
import com.viontech.fanxing.forward.batch.reader.RedisPopReader;
import lombok.extern.slf4j.Slf4j;
......@@ -16,7 +17,6 @@ import org.redisson.api.RedissonClient;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
......@@ -100,9 +100,10 @@ public class BehaviorJobConfiguration {
}
@Bean(name = "behaviorProcessorCompose")
public ItemProcessor<JSONObject, Behavior> behaviorProcessorCompose(BehaviorProcessor behaviorProcessor, PicProcessor picProcessor) {
public ItemProcessor<JSONObject, Behavior> behaviorProcessorCompose(BehaviorProcessor behaviorProcessor, PicProcessor picProcessor, TaskInfoProcessor taskInfoProcessor) {
LinkedList delegates = new LinkedList<>();
delegates.add(taskInfoProcessor);
delegates.add(picProcessor);
delegates.add(behaviorProcessor);
......
......@@ -7,6 +7,7 @@ import com.viontech.fanxing.forward.batch.listener.JobRestartListener;
import com.viontech.fanxing.forward.batch.listener.WorkQueueClearChunkListener;
import com.viontech.fanxing.forward.batch.processor.CompositeItemStreamProcessor;
import com.viontech.fanxing.forward.batch.processor.PicProcessor;
import com.viontech.fanxing.forward.batch.processor.TaskInfoProcessor;
import com.viontech.fanxing.forward.batch.processor.TrafficFlowProcessor;
import com.viontech.fanxing.forward.batch.reader.ConcurrencyReader;
import com.viontech.fanxing.forward.batch.reader.RedisPopReader;
......@@ -16,7 +17,6 @@ import org.redisson.api.RedissonClient;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
......@@ -99,9 +99,10 @@ public class TrafficFlowJobConfiguration {
}
@Bean(name = "trafficFlowProcessorCompose")
public ItemProcessor<JSONObject, TrafficFlowContent> trafficFlowProcessorCompose(TrafficFlowProcessor trafficFlowProcessor, PicProcessor picProcessor) {
public ItemProcessor<JSONObject, TrafficFlowContent> trafficFlowProcessorCompose(TrafficFlowProcessor trafficFlowProcessor, PicProcessor picProcessor, TaskInfoProcessor taskInfoProcessor) {
LinkedList delegates = new LinkedList<>();
delegates.add(taskInfoProcessor);
delegates.add(picProcessor);
delegates.add(trafficFlowProcessor);
......
......@@ -7,6 +7,7 @@ import com.viontech.fanxing.forward.batch.listener.JobRestartListener;
import com.viontech.fanxing.forward.batch.listener.WorkQueueClearChunkListener;
import com.viontech.fanxing.forward.batch.processor.CompositeItemStreamProcessor;
import com.viontech.fanxing.forward.batch.processor.PicProcessor;
import com.viontech.fanxing.forward.batch.processor.TaskInfoProcessor;
import com.viontech.fanxing.forward.batch.processor.TrafficProcessor;
import com.viontech.fanxing.forward.batch.reader.ConcurrencyReader;
import com.viontech.fanxing.forward.batch.reader.RedisPopReader;
......@@ -16,7 +17,6 @@ import org.redisson.api.RedissonClient;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
......@@ -100,9 +100,10 @@ public class TrafficJobConfiguration {
}
@Bean(name = "trafficProcessorCompose")
public ItemProcessor<JSONObject, TrafficContent> trafficFlowProcessorCompose(TrafficProcessor trafficProcessor, PicProcessor picProcessor) {
public ItemProcessor<JSONObject, TrafficContent> trafficFlowProcessorCompose(TrafficProcessor trafficProcessor, PicProcessor picProcessor, TaskInfoProcessor taskInfoProcessor) {
LinkedList delegates = new LinkedList<>();
delegates.add(taskInfoProcessor);
delegates.add(picProcessor);
delegates.add(trafficProcessor);
......
......@@ -9,14 +9,16 @@ import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* Created by vion on 2018/8/6.
*/
@Component("jobRestartListener")
public class JobRestartListener implements JobExecutionListener,ApplicationContextAware {
public class JobRestartListener implements JobExecutionListener, ApplicationContextAware {
private Logger logger = LoggerFactory.getLogger(JobRestartListener.class);
......@@ -25,21 +27,24 @@ public class JobRestartListener implements JobExecutionListener,ApplicationConte
@Autowired
private JobRepository jobRepository;
@Resource
private TaskExecutor taskExecutor;
@Override
public void beforeJob(JobExecution jobExecution) {
logger.info(">>>>>>>>>>开始执行{}",jobExecution.getJobInstance().getJobName());
logger.info(">>>>>>>>>>开始执行{}", jobExecution.getJobInstance().getJobName());
}
@Override
public void afterJob(JobExecution jobExecution) {
// 如果任务失败 那么打印下失败信息
if(!jobExecution.getExitStatus().getExitCode().equals(ExitStatus.COMPLETED.getExitCode())){
logger.error("{}任务结束,开始尝试进行重启,结束原因为{}",jobExecution.getJobInstance().getJobName(),jobExecution.getExitStatus().getExitDescription());
if (!jobExecution.getExitStatus().getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
logger.error("{}任务结束,开始尝试进行重启,结束原因为{}", jobExecution.getJobInstance().getJobName(), jobExecution.getExitStatus().getExitDescription());
}
ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
while(threadGroup.getParent() != null){
while (threadGroup.getParent() != null) {
threadGroup = threadGroup.getParent();
}
......@@ -47,14 +52,14 @@ public class JobRestartListener implements JobExecutionListener,ApplicationConte
try {
SimpleJobLauncher sjl = new SimpleJobLauncher();
sjl.setJobRepository(jobRepository);
sjl.setTaskExecutor(new SimpleAsyncTaskExecutor());
sjl.setTaskExecutor(taskExecutor);
sjl.afterPropertiesSet();
JobParametersBuilder jpb = new JobParametersBuilder();
jpb.addLong("date", System.currentTimeMillis());
sjl.run(job, jpb.toJobParameters());
} catch (Exception e) {
logger.error("任务"+jobExecution.getJobInstance().getJobName()+"重启失败",e);
logger.error("任务" + jobExecution.getJobInstance().getJobName() + "重启失败", e);
}
}
......
......@@ -3,7 +3,6 @@ package com.viontech.fanxing.forward.batch.processor;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.model.Behavior;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.forward.util.CacheUtils;
import com.viontech.keliu.util.DateUtil;
import lombok.extern.slf4j.Slf4j;
......@@ -16,7 +15,6 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.Map;
/**
* .
......@@ -32,36 +30,27 @@ public class BehaviorProcessor implements ItemStream, ItemProcessor<JSONObject,
@Override
public Behavior process(JSONObject item) throws Exception {
log.info("收到 behavior 消息 , eventId:{}", item.getString("event_refid"));
final Map<String, Task> taskMap = cacheUtils.getTaskMap();
String taskUnid = item.getString("subtask_id");
final Task task = taskMap.get(taskUnid);
Long taskId;
if (task == null) {
log.info("无法找到对应task:{}", item.toJSONString());
taskId = null;
} else {
taskId = task.getId();
JSONObject eventData = item.getJSONObject("event_data");
String eventRefid = item.getString("event_refid");
if (StringUtils.isEmpty(eventRefid)) {
eventRefid = eventData.getString("ID");
}
log.info("收到 behavior 消息 , eventId:{}", eventRefid);
Behavior behavior = new Behavior();
behavior.setTaskId(taskId);
Long taskId = item.getLong("taskId");
String eventType = item.getString("event_type");
String eventRefid = item.getString("event_refid");
String eventDt = item.getString("event_dt");
String eventCate = item.getString("event_cate");
String channelUnid = item.getString("vchan_refid");
String taskName = item.getString("task_name");
Date eventTime = DateUtil.parse(DateUtil.FORMAT_FULL, eventDt);
JSONObject eventData = item.getJSONObject("event_data");
JSONArray video = item.getJSONArray("video");
String picArray = item.getString("pic_path_array");
if (StringUtils.isEmpty(eventRefid)) {
eventRefid = eventData.getString("ID");
}
behavior.setTaskId(taskId);
behavior.setEventType(eventType);
behavior.setEventRefid(eventRefid);
behavior.setEventTime(eventTime);
......
package com.viontech.fanxing.forward.batch.processor;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.forward.util.CacheUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Map;
/**
* .
*
* @author 谢明辉
* @date 2021/9/7
*/
@Component
@Slf4j
public class TaskInfoProcessor implements ItemProcessor<JSONObject, JSONObject>, ItemStream {
@Resource
private CacheUtils cacheUtils;
@Override
public JSONObject process(JSONObject item) throws Exception {
final Map<String, Task> taskMap = cacheUtils.getTaskMap();
String taskUnid = item.getString("subtask_id");
final Task task = taskMap.get(taskUnid);
if (task != null) {
Long taskId = task.getId();
String channelUnid = task.getChannelUnid();
item.put("taskId", taskId);
item.put("vchan_refid", channelUnid);
}
return item;
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
public void close() throws ItemStreamException {
}
}
......@@ -4,7 +4,6 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.model.FlowData;
import com.viontech.fanxing.commons.model.FlowEvent;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.forward.model.TrafficFlowContent;
import com.viontech.fanxing.forward.util.CacheUtils;
import com.viontech.keliu.util.DateUtil;
......@@ -20,7 +19,6 @@ import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* .
......@@ -37,18 +35,12 @@ public class TrafficFlowProcessor implements ItemProcessor<JSONObject, TrafficFl
@Override
public TrafficFlowContent process(JSONObject item) throws Exception {
log.info("收到 flow 消息 , eventId:{}", item.getString("event_refid"));
final Map<String, Task> taskMap = cacheUtils.getTaskMap();
String taskUnid = item.getString("subtask_id");
final Task task = taskMap.get(taskUnid);
Long taskId;
if (task == null) {
log.info("无法找到对应task:{}", item.toJSONString());
taskId = null;
} else {
taskId = task.getId();
JSONObject eventData = item.getJSONObject("event_data");
String eventRefid = item.getString("event_refid");
if (StringUtils.isEmpty(eventRefid)) {
eventRefid = eventData.getString("ID");
}
log.info("收到 flow 消息 , eventId:{}", eventRefid);
TrafficFlowContent content = new TrafficFlowContent();
FlowEvent flowEvent = new FlowEvent();
......@@ -57,15 +49,12 @@ public class TrafficFlowProcessor implements ItemProcessor<JSONObject, TrafficFl
ArrayList<FlowData> flowDataList = new ArrayList<>();
content.setFlowData(flowDataList);
Long taskId = item.getLong("taskId");
String eventType = item.getString("event_type");
String eventRefid = item.getString("event_refid");
String eventDt = item.getString("event_dt");
String picArray = item.getString("pic_path_array");
Date eventTime = DateUtil.parse(DateUtil.FORMAT_FULL, eventDt);
JSONObject eventData = item.getJSONObject("event_data");
if (StringUtils.isEmpty(eventRefid)) {
eventRefid = eventData.getString("ID");
}
flowEvent.setEventType(eventType);
flowEvent.setEventId(eventRefid);
......
......@@ -2,7 +2,6 @@ package com.viontech.fanxing.forward.batch.processor;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.Traffic;
import com.viontech.fanxing.commons.model.TrafficFace;
import com.viontech.fanxing.forward.model.TrafficContent;
......@@ -19,7 +18,6 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date;
import java.util.Map;
/**
* .
......@@ -37,18 +35,13 @@ public class TrafficProcessor implements ItemProcessor<JSONObject, TrafficConten
@Override
public TrafficContent process(JSONObject item) throws Exception {
log.info("收到 traffic 消息 , eventId:{}", item.getString("event_refid"));
final Map<String, Task> taskMap = cacheUtils.getTaskMap();
String taskUnid = item.getString("subtask_id");
final Task task = taskMap.get(taskUnid);
Long taskId;
if (task == null) {
log.info("无法找到对应task:{}", item.toJSONString());
taskId = null;
} else {
taskId = task.getId();
String eventRefid = item.getString("event_refid");
JSONObject eventData = item.getJSONObject("event_data");
if (StringUtils.isEmpty(eventRefid)) {
eventRefid = eventData.getString("ID");
}
log.info("收到 traffic 消息 , eventId:{}", eventRefid);
TrafficContent content = new TrafficContent();
Traffic traffic = new Traffic();
......@@ -59,17 +52,13 @@ public class TrafficProcessor implements ItemProcessor<JSONObject, TrafficConten
String eventType = item.getString("event_type");
String eventCate = item.getString("event_cate");
String eventRefid = item.getString("event_refid");
Long taskId = item.getLong("taskId");
String eventDt = item.getString("event_dt");
Date eventTime = DateUtil.parse(DateUtil.FORMAT_FULL, eventDt);
String channelUnid = item.getString("vchan_refid");
String picArray = item.getString("pic_path_array");
JSONObject eventData = item.getJSONObject("event_data");
if (StringUtils.isEmpty(eventRefid)) {
eventRefid = eventData.getString("ID");
}
traffic.setEventCate(eventCate);
traffic.setEventType(eventType);
......
......@@ -26,8 +26,8 @@ import java.util.stream.Collectors;
public class TrafficWriter implements ItemWriter<TrafficContent> {
private final static String INSERT_TRAFFIC = "insert into d_traffic (" +
"event_cate,event_type,event_time,channel_unid,plate_color,plate_number,location_code,location_name,lane_code,direction_code,vehicle_type,vehicle_color,vehicle_logo,illegal_code,illegal_state,feature_annual_inspection_mark,feature_pendant,feature_decoration,feature_sun_shield,xcycle_type,event_id,special_type,with_helmet,json_data,pics,video_name) values(" +
":eventCate,:eventType,:eventTime,:channelUnid,:plateColor,:plateNumber,:locationCode,:locationName,:laneCode,:directionCode,:vehicleType,:vehicleColor,:vehicleLogo,:illegalCode,:illegalState,:featureAnnualInspectionMark,:featurePendant,:featureDecoration,:featureSunShield,:xcycleType,:eventId,:specialType,:withHelmet,:jsonData,:pics,:videoName)";
"task_id,event_cate,event_type,event_time,channel_unid,plate_color,plate_number,location_code,location_name,lane_code,direction_code,vehicle_type,vehicle_color,vehicle_logo,illegal_code,illegal_state,feature_annual_inspection_mark,feature_pendant,feature_decoration,feature_sun_shield,xcycle_type,event_id,special_type,with_helmet,json_data,pics,video_name) values(" +
":taskId,:eventCate,:eventType,:eventTime,:channelUnid,:plateColor,:plateNumber,:locationCode,:locationName,:laneCode,:directionCode,:vehicleType,:vehicleColor,:vehicleLogo,:illegalCode,:illegalState,:featureAnnualInspectionMark,:featurePendant,:featureDecoration,:featureSunShield,:xcycleType,:eventId,:specialType,:withHelmet,:jsonData,:pics,:videoName)";
private final static String INSERT_TRAFFIC_FACE = "insert into d_traffic_face (" +
"traffic_id,state,sex,upper_color,lower_color,event_time) values (:trafficId,:state,:sex,:upperColor,:lowerColor,:eventTime)";
......
......@@ -2,6 +2,7 @@ package com.viontech.fanxing.forward.controller;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.config.VionConfig;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.forward.model.VideoResult;
import com.viontech.fanxing.forward.util.PicUtils;
......@@ -10,7 +11,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
......@@ -31,8 +31,8 @@ import java.util.Date;
public class DataReceiveController {
@Resource
private RedissonClient redissonClient;
@Value("${vion.forward.enable:false}")
private Boolean enableForward;
@Resource
private VionConfig vionConfig;
@Resource
private PicUtils picUtils;
......@@ -61,7 +61,7 @@ public class DataReceiveController {
queue.offerFirst(jsonObject);
}
if (enableForward) {
if (vionConfig.isEnableForward()) {
forwardQueue.offer(jsonObject);
}
......
......@@ -15,9 +15,9 @@ import java.util.List;
* @date 2021/7/16
*/
@FeignClient(value = "fanxing-task-manager")
@FeignClient(value = "fanxing-task")
@Service
public interface TaskManagerFeignClient {
public interface TaskFeignClient {
@GetMapping("/tasks")
JsonMessageUtil.JsonMessage<List<Task>> getAllTask();
......
package com.viontech.fanxing.forward.runner;
import com.viontech.fanxing.commons.config.VionConfig;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
......@@ -26,9 +25,11 @@ import java.util.UUID;
@Order(11)
public class JobStartRunner implements CommandLineRunner {
@Autowired
@Resource
JobRepository jobRepository;
@Resource
private TaskExecutor taskExecutor;
@Resource
private Job trafficJob;
@Resource
private Job forwardJob;
......@@ -36,14 +37,14 @@ public class JobStartRunner implements CommandLineRunner {
private Job trafficFlowJob;
@Resource
private Job behaviorJob;
@Value("${vion.forward.enable:false}")
private Boolean enableForward;
@Resource
private VionConfig vionConfig;
@Override
public void run(String... args) throws Exception {
SimpleJobLauncher sjl = new SimpleJobLauncher();
sjl.setJobRepository(jobRepository);
sjl.setTaskExecutor(new SimpleAsyncTaskExecutor());
sjl.setTaskExecutor(taskExecutor);
sjl.afterPropertiesSet();
JobParameters jobParameters;
for (int i = 0; i < 5; i++) {
......@@ -61,7 +62,7 @@ public class JobStartRunner implements CommandLineRunner {
sjl.run(behaviorJob, jobParameters);
}
if (enableForward) {
if (vionConfig.isEnableForward()) {
for (int i = 0; i < 5; i++) {
jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters();
sjl.run(forwardJob, jobParameters);
......
package com.viontech.fanxing.forward.runner;
import com.viontech.fanxing.commons.config.VionConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.File;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
......@@ -20,28 +21,26 @@ import java.time.format.DateTimeFormatter;
@Slf4j
public class PicKeepRunner {
@Value("${vion.pic.keep:7}")
private Integer keep;
@Value("${vion.pic.path:/images}")
private String basePath;
@Resource
private VionConfig vionConfig;
@Scheduled(cron = "0 0 08 * * ?")
public void run() {
if (keep == -1) {
if (vionConfig.getImage().getKeep() == -1) {
return;
}
try {
long nowDay = LocalDate.now().toEpochDay();
DateTimeFormatter yyyyMMdd = DateTimeFormatter.ofPattern("yyyyMMdd");
File baseDir = new File(basePath);
File baseDir = new File(vionConfig.getImage().getPath());
File[] files = baseDir.listFiles((dir, name) -> name.matches("[1-9]\\d{7}"));
for (File file : files) {
String name = file.getName();
LocalDate day = LocalDate.parse(name, yyyyMMdd);
long l = day.toEpochDay();
if (nowDay - l > keep) {
if (nowDay - l > vionConfig.getImage().getKeep()) {
log.info("删除文件夹:{}", name);
FileUtils.deleteDirectory(file);
}
......
......@@ -5,7 +5,7 @@ import com.google.common.cache.CacheBuilder;
import com.viontech.fanxing.commons.model.Forward;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.forward.feign.OpsFeignClient;
import com.viontech.fanxing.forward.feign.TaskManagerFeignClient;
import com.viontech.fanxing.forward.feign.TaskFeignClient;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
......@@ -32,7 +32,7 @@ public class CacheUtils {
private static final Cache<Object, Object> CACHE = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(2)).build();
@Resource
private TaskManagerFeignClient taskManagerFeignClient;
private TaskFeignClient taskFeignClient;
@Resource
private OpsFeignClient opsFeignClient;
......@@ -42,7 +42,7 @@ public class CacheUtils {
result = (Map<String, Task>) CACHE.get("task_map", () -> {
JsonMessageUtil.JsonMessage<List<Task>> response = null;
try {
response = taskManagerFeignClient.getAllTask();
response = taskFeignClient.getAllTask();
} catch (Exception e) {
log.info("获取 task_map 失败:", e);
}
......
......@@ -21,7 +21,7 @@ import java.util.UUID;
@Component
public class PicUtils {
@Value("${vion.pic.path:/images}")
@Value("${vion.image.path:/images}")
private String basePath;
public String savePic(String unid, Date date, String format, byte[] data) throws IOException {
......
spring:
servlet:
multipart:
max-file-size: 500MB
max-request-size: 512MB
cloud:
loadbalancer:
ribbon:
......@@ -40,10 +44,6 @@ spring:
batch:
job:
enabled: false
servlet:
multipart:
max-file-size: 10MB
max-request-size: 100MB
logging:
config: classpath:logback-${spring.profiles.active}.xml
mybatis:
......@@ -55,11 +55,10 @@ pagehelper:
supportMethodsArguments: true
params: count=countByExample
vion:
pic:
image:
path: G:\data
keep: 1
forward:
enable: false
enable-forward: true
redisson:
path: F:\myIDEAworkspace\jt\fanxing3\fanxing-commons\src\main\resources\redisson.yml
debug: true
\ No newline at end of file
......@@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.forward.feign.TaskManagerFeignClient;
import com.viontech.fanxing.forward.feign.TaskFeignClient;
import com.viontech.keliu.util.JsonMessageUtil;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -30,7 +30,7 @@ public class Test0 {
@Resource
private RedissonClient redissonClient;
@Resource
private TaskManagerFeignClient taskManagerFeignClient;
private TaskFeignClient taskFeignClient;
@Test
public void test() {
......@@ -41,7 +41,7 @@ public class Test0 {
@Test
public void allTask() {
JsonMessageUtil.JsonMessage<List<Task>> allTask = taskManagerFeignClient.getAllTask();
JsonMessageUtil.JsonMessage<List<Task>> allTask = taskFeignClient.getAllTask();
System.out.println(JSON.toJSONString(allTask));
}
......
package com.viontech.fanxing.forward;
import com.viontech.fanxing.commons.config.VionConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
......@@ -10,9 +10,9 @@ import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
......@@ -24,15 +24,14 @@ import java.nio.charset.StandardCharsets;
@Slf4j
@Component
@RefreshScope
public class AuthorizationFilter implements GlobalFilter {
@Value("${vion.authorization.enable}")
private Boolean enable;
@Resource
private VionConfig vionConfig;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
if (enable) {
if (vionConfig.isEnableAuthorization()) {
// todo authorize
return unAuthorized(exchange.getResponse(), "authorization failed");
}
......
......@@ -5,6 +5,8 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
......@@ -13,7 +15,10 @@ import org.springframework.scheduling.annotation.EnableScheduling;
* @author 谢明辉
* @date 2021/6/11
*/
@SpringBootApplication(scanBasePackages = "com.viontech.fanxing")
@SpringBootApplication
@ComponentScan(basePackages = "com.viontech.fanxing",excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = com.viontech.fanxing.commons.config.WebConfig.class)
})
@EnableDiscoveryClient
@EnableScheduling
@EnableFeignClients
......
......@@ -55,7 +55,6 @@ spring:
logging:
config: classpath:logback-${spring.profiles.active}.xml
vion:
authorization:
enable: false
enable-authorization: false
redisson:
path: F:\myIDEAworkspace\jt\fanxing3\fanxing-commons\src\main\resources\redisson.yml
\ No newline at end of file
......@@ -45,4 +45,10 @@ public class ChannelController extends ChannelBaseController {
JSONObject result = channelService.nvs3000(jsonObject.getString("nvsUrl"), jsonObject.getString("nvsRegex"));
return JsonMessageUtil.getSuccessJsonMsg(result);
}
@PostMapping("/video/upload")
public Object uploadVideo(ChannelVo channelVo) {
Channel channel = channelService.uploadVideo(channelVo);
return JsonMessageUtil.getSuccessJsonMsg(channel);
}
}
\ No newline at end of file
......@@ -3,6 +3,7 @@ package com.viontech.fanxing.ops.service.adapter;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.base.BaseService;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.vo.ChannelVo;
import com.viontech.fanxing.commons.vo.DictCodeVo;
import java.util.List;
......@@ -13,4 +14,6 @@ public interface ChannelService extends BaseService<Channel> {
List<DictCodeVo> channelOrg(List<Channel> channels);
Channel uploadVideo(ChannelVo channelVo);
}
\ No newline at end of file
......@@ -4,6 +4,9 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.base.BaseServiceImpl;
import com.viontech.fanxing.commons.config.VionConfig;
import com.viontech.fanxing.commons.constant.ChannelType;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.*;
import com.viontech.fanxing.commons.vo.ChannelVo;
import com.viontech.fanxing.commons.vo.DictCodeVo;
......@@ -12,17 +15,19 @@ import com.viontech.fanxing.ops.service.adapter.ChannelService;
import com.viontech.fanxing.ops.service.adapter.DictCateService;
import com.viontech.fanxing.ops.service.adapter.DictCodeService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.reactive.function.client.WebClient;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
@Service
......@@ -34,6 +39,8 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
private DictCodeService dictcodeService;
@Resource
private DictCateService dictCateService;
@Resource
private VionConfig vionConfig;
@Override
public BaseMapper<Channel> getMapper() {
......@@ -94,9 +101,7 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
dictCodeVo.getChildren().removeIf(this::needRemoveFromTree);
}
if (dictCodeVo.getChannels() == null) {
if (dictCodeVo.getChildren() == null || dictCodeVo.getChildren().size() == 0) {
return true;
}
return dictCodeVo.getChildren() == null || dictCodeVo.getChildren().size() == 0;
}
return false;
}
......@@ -153,9 +158,9 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
channel.setChannelUnid(id);
channel.setName(name);
channel.setAddressUnid(addressUnid);
channel.setType(1);
channel.setType(ChannelType.THIRD_PART.value);
channel.setExpand("nvs3000");
channel.setStreamType(0);
channel.setStreamType(ChannelType.STREAM_RTSP.value);
channel.setStreamPath(nvsRegex + id);
insertSelective(channel);
success++;
......@@ -167,4 +172,40 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
log.info("拉取nvs3000 : {}", jsonObject);
return jsonObject;
}
/**
* 上传存储视频文件
*/
@Override
public Channel uploadVideo(ChannelVo channelVo) {
MultipartFile file = channelVo.getFile();
String originalFilename = file.getOriginalFilename();
String basePath = vionConfig.getImage().getPath() + File.separator + "uploadVideo" + File.separator;
String extension = FilenameUtils.getExtension(originalFilename);
if (StringUtils.isBlank(extension) || !vionConfig.getSupportedVideoFormats().contains(extension.toLowerCase())) {
throw new FanXingException("不支持的视频格式:" + extension);
}
String unid = UUID.randomUUID().toString();
String filename = unid + "." + extension;
File video = new File(basePath + filename);
video.getParentFile().mkdirs();
try {
FileUtils.copyToFile(file.getInputStream(), video);
} catch (IOException e) {
throw new RuntimeException(e);
}
channelVo.setUnid(unid);
channelVo.setChannelUnid(unid);
channelVo.setDeviceUnid(unid);
channelVo.setName(originalFilename);
channelVo.setStreamPath(video.getPath());
channelVo.setType(ChannelType.FILE.value);
channelVo.setStreamType(ChannelType.STREAM_FILE.value);
Channel channel = this.insertSelective(channelVo);
channel = selectByPrimaryKey(channel.getId());
return channel;
}
}
\ No newline at end of file
spring:
servlet:
multipart:
max-file-size: 500MB
max-request-size: 512MB
cloud:
loadbalancer:
ribbon:
......@@ -48,5 +52,12 @@ pagehelper:
supportMethodsArguments: true
params: count=countByExample
vion:
image:
path: G:\data
redisson:
path: F:\myIDEAworkspace\jt\fanxing3\fanxing-commons\src\main\resources\redisson.yml
\ No newline at end of file
path: F:\myIDEAworkspace\jt\fanxing3\fanxing-commons\src\main\resources\redisson.yml
supported-video-formats:
- mp4
- avi
- h264
- nsf
\ No newline at end of file
spring:
servlet:
multipart:
max-file-size: 500MB
max-request-size: 512MB
cloud:
loadbalancer:
ribbon:
......
......@@ -35,7 +35,7 @@
<build>
<finalName>fanxing-task-scheduling</finalName>
<finalName>fanxing-task</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
......
......@@ -2,12 +2,12 @@ package com.viontech.fanxing.task.controller;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.config.VionConfig;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.service.VAServerService;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RMap;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
......@@ -26,27 +26,23 @@ public class VAServerController {
@Resource
private VAServerService vaServerService;
@Value("${vion.gateway.ip}")
private String vionGatewayIp;
@Value("${vion.gateway.port}")
private String vionGatewayPort;
@Resource
private VionConfig vionConfig;
/**
* 注册
*/
@PostMapping("/register")
public Object register(@RequestBody VaServerInfo vaServerInfo) {
String ip = vionConfig.getGateway().getIp();
String port = vionConfig.getGateway().getPort();
log.info("收到注册消息:{}", JSON.toJSONString(vaServerInfo));
vaServerService.registerVAServer(vaServerInfo);
HashMap<String, Object> result = new HashMap<>();
result.put("code", 200);
result.put("msg", "success");
result.put("resultRecvUrl", "http://" + vionGatewayIp + ":" + vionGatewayPort + "/fanxing-forward/result");
result.put("videoUploadUrl", "http://" + vionGatewayIp + ":" + vionGatewayPort + "/fanxing-forward/result/video");
result.put("resultRecvUrl", "http://" + ip + ":" + port + "/fanxing-forward/result");
result.put("videoUploadUrl", "http://" + ip + ":" + port + "/fanxing-forward/result/video");
return result;
}
......
......@@ -56,7 +56,7 @@ public class VAServerHttpService {
.retrieve()
.bodyToMono(String.class);
String response = stringMono.block(Duration.ofSeconds(20));
log.info("下发任务结果:{}", response);
log.info("更新任务结果:{}", response);
return JSON.parseObject(response);
}
......
......@@ -45,7 +45,7 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
task = selectByPrimaryKey(task.getId());
if (StringUtils.isNotBlank(task.getScene()) && task.getStoreConfigId() != null) {
taskDataService.addTask(task);
taskDataService.updateTask(task);
}
return new TaskVo(task);
}
......
spring:
servlet:
multipart:
max-file-size: 500MB
max-request-size: 512MB
cloud:
loadbalancer:
ribbon:
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!