Commit bc41bdbc by xmh

<feat> OSD 改为从 osd-server获取

<feat> VaServer 注册时添加容器名
<feat> 更新视频资源时检查文件名和设备编号
<fix> 视频云返回的数据结构变更,修改解析逻辑
<feat> VaServer 升级完成初始部分
<feat> 上传视频文件,通过文件头判断文件类型
<feat> 数据概览添加有效分析时长和异常次数
<fix> 优化任务更新逻辑
1 parent 20401a58
Showing 24 changed files with 365 additions and 67 deletions
package com.viontech.fanxing.commons.feign;
import com.alibaba.fastjson.JSONObject;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
/**
* .
*
* @author 谢明辉
* @date 2021/12/11
*/
@FeignClient(value = "osd-server")
public interface OSDConfigClient {
@GetMapping("/storageConfigs")
JSONObject listAll(@RequestParam Integer type);
@GetMapping("/storageConfigs/{id}")
JSONObject getById(@PathVariable("id") Long id);
}
package com.viontech.fanxing.commons.feign;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.model.*;
import com.viontech.fanxing.commons.model.main.ImageKeepConfig;
import com.viontech.fanxing.commons.vo.LogVo;
......@@ -25,7 +26,7 @@ public interface OpsClient {
* 获取存储配置
*/
@GetMapping("/storeConfigs/{id}")
JsonMessageUtil.JsonMessage<StoreConfig> getStoreConfigById(@PathVariable("id") Long storeConfigId);
JSONObject getStoreConfigById(@PathVariable("id") Long storeConfigId);
/**
* 获取文本内容
......
package com.viontech.fanxing.commons.feign;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.keliu.util.JsonMessageUtil;
import org.springframework.cloud.openfeign.FeignClient;
......@@ -20,13 +22,31 @@ import java.util.List;
@Service
public interface TaskFeignClient {
/**
* 获取所有任务
*/
@GetMapping("/tasks")
JsonMessageUtil.JsonMessage<List<Task>> getAllTask();
/**
* 通过通道号获取任务
*/
@GetMapping("/tasks")
JsonMessageUtil.JsonMessage<List<Task>> getTaskByChannelUnid(@RequestParam("channelUnid") String channelUnid);
/**
* 更新任务
*/
@PostMapping("/tasks/{id}")
JsonMessageUtil.JsonMessage updateById(@PathVariable("id") Long id, @RequestBody TaskVo taskVo);
/**
* 获取全部的VaServer信息
*/
@GetMapping("/vaServerInfo")
JsonMessageUtil.JsonMessage<List<VaServerInfo>> vaServerInfo();
@GetMapping("/status")
JSONObject getVaServerStatusByDevId(@RequestParam String devId);
}
package com.viontech.fanxing.task.model.vaserver;
package com.viontech.fanxing.commons.model.main;
import lombok.Getter;
import lombok.Setter;
......@@ -29,6 +29,8 @@ public class VaServerInfo implements Serializable {
private String softVersion;
/** 算法版本 */
private String algoVersion;
/** 容器名称 */
private String dockerContainerName;
/** 状态 1在线,0离线 */
private int status = 1;
......
......@@ -74,8 +74,13 @@ public class ChannelController extends ChannelBaseController {
Assert.notNull(channelVo.getUsername(), "用户名不能为空");
Assert.notNull(channelVo.getPassword(), "密码不能为空");
Assert.notNull(channelVo.getStreamPath(), "视频流地址不能为空");
try {
ChannelVo update = channelService.update(id, channelVo);
return JsonMessageUtil.getSuccessJsonMsg(MESSAGE_UPDATE_SUCCESS, update);
} catch (DuplicateKeyException e) {
log.error("", e);
return JsonMessageUtil.getErrorJsonMsg("设备名称或设备编号重复");
}
}
@Override
......
package com.viontech.fanxing.ops.controller.web;
import cn.hutool.core.codec.Base64;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.base.BaseExample;
import com.viontech.fanxing.commons.feign.OSDConfigClient;
import com.viontech.fanxing.commons.model.StoreConfigExample;
import com.viontech.fanxing.commons.vo.StoreConfigVo;
import com.viontech.fanxing.ops.controller.base.StoreConfigBaseController;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.SneakyThrows;
import org.springframework.stereotype.Controller;
import org.springframework.util.StreamUtils;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
import java.io.IOException;
import java.nio.charset.Charset;
......@@ -19,12 +22,37 @@ import java.nio.charset.Charset;
@RequestMapping("/storeConfigs")
public class StoreConfigController extends StoreConfigBaseController {
@Resource
private OSDConfigClient osdConfigClient;
@Override
protected BaseExample getExample(StoreConfigVo storeConfigVo, int type) {
StoreConfigExample storeConfigExample = (StoreConfigExample) super.getExample(storeConfigVo, type);
return storeConfigExample;
}
@RequestMapping(value = "", method = RequestMethod.GET)
@Override
public Object page(StoreConfigVo storeConfigVo, @RequestParam(value = "page", defaultValue = "-1") int page, @RequestParam(value = "pageSize", defaultValue = "100") int pageSize, String sortName, String sortOrder) {
return osdConfigClient.listAll(1);
}
@SneakyThrows
@RequestMapping(value = "/{id}", method = RequestMethod.GET)
@Override
public Object selOne(@PathVariable(value = "id") Long id) {
JSONObject item = osdConfigClient.getById(id);
JSONObject data = item.getJSONObject("data");
if (data != null) {
String base64 = data.getString("config");
byte[] decode = Base64.decode(base64);
String config = new String(decode, "GBK");
data.put("config", config);
}
return item;
}
@Override
@PostMapping
@ResponseBody
......
......@@ -3,6 +3,9 @@ package com.viontech.fanxing.ops.service.impl;
import cn.hutool.core.util.RandomUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.viontech.fanxing.commons.base.BaseExample;
import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.base.BaseServiceImpl;
......@@ -68,6 +71,20 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
}
@Override
public PageInfo<Channel> pagedQuery(BaseExample example, int pageNum, int pageSize) {
if (pageSize > 0) {
PageHelper.startPage(pageNum, pageSize);
List<Channel> channels = selectByExample(example);
return new PageInfo<>(channels);
} else {
List<Channel> result = selectByExample(example);
Page<Channel> p = new Page<>();
p.addAll(result);
return new PageInfo<>(p);
}
}
@Override
public Channel add(ChannelVo channelVo) {
logService.addLog(LogType.VIDEO_RESOURCE.value, null, "添加视频资源:" + channelVo.getName());
return insertSelective(channelVo);
......@@ -298,40 +315,71 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
}
@Override
@Transactional(rollbackFor = Exception.class)
public Object pullFromVideoCloud() {
Assert.notNull(vionConfig.getVideoCloud(), "视频云配置为空");
Assert.hasText(vionConfig.getVideoCloud().getId(), "视频云对接id为空");
Assert.hasText(vionConfig.getVideoCloud().getUrl(), "视频云对接地址为空");
JSONObject response = WebClient.create(vionConfig.getVideoCloud().getUrl())
log.info("开始从视频云平台拉取视频资源信息");
JSONObject response = null;
// 第一次请求可能会出错,不想查了
for (int i = 0; i < 2; i++) {
try {
response = WebClient.create(vionConfig.getVideoCloud().getUrl())
.get()
.uri(uriBuilder -> uriBuilder.path("/api/device/getAllDeviceList").queryParam("userid", vionConfig.getVideoCloud().getId()).build())
.retrieve()
.bodyToMono(JSONObject.class)
.block(Duration.ofSeconds(10));
} catch (Exception e) {
if (i != 0) {
log.error("请求失败:", e);
throw new FanXingException(e);
}
}
}
log.info("拉取视频云平台视频资源信息完成:{}", response);
DictCode videoCloud = dictcodeService.getOrCreateOrgCode("video_cloud", "视频云");
log.info("开始清除原有数据");
clear(videoCloud);
log.info("清除原有数据完成,解析数据开始");
if (response != null && response.containsKey("data")) {
JSONArray data = response.getJSONArray("data");
if (data.size() > 0) {
analyseVideoCloud(data, videoCloud, videoCloud.getCateId());
}
}
log.info("解析数据完成");
return null;
}
@Transactional(rollbackFor = Exception.class)
private void clear(DictCode videoCloud) {
Long cateId = videoCloud.getId();
DictCodeExample dictCodeExample = new DictCodeExample();
dictCodeExample.createCriteria().andNoteEqualTo("video_cloud");
dictcodeService.deleteByExample(dictCodeExample);
ChannelExample channelExample = new ChannelExample();
channelExample.createCriteria().andStreamTypeEqualTo(ChannelType.STREAM_VIDEO_CLOUD.value);
deleteByExample(channelExample);
}
protected void analyseVideoCloud(JSONArray data, DictCode parent, Long cateId) {
for (int i = 0; i < data.size(); i++) {
JSONObject item = data.getJSONObject(i);
String name = item.getString("orzName");
JSONArray orzs = item.getJSONArray("orzs");
JSONArray devices = item.getJSONArray("devices");
if ((orzs.size() > 0 && StringUtils.isNotEmpty(name)) || devices.size() > 0) {
DictCode dictCode = dictcodeService.saveAndGet(name, item.getString("orzId"), "video_cloud", cateId, parent.getId());
// orzs 的长度大于零, 说明是组织机构节点,需要添加到字典表中
if (orzs.size() > 0) {
DictCode dictCode = dictcodeService.saveAndGet(item.getString("orzName"), item.getString("orzId"), "video_cloud", cateId, parent.getId());
analyseVideoCloud(orzs, dictCode, cateId);
} else {
}
// 解析视频资源,devices->channels
JSONArray devices = item.getJSONArray("devices");
if (devices.size() > 0) {
for (int j = 0; j < devices.size(); j++) {
JSONObject deviceJson = devices.getJSONObject(j);
......@@ -343,13 +391,12 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
String channelName = channelJson.getString("channelName");
String channelId = channelJson.getString("channelId");
Integer hasPtz = channelJson.getInteger("hasPtz");
saveAndGet(channelName, deviceId, channelId, ChannelType.THIRD_PART.value, ChannelType.STREAM_VIDEO_CLOUD.value, null, parent.getUnid());
saveAndGet(channelName, deviceId, channelId, ChannelType.THIRD_PART.value, ChannelType.STREAM_VIDEO_CLOUD.value, null, dictCode.getUnid());
}
}
}
}
}
}
private Channel saveAndGet(String name, String deviceUnid, String channelUnid, Integer type, Integer streamType, String deviceType, String addressUnid) {
......
package com.viontech.fanxing.ops.service.main;
import cn.hutool.core.util.RandomUtil;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.config.VionConfig;
import com.viontech.fanxing.commons.feign.TaskFeignClient;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.ops.model.OpsServer;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RKeys;
import org.redisson.api.RedissonClient;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.reactive.function.client.WebClient;
import javax.annotation.Resource;
import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* .
......@@ -26,12 +36,17 @@ import java.util.List;
@Service
@Slf4j
public class OpsServerService {
protected final static Pattern IP_PATTERN = Pattern.compile("((?:1[0-9][0-9]\\.|2[0-4][0-9]\\.|25[0-5]\\.|[1-9][0-9]\\.|[0-9]\\.){3}(?:1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[1-9][0-9]|[0-9]))");
@Resource
private RedissonClient redissonClient;
@Resource
private VionConfig vionConfig;
@Resource
private TaskFeignClient taskFeignClient;
public void register(OpsServer opsServer) {
log.info("收到运维注册信息:{}",opsServer.toString());
log.info("收到运维注册信息:{}", opsServer.toString());
opsServer.setLastHeartBeat(System.currentTimeMillis());
addOrUpdateOpsServer(opsServer);
}
......@@ -66,14 +81,15 @@ public class OpsServerService {
String ip = opsServer.getIp();
Integer port = opsServer.getPort();
JSONObject nvsResponse = WebClient.create()
String nvsResponse = WebClient.create()
.post()
.uri(uriBuilder -> uriBuilder.scheme("http").host(ip).port(port).path("/api/v1/isg/timing").build())
.bodyValue(jsonObject.toJSONString())
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(jsonObject)
.retrieve()
.bodyToMono(JSONObject.class)
.block(Duration.ofSeconds(20));
.bodyToMono(String.class)
.block(Duration.ofSeconds(10));
log.info("下发校时配置结果:{}", nvsResponse);
} catch (Exception e) {
log.error("下发校时配置失败", e);
}
......@@ -82,11 +98,84 @@ public class OpsServerService {
public void updateVaServer(String[] devIdArr, MultipartFile file) {
// todo 保存文件到本地,根据 devIdArr 获取对应分析服务,再根据分析服务的 ip 获取对应的运维服务,下发文件地址和升级信息
log.info("收到升级请求,目标服务:{},文件:{}", Arrays.toString(devIdArr), file.getName());
File local = new File(vionConfig.getImage().getPath() + "/temp/" + RandomUtil.randomString(6) + "_" + file.getOriginalFilename());
local.getParentFile().mkdirs();
String imageUrl = vionConfig.getImage().getUrlPrefix() + "/temp/" + local.getName();
Map<String, List<String>> ipContainerNameMap = getIpContainerNameMap(devIdArr);
List<CompletableFuture<String>> futures = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : ipContainerNameMap.entrySet()) {
String ip = entry.getKey();
OpsServer opsServerByIp = getOpsServerByIp(ip);
if (opsServerByIp == null) {
continue;
}
JSONObject jsonObject = new JSONObject();
jsonObject.put("pkgAddress", imageUrl);
jsonObject.put("containerNames", entry.getValue());
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
return WebClient.create()
.post()
.uri(uriBuilder -> uriBuilder.scheme("http").host(ip).port(opsServerByIp.getPort()).path("/api/v1/isg/upgrade/vaserver").build())
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(jsonObject)
.retrieve()
.bodyToMono(String.class)
.block(Duration.ofSeconds(10));
} catch (Exception e) {
log.error("下发升级任务失败:", e);
return null;
}
});
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})).join();
List<String> taskIds = futures.stream().map(stringCompletableFuture -> {
try {
return stringCompletableFuture.get();
} catch (Exception e) {
log.error("", e);
return null;
}
}).collect(Collectors.toList());
}
private void addOrUpdateOpsServer(OpsServer opsServer) {
RBucket<OpsServer> bucket = redissonClient.getBucket("ops:server:" + opsServer.getIp());
bucket.set(opsServer);
bucket.expire(1, TimeUnit.MINUTES);
}
private OpsServer getOpsServerByIp(String ip) {
RBucket<OpsServer> bucket = redissonClient.getBucket("ops:server:" + ip);
return bucket.get();
}
private Map<String, List<String>> getIpContainerNameMap(String[] devIdArr) {
HashMap<String, List<String>> result = new HashMap<>();
HashSet<String> devIdSet = new HashSet<>(Arrays.asList(devIdArr));
JsonMessageUtil.JsonMessage<List<VaServerInfo>> message = taskFeignClient.vaServerInfo();
List<VaServerInfo> data = message.getData();
for (VaServerInfo item : data) {
if (devIdSet.contains(item.getDevID())) {
String ip = getIp(item.getServiceBaseUrl());
if (ip == null) {
continue;
}
List<String> ContainerNames = result.computeIfAbsent(ip, x -> new ArrayList<>());
ContainerNames.add(item.getDockerContainerName());
}
}
return result;
}
private String getIp(String url) {
Matcher matcher = IP_PATTERN.matcher(url);
return matcher.find() ? matcher.group() : null;
}
......
package com.viontech.fanxing.ops.service.main;
import cn.hutool.core.io.IoUtil;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.base.LocalCache;
import com.viontech.fanxing.commons.config.VionConfig;
......@@ -22,6 +23,7 @@ import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.*;
......@@ -81,8 +83,12 @@ public class VideoService {
video.getParentFile().mkdirs();
try {
FileUtils.copyToFile(file.getInputStream(), video);
checkVideoType(video);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
video.delete();
video.deleteOnExit();
}
double videoLength = (double) video.length();
double mbSize = videoLength / 1024 / 1024;
......@@ -110,6 +116,17 @@ public class VideoService {
}
}
private void checkVideoType(File file) throws IOException {
try (FileInputStream fi = new FileInputStream(file)) {
byte[] bytes = IoUtil.readBytes(fi, 32);
String s = IoUtil.readHex(fi, 4, true);
String s1 = new String(bytes).toLowerCase();
if (!"00000001".equals(s) && !s1.contains("avi") && !s1.contains("mp4")) {
throw new FanXingException("视频格式不支持");
}
}
}
/**
* 获取录像文件头部概览
*/
......
......@@ -139,8 +139,23 @@ public class TrafficServiceImpl extends BaseServiceImpl<Traffic> implements Traf
dov.setBehavior(b.getCount());
}
}
Map<Long, String> task_id_unid_map = taskMap.values().stream().collect(Collectors.toMap(Task::getId, Task::getUnid, (x, y) -> x));
Map<String, JSONObject> taskStateMap = taskClientService.taskStateMap();
Collection<DataOverViewModel> values = resultMap.values();
List<DataOverViewModel> collect = values.stream().filter(x -> x.getTaskId() != null).sorted(Comparator.comparingLong(DataOverViewModel::getTaskId)).collect(Collectors.toList());
List<DataOverViewModel> collect = values.stream()
.filter(x -> x.getTaskId() != null)
.peek(x -> {
Long id = x.getTaskId();
String unid = task_id_unid_map.get(id);
JSONObject jsonObject = taskStateMap.get(unid);
if (jsonObject != null) {
x.setEffectiveAnalysisTime(jsonObject.getInteger("effective_running_time"));
x.setExceptionNum(jsonObject.getInteger("exception_times"));
}
})
.sorted(Comparator.comparingLong(DataOverViewModel::getTaskId))
.collect(Collectors.toList());
List<List<DataOverViewModel>> partition = ListUtil.partition(collect, pageSize);
JSONObject jsonObject = new JSONObject();
......
package com.viontech.fanxing.query.service.main;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.base.LocalCache;
import com.viontech.fanxing.commons.constant.LogType;
import com.viontech.fanxing.commons.exception.FanXingException;
......@@ -35,12 +36,17 @@ public class OpsClientService {
@Resource
private OpsClient opsClient;
public StoreConfig getStoreConfigById(Long id) {
JsonMessageUtil.JsonMessage<StoreConfig> res = opsClient.getStoreConfigById(id);
if (res.getCode() != 200) {
throw new FanXingException(res.getMsg());
public String getStoreConfigById(Long id) {
JSONObject res = opsClient.getStoreConfigById(id);
if (!res.getBoolean("success")) {
throw new FanXingException(res.getString("msg"));
} else {
return res.getData();
JSONObject data = res.getJSONObject("data");
if (data == null) {
return null;
} else {
return data.getString("config");
}
}
}
......
package com.viontech.fanxing.query.service.main;
import com.viontech.fanxing.commons.model.Task;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.feign.TaskFeignClient;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
......@@ -31,4 +35,28 @@ public class TaskClientService {
return allTask.getData().stream().collect(Collectors.toMap(Task::getId, x -> x, (a, b) -> a));
}
public Map<String, JSONObject> taskStateMap() {
HashMap<String, JSONObject> result = new HashMap<>();
JsonMessageUtil.JsonMessage<List<VaServerInfo>> info = taskFeignClient.vaServerInfo();
if (info.isSuccess()) {
List<VaServerInfo> data = info.getData();
for (VaServerInfo item : data) {
if (item.getStatus() == 1) {
try {
JSONObject jsonObject = taskFeignClient.getVaServerStatusByDevId(item.getDevID());
JSONArray tasks = jsonObject.getJSONArray("tasks");
for (int i = 0; i < tasks.size(); i++) {
JSONObject obj = tasks.getJSONObject(i);
String taskUnid = obj.getString("task_unid");
result.put(taskUnid, obj);
}
} catch (Exception e) {
log.error("", e);
}
}
}
}
return result;
}
}
......@@ -3,7 +3,7 @@ package com.viontech.fanxing.task.controller;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.config.VionConfig;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.task.model.vaserver.VaServerOverViewModel;
import com.viontech.fanxing.task.service.VAServerService;
import com.viontech.keliu.util.JsonMessageUtil;
......
......@@ -31,6 +31,7 @@ import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
......@@ -233,7 +234,7 @@ public class TaskController extends TaskBaseController {
String channelName = channel == null ? "未知" : channel.getName();
response.setContentType(MediaType.APPLICATION_OCTET_STREAM_VALUE);
response.setHeader("Content-Disposition",
"attachment;filename=" + channelName + "_" + sceneUnid + ".json");
"attachment;filename=" + URLEncoder.encode(channelName + "_" + sceneUnid + ".json", "utf8"));
IOUtils.write(bytes, response.getOutputStream());
}
......
......@@ -2,7 +2,7 @@ package com.viontech.fanxing.task.repository;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
......
......@@ -4,7 +4,7 @@ import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.service.VAServerService;
import com.viontech.fanxing.task.service.adapter.TaskService;
......
......@@ -3,7 +3,7 @@ package com.viontech.fanxing.task.runner;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.model.main.StreamInfo;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.task.service.VAServerService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
......
package com.viontech.fanxing.task.service;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.constant.LogType;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.feign.OpsClient;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.model.Content;
import com.viontech.fanxing.commons.model.DictCode;
import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.vo.LogVo;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j;
......@@ -34,12 +34,17 @@ public class OpsClientService {
@Resource
private OpsClient opsClient;
public StoreConfig getStoreConfigById(Long id) {
JsonMessageUtil.JsonMessage<StoreConfig> res = opsClient.getStoreConfigById(id);
if (res.getCode() != 200) {
throw new FanXingException(res.getMsg());
public String getStoreConfigById(Long id) {
JSONObject res = opsClient.getStoreConfigById(id);
if (!res.getBoolean("success")) {
throw new FanXingException(res.getString("msg"));
} else {
return res.getData();
JSONObject data = res.getJSONObject("data");
if (data == null) {
return null;
} else {
return data.getString("config");
}
}
}
......
package com.viontech.fanxing.task.service;
import com.mysql.cj.log.Log;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.runtime.RuntimeConfig;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.repository.TaskDataRedisRepository;
import com.viontech.fanxing.task.service.adapter.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RMap;
......@@ -35,16 +35,18 @@ public class TaskDataService {
private TaskDataRedisRepository taskDataRedisRepository;
@Resource
private OpsClientService opsClientService;
@Resource
private TaskService taskService;
public void addTask(Task task) {
TaskData taskData = new TaskData(task);
// 获取存储配置
Long storeConfigId = task.getStoreConfigId();
StoreConfig storeConfig = opsClientService.getStoreConfigById(storeConfigId);
if (storeConfig == null) {
String config = opsClientService.getStoreConfigById(storeConfigId);
if (config == null) {
throw new FanXingException("无法获取对应的存储配置");
}
taskData.setStoreConfig(storeConfig.getContent());
taskData.setStoreConfig(config);
taskDataRedisRepository.addOrUpdateTaskData(taskData);
// 计算运行时间并生成任务
boolean success = distributeTask(taskData);
......@@ -62,7 +64,7 @@ public class TaskDataService {
if (vaServerInfo != null) {
return true;
}
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.PAUSE.val);
ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
log.info("部署任务[{}],运行时间:[{}]", taskData.getTask().getName(), nextTime.toString());
Long nextExecuteTime = nextTime.left;
......@@ -120,11 +122,11 @@ public class TaskDataService {
}
public void updateTask(Task task) {
public void updateTask(Task task, boolean rebuild) {
String taskUnid = task.getUnid();
VaServerInfo vaServerInfo = taskRunOn(taskUnid);
// vaServerId 为空说明任务未执行可以先删除再建立新任务
if (vaServerInfo == null) {
if (vaServerInfo == null || rebuild) {
deleteTask(taskUnid);
addTask(task);
} else if (vaServerInfo.getStatus() == 0) {
......
......@@ -8,7 +8,7 @@ import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.vaserver.VATask;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.springframework.http.MediaType;
......
......@@ -6,9 +6,9 @@ import com.viontech.fanxing.commons.config.VionConfig;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.model.vaserver.VaServerOverViewModel;
import com.viontech.fanxing.task.repository.VAServerRedisRepository;
import lombok.extern.slf4j.Slf4j;
......@@ -278,17 +278,16 @@ public class VAServerService {
public Object getDefaultAlgorithmConfig(String taskAlgType) {
RMap<String, VaServerInfo> map = vaServerRedisRepository.getVaServerInfoMap();
VaServerInfo temp = null;
for (VaServerInfo item : map.readAllValues()) {
if (item.getStatus() == 1) {
temp = item;
break;
try {
return vaServerHttpService.getDefaultAlgorithmConfig(item, taskAlgType);
} catch (Exception e) {
log.error(item.getDevID() + "访问失败", e);
}
}
if (temp == null) {
throw new FanXingException("没有在线的VAServer");
}
return vaServerHttpService.getDefaultAlgorithmConfig(temp, taskAlgType);
throw new FanXingException("无法获取到默认配置");
}
/**
......@@ -306,7 +305,7 @@ public class VAServerService {
ip = matcher.group();
}
JSONObject status = new JSONObject();
JSONObject brief = new JSONObject();
JSONObject brief = null;
try {
status = vaServerHttpService.status(vaServer);
brief = status.getJSONObject("resource").getJSONObject("brief");
......@@ -316,7 +315,7 @@ public class VAServerService {
}
status.put("devId", vaServer.getDevID());
status.put("serviceName", vaServer.getServiceName());
status.put("serviceName", vaServer.getDockerContainerName());
status.put("status", vaServer.getStatus());
status.put("platType", vaServer.getPlatType());
status.put("softVersion", vaServer.getSoftVersion());
......@@ -325,7 +324,10 @@ public class VAServerService {
status.put("videoResource", vaServer.getVideoResource());
VaServerOverViewModel model = map.computeIfAbsent(ip, x -> new VaServerOverViewModel());
model.setIp(ip).addTotal(brief.getFloat("video_total")).addUsed(brief.getFloat("video_busy")).addInfo(status);
model.setIp(ip).addInfo(status);
if (brief != null) {
model.addTotal(brief.getFloat("video_total")).addUsed(brief.getFloat("video_busy"));
}
}
return map.values();
......
......@@ -15,11 +15,11 @@ import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.model.DictCode;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.TaskExample;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.fanxing.task.mapper.TaskMapper;
import com.viontech.fanxing.task.model.ConfigBuilder;
import com.viontech.fanxing.task.model.Scene;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.service.OpsClientService;
import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.service.VAServerService;
......@@ -173,18 +173,22 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
throw new FanXingException("场景配置参数校验不通过");
}
}
Task originalTask = selectByPrimaryKey(task.getId());
updateByPrimaryKeySelective(task);
task = selectByPrimaryKey(task.getId());
if (task == null) {
throw new FanXingException("任务不存在");
}
// 具有存储配置,具有场景配置,并且不是未部署状态的才去更新
if (StringUtils.isNotBlank(task.getScene())
&& task.getStoreConfigId() != null
&& !TaskStatus.AWAIT.valEqual(task.getStatus())) {
taskDataService.updateTask(task);
// 改变了运行时段或者存储配置需要重新部署
boolean rebuild = (!originalTask.getRuntimeConf().equals(task.getRuntimeConf()))
|| (!originalTask.getStoreConfigId().equals(task.getStoreConfigId()));
taskDataService.updateTask(task, rebuild);
}
return new TaskVo(task);
}
......
......@@ -5,7 +5,7 @@ import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.model.DictCode;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.runtime.DailyRuntimeConfig;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.task.service.OpsClientService;
import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.service.VAServerHttpService;
......
......@@ -6,7 +6,7 @@ import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.task.repository.TaskDataRedisRepository;
import com.viontech.fanxing.task.repository.VAServerRedisRepository;
import com.viontech.fanxing.task.service.adapter.TaskService;
......@@ -93,8 +93,8 @@ class VAServerHttpServiceTest {
@Test
void storeConfig() {
StoreConfig storeConfig = opsClientService.getStoreConfigById(6L);
System.out.println(JSON.toJSONString(storeConfig));
String storeConfig = opsClientService.getStoreConfigById(6L);
System.out.println(storeConfig);
}
@Test
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!