Commit 5fcdb1ca by xmh

<feat> 构建 scene 时, 地点信息和编号使用设备信息和编号

<feat> 任务运行配置修改, 优化任务运行时间的计算
<feat> 下发任务时, 需要携带 存储配置id
<feat> 优化任务下发和停止的流程,每次定时任务先执行停止任务操作后执行下发任务操作
<feat> 启动和更新任务时对可用资源进行计算(后面可能需要优化)
<feat> 设备资源可以重名
<feat> 添加清除临时文件的定时任务
<feat> 校时配置对 ntpServer, port 和 定时配置 进行校验
<feat> 开放所有的存储配置而不是只取上传的
<feat> 添加任务状态: 启动中
<feat> 数据库初始化脚本优化
1 parent 9bec8eb7
Showing 24 changed files with 378 additions and 163 deletions
......@@ -17,8 +17,10 @@ public enum TaskStatus {
/** 暂停 */
STOP(3),
/** 无法运行 */
CAN_NOT_RUN(4);
public int val;
CAN_NOT_RUN(4),
/** 启动中 */
STARTING(5);
public final int val;
TaskStatus(int val) {
this.val = val;
......
package com.viontech.fanxing.commons.utils;
import org.springframework.web.reactive.function.client.WebClient;
/**
* .
*
* @author 谢明辉
* @date 2022/1/4
*/
public class WebClientUtils {
public static WebClient buildClient(String baseUrl) {
WebClient.Builder builder = WebClient.builder();
if (baseUrl != null) {
builder = builder.baseUrl(baseUrl);
}
return builder.codecs(clientCodecConfigurer -> clientCodecConfigurer.defaultCodecs().maxInMemorySize(-1))
.build();
}
}
......@@ -3,6 +3,7 @@ package com.viontech.fanxing.forward.batch.writer;
import com.viontech.fanxing.commons.config.ApplicationContextProvider;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.model.Forward;
import com.viontech.fanxing.commons.utils.WebClientUtils;
import com.viontech.fanxing.forward.ForwardApp;
import com.viontech.fanxing.forward.model.ForwardContent;
import lombok.extern.slf4j.Slf4j;
......@@ -10,7 +11,6 @@ import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.time.Duration;
......@@ -57,13 +57,12 @@ public class ForwardWriter implements ItemWriter<ForwardContent> {
@Override
public void run() {
try {
Mono<String> response = WebClient.create()
Mono<String> response = WebClientUtils.buildClient(null)
.post()
.uri(forward.getUrl())
.bodyValue(json)
.retrieve()
.bodyToMono(String.class);
String block = response.block(Duration.ofSeconds(20));
} catch (Exception e) {
failed += 1;
......
......@@ -35,8 +35,8 @@ public class StoreConfigController extends StoreConfigBaseController {
@RequestMapping(value = "", method = RequestMethod.GET)
@Override
public Object page(StoreConfigVo storeConfigVo, @RequestParam(value = "page", defaultValue = "-1") int page, @RequestParam(value = "pageSize", defaultValue = "100") int pageSize, String sortName, String sortOrder) {
// todo 应该取所有,传 null
return osdConfigClient.listAll(1);
// 应该取所有,传 null
return osdConfigClient.listAll(null);
}
@SneakyThrows
......
package com.viontech.fanxing.ops.runner;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.ArrayUtil;
import com.viontech.fanxing.commons.config.VionConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.File;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZoneOffset;
/**
* .
*
* @author 谢明辉
* @date 2022/1/4
*/
@Component
@Slf4j
public class CleanJob {
@Resource
private VionConfig vionConfig;
@Scheduled(cron = "0 0 9 * * ?")
public void cleanTemp() {
try {
log.info("开始清理2天前的临时文件");
long today = LocalDate.now().toEpochDay();
File tempDir = new File(vionConfig.getImage().getPath() + "/temp)");
File[] files = tempDir.listFiles();
if (ArrayUtil.isNotEmpty(files)) {
for (File file : files) {
long l = file.lastModified();
LocalDate localDate = Instant.ofEpochMilli(l).atOffset(ZoneOffset.of(ZoneId.systemDefault().getId())).toLocalDate();
if (today - localDate.toEpochDay() >= 2) {
log.info("删除文件:{}", file.getName());
FileUtil.del(file);
}
}
}
log.info("完成清理2天前的临时文件");
} catch (Exception e) {
log.error("", e);
}
}
}
package com.viontech.fanxing.ops.service.impl;
import cn.hutool.core.util.RandomUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.Page;
......@@ -15,6 +14,7 @@ import com.viontech.fanxing.commons.constant.LogType;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.feign.TaskFeignClient;
import com.viontech.fanxing.commons.model.*;
import com.viontech.fanxing.commons.utils.WebClientUtils;
import com.viontech.fanxing.commons.vo.ChannelVo;
import com.viontech.fanxing.commons.vo.DictCodeVo;
import com.viontech.fanxing.commons.vo.TaskVo;
......@@ -92,10 +92,6 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
@Override
public Channel insertSelective(Channel record) {
String name = record.getName();
while (duplicateName(record.getName())) {
record.setName(RandomUtil.randomString(6) + name);
}
return super.insertSelective(record);
}
......@@ -245,13 +241,6 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
return false;
}
private boolean duplicateName(String name) {
ChannelExample channelExample = new ChannelExample();
channelExample.createCriteria().andNameEqualTo(name);
List<Channel> channels = selectByExample(channelExample);
return channels.size() > 0;
}
/**
* 拉取 nvs3000 视频资源
*/
......@@ -269,7 +258,7 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
JSONObject nvsPostData = new JSONObject();
nvsPostData.put("fromindex", 0);
nvsPostData.put("toindex", -1);
JSONObject nvsResponse = WebClient.create()
JSONObject nvsResponse = WebClientUtils.buildClient(null)
.post()
.uri(nvsUrl + "/nvsthird/getcamlist")
.bodyValue(nvsPostData)
......
package com.viontech.fanxing.ops.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.base.BaseServiceImpl;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Content;
import com.viontech.fanxing.commons.model.ContentExample;
import com.viontech.fanxing.commons.model.main.ImageKeepConfig;
......@@ -57,6 +59,28 @@ public class ContentServiceImpl extends BaseServiceImpl<Content> implements Cont
@Override
public void addOrUpdateTimingConfig(JSONObject jsonObject) {
JSONObject ntpMode = jsonObject.getJSONObject("ntpMode");
String ntpServer = ntpMode.getString("ntpServer");
Integer ntpPort = ntpMode.getInteger("ntpPort");
JSONObject timingFrequency = ntpMode.getJSONObject("timingFrequency");
if (!ntpServer.matches("[1-9](\\d{1,2})?\\.(0|([1-9](\\d{1,2})?))\\.(0|([1-9](\\d{1,2})?))\\.(0|([1-9](\\d{1,2})?))") && !ntpServer.matches("^(https?|ftp|file)://[-a-zA-Z0-9+&@#/%?=~_|!:,.;]*[-a-zA-Z0-9+&@#/%=~_|]")) {
throw new FanXingException("服务器地址校验不通过");
}
if (ntpPort <= 0 || ntpPort >= 65536) {
throw new FanXingException("端口超出可用范围");
}
if (timingFrequency.containsKey("timingTimes")) {
JSONArray timingTimes = timingFrequency.getJSONArray("timingTimes");
for (int i = 0; i < timingTimes.size() - 1; i++) {
for (int j = i + 1; j < timingTimes.size(); j++) {
String s1 = timingTimes.getString(i);
String s2 = timingTimes.getString(j);
if (s1.equals(s2)) {
throw new FanXingException("定时校时不能重复");
}
}
}
}
ContentExample contentExample = new ContentExample();
contentExample.createCriteria().andTypeEqualTo(TYPE_PLATFORM_CONFIG).andNameEqualTo(NAME_TIMING_CONFIG);
addOrUpdate(TYPE_PLATFORM_CONFIG, NAME_TIMING_CONFIG, jsonObject.toJSONString());
......
......@@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.config.VionConfig;
import com.viontech.fanxing.commons.feign.TaskFeignClient;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.commons.utils.WebClientUtils;
import com.viontech.fanxing.ops.model.OpsServer;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.SneakyThrows;
......@@ -16,7 +17,6 @@ import org.redisson.api.RedissonClient;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.reactive.function.client.WebClient;
import javax.annotation.Resource;
import java.io.File;
......@@ -83,7 +83,7 @@ public class OpsServerService {
String ip = opsServer.getIp();
Integer port = opsServer.getPort();
String nvsResponse = WebClient.create()
String nvsResponse = WebClientUtils.buildClient(null)
.post()
.uri(uriBuilder -> uriBuilder.scheme("http").host(ip).port(port).path("/api/v1/isg/timing").build())
.contentType(MediaType.APPLICATION_JSON)
......@@ -118,7 +118,7 @@ public class OpsServerService {
jsonObject.put("containerNames", entry.getValue());
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
return WebClient.create()
return WebClientUtils.buildClient(null)
.post()
.uri(uriBuilder -> uriBuilder.scheme("http").host(ip).port(opsServerByIp.getPort()).path("/api/v1/isg/upgrade/vaserver").build())
.contentType(MediaType.APPLICATION_JSON)
......
package com.viontech.fanxing.ops.service.main;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.io.IoUtil;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.base.LocalCache;
......@@ -85,9 +86,8 @@ public class VideoService {
FileUtils.copyToFile(file.getInputStream(), video);
checkVideoType(video);
} catch (IOException e) {
video.delete();
video.deleteOnExit();
throw new RuntimeException(e);
FileUtil.del(video);
throw new FanXingException(e);
}
double videoLength = (double) video.length();
double mbSize = videoLength / 1024 / 1024;
......
......@@ -10,7 +10,6 @@ import com.viontech.fanxing.commons.base.BaseExample;
import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.model.DictCode;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.TaskExample;
import com.viontech.fanxing.commons.vo.TaskVo;
......@@ -187,14 +186,8 @@ public class TaskController extends TaskBaseController {
String xml = config.getString("xml");
ConfigBuilder configBuilder = new ConfigBuilder();
Channel channel = opsClientService.getChannelByChannelUnid(task.getChannelUnid());
String addressUnid = channel.getAddressUnid();
String addressName = null;
if (addressUnid != null) {
DictCode dictCode = opsClientService.getDictCodeByUnid(addressUnid);
addressName = dictCode.getName();
}
String c = configBuilder.buildDefaultConfig(xml)
.buildVchanInfo(channel.getName(), channel.getChannelUnid(), addressUnid, addressName)
.buildVchanInfo(channel.getName(), channel.getChannelUnid())
.build();
config.put("xml", c);
}
......@@ -252,7 +245,7 @@ public class TaskController extends TaskBaseController {
throw new FanXingException("无效操作类型:" + type);
}
// 1 启动, 2暂停, 3删除
ListMultimap<String, Long> build = MultimapBuilder.treeKeys().linkedListValues().build();
ListMultimap<String, String> build = MultimapBuilder.treeKeys().linkedListValues().build();
boolean error = false;
for (Long taskId : taskIdArr) {
try {
......@@ -269,11 +262,11 @@ public class TaskController extends TaskBaseController {
default:
break;
}
build.put("success", taskId);
build.put("success", String.valueOf(taskId));
} catch (Exception e) {
log.info("", e);
error = true;
build.put("error", taskId);
build.put("error", e.getMessage());
}
}
if (error) {
......
......@@ -82,7 +82,7 @@ public class ConfigBuilder {
return this;
}
public ConfigBuilder buildVchanInfo(String name, String channelUnid, String addressUnid, String addressName) {
public ConfigBuilder buildVchanInfo(String name, String channelUnid) {
if (vChanInfoBuilt) {
return this;
}
......@@ -97,10 +97,8 @@ public class ConfigBuilder {
setText(deviceConfig, "设备名称", name);
setText(deviceConfig, "设备编号", channelUnid);
if (addressUnid != null) {
setText(deviceConfig, "地点名称", addressUnid);
setText(deviceConfig, "地点编号", addressName);
}
setText(deviceConfig, "地点名称", name);
setText(deviceConfig, "地点编号", channelUnid);
this.vChanInfoBuilt = true;
} catch (Exception e) {
log.error("", e);
......
......@@ -24,6 +24,7 @@ public class TaskData implements Serializable {
private RuntimeConfig runtimeConfig;
private String storeConfig;
private Long storeConfigId;
private String deviceUnid;
......
......@@ -44,27 +44,20 @@ public class DailyRuntimeConfig implements RuntimeConfig {
Config config = new Config().setStart(LocalTime.parse(start, TIME_FORMATTER)).setEnd(LocalTime.parse(end, TIME_FORMATTER));
multiConfig.add(config);
}
// 判断是否有重复的时间段
for (int i = 0; i < multiConfig.size(); i++) {
for (int j = 1; j < multiConfig.size(); j++) {
if (i == j) {
continue;
}
Config configI = multiConfig.get(i);
Config configJ = multiConfig.get(j);
// 判断时间段是否有交叉
if ((configJ.getStart().isAfter(configI.getStart()) && configJ.getStart().isBefore(configI.getEnd())
|| (configJ.getEnd().isAfter(configI.getStart()) && configJ.getEnd().isBefore(configI.getEnd())))) {
throw new FanXingException("配置有误请检查配置");
}
}
}
// 根据开始时间排序
multiConfig.sort((o1, o2) -> {
LocalTime start1 = o1.getStart();
LocalTime start2 = o2.getStart();
return start1.isBefore(start2) ? -1 : start1.isAfter(start2) ? 1 : 0;
});
// 判断是否有重复的时间段
for (int i = 0; i < multiConfig.size() - 1; i++) {
Config config1 = multiConfig.get(i);
Config config2 = multiConfig.get(i + 1);
if (config2.getStart().isBefore(config1.getEnd())) {
throw new FanXingException("配置有误请检查配置");
}
}
}
@Override
......@@ -78,7 +71,7 @@ public class DailyRuntimeConfig implements RuntimeConfig {
LocalTime end = config.getEnd();
terminateTime = LocalDateTime.of(executeTime.toLocalDate(), end);
// 如果终止时间在现在的时间之前需要明天执行
if (!terminateTime.isBefore(LocalDateTime.now())) {
if (terminateTime.isAfter(LocalDateTime.now())) {
tomorrow = false;
break;
}
......
......@@ -22,7 +22,6 @@ public class ManualRuntimeConfig implements RuntimeConfig {
@Override
public ImmutablePair<Long, Long> getNextTimeOfExecutionAndTerminal() {
long l = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
return ImmutablePair.of(l, null);
return ImmutablePair.of(System.currentTimeMillis(), null);
}
}
......@@ -2,14 +2,11 @@ package com.viontech.fanxing.task.model.runtime;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.viontech.keliu.util.DateUtil;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.util.Date;
import java.util.SplittableRandom;
import java.util.concurrent.TimeUnit;
/**
......@@ -35,17 +32,9 @@ public class RandomRuntimeConfig implements RuntimeConfig {
@Override
public ImmutablePair<Long, Long> getNextTimeOfExecutionAndTerminal() {
// todo 目前逻辑还未定,只有手动启动才会执行一次
// 只有手动启动才会执行一次
long running = TimeUnit.MINUTES.toMillis(runningTime);
Date date = DateUtil.setDayMinTime(new Date());
long time = date.getTime() - running;
if (time < System.currentTimeMillis()) {
long l = System.currentTimeMillis();
return ImmutablePair.of(l, l + running);
}
SplittableRandom splittableRandom = new SplittableRandom();
long l = splittableRandom.nextLong(System.currentTimeMillis(), time);
return ImmutablePair.of(l, l + running);
}
}
......@@ -84,6 +84,22 @@ public interface RuntimeConfig extends Serializable {
@JsonDeserialize(using = LocalTimeDeserializer.class)
@JsonSerialize(using = LocalTimeSerializer.class)
private LocalTime end;
public Config setStart(LocalTime start) {
if (end != null && start.isAfter(end)) {
throw new FanXingException("配置有误");
}
this.start = start;
return this;
}
public Config setEnd(LocalTime end) {
if (start != null && end.isBefore(start)) {
throw new FanXingException("配置有误");
}
this.end = end;
return this;
}
}
}
......@@ -26,6 +26,7 @@ public class VATask {
private String task_name;
private Integer alg_type;
private String store_config;
private Long store_config_id;
private String channel_unid;
private String device_unid;
private String stream_path;
......@@ -39,6 +40,7 @@ public class VATask {
this.task_name = task.getName();
this.alg_type = Integer.parseInt(task.getAlgType());
this.store_config = taskData.getStoreConfig();
this.store_config_id = taskData.getStoreConfigId();
this.channel_unid = task.getChannelUnid();
this.stream_path = task.getStreamPath();
this.stream_type = task.getStreamType();
......
......@@ -42,9 +42,18 @@ public class TaskRunner {
private TaskService taskService;
@Scheduled(fixedDelay = 5000)
public void executedTaskListener() {
public void run() {
RLock jobLock = redisService.getLockMust("lock:taskRunner");
try {
terminatedTask();
executedTask();
} finally {
jobLock.forceUnlock();
}
}
public void executedTask() {
try {
RScoredSortedSet<String> set = redisService.getToBeExecutedTaskUnidSet();
RMap<String, VaServerInfo> vaServerMap = vaServerService.getVaServerRedisRepository().getVaServerInfoMap();
......@@ -110,19 +119,13 @@ public class TaskRunner {
}
} catch (Exception e) {
log.error("", e);
} finally {
jobLock.forceUnlock();
}
}
@Scheduled(fixedDelay = 5000)
public void terminatedTaskListener() {
RLock jobLock = redisService.getLockMust("lock:taskRunner");
public void terminatedTask() {
try {
RScoredSortedSet<String> set = redisService.getToBeTerminatedTaskUnidSet();
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);
for (String taskUnid : entryCollection) {
......@@ -151,8 +154,6 @@ public class TaskRunner {
}
} catch (Exception e) {
log.error("", e);
} finally {
jobLock.forceUnlock();
}
}
......
......@@ -11,6 +11,7 @@ import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.runtime.RuntimeConfig;
import com.viontech.fanxing.task.repository.TaskDataRedisRepository;
import com.viontech.fanxing.task.service.adapter.TaskService;
import com.viontech.fanxing.task.utils.TaskUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RMap;
......@@ -42,6 +43,7 @@ public class TaskDataService {
public void addTask(Task task) {
TaskData taskData = buildTaskData(task);
TaskUtils.INSTANCE.checkRuntimeConf(taskData, vaServerService, this);
taskDataRedisRepository.addOrUpdateTaskData(taskData);
// 计算运行时间并生成任务
boolean success = distributeTask(taskData);
......@@ -59,12 +61,19 @@ public class TaskDataService {
if (vaServerInfo != null) {
return true;
}
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.STAY.val);
ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
log.info("部署任务[{}],运行时间:[{}]", taskData.getTask().getName(), nextTime.toString());
Long nextExecuteTime = nextTime.left;
Long nextTerminateTime = nextTime.right;
if (nextExecuteTime != null) {
// 需要立即启动的修改状态为 启动中, 需要过一段时间运行的修改状态为 待时
if (nextExecuteTime > System.currentTimeMillis()) {
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.STAY.val);
} else {
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.STARTING.val);
}
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
toBeExecutedTaskUnidSet.add(nextExecuteTime, taskUnid);
if (nextTerminateTime != null) {
......@@ -144,6 +153,7 @@ public class TaskDataService {
throw new FanXingException("无法获取对应的存储配置");
}
taskData.setStoreConfig(config);
taskData.setStoreConfigId(task.getStoreConfigId());
if (taskData.getTask().getStreamType().equals(ChannelType.STREAM_VIDEO_CLOUD.value)) {
Channel channel = opsClientService.getChannelByChannelUnid(taskData.getTask().getChannelUnid());
String deviceUnid = channel.getDeviceUnid();
......@@ -155,4 +165,8 @@ public class TaskDataService {
public TaskDataRedisRepository getRepository() {
return taskDataRedisRepository;
}
public TaskService getTaskService() {
return taskService;
}
}
......@@ -4,13 +4,13 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.commons.utils.WebClientUtils;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.vaserver.VATask;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.time.Duration;
......@@ -32,7 +32,7 @@ public class VAServerHttpService {
public JSONObject addTask(TaskData taskData, VaServerInfo vaServerInfo) {
VATask vaTask = new VATask(taskData);
String path = "/api/vaserver/v1/task";
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
Mono<String> stringMono = WebClientUtils.buildClient(vaServerInfo.getServiceBaseUrl())
.post()
.uri(uriBuilder -> uriBuilder.path(path).build())
.bodyValue(vaTask)
......@@ -54,7 +54,7 @@ public class VAServerHttpService {
public JSONObject updateTask(TaskData taskData, VaServerInfo vaServerInfo) {
String path = "/api/vaserver/v1/task";
VATask vaTask = new VATask(taskData);
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
Mono<String> stringMono = WebClientUtils.buildClient(vaServerInfo.getServiceBaseUrl())
.put()
.uri(uriBuilder -> uriBuilder.path(path).build())
.bodyValue(vaTask)
......@@ -79,7 +79,7 @@ public class VAServerHttpService {
JSONObject jsonObject = new JSONObject();
jsonObject.put("task_unid", taskUnid);
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
Mono<String> stringMono = WebClientUtils.buildClient(vaServerInfo.getServiceBaseUrl())
.post()
.uri(uriBuilder -> uriBuilder.path(path).build())
.bodyValue(jsonObject.toString())
......@@ -99,7 +99,7 @@ public class VAServerHttpService {
JSONObject jsonObject = new JSONObject();
jsonObject.put("task_unid", taskUnid);
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
Mono<String> stringMono = WebClientUtils.buildClient(vaServerInfo.getServiceBaseUrl())
.post()
.uri(uriBuilder -> uriBuilder.path(path).build())
.bodyValue(jsonObject.toString())
......@@ -119,7 +119,7 @@ public class VAServerHttpService {
JSONObject jsonObject = new JSONObject();
jsonObject.put("task_unid", taskUnid);
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
Mono<String> stringMono = WebClientUtils.buildClient(vaServerInfo.getServiceBaseUrl())
.post()
.uri(uriBuilder -> uriBuilder.path(path).build())
.bodyValue(jsonObject.toString())
......@@ -142,7 +142,7 @@ public class VAServerHttpService {
jsonObject.put("isDrawRect", 1);
jsonObject.put("mediaServerPushUrl", url);
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
Mono<String> stringMono = WebClientUtils.buildClient(vaServerInfo.getServiceBaseUrl())
.post()
.uri(uriBuilder -> uriBuilder.path(path).build())
.bodyValue(jsonObject.toString())
......@@ -164,7 +164,7 @@ public class VAServerHttpService {
jsonObject.put("task_unid", taskUnid);
jsonObject.put("sceneID", sceneId);
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
Mono<String> stringMono = WebClientUtils.buildClient(vaServerInfo.getServiceBaseUrl())
.post()
.uri(uriBuilder -> uriBuilder.path(path).build())
.bodyValue(jsonObject.toString())
......@@ -186,7 +186,7 @@ public class VAServerHttpService {
jsonObject.put("task_unid", taskUnid);
jsonObject.put("alternateStatus", rotationStatus.toString());
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
Mono<String> stringMono = WebClientUtils.buildClient(vaServerInfo.getServiceBaseUrl())
.post()
.uri(uriBuilder -> uriBuilder.path(path).build())
.bodyValue(jsonObject.toString())
......@@ -207,7 +207,7 @@ public class VAServerHttpService {
JSONObject jsonObject = new JSONObject();
jsonObject.put("task_unid", taskUnid);
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
Mono<String> stringMono = WebClientUtils.buildClient(vaServerInfo.getServiceBaseUrl())
.post()
.uri(uriBuilder -> uriBuilder.path(path).build())
.bodyValue(jsonObject.toString())
......@@ -225,7 +225,7 @@ public class VAServerHttpService {
public JSONObject status(VaServerInfo vaServerInfo) {
String path = "/api/vaserver/v1/status";
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
Mono<String> stringMono = WebClientUtils.buildClient(vaServerInfo.getServiceBaseUrl())
.get()
.uri(uriBuilder -> uriBuilder
.path(path)
......@@ -245,7 +245,7 @@ public class VAServerHttpService {
String path = "/api/vaserver/v1/runtime_config";
JSONObject jsonObject = new JSONObject();
jsonObject.put("task_algo_type", taskAlgType);
Mono<String> mono = WebClient.create(vaServerInfo.getServiceBaseUrl())
Mono<String> mono = WebClientUtils.buildClient(vaServerInfo.getServiceBaseUrl())
.post()
.uri(path)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
......@@ -266,7 +266,7 @@ public class VAServerHttpService {
String path = "/api/vaserver/v1/get_current_scene";
JSONObject obj = new JSONObject();
obj.put("task_unid", taskUnid);
Mono<String> mono = WebClient.create(vaServerInfo.getServiceBaseUrl())
Mono<String> mono = WebClientUtils.buildClient(vaServerInfo.getServiceBaseUrl())
.post()
.uri(path)
.header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
......
......@@ -11,7 +11,6 @@ import com.viontech.fanxing.commons.base.BaseServiceImpl;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.model.DictCode;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.TaskExample;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
......@@ -117,12 +116,6 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
}
JSONArray sceneArray = new JSONArray();
String algType = task.getAlgType();
String addressName = null;
String addressUnid = channel.getAddressUnid();
if (addressUnid != null) {
DictCode dictCode = opsClientService.getDictCodeByUnid(addressUnid);
addressName = dictCode.getName();
}
int sceneNum;
String content;
if ("3".equals(algType)) {
......@@ -132,7 +125,7 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
content = opsClientService.getContentByName("defaultConfig");
sceneNum = 1;
}
ConfigBuilder configBuilder = new ConfigBuilder().buildVchanInfo(channel.getName(), channelUnid, addressUnid, addressName);
ConfigBuilder configBuilder = new ConfigBuilder().buildVchanInfo(channel.getName(), channelUnid);
if (content != null) {
configBuilder.buildDefaultConfig(content);
}
......@@ -229,8 +222,6 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
RLock taskLock = taskDataService.getRepository().getTaskLock(task.getUnid());
try {
// xxx 如果没问题可以删除这行代码
// updateStatus(id, TaskStatus.STAY.val);
taskDataService.addTask(task);
opsClientService.addLog("启动任务:" + task.getName());
} finally {
......
package com.viontech.fanxing.task.utils;
import cn.hutool.core.collection.ListUtil;
import com.google.common.collect.ArrayListMultimap;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.TaskExample;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.runtime.DailyRuntimeConfig;
import com.viontech.fanxing.task.model.runtime.RandomRuntimeConfig;
import com.viontech.fanxing.task.model.runtime.RuntimeConfig;
import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.service.VAServerService;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RMap;
import java.time.LocalTime;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* .
......@@ -19,10 +39,7 @@ public enum TaskUtils {
* 没有场景存储配置,状态是停止或未部署的都不能运行
*/
public boolean canRun(Task task) {
return StringUtils.isNotBlank(task.getScene())
&& task.getStoreConfigId() != null
&& !TaskStatus.AWAIT.valEqual(task.getStatus())
&& !TaskStatus.STOP.valEqual(task.getStatus());
return StringUtils.isNotBlank(task.getScene()) && task.getStoreConfigId() != null && !TaskStatus.AWAIT.valEqual(task.getStatus()) && !TaskStatus.STOP.valEqual(task.getStatus());
}
/**
......@@ -32,7 +49,137 @@ public enum TaskUtils {
* @param present 现在的任务信息
*/
public boolean needRebuild(Task original, Task present) {
return (!original.getRuntimeConf().equals(present.getRuntimeConf()))
|| (!original.getStoreConfigId().equals(present.getStoreConfigId()));
return (!original.getRuntimeConf().equals(present.getRuntimeConf())) || (!original.getStoreConfigId().equals(present.getStoreConfigId()));
}
public void checkRuntimeConf(TaskData taskData, VAServerService vaServerService, TaskDataService taskDataService) {
Integer runtimeType = taskData.getTask().getRuntimeType();
TaskExample taskExample = new TaskExample();
taskExample.createCriteria().andStatusIn(ListUtil.of(TaskStatus.STARTING.val, TaskStatus.STAY.val, TaskStatus.RUNNING.val, TaskStatus.CAN_NOT_RUN.val));
List<Task> tasks = taskDataService.getTaskService().selectByExample(taskExample);
RMap<String, VaServerInfo> map = vaServerService.getVaServerRedisRepository().getVaServerInfoMap();
// 初始化资源检查对象
final TaskChecking taskChecking = new TaskChecking();
map.values().stream().forEach(x -> taskChecking.addServer(x));
tasks.stream().filter(x -> x.getId() != taskData.getTask().getId()).map(x -> new TaskData(x)).forEach(x -> taskChecking.addTask(x));
taskChecking.addTask(taskData);
}
private @Getter
@Setter
static class TaskChecking {
private ArrayListMultimap<String, ResourceNode> serverResourceMap = ArrayListMultimap.create();
public void addTask(TaskData taskData) {
String vaType = taskData.getTask().getVaType();
Integer runtimeType = taskData.getTask().getRuntimeType();
RuntimeConfig runtimeConfig = taskData.getRuntimeConfig();
Float resourceNeed = taskData.getTask().getResourceNeed();
Collection<ResourceNode> nodes;
if (vaType == null) {
nodes = serverResourceMap.values();
} else {
nodes = serverResourceMap.get(vaType);
}
boolean success = false;
switch (runtimeType) {
case 4:
if (reduce(nodes, resourceNeed, 0, ResourceNode.MAX_SECOND)) {
return;
}
break;
case 0:
DailyRuntimeConfig dConfig = (DailyRuntimeConfig) runtimeConfig;
List<RuntimeConfig.Config> multiConfig = dConfig.getMultiConfig();
for (RuntimeConfig.Config config : multiConfig) {
LocalTime start = config.getStart();
LocalTime end = config.getEnd();
if (reduce(nodes, resourceNeed, start.toSecondOfDay(), end.toSecondOfDay())) {
success = true;
} else {
break;
}
}
if (success) {
return;
} else {
break;
}
case 3:
RandomRuntimeConfig rConfig = (RandomRuntimeConfig) runtimeConfig;
Long runningTime = rConfig.getRunningTime();
long runSecond = (int) TimeUnit.MINUTES.toSeconds(runningTime);
int nowSecond = LocalTime.now().toSecondOfDay();
if (runSecond > 86400) {
success = reduce(nodes, resourceNeed, 0, ResourceNode.MAX_SECOND);
} else if ((nowSecond + runSecond) > ResourceNode.MAX_SECOND) {
success = reduce(nodes, resourceNeed, nowSecond, ResourceNode.MAX_SECOND);
if (success) {
reduce(nodes, resourceNeed, 0, (int) ((nowSecond + runSecond) - ResourceNode.MAX_SECOND));
}
} else {
success = reduce(nodes, resourceNeed, nowSecond, (int) (nowSecond + runSecond));
}
if (success) {
return;
}
break;
}
throw new FanXingException("任务资源信息异常");
}
private boolean reduce(Collection<ResourceNode> resourceNodes, Float resourceNeed, int start, int end) {
for (ResourceNode node : resourceNodes) {
if (node.reduce(resourceNeed, start, end)) {
return true;
}
}
return false;
}
public void addServer(VaServerInfo vaServerInfo) {
ResourceNode resourceNode = new ResourceNode(vaServerInfo);
serverResourceMap.put(vaServerInfo.getPlatType(), resourceNode);
}
}
private @Getter
@Setter
@NoArgsConstructor
static class ResourceNode {
public static final Integer MAX_SECOND = 24 * 60 * 60 - 1;
/** min index 0; max index 86399 */
private Float[] resource;
private Float max;
private String devId;
public ResourceNode(VaServerInfo vaServerInfo) {
Float availableResources = vaServerInfo.getVideoResource();
this.devId = vaServerInfo.getDevID();
resource = new Float[86400];
for (int i = 0; i < resource.length; i++) {
resource[i] = availableResources;
}
this.max = availableResources;
}
/**
* @param start include
* @param end exclude
*/
public boolean reduce(Float value, int start, int end) {
for (int i = start; i < end; i++) {
if (!(resource[i] >= value)) {
return false;
}
}
for (int i = start; i < end; i++) {
resource[i] = resource[i] - value;
}
return true;
}
}
}
......@@ -8,13 +8,13 @@ server {
client_max_body_size 10G;
location / {
alias /xmh/fanxing3/page/;
alias /data/page/dist/;
index index.html;
}
location /srs/ {
rewrite ^/srs/(.*) /$1 break;
proxy_pass http://127.0.0.1:8080;
proxy_pass http://127.0.0.1:18080;
}
location /api/ {
......@@ -40,50 +40,25 @@ server {
rewrite ^/api/video-cloud/(.*) /$1 break;
proxy_pass http://127.0.0.1:8888/;
}
}
# grafana
server {
listen 33000;
location / {
proxy_pass http://127.0.0.1:3000;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_hide_header 'Access-Control-Allow-Origin';
add_header Access-Control-Allow-Origin $http_origin;
add_header Access-Control-Allow-Credentials 'true';
add_header Access-Control-Allow-Methods '*';
add_header Access-Control-Allow-Headers 'Origin,X-Requested-With,Content-Type,Accept,Authorization,Referer,User-Agent,x-grafana-org-id';
location /api/images {
if ($request_uri ~* "\.\.\/|\.\/"){
return 500;
}
alias /images;
}
location /api/agent/ {
rewrite ^/api/agent/(.*) /v1/agent/$1 break;
proxy_pass http://127.0.0.1:8500/;
if ( $request_method = 'OPTIONS' ) {
return 200;
}
}
}
# prometheus
server {
listen 39090;
location / {
proxy_pass http://127.0.0.1:9090;
}
}
# consul
server {
listen 38500;
location / {
proxy_pass http://127.0.0.1:8500;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_hide_header 'Access-Control-Allow-Origin';
add_header Access-Control-Allow-Origin $http_origin;
add_header Access-Control-Allow-Credentials 'true';
add_header Access-Control-Allow-Methods '*';
add_header Access-Control-Allow-Headers 'Origin,X-Requested-With,Content-Type,Accept,Authorization,Referer,User-Agent,x-grafana-org-id';
location /api/prometheus/ {
rewrite ^/api/prometheus/(.*) /api/v1/$1 break;
proxy_pass http://192.168.9.233:9090/;
if ( $request_method = 'OPTIONS' ) {
return 200;
}
......
......@@ -251,7 +251,7 @@ CREATE TABLE IF NOT EXISTS s_channel
direction varchar(64) COMMENT '方向',
address_unid VARCHAR(36) COMMENT '绑定的组织结构,组织结构在字典表中',
-- 设备名称不能重复
name VARCHAR(128) UNIQUE COMMENT '名称',
name VARCHAR(128) COMMENT '名称',
ip VARCHAR(36) COMMENT 'ip地址',
port INT COMMENT '端口号',
expand VARCHAR(128) COMMENT '扩展字段,保存一些不重要的信息',
......@@ -335,24 +335,29 @@ CREATE TABLE IF NOT exists d_export_data
-- 分表
set @sql=concat('alter TABLE d_traffic PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_traffic',date_format(adddate(current_date,interval -1 day ),'%Y%m%d') ,' VALUES less than (',UNIX_TIMESTAMP(current_date),'))');
set @sql = concat('alter TABLE d_traffic PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_traffic', date_format(adddate(current_date, interval -1 day), '%Y%m%d'), ' VALUES less than (',
UNIX_TIMESTAMP(current_date), '))');
prepare stmt from @sql;
execute stmt;
set @sql=concat('alter TABLE d_traffic_face PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_traffic_face',date_format(adddate(current_date,interval -1 day ),'%Y%m%d') ,' VALUES less than (',UNIX_TIMESTAMP(current_date),'))');
set @sql = concat('alter TABLE d_traffic_face PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_traffic_face', date_format(adddate(current_date, interval -1 day), '%Y%m%d'), ' VALUES less than (',
UNIX_TIMESTAMP(current_date), '))');
prepare stmt from @sql;
execute stmt;
set @sql=concat('alter TABLE d_flow_event PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_flow_event',date_format(adddate(current_date,interval -1 day ),'%Y%m%d') ,' VALUES less than (',UNIX_TIMESTAMP(current_date),'))');
set @sql = concat('alter TABLE d_flow_event PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_flow_event', date_format(adddate(current_date, interval -1 day), '%Y%m%d'), ' VALUES less than (',
UNIX_TIMESTAMP(current_date), '))');
prepare stmt from @sql;
execute stmt;
set @sql=concat('alter TABLE d_flow_data PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_flow_data',date_format(adddate(current_date,interval -1 day ),'%Y%m%d') ,' VALUES less than (',UNIX_TIMESTAMP(current_date),'))');
set @sql = concat('alter TABLE d_flow_data PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_flow_data', date_format(adddate(current_date, interval -1 day), '%Y%m%d'), ' VALUES less than (',
UNIX_TIMESTAMP(current_date), '))');
prepare stmt from @sql;
execute stmt;
set @sql=concat('alter TABLE d_behavior PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_behavior',date_format(adddate(current_date,interval -1 day ),'%Y%m%d') ,' VALUES less than (',UNIX_TIMESTAMP(current_date),'))');
set @sql = concat('alter TABLE d_behavior PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_behavior', date_format(adddate(current_date, interval -1 day), '%Y%m%d'), ' VALUES less than (',
UNIX_TIMESTAMP(current_date), '))');
prepare stmt from @sql;
execute stmt;
DEALLOCATE PREPARE stmt;
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!