Commit f2dadbfe by xmh

first commit

0 parents
### Example user template template
### Example user template
# IntelliJ project files
.idea
*.iml
out
gen
target
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>retransmission</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>war</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId >
<artifactId>aspectjweaver</artifactId >
<version>1.6.11</version >
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.40</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.18</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.11</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
</dependencies>
<build>
<finalName>retransmission_serv-latest</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<failOnMissingWebXml>false</failOnMissingWebXml>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package com.viontech;
import java.nio.charset.Charset;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.client.RestTemplate;
@SpringBootApplication
@ComponentScan(basePackages = {"com.viontech.*"})
@EnableScheduling
@EnableAsync(proxyTargetClass=true)
public class RetransmissionAplication extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(RetransmissionAplication.class, args);
}
@Bean
public RestTemplate restTemplate(ClientHttpRequestFactory factory) {
RestTemplate restTemplate = new RestTemplate(factory);
restTemplate.getMessageConverters().set(1, new StringHttpMessageConverter(
Charset.forName("UTF-8")));
return restTemplate;
}
@Bean
public ClientHttpRequestFactory simpleClientHttpRequestFactory() {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setReadTimeout(30000);
factory.setConnectTimeout(5000);
return (ClientHttpRequestFactory)factory;
}
}
\ No newline at end of file
package com.viontech.bo;
public class PicBo {
private String id;
private Object pics;
public String getId() {
return this.id;
}
public void setId(String id) {
this.id = id;
}
public Object getPics() {
return this.pics;
}
public void setPics(Object pics) {
this.pics = pics;
}
}
package com.viontech.config;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
public class CorsConfig implements WebMvcConfigurer {
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins(new String[] { "*" }).allowCredentials(true)
.allowedMethods(new String[] { "GET", "POST", "PUT", "DELETE", "OPTIONS" }).maxAge(3600L);
}
}
package com.viontech.config;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ExecutorConfig {
private static final Logger log = LoggerFactory.getLogger(ExecutorConfig.class);
@Bean
public ThreadPoolTaskExecutor dataForwardServiceExecutor() {
log.info("start dataForwardServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("dataForward");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean
public ThreadPoolTaskExecutor videoForwardServiceExecutor() {
log.info("start videoForwardServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(15);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("videoForward");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
package com.viontech.config;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setValueSerializer((RedisSerializer)new StringRedisSerializer());
redisTemplate.setKeySerializer((RedisSerializer)new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
package com.viontech.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import java.util.concurrent.Executors;
//@Configuration
public class ScheduleConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
}
}
package com.viontech.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping({"/api/v1/retransmission"})
public class BaseController {}
package com.viontech.controller;
import com.alibaba.fastjson.JSONObject;
import com.viontech.service.adapter.IRedisService;
import com.viontech.vo.VideoVo;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class ReceiveDataController extends BaseController {
@Autowired
IRedisService iRedisService;
@PostMapping({"/recv/datas"})
@ResponseBody
public Object recvData(@RequestBody JSONObject data) {
Map<Object, Object> result = new HashMap<>(2);
this.iRedisService.recvData(data);
result.put("ecode", "200");
result.put("enote", "OK");
return result;
}
@PostMapping({"/recv/video"})
@ResponseBody
public Object recvVideo(VideoVo videoVo) {
Map<Object, Object> result = new HashMap<>(2);
try {
this.iRedisService.uploadVideo(videoVo);
} catch (Exception e) {
e.printStackTrace();
}
result.put("ecode", "200");
result.put("enote", "OK");
return result;
}
@PostMapping({"/recv/upload"})
@ResponseBody
public Object upload(VideoVo videoVo) {
Map<Object, Object> result = new HashMap<>(2);
result.put("ecode", "200");
result.put("enote", "OK");
return result;
}
}
package com.viontech.dto;
import java.util.List;
import java.util.Map;
public class AnalysisDataDTO {
private String task_type;
private String task_id;
private String event_type;
private String event_refid;
private String event_dt;
private String dev_unid;
private Map aux_dev_info;
private Map event_data;
private List pics;
private String event_cate;
private String vdev_unid;
private String vchan_duid;
private String vchan_refid;
private List video;
private String test_pic_file;
private String test_frame_no;
private String subtask_id;
private String source_type;
public String getTask_type() {
return this.task_type;
}
public void setTask_type(String task_type) {
this.task_type = task_type;
}
public String getTask_id() {
return this.task_id;
}
public void setTask_id(String task_id) {
this.task_id = task_id;
}
public String getEvent_type() {
return this.event_type;
}
public void setEvent_type(String event_type) {
this.event_type = event_type;
}
public String getEvent_refid() {
return this.event_refid;
}
public void setEvent_refid(String event_refid) {
this.event_refid = event_refid;
}
public String getEvent_dt() {
return this.event_dt;
}
public void setEvent_dt(String event_dt) {
this.event_dt = event_dt;
}
public String getDev_unid() {
return this.dev_unid;
}
public void setDev_unid(String dev_unid) {
this.dev_unid = dev_unid;
}
public Map getAux_dev_info() {
return this.aux_dev_info;
}
public void setAux_dev_info(Map aux_dev_info) {
this.aux_dev_info = aux_dev_info;
}
public Map getEvent_data() {
return this.event_data;
}
public void setEvent_data(Map event_data) {
this.event_data = event_data;
}
public List getPics() {
return this.pics;
}
public void setPics(List pics) {
this.pics = pics;
}
public String getEvent_cate() {
return this.event_cate;
}
public void setEvent_cate(String event_cate) {
this.event_cate = event_cate;
}
public String getVdev_unid() {
return this.vdev_unid;
}
public void setVdev_unid(String vdev_unid) {
this.vdev_unid = vdev_unid;
}
public String getVchan_duid() {
return this.vchan_duid;
}
public void setVchan_duid(String vchan_duid) {
this.vchan_duid = vchan_duid;
}
public String getVchan_refid() {
return this.vchan_refid;
}
public void setVchan_refid(String vchan_refid) {
this.vchan_refid = vchan_refid;
}
public List getVideo() {
return this.video;
}
public void setVideo(List video) {
this.video = video;
}
public String getTest_pic_file() {
return this.test_pic_file;
}
public void setTest_pic_file(String test_pic_file) {
this.test_pic_file = test_pic_file;
}
public String getTest_frame_no() {
return this.test_frame_no;
}
public void setTest_frame_no(String test_frame_no) {
this.test_frame_no = test_frame_no;
}
public String getSubtask_id() {
return this.subtask_id;
}
public void setSubtask_id(String subtask_id) {
this.subtask_id = subtask_id;
}
public String getSource_type() {
return this.source_type;
}
public void setSource_type(String source_type) {
this.source_type = source_type;
}
}
package com.viontech.service.adapter;
import com.alibaba.fastjson.JSONObject;
import com.viontech.vo.VideoVo;
public interface IReceiveDataService {
Object recvData(JSONObject paramJSONObject);
Object uploadVideo(VideoVo paramVideoVo);
boolean forwardUploadVideo(VideoVo paramVideoVo);
boolean dataRetransmission(String paramString);
}
package com.viontech.service.adapter;
import com.alibaba.fastjson.JSONObject;
import com.viontech.vo.VideoVo;
public interface IRedisService {
Object recvData(JSONObject paramJSONObject);
Object uploadVideo(VideoVo paramVideoVo);
}
package com.viontech.service.adapter;
import com.alibaba.fastjson.JSONObject;
import com.viontech.vo.VideoVo;
public interface IRocketmqService {
Object recvData(JSONObject paramJSONObject);
Object uploadVideo(VideoVo paramVideoVo);
boolean forwardUploadVideo(VideoVo paramVideoVo);
boolean dataRetransmission(String paramString);
}
package com.viontech.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.viontech.service.adapter.IRedisService;
import com.viontech.util.Base64PicUtil;
import com.viontech.util.VideoUtil;
import com.viontech.vo.VideoVo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.FileSystemResource;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.*;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Service
public class RedisServiceImpl implements IRedisService {
private static final Logger log = LoggerFactory.getLogger(RedisServiceImpl.class);
@Resource
RedisTemplate redisTemplate;
@Autowired
ThreadPoolTaskExecutor dataForwardServiceExecutor;
@Autowired
ThreadPoolTaskExecutor videoForwardServiceExecutor;
@Value("${data_forward_url}")
String data_forward_url;
@Value("${consumer_crash}")
Integer consumer_crash;
@Value("${video_forward_url}")
String video_forward_url;
@Value("${video_download_url}")
String video_download_url;
@Autowired
RestTemplate restTemplate;
@Autowired
StringRedisTemplate stringRedisTemplate;
HttpHeaders headers;
HttpHeaders videoheaders;
@Value("${pic.path}")
private String picPath;
@Value("${video.path}")
private String videoPath;
private CopyOnWriteArrayList<String> ssuccessCacheFiles;
private CopyOnWriteArrayList<String> errorCacheFiles;
public RedisServiceImpl() {
this.headers = new HttpHeaders();
videoheaders = new HttpHeaders();
this.headers.setContentType(MediaType.parseMediaType("application/json;charset=UTF-8"));
videoheaders.setContentType(MediaType.parseMediaType("multipart/form-data;charset=UTF-8"));
this.ssuccessCacheFiles = new CopyOnWriteArrayList<>();
this.errorCacheFiles = new CopyOnWriteArrayList<>();
}
public Object recvData(JSONObject data) {
try {
if (StringUtils.isEmpty(this.data_forward_url)) {
return null;
}
List<Map> pics = (List<Map>) data.get("pics");
if (!CollectionUtils.isEmpty(pics)) pics.stream().forEach(pic -> {
String picpath = Base64PicUtil.save(String.valueOf(pic.get("pic_base64")), StringUtils.isEmpty(data.getJSONObject("event_data").getString("ID")) ? (System.currentTimeMillis() + "" + (Math.random() * 100.0D) + ".") : (data.getJSONObject("event_data").getString("ID") + "-" + System.currentTimeMillis() + (Math.random() * 100.0D) + "." + ((pic.get("format") == null) ? "jpg" : (String) pic.get("format"))), this.picPath);
pic.put("src_url", picpath);
pic.remove("pic_base64");
});
data.put("forward_url", this.data_forward_url);
redisTemplate.opsForList().leftPush("data_forward", data.toJSONString());
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public Object uploadVideo(VideoVo videoVo) {
try {
if (StringUtils.isEmpty(this.video_forward_url)) return null;
MultipartFile videofile = videoVo.getFile();
if (videofile == null || videofile.isEmpty()) return true;
//String fileName = videoVo.getFile().getOriginalFilename();
//String tempFilePath = System.getProperty("java.io.tmpdir") + "/" + videoVo.getRefid() + ".mp4";
//System.out.println("tempFilePath=" + tempFilePath);
//File tempFile = new File(tempFilePath);
//videofile.transferTo(tempFile);
//videoVo.setTempFile(tempFile);
//boolean upload_result = upload(videoVo);
//if (!upload_result) {
String videopath = VideoUtil.save(videofile.getInputStream(), this.videoPath, videoVo.getRefid() + ".mp4");
videoVo.setFile(null);
videoVo.setVideo_path(videopath);
if (videoVo.getTempFile() != null) videoVo.getTempFile().delete();
videoVo.setTempFile(null);
redisTemplate.opsForList().leftPush("video_forward", JSONObject.toJSONString(videoVo));
//this.redisTemplate.opsForValue().set("video_forward_" + System.currentTimeMillis() + (Math.random() * 100.0D), JSONObject.toJSONString(videoVo), 3L, TimeUnit.DAYS);
//} else {
// this.ssuccessCacheFiles.add(videoVo.getVideo_path());
// }
if (videoVo.getTempFile() != null) videoVo.getTempFile().delete();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
@Scheduled(fixedDelay = 2010L)
@Async
public void forwardUploadVideo() {
try {
Long size = redisTemplate.opsForList().size("video_forward");
log.info("当前短时录像总数量为:{}", size);
if (size == null || size <= 0) {
if (videoForwardServiceExecutor.getActiveCount() == 0) {
try {
Runtime.getRuntime().exec("rm -rf " + this.videoPath).waitFor();
} catch (Exception e) {
e.printStackTrace();
}
}
return;
}
List<String> videodatas = redisTemplate.opsForList().range("video_forward", -50, -1);
;
/*Set<String> keys = this.redisTemplate.keys("*video_forward_*");
if (CollectionUtils.isEmpty(keys)) {
this.videoSending = 0;
try {
log.info("rm -rf {}", this.videoPath);
Runtime.getRuntime().exec("rm -rf " + this.videoPath).waitFor();
} catch (Exception e) {
e.printStackTrace();
}
return;
}*/
log.debug("视频重发开始");
MultipartFile multipartFile = null;
for (Iterator<String> it = videodatas.iterator(); it.hasNext(); ) {
try {
String data = it.next();
//System.out.println("videodata:"+data);
redisTemplate.opsForList().remove("video_forward", -1, data);
videoForwardServiceExecutor.execute(() -> {
VideoVo videoVo = JSONObject.parseObject(data, VideoVo.class);
MockMultipartFile mockMultipartFile;
FileSystemResource resource;
LinkedMultiValueMap linkedMultiValueMap = null;
HttpEntity<MultiValueMap<String, Object>> formEntity;
try {
if (!new File(videoVo.getVideo_path()).exists()) {
redisTemplate.opsForList().remove("video_forward", -1, data);
return;
}
Thread.sleep(5);
mockMultipartFile = new MockMultipartFile(videoVo.getRefid() + ".mp4", VideoUtil.getInputStream(videoVo.getVideo_path()));
videoVo.setFile(mockMultipartFile);
//boolean b = upload(videoVo);
linkedMultiValueMap = new LinkedMultiValueMap();
resource = new FileSystemResource(videoVo.getVideo_path());
linkedMultiValueMap.add("refid", videoVo.getRefid());
linkedMultiValueMap.add("format", videoVo.getFormat());
linkedMultiValueMap.add("location_id", videoVo.getLocation_id());
linkedMultiValueMap.add("file", resource);
formEntity = new HttpEntity(linkedMultiValueMap, videoheaders);
ResponseEntity<String> responseEntity = null;
try {
responseEntity = restTemplate.postForEntity(video_forward_url, formEntity, String.class);
} catch (RestClientException exception) {
log.error(exception.getMessage());
}
if (responseEntity != null && (200 == responseEntity.getStatusCode().value() && "200".equals(ecode(responseEntity.getBody())))) {
log.info("视频重发地址:{};refid:{};http:{};响应信息{}", video_forward_url, videoVo.getRefid(), responseEntity.getStatusCode().value(), responseEntity.getBody());
ssuccessCacheFiles.add(videoVo.getVideo_path());
redisTemplate.opsForList().remove("video_forward", -1, data);
} else {
videoVo.setFile(null);
if (videoVo.getTempFile() != null) videoVo.getTempFile().delete();
videoVo.setTempFile(null);
redisTemplate.opsForList().rightPush("video_forward", JSONObject.toJSONString(videoVo));
}
/*if(b){
//redisTemplate.opsForList().remove("video_forward",-1,data);
ssuccessCacheFiles.add(videoVo.getVideo_path());
}else {
videoVo.setFile(null);
if (videoVo.getTempFile() != null)
videoVo.getTempFile().delete();
videoVo.setTempFile(null);
redisTemplate.opsForList().rightPush("video_forward",JSONObject.toJSONString(videoVo));
}*/
} catch (Exception e) {
e.printStackTrace();
} finally {
if (linkedMultiValueMap != null) {
linkedMultiValueMap.clear();
}
videoVo.setFile(null);
if (videoVo.getTempFile() != null) {
videoVo.getTempFile().delete();
}
videoVo.setTempFile(null);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Scheduled(fixedDelay = 2020L)
@Async
public void removeFiles() {
int num = 0;
if (!CollectionUtils.isEmpty(this.ssuccessCacheFiles)) {
Iterator<String> files = this.ssuccessCacheFiles.iterator();
while (files.hasNext() && num <= 200) {
String path = files.next();
try {
log.debug("rm -rf {}", path);
Runtime.getRuntime().exec("rm -rf " + path).waitFor();
//files.remove();
ssuccessCacheFiles.remove(path);
num++;
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
@Scheduled(fixedDelay = 2000L)
@Async
public void dataRetransmission() {
try {
while (true) {
Long size = redisTemplate.opsForList().size("data_forward");
log.info("当前分析数据总数量为:{}", size);
if (size == null || size <= 0) {
if (dataForwardServiceExecutor.getActiveCount() == 0) {
try {
Runtime.getRuntime().exec("rm -rf " + this.picPath).waitFor();
} catch (Exception e) {
e.printStackTrace();
}
}
return;
}
log.info("json数据重发开始");
for (int i = 0; i < 200; i++) {
Object item = redisTemplate.opsForList().rightPop("data_forward");
if (item == null) {
return;
}
String data = (String) item;
this.dataForwardServiceExecutor.submit(() -> {
JSONObject jSONObject = JSONObject.parseObject(data);
String forward_url = jSONObject.getString("forward_url");
String[] forwards = null;
if (!StringUtils.isEmpty(forward_url)) {
forwards = forward_url.split(";");
}
List<Map> pics;
StringBuilder failedUrl = new StringBuilder();
try {
pics = (List<Map>) jSONObject.get("pics");
RedisServiceImpl.this.putBase64ToPics(pics);
if (forwards != null && forwards.length > 0) {
String txt = jSONObject.toJSONString();
txt = txt.replace("\"{", "{").replace("}\"", "}");
for (String url : forwards) {
if (!sendData(txt, url)) {
failedUrl.append(url).append(";");
}
}
}
forward_url = failedUrl.toString();
if (StringUtils.isEmpty(forward_url)) {
if (!CollectionUtils.isEmpty(pics)) {
pics.forEach(pic -> {
try {
log.info("rm -rf {}", pic.get("src_url"));
ssuccessCacheFiles.add(String.valueOf(pic.get("src_url")));
Runtime.getRuntime().exec("rm -rf " + pic.get("src_url")).waitFor();
pic.remove("pic_base64");
} catch (Exception e) {
e.printStackTrace();
}
});
}
} else {
jSONObject.remove("pics");
if (!CollectionUtils.isEmpty(pics)) pics.forEach(pic -> {
try {
pic.remove("pic_base64");
} catch (Exception e) {
e.printStackTrace();
}
});
jSONObject.put("pics", pics);
jSONObject.put("forward_url", forward_url);
redisTemplate.opsForList().leftPush("data_forward", JSON.toJSON(jSONObject).toString());
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
while (dataForwardServiceExecutor.getActiveCount() > 0) {
Thread.sleep(1000L);
}
}
} catch (Exception e) {
log.error("数据重发异常:{}", e.getLocalizedMessage(), e);
}
}
// @Scheduled(fixedDelay = 2000L)
// public void dataRetransmission() {
// try {
// if (1 == this.dataSending)
// return;
// this.dataSending = 1;
// Set<String> keys = this.redisTemplate.keys("*data_forward_*");
// if (CollectionUtils.isEmpty(keys)) {
// this.dataSending = 0;
// if (CollectionUtils.isEmpty(this.errorCacheFiles))
// try {
// log.info("rm -rf {}", this.picPath);
// Runtime.getRuntime().exec("rm -rf " + this.picPath).waitFor();
// } catch (Exception e) {
// e.printStackTrace();
// }
// return;
// }
// //log.info(");
// Map.Entry<String, String> item = null;
// int num = 0;
// for (Iterator<String> it = keys.iterator(); it.hasNext(); ) {
// final String _key = it.next();
// this.dataForwardServiceExecutor.execute(new Runnable() {
// public void run() {
// String data = null;
// String key = "";
// JSONObject jSONObject = null;
// String forward_url = null;
// String[] forwards = null;
// List<Map> pics = null;
// StringBuffer sb = null;
// try {
// key = _key;
// data = (String)RedisServiceImpl.this.redisTemplate.opsForValue().get(key);
// if (RedisServiceImpl.this.redisTemplate.hasKey(key).booleanValue())
// RedisServiceImpl.this.redisTemplate.delete(key);
// if (StringUtils.isEmpty(data)) {
// data = null;
// key = "";
// return;
// }
// data = data.replace("\"{", "{").replace("}\"", "}");
// if (StringUtils.isEmpty(data)) {
// data = null;
// key = "";
// return;
// }
// jSONObject = JSONObject.parseObject(data);
// forward_url = jSONObject.getString("forward_url");
// if (!StringUtils.isEmpty(forward_url))
// forwards = forward_url.split(";");
// pics = (List<Map>)jSONObject.get("pics");
// RedisServiceImpl.this.putBase64ToPics(pics);
// if (forwards != null && forwards.length > 0) {
// sb = new StringBuffer();
// String txt = jSONObject.toJSONString();
// txt = txt.replace("\"{", "{").replace("}\"", "}");
// for (String url : forwards) {
// if (!RedisServiceImpl.this.sendData(txt, url))
// sb.append(url).append(";");
// }
// }
// if (sb == null || StringUtils.isEmpty(sb.toString())) {
// if (!CollectionUtils.isEmpty(pics))
// pics.stream().forEach(pic -> {
// try {
// RedisServiceImpl.log.info("rm -rf {}", pic.get("src_url"));
// RedisServiceImpl.this.errorCacheFiles.remove(String.valueOf(pic.get("src_url")));
// RedisServiceImpl.this.ssuccessCacheFiles.add(String.valueOf(pic.get("src_url")));
// Runtime.getRuntime().exec("rm -rf " + pic.get("src_url")).waitFor();
// pic.remove("pic_base64");
// } catch (Exception e) {
// e.printStackTrace();
// }
// });
// } else if (!StringUtils.isEmpty(sb.toString())) {
// forward_url = sb.toString();
// jSONObject.remove("pics");
// if (!CollectionUtils.isEmpty(pics))
// pics.stream().forEach(pic -> {
// try {
// pic.remove("pic_base64");
// } catch (Exception e) {
// e.printStackTrace();
// }
// });
// jSONObject.put("pics", pics);
// jSONObject.put("forward_url", forward_url);
// RedisServiceImpl.this.redisTemplate.opsForValue().set("data_forward_" + System.currentTimeMillis() + (Math.random() * 100.0D), JSON.toJSON(jSONObject).toString(), 3L, TimeUnit.DAYS);
// }
// } catch (Exception e) {
// e.printStackTrace();
// } finally {
// data = null;
// key = "";
// jSONObject = null;
// forward_url = null;
// forwards = null;
// pics = null;
// sb = null;
// }
// }
// });
// it.remove();
// if (num > 500)
// break;
// num++;
// }
// } catch (Exception e) {
// log.error("{}", e.getLocalizedMessage(), e);
// } finally {
// this.dataSending = 0;
// }
// }
private boolean sendData(String msg, String url) {
try {
HttpEntity<String> requestEntity = new HttpEntity(msg, (MultiValueMap) this.headers);
ResponseEntity<String> responseEntity = this.restTemplate.exchange(url, HttpMethod.POST, requestEntity, String.class, new Object[0]);
log.info("数据发送url{};http{};{}", new Object[]{url, Integer.valueOf(responseEntity.getStatusCode().value()), responseEntity.getBody()});
if (200 == responseEntity.getStatusCode().value() && "200".equals(ecode((String) responseEntity.getBody())))
return true;
return false;
} catch (Exception e) {
log.error("重发异常:{};{}", new Object[]{url, e.getLocalizedMessage(), e});
return false;
}
}
private void putBase64ToPics(List<Map> pics) {
if (!CollectionUtils.isEmpty(pics)) pics.stream().forEach(pic -> {
String pic_base64 = null;
try {
pic_base64 = Base64PicUtil.downlodPicForBase64(String.valueOf(pic.get("src_url")));
pic.put("pic_base64", pic_base64);
} catch (IOException e) {
e.printStackTrace();
}
pic_base64 = null;
});
}
private boolean upload(VideoVo videoVo) {
//File tempFile = null;
try {
/*MultipartFile videofile = videoVo.getFile();
if (videofile == null || videofile.isEmpty())
return true;
String fileName = videoVo.getFile().getOriginalFilename();
String tempFilePath = System.getProperty("java.io.tmpdir") + "/" + videoVo.getRefid() + ".mp4";
System.out.println("tempFilePath=" + tempFilePath);
tempFile = new File(tempFilePath);
videofile.transferTo(tempFile);
videoVo.setTempFile(tempFile);*/
//RestTemplate restTemplate = new RestTemplate();
LinkedMultiValueMap linkedMultiValueMap = new LinkedMultiValueMap();
FileSystemResource resource = new FileSystemResource(videoVo.getVideo_path());
linkedMultiValueMap.add("refid", videoVo.getRefid());
linkedMultiValueMap.add("format", videoVo.getFormat());
linkedMultiValueMap.add("location_id", videoVo.getLocation_id());
linkedMultiValueMap.add("file", resource);
HttpEntity<MultiValueMap<String, Object>> formEntity = new HttpEntity(linkedMultiValueMap, (MultiValueMap) videoheaders);
ResponseEntity<String> responseEntity = restTemplate.postForEntity(this.video_forward_url, formEntity, String.class, new Object[0]);
log.info("视频重发地址:{};refid:{};http:{};响应信息{}", new Object[]{this.video_forward_url, videoVo.getRefid(), Integer.valueOf(responseEntity.getStatusCode().value()), responseEntity.getBody()});
if (200 == responseEntity.getStatusCode().value() && "200".equals(ecode(responseEntity.getBody()))) {
//System.out.println("return true");
return true;
}
return false;
} catch (Exception e) {
log.error("异常{};{}", new Object[]{this.video_forward_url, e.getLocalizedMessage()}, e);
return false;
}
}
private String ecode(String result) {
Pattern pattern = Pattern.compile("\"code\":[ ]*\"[0-9a-zA-Z_]+\"");
Matcher matcher = pattern.matcher(result);
if (matcher.find()) {
String code = matcher.group().replace("\"", "").split(":")[1];
return code.trim();
}
pattern = Pattern.compile("\"code\":[ ]*[0-9a-zA-Z_]+");
matcher = pattern.matcher(result);
if (matcher.find()) {
String code = matcher.group().replace("\"", "").split(":")[1];
return code.trim();
}
pattern = Pattern.compile("\"ecode\":[ ]*\"[0-9a-zA-Z_]+\"");
matcher = pattern.matcher(result);
if (matcher.find()) {
String code = matcher.group().replace("\"", "").split(":")[1];
return code.trim();
}
pattern = Pattern.compile("\"ecode\":[ ]*[0-9a-zA-Z_]+");
matcher = pattern.matcher(result);
if (matcher.find()) {
String code = matcher.group().replace("\"", "").split(":")[1];
return code.trim();
}
return "";
}
}
package com.viontech.service.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.util.ErrorHandler;
@Service
public class VionActiveErrorHandler implements ErrorHandler {
private static final Logger log = LoggerFactory.getLogger(VionActiveErrorHandler.class);
public void handleError(Throwable throwable) {
log.error("Error in listener", throwable);
}
}
package com.viontech.util;
import org.apache.commons.codec.binary.Base64;
import sun.misc.BASE64Decoder;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class Base64PicUtil {
public static SimpleDateFormat FORMAT_YYYY_MM_DD = new SimpleDateFormat("yyyy-MM-dd");
public static String uplodPicByBase64(String base64, String fineName, String filePath) {
filePath = filePath + "/" + FORMAT_YYYY_MM_DD.format(new Date());
File file = new File(filePath);
if (!file.exists())
file.mkdirs();
try {
filePath = filePath + "/" + fineName;
byte[] buffer = (new BASE64Decoder()).decodeBuffer(base64);
FileOutputStream out = new FileOutputStream(filePath);
out.write(buffer);
out.close();
return filePath;
} catch (IOException e) {
e.printStackTrace();
return "";
}
}
public static String downlodPicForBase64(String filePath) throws IOException {
File file = new File(filePath);
FileInputStream inputFile = new FileInputStream(file);
byte[] buffer = new byte[(int) file.length()];
inputFile.read(buffer);
inputFile.close();
return Base64.encodeBase64String(buffer);
}
public static String save(String base64, String fileName, String picPath) {
picPath = picPath + "/" + FORMAT_YYYY_MM_DD.format(new Date());
String selfPath = picPath + "/" + fileName;
byte[] base64byte = Base64.decodeBase64(base64);
File file = new File(selfPath);
file.getParentFile().mkdirs();
try {
if (!file.exists()) {
file.createNewFile();
}
} catch (Exception e) {
e.printStackTrace();
}
try (FileOutputStream out = new FileOutputStream(file)) {
out.write(base64byte, 0, base64byte.length);
out.flush();
} catch (IOException e) {
e.printStackTrace();
}
return selfPath;
}
public static boolean makerDir(String filePath) {
try {
File dirFile = new File(filePath);
if (!dirFile.exists()) {
dirFile.setWritable(true, false);
dirFile.mkdirs();
}
File fp = new File(filePath);
dirFile = null;
if (fp.exists()) {
fp = null;
return true;
}
fp = null;
return false;
} catch (Exception e) {
return false;
}
}
}
package com.viontech.util;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SendDataByHttpUtil {
public static String ecode(String result) {
Pattern pattern = Pattern.compile("\"code\":[ ]*\"[0-9a-zA-Z_]+\"");
Matcher matcher = pattern.matcher(result);
if (matcher.find()) {
System.out.println(" matcher.group().replace(\"\\\"\", \"\")=" + matcher.group().replace("\"", ""));
String code = matcher.group().replace("\"", "").split(":")[1];
System.out.println("code=" + code);
return code.trim();
}
pattern = Pattern.compile("\"code\":[ ]*[0-9a-zA-Z_]+");
matcher = pattern.matcher(result);
if (matcher.find()) {
System.out.println(" matcher.group().replace(\"\\\"\", \"\")=" + matcher.group().replace("\"", ""));
String code = matcher.group().replace("\"", "").split(":")[1];
System.out.println(("200" == code));
return code.trim();
}
pattern = Pattern.compile("\"ecode\":[ ]*\"[0-9a-zA-Z_]+\"");
matcher = pattern.matcher(result);
if (matcher.find()) {
System.out.println(" matcher.group().replace(\"\\\"\", \"\")=" + matcher.group().replace("\"", ""));
String code = matcher.group().replace("\"", "").split(":")[1];
System.out.println("ecode=" + code);
return code.trim();
}
return "";
}
}
package com.viontech.util;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.springframework.web.multipart.MultipartFile;
public class VideoUtil {
public static SimpleDateFormat FORMAT_YYYY_MM_DD = new SimpleDateFormat("yyyy-MM-dd");
public static String saveVideo(MultipartFile videofile, String videoPath, String videoName) throws IOException {
if (videofile == null || videofile.isEmpty())
return null;
videoPath = videoPath + "/" + FORMAT_YYYY_MM_DD.format(new Date());
File videoFile = null;
String fileName = videofile.getOriginalFilename();
String videoFilePath = videoPath + "/" + videoName;
makerDir(videoPath);
System.out.println("videoFilePath=" + videoFilePath);
InputStream is = videofile.getInputStream();
videoFile = new File(videoFilePath);
videofile.transferTo(videoFile);
return videoFilePath;
}
public static String save(InputStream videofile, String videoPath, String videoName) {
if (videofile == null) {
System.out.println("videofile == null");
return null;
}
videoPath = videoPath + "/" + FORMAT_YYYY_MM_DD.format(new Date());
String videoFilePath = videoPath + "/" + videoName;
makerDir(videoPath);
FileOutputStream fos = null;
FileChannel inChannel = null;
FileChannel outChannel = null;
try {
fos = new FileOutputStream(videoFilePath);
FileInputStream fis = (FileInputStream)videofile;
inChannel = fis.getChannel();
outChannel = fos.getChannel();
inChannel.transferTo(0L, inChannel.size(), outChannel);
inChannel.close();
outChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
return videoFilePath;
}
public static byte[] getBety(String filePath) throws IOException {
File file = new File(filePath);
FileInputStream inputFile = new FileInputStream(file);
byte[] buffer = new byte[(int)file.length()];
inputFile.read(buffer);
inputFile.close();
return buffer;
}
public static InputStream getInputStream(String filePath) throws IOException {
InputStream inputStream = new ByteArrayInputStream(getBety(filePath));
return inputStream;
}
public static boolean makerDir(String filePath) {
try {
File dirFile = new File(filePath);
if (!dirFile.exists()) {
dirFile.setWritable(true, false);
dirFile.mkdirs();
}
File fp = new File(filePath);
dirFile = null;
if (fp.exists()) {
fp = null;
return true;
}
fp = null;
return false;
} catch (Exception e) {
return false;
}
}
}
package com.viontech.vo;
import java.io.File;
import org.springframework.web.multipart.MultipartFile;
public class VideoVo {
private String refid;
private String format;
private MultipartFile file;
private String pic_refid;
private String location_id;
private String video_path;
private File tempFile;
public String getRefid() {
return this.refid;
}
public void setRefid(String refid) {
this.refid = refid;
}
public String getFormat() {
return this.format;
}
public void setFormat(String format) {
this.format = format;
}
public MultipartFile getFile() {
return this.file;
}
public void setFile(MultipartFile file) {
this.file = file;
}
public String getPic_refid() {
return this.pic_refid;
}
public void setPic_refid(String pic_refid) {
this.pic_refid = pic_refid;
}
public String getLocation_id() {
return this.location_id;
}
public void setLocation_id(String location_id) {
this.location_id = location_id;
}
public String getVideo_path() {
return this.video_path;
}
public void setVideo_path(String video_path) {
this.video_path = video_path;
}
public File getTempFile() {
return this.tempFile;
}
public void setTempFile(File tempFile) {
this.tempFile = tempFile;
}
}
# server
server.port=8080
#redis
# REDIS (RedisProperties)
# Redis
spring.redis.database=8
# Redis
spring.redis.host=192.168.9.233
# Redis
spring.redis.port=6379
# Redis
spring.redis.password=3c61f2e4c4d1877ef9d01319c3a0fccaeabb1518
#
spring.redis.jedis.pool.max-active=8
#
spring.redis.lettuce.pool.max-wait=-1
#
spring.redis.lettuce.pool.max-idle=8
#
spring.redis.lettuce.pool.min-idle=0
#
spring.redis.timeout=5000
#
data_forward_url=http://192.168.9.233:21182/api/v1/recv/events
#
consumer_crash=
#
video_forward_url=http://192.168.9.233:21182/api/v1/web/videos/upload
#
video_download_url=http://192.168.9.233:20080/api/v1/web/videos/%s
#
pic.path=/data/retransmission/pics
#
video.path=/data/retransmission/videos
#
spring.servlet.multipart.maxFileSize=500MB
spring.servlet.multipart.maxRequestSize=500MB
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!