Commit c142d2fc by xmh

<feat> 推送优化,视频推送

1 parent e3f76479
package com.viontech.fanxing.commons.utils;
/**
* .
*
* @author 谢明辉
* @date 2022/1/20
*/
public class Utils {
public static Long cast2Long(Object item) {
if (!(item instanceof Number)) {
throw new RuntimeException(item.toString() + "不是数字类型");
}
if (item instanceof Long) {
return (Long) item;
} else if (item instanceof Integer) {
return ((Integer) item).longValue();
}
return 0L;
}
}
...@@ -10,12 +10,14 @@ import lombok.extern.slf4j.Slf4j; ...@@ -10,12 +10,14 @@ import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.ItemWriter;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import java.time.Duration; import java.time.Duration;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
/** /**
* . * .
...@@ -60,18 +62,29 @@ public class ForwardWriter implements ItemWriter<ForwardContent> { ...@@ -60,18 +62,29 @@ public class ForwardWriter implements ItemWriter<ForwardContent> {
Mono<String> response = WebClientUtils.buildClient(null) Mono<String> response = WebClientUtils.buildClient(null)
.post() .post()
.uri(forward.getUrl()) .uri(forward.getUrl())
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(json) .bodyValue(json)
.retrieve() .retrieve()
.bodyToMono(String.class); .bodyToMono(String.class);
String block = response.block(Duration.ofSeconds(20)); String block = response.block(Duration.ofSeconds(20));
} catch (Exception e) { } catch (Exception e) {
failed += 1; failed += 1;
if (failed == 3) {
log.error("发送失败,对接名称[{}],url:[{}],error:[{}]", forward.getName(), forward.getUrl(), e.getMessage());
}
} }
RMap<String, Object> map = redissonClient.getMap(RedisKeys.getForwardResultMap(forward.getId())); RMap<String, Object> map = redissonClient.getMap(RedisKeys.getForwardResultMap(forward.getId()));
if (!map.isExists()) {
map.put("id", forward.getId());
map.addAndGet("total", forward.getTotal());
map.addAndGet("failed", forward.getFailed());
map.expire(10, TimeUnit.DAYS);
}
if (failed == 0) { if (failed == 0) {
map.put("id", forward.getId()); map.put("id", forward.getId());
map.addAndGet("total", 1); map.addAndGet("total", 1);
map.put("lastSendTime", new Date()); map.put("lastSendTime", new Date());
map.expire(10, TimeUnit.DAYS);
} else if (failed < 3) { } else if (failed < 3) {
ForwardApp.THREAD_POOL_EXECUTOR.submit(this); ForwardApp.THREAD_POOL_EXECUTOR.submit(this);
} else { } else {
...@@ -80,6 +93,7 @@ public class ForwardWriter implements ItemWriter<ForwardContent> { ...@@ -80,6 +93,7 @@ public class ForwardWriter implements ItemWriter<ForwardContent> {
map.addAndGet("total", 1); map.addAndGet("total", 1);
map.addAndGet("failed", 1); map.addAndGet("failed", 1);
map.put("lastSendTime", new Date()); map.put("lastSendTime", new Date());
map.expire(10, TimeUnit.DAYS);
} }
} }
} }
......
...@@ -4,19 +4,30 @@ import com.alibaba.fastjson.JSON; ...@@ -4,19 +4,30 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.config.VionConfig; import com.viontech.fanxing.commons.config.VionConfig;
import com.viontech.fanxing.commons.constant.RedisKeys; import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.model.Forward;
import com.viontech.fanxing.commons.utils.WebClientUtils;
import com.viontech.fanxing.forward.ForwardApp;
import com.viontech.fanxing.forward.model.VideoResult; import com.viontech.fanxing.forward.model.VideoResult;
import com.viontech.fanxing.forward.util.CacheUtils;
import com.viontech.fanxing.forward.util.PicUtils; import com.viontech.fanxing.forward.util.PicUtils;
import com.viontech.keliu.util.JsonMessageUtil; import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.redisson.api.RBlockingDeque; import org.redisson.api.RBlockingDeque;
import org.redisson.api.RedissonClient; import org.redisson.api.RedissonClient;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.MediaType;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.BodyInserters;
import reactor.core.publisher.Mono;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.File; import java.io.File;
import java.time.Duration;
import java.util.Date; import java.util.Date;
/** /**
...@@ -35,6 +46,8 @@ public class DataReceiveController { ...@@ -35,6 +46,8 @@ public class DataReceiveController {
private VionConfig vionConfig; private VionConfig vionConfig;
@Resource @Resource
private PicUtils picUtils; private PicUtils picUtils;
@Resource
private CacheUtils cacheUtils;
@PostMapping("/result") @PostMapping("/result")
public Object result(@RequestBody String analysisResultStr) { public Object result(@RequestBody String analysisResultStr) {
...@@ -79,6 +92,32 @@ public class DataReceiveController { ...@@ -79,6 +92,32 @@ public class DataReceiveController {
} }
FileUtils.copyToFile(videoResult.getFile().getInputStream(), file); FileUtils.copyToFile(videoResult.getFile().getInputStream(), file);
videoResult.setFile(null); videoResult.setFile(null);
for (Forward forward : cacheUtils.getAllForward()) {
if (forward.getStatus() == 1) {
ForwardApp.THREAD_POOL_EXECUTOR.submit(() -> {
try {
MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
param.add("refid", videoResult.getRefid());
param.add("format", videoResult.getFormat());
param.add("file", new FileSystemResource(file));
Mono<String> response = WebClientUtils.buildClient(null)
.post()
.uri(forward.getUrl())
.contentType(MediaType.MULTIPART_FORM_DATA)
.contentLength(file.length())
.body(BodyInserters.fromMultipartData(param))
.retrieve()
.bodyToMono(String.class);
String block = response.block(Duration.ofSeconds(10));
log.info("发送视频文件完成,结果:{}", block);
} catch (Exception e) {
log.error("发送视频文件失败", e);
}
});
}
}
return videoResult; return videoResult;
} }
} }
...@@ -2,6 +2,7 @@ package com.viontech.fanxing.forward.runner; ...@@ -2,6 +2,7 @@ package com.viontech.fanxing.forward.runner;
import com.viontech.fanxing.commons.feign.OpsClient; import com.viontech.fanxing.commons.feign.OpsClient;
import com.viontech.fanxing.commons.model.Forward; import com.viontech.fanxing.commons.model.Forward;
import com.viontech.fanxing.commons.utils.Utils;
import com.viontech.keliu.util.JsonMessageUtil; import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RKeys; import org.redisson.api.RKeys;
...@@ -46,12 +47,12 @@ public class ForwardResultPersistenceRunner { ...@@ -46,12 +47,12 @@ public class ForwardResultPersistenceRunner {
Forward forward = new Forward(); Forward forward = new Forward();
RMap<String, Object> map = redissonClient.getMap(forwardResultKey); RMap<String, Object> map = redissonClient.getMap(forwardResultKey);
Object total = map.get("total"); Object total = map.get("total");
forward.setTotal((Long) total); forward.setTotal(Utils.cast2Long(total));
Object lastSendTime = map.get("lastSendTime"); Object lastSendTime = map.get("lastSendTime");
forward.setLastSendTime((Date) lastSendTime); forward.setLastSendTime((Date) lastSendTime);
Object failed = map.get("failed"); Object failed = map.get("failed");
forward.setFailed((Long) failed); forward.setFailed(Utils.cast2Long(failed));
Long id = (Long) map.get("id"); Long id = Utils.cast2Long(map.get("id"));
JsonMessageUtil.JsonMessage<Forward> forwardJsonMessage = opsClient.updateForwardById(id, forward); JsonMessageUtil.JsonMessage<Forward> forwardJsonMessage = opsClient.updateForwardById(id, forward);
} }
} catch (Exception e) { } catch (Exception e) {
......
...@@ -48,7 +48,7 @@ public class CacheUtils { ...@@ -48,7 +48,7 @@ public class CacheUtils {
} }
} }
@LocalCache(value = "forward_list", duration = 5) @LocalCache(value = "forward_list", duration = 30, timeunit = TimeUnit.SECONDS)
public synchronized List<Forward> getAllForward() { public synchronized List<Forward> getAllForward() {
JsonMessageUtil.JsonMessage<List<Forward>> response = null; JsonMessageUtil.JsonMessage<List<Forward>> response = null;
try { try {
......
...@@ -136,7 +136,6 @@ public class TaskDataService { ...@@ -136,7 +136,6 @@ public class TaskDataService {
} else if (vaServerInfo.getStatus() == 0) { } else if (vaServerInfo.getStatus() == 0) {
throw new FanXingException("设备离线"); throw new FanXingException("设备离线");
} else { } else {
TaskData taskData = buildTaskData(task); TaskData taskData = buildTaskData(task);
// 需要更新taskData,并且向vaServer更新任务信息 // 需要更新taskData,并且向vaServer更新任务信息
taskDataRedisRepository.addOrUpdateTaskData(taskData); taskDataRedisRepository.addOrUpdateTaskData(taskData);
......
...@@ -54,8 +54,7 @@ public enum TaskUtils { ...@@ -54,8 +54,7 @@ public enum TaskUtils {
return (!original.getRuntimeConf().equals(present.getRuntimeConf())) return (!original.getRuntimeConf().equals(present.getRuntimeConf()))
|| (!original.getStoreConfigId().equals(present.getStoreConfigId())) || (!original.getStoreConfigId().equals(present.getStoreConfigId()))
|| (!original.getResourceNeed().equals(present.getResourceNeed())) || (!original.getResourceNeed().equals(present.getResourceNeed()))
|| (!original.getVaType().equals(present.getVaType())) || (!original.getVaType().equals(present.getVaType()));
|| (!original.getScene().equals(present.getScene()));
} }
public void checkRuntimeConf(TaskData taskData, VAServerService vaServerService, TaskDataService taskDataService) { public void checkRuntimeConf(TaskData taskData, VAServerService vaServerService, TaskDataService taskDataService) {
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!