Commit ed354d01 by xmh

1. <refactor> 代码优化

2. <feat> 视频源来自视频云平台的任务下发时逻辑调整
1 parent 640ab8c2
......@@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
......@@ -98,7 +97,7 @@ public class AuthorizationFilter implements GlobalFilter {
response.setStatusCode(HttpStatus.UNAUTHORIZED);
return response.writeWith(Mono.just(msg).map(x -> {
JsonMessageUtil.JsonMessage errorJsonMsg = JsonMessageUtil.getErrorJsonMsg(msg);
JsonMessageUtil.JsonMessage errorJsonMsg = JsonMessageUtil.getErrorJsonMsg(401, "authorization校验失败");
String s = JSON.toJSONString(errorJsonMsg);
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
return response.bufferFactory().wrap(bytes);
......
......@@ -143,9 +143,9 @@ public class ChannelController extends ChannelBaseController {
return JsonMessageUtil.getSuccessJsonMsg(result);
}
@GetMapping("/sip28181/pull")
public Object pullFromSip28181() {
channelService.pullFromSip28181();
@GetMapping("/videoCloud/pull")
public Object pullFromVideoCloud() {
channelService.pullFromVideoCloud();
return JsonMessageUtil.getSuccessJsonMsg();
}
......
......@@ -18,5 +18,5 @@ public interface ChannelService extends BaseService<Channel> {
ChannelVo update(long id, ChannelVo vo);
Object pullFromSip28181();
Object pullFromVideoCloud();
}
\ No newline at end of file
......@@ -13,7 +13,7 @@ public interface DictCodeService extends BaseService<DictCode> {
ImmutablePair<Map<Long, DictCodeVo>, List<DictCodeVo>> getTreeCode(List<DictCode> dictCodes);
DictCode getOrCreateOrgCode(String code);
DictCode getOrCreateOrgCode(String code,String name);
DictCode saveAndGet(String name, String code, String note, Long cateId, Long parentId);
}
\ No newline at end of file
......@@ -218,7 +218,7 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
*/
@Override
public JSONObject nvs3000(String nvsUrl, String nvsRegex) {
DictCode nvs3000Code = dictcodeService.getOrCreateOrgCode("nvs3000");
DictCode nvs3000Code = dictcodeService.getOrCreateOrgCode("nvs3000", "nvs3000");
String addressUnid = nvs3000Code.getUnid();
ChannelExample channelExample = new ChannelExample();
......@@ -281,10 +281,10 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
}
@Override
public Object pullFromSip28181() {
Assert.notNull(vionConfig.getSip(), "sip28181 配置为空");
Assert.hasText(vionConfig.getSip().getId(), "sip28181 对接id为空");
Assert.hasText(vionConfig.getSip().getUrl(), "sip28181 对接地址为空");
public Object pullFromVideoCloud() {
Assert.notNull(vionConfig.getSip(), "视频云配置为空");
Assert.hasText(vionConfig.getSip().getId(), "视频云对接id为空");
Assert.hasText(vionConfig.getSip().getUrl(), "视频云对接地址为空");
JSONObject response = WebClient.create(vionConfig.getSip().getUrl())
.get()
......@@ -293,25 +293,25 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
.bodyToMono(JSONObject.class)
.block(Duration.ofSeconds(10));
DictCode sip28181 = dictcodeService.getOrCreateOrgCode("sip28181");
DictCode sip28181 = dictcodeService.getOrCreateOrgCode("video_cloud", "视频云");
if (response != null && response.containsKey("data")) {
JSONArray data = response.getJSONArray("data");
if (data.size() > 0) {
analyseSip28181(data, sip28181, sip28181.getCateId());
analyseVideoCloud(data, sip28181, sip28181.getCateId());
}
}
return null;
}
@Transactional(rollbackFor = Exception.class)
protected void analyseSip28181(JSONArray data, DictCode parent, Long cateId) {
protected void analyseVideoCloud(JSONArray data, DictCode parent, Long cateId) {
for (int i = 0; i < data.size(); i++) {
JSONObject item = data.getJSONObject(i);
JSONArray orzs = item.getJSONArray("orzs");
// orzs 的长度大于零, 说明是组织机构节点,需要添加到字典表中
if (orzs.size() > 0) {
DictCode dictCode = dictcodeService.saveAndGet(item.getString("orzName"), item.getString("orzId"), "sip28181", cateId, parent.getId());
analyseSip28181(orzs, dictCode, cateId);
DictCode dictCode = dictcodeService.saveAndGet(item.getString("orzName"), item.getString("orzId"), "video_cloud", cateId, parent.getId());
analyseVideoCloud(orzs, dictCode, cateId);
} else {
// 解析视频资源,devices->channels
JSONArray devices = item.getJSONArray("devices");
......
......@@ -55,12 +55,12 @@ public class DictCodeServiceImpl extends BaseServiceImpl<DictCode> implements Di
}
@Override
public DictCode getOrCreateOrgCode(String code) {
public DictCode getOrCreateOrgCode(String code, String name) {
DictCate videoOrgCate = dictCateService.getVideoOrgCate();
Long cateId = videoOrgCate.getId();
DictCodeExample dictCodeExample = new DictCodeExample();
dictCodeExample.createCriteria().andCateIdEqualTo(cateId);
dictCodeExample.createCriteria().andCateIdEqualTo(cateId).andCodeEqualTo(code);
List<DictCode> dictCodes = selectByExample(dictCodeExample);
DictCode orgCode = null;
for (DictCode dictCode : dictCodes) {
......@@ -72,12 +72,12 @@ public class DictCodeServiceImpl extends BaseServiceImpl<DictCode> implements Di
if (orgCode == null) {
orgCode = new DictCode();
orgCode.setCode(code);
orgCode.setName(code);
orgCode.setName(name);
orgCode.setCateId(cateId);
orgCode.setNote(code);
orgCode.setNote("");
insertSelective(orgCode);
return getOrCreateOrgCode(code);
return getOrCreateOrgCode(code, name);
}
return orgCode;
}
......
......@@ -27,6 +27,7 @@ public class VATask {
private Integer alg_type;
private String store_config;
private String channel_unid;
private String device_unid;
private String stream_path;
private Integer stream_type;
private Float resource_use;
......
......@@ -131,6 +131,7 @@ public class TaskRunner {
RLock jobLock = redisService.getLockMust("lock:taskRunner");
try {
RScoredSortedSet<String> set = redisService.getToBeTerminatedTaskUnidSet();
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);
......@@ -146,6 +147,8 @@ public class TaskRunner {
boolean success = vaServerService.terminateTask(taskUnid);
if (success) {
set.remove(taskUnid);
// 防止任务持续无法运行导致超过运行时段
toBeExecutedTaskUnidSet.remove(taskUnid);
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.PAUSE.val);
// 随机任务不进行部署
if (taskData.getTask().getRuntimeType() != 3) {
......
......@@ -2,6 +2,9 @@ package com.viontech.fanxing.task.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ecwid.consul.v1.ConsulClient;
import com.viontech.fanxing.commons.constant.ChannelType;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.vaserver.VATask;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
......@@ -12,6 +15,7 @@ import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.time.Duration;
/**
......@@ -25,11 +29,22 @@ import java.time.Duration;
@Slf4j
public class VAServerHttpService {
@Resource
private OpsClientService opsClientService;
@Resource
private ConsulClient consulClient;
/**
* 下发任务
*/
public JSONObject addTask(TaskData taskData, VaServerInfo vaServerInfo) {
VATask vaTask = new VATask(taskData);
if (vaTask.getStream_type().equals(ChannelType.STREAM_SIP.value)) {
Channel channel = opsClientService.getChannelByChannelUnid(vaTask.getChannel_unid());
String deviceUnid = channel.getDeviceUnid();
vaTask.setDevice_unid(deviceUnid);
}
String path = "/api/vaserver/v1/task";
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
.post()
......@@ -48,7 +63,11 @@ public class VAServerHttpService {
public JSONObject updateTask(TaskData taskData, VaServerInfo vaServerInfo) {
String path = "/api/vaserver/v1/task";
VATask vaTask = new VATask(taskData);
if (vaTask.getStream_type().equals(ChannelType.STREAM_SIP.value)) {
Channel channel = opsClientService.getChannelByChannelUnid(vaTask.getChannel_unid());
String deviceUnid = channel.getDeviceUnid();
vaTask.setDevice_unid(deviceUnid);
}
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
.put()
.uri(uriBuilder -> uriBuilder.path(path).build())
......
......@@ -33,6 +33,7 @@ import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
......@@ -97,8 +98,9 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
Channel channel = opsClientService.getChannelByChannelUnid(channelUnid);
String streamPath = task.getStreamPath();
if (streamPath == null) {
streamPath = channel.getStreamPath();
streamPath = Optional.ofNullable(channel.getStreamPath()).orElse("");
task.setStreamPath(channel.getStreamPath());
task.setStreamType(channel.getStreamType());
}
JSONArray sceneArray = new JSONArray();
String algType = task.getAlgType();
......
......@@ -31,11 +31,13 @@ server {
location /api/video-server/ {
rewrite ^/api/video-server/(.*) /$1 break;
add_header Access-Control-Allow-Methods '*';
add_header Access-Control-Allow-Headers 'Origin, X-Requested-With, Content-Type, Accept, Authorization';
proxy_pass http://192.168.9.233:10350/;
}
location /api/sip28181/ {
rewrite ^/api/sip28181/(.*) /$1 break;
location /api/video-cloud/ {
rewrite ^/api/video-cloud/(.*) /$1 break;
proxy_pass http://192.168.9.233:8888/;
}
}
\ No newline at end of file
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!