Commit 9bec8eb7 by xmh

<fix> 数据库初始化脚本修改

<fix> WebClient 缓冲区调整
<fix> 通过文件头判断文件类型调整
<fix> 数据概览,已删除的任务也可以看到
<fix> 数据统计时间修正
<fix> 修复更新任务的时候存储配置没有加存储配置的问题
<fix> 任务状态及任务运行逻辑调整
<feat> 可用资源数以任务服务为准,不再通过视频分析服务纠正
<fix> 调整下发任务时 deviceUnid 的逻辑
<feat> 版本号内置
1 parent 3f864f46
Showing 34 changed files with 204 additions and 142 deletions
......@@ -12,8 +12,10 @@ public enum TaskStatus {
AWAIT(0),
/** 运行中 */
RUNNING(1),
/** 待时 */
STAY(2),
/** 暂停 */
PAUSE(2),
STOP(3),
/** 无法运行 */
CAN_NOT_RUN(4);
public int val;
......
......@@ -17,7 +17,7 @@ import org.springframework.web.bind.annotation.RequestParam;
public interface OSDConfigClient {
@GetMapping("/storageConfigs")
JSONObject listAll(@RequestParam Integer type);
JSONObject listAll(@RequestParam(required = false) Integer type);
@GetMapping("/storageConfigs/{id}")
JSONObject getById(@PathVariable("id") Long id);
......
......@@ -156,6 +156,7 @@ public class PicKeepRunner {
}
private void addLog(String yyyyMMdd) {
try {
String year = yyyyMMdd.substring(0, 4);
String month = yyyyMMdd.substring(4, 6);
String day = yyyyMMdd.substring(6, 8);
......@@ -164,6 +165,9 @@ public class PicKeepRunner {
logVo.setLogType(LogType.SYSTEM.value);
logVo.setContent("清除数据 执行时间: " + DateUtil.format(DateUtil.FORMAT_LONG, new Date()) + " 数据日期: " + year + "-" + month + "-" + day);
opsClient.addLog(logVo);
} catch (Exception e) {
log.info("日志发送失败", e);
}
}
}
......
spring.cloud.consul.discovery.metadata.version=3.0-SNAPSHOT
\ No newline at end of file
......@@ -27,8 +27,6 @@ spring:
# 服务注册标识,格式为:应用名称:服务器IP:端口
instance-id: ${spring.application.name}:${spring.cloud.consul.discovery.ip-address}:${server.port}
ip-address: 192.168.9.146
metadata:
version: 3.0.1-SNAPSHOT
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.9.233:3306/fanxing3?serverTimezone=Asia/Shanghai
......
spring.cloud.consul.discovery.metadata.version=3.0-SNAPSHOT
\ No newline at end of file
......@@ -49,8 +49,6 @@ spring:
# 服务注册标识,格式为:应用名称:服务器IP:端口
instance-id: ${spring.application.name}:${spring.cloud.consul.discovery.ip-address}:${server.port}
ip-address: 192.168.9.146
metadata:
version: 3.0.1-SNAPSHOT
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.9.233:3306/fanxing3
......
......@@ -29,6 +29,11 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-config</artifactId>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
<version>5.2.4</version>
</dependency>
</dependencies>
<build>
......
......@@ -23,8 +23,6 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static com.viontech.keliu.util.JsonMessageUtil.getSuccessJsonMsg;
@RestController
@RequestMapping("/channels")
@Slf4j
......@@ -55,7 +53,7 @@ public class ChannelController extends ChannelBaseController {
Assert.notNull(channelVo.getType(), "类型不能为空");
try {
return JsonMessageUtil.getSuccessJsonMsg(channelService.add(channelVo));
return success(channelService.add(channelVo));
} catch (DuplicateKeyException e) {
log.error("", e);
return JsonMessageUtil.getErrorJsonMsg("设备编号重复");
......@@ -76,7 +74,7 @@ public class ChannelController extends ChannelBaseController {
Assert.notNull(channelVo.getStreamPath(), "视频流地址不能为空");
try {
ChannelVo update = channelService.update(id, channelVo);
return JsonMessageUtil.getSuccessJsonMsg(MESSAGE_UPDATE_SUCCESS, update);
return success(update);
} catch (DuplicateKeyException e) {
log.error("", e);
return JsonMessageUtil.getErrorJsonMsg("设备名称或设备编号重复");
......@@ -138,20 +136,20 @@ public class ChannelController extends ChannelBaseController {
channel.setTags(tags);
}
}
return getSuccessJsonMsg(MESSAGE_SELECT_SUCCESS, response);
return success(response);
}
}
@PostMapping("/nvs3000")
public Object nvs3000(@RequestBody JSONObject jsonObject) {
JSONObject result = channelService.nvs3000(jsonObject.getString("nvsUrl"), jsonObject.getString("nvsRegex"));
return JsonMessageUtil.getSuccessJsonMsg(result);
return success(result);
}
@GetMapping("/videoCloud/pull")
public Object pullFromVideoCloud() {
channelService.pullFromVideoCloud();
return getSuccessJsonMsg(MESSAGE_SELECT_SUCCESS);
return success();
}
}
\ No newline at end of file
......@@ -35,6 +35,7 @@ public class StoreConfigController extends StoreConfigBaseController {
@RequestMapping(value = "", method = RequestMethod.GET)
@Override
public Object page(StoreConfigVo storeConfigVo, @RequestParam(value = "page", defaultValue = "-1") int page, @RequestParam(value = "pageSize", defaultValue = "100") int pageSize, String sortName, String sortOrder) {
// todo 应该取所有,传 null
return osdConfigClient.listAll(1);
}
......
......@@ -331,7 +331,10 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
// 第一次请求可能会出错,不想查了
for (int i = 0; i < 2; i++) {
try {
response = WebClient.create(vionConfig.getVideoCloud().getUrl())
response = WebClient.builder()
.codecs(clientCodecConfigurer -> clientCodecConfigurer.defaultCodecs().maxInMemorySize(-1))
.baseUrl(vionConfig.getVideoCloud().getUrl())
.build()
.get()
.uri(uriBuilder -> uriBuilder.path("/api/device/getAllDeviceList").queryParam("userid", vionConfig.getVideoCloud().getId()).build())
.retrieve()
......
......@@ -117,12 +117,15 @@ public class VideoService {
private void checkVideoType(File file) throws IOException {
try (FileInputStream fi = new FileInputStream(file)) {
byte[] bytes = IoUtil.readBytes(fi, 32);
String s = IoUtil.readHex(fi, 4, true);
String s1 = new String(bytes).toLowerCase();
if (!"00000001".equals(s) && !s1.contains("avi") && !s1.contains("mp4")) {
throw new FanXingException("视频格式不支持");
String s = IoUtil.readHex(fi, 32, true);
// h264,avi,mp4
if (s.contains("00000001")
|| (s.contains("52494646") && s.contains("41564920"))
|| s.contains("6674797069736f6d")) {
return;
}
throw new FanXingException("视频格式不支持");
}
}
......
spring.cloud.consul.discovery.metadata.version=3.0-SNAPSHOT
\ No newline at end of file
......@@ -27,8 +27,6 @@ spring:
# 服务注册标识,格式为:应用名称:服务器IP:端口
instance-id: ${spring.application.name}:${spring.cloud.consul.discovery.ip-address}:${server.port}
ip-address: 192.168.9.146
metadata:
version: 3.0.1-SNAPSHOT
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.9.233:3306/fanxing3
......@@ -47,6 +45,10 @@ spring:
scheduling:
pool:
size: 10
flyway:
locations: classpath:/db
enabled: true
baseline-on-migrate: true
logging:
config: classpath:logback-${spring.profiles.active}.xml
mybatis:
......
This diff could not be displayed because it is too large.
package com.viontech.fanxing.ops;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.feign.OSDConfigClient;
import com.viontech.fanxing.commons.feign.TaskFeignClient;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.vo.ChannelVo;
......@@ -30,16 +32,13 @@ public class MainTest {
private ChannelService channelService;
@Resource
private TaskFeignClient taskFeignClient;
@Resource
private OSDConfigClient osdConfigClient;
@Test
public void nvs3000CodeTest() {
ChannelVo channelVo = new ChannelVo();
channelVo.setChannelUnid("123");
ArrayList<ChannelVo> objects = new ArrayList<>();
objects.add(channelVo);
Map<String, List<ChannelVo>> addressUnid_channel_map = objects.stream().map(ChannelVo::new).collect(Collectors.groupingBy(ChannelVo::getAddressUnid, Collectors.toList()));
System.out.println(addressUnid_channel_map);
public void test() {
JSONObject jsonObject = osdConfigClient.listAll(null);
System.out.println(jsonObject.toString());
}
......
......@@ -103,7 +103,7 @@ public class TrafficServiceImpl extends BaseServiceImpl<Traffic> implements Traf
DataOverViewModel dov = resultMap.computeIfAbsent(t.getTaskId(), x -> {
DataOverViewModel temp = new DataOverViewModel();
temp.setTaskId(x);
temp.setTaskName(taskMap.get(x).getName());
temp.setTaskName(taskMap.get(x) == null ? "任务已删除" : taskMap.get(x).getName());
return temp;
});
dov.setTraffic(t.getCount());
......@@ -124,7 +124,7 @@ public class TrafficServiceImpl extends BaseServiceImpl<Traffic> implements Traf
DataOverViewModel dov = resultMap.computeIfAbsent(f.getTaskId(), x -> {
DataOverViewModel temp = new DataOverViewModel();
temp.setTaskId(x);
temp.setTaskName(taskMap.get(x).getName());
temp.setTaskName(taskMap.get(x) == null ? "任务已删除" : taskMap.get(x).getName());
return temp;
});
dov.setFlow(f.getCount());
......@@ -145,7 +145,7 @@ public class TrafficServiceImpl extends BaseServiceImpl<Traffic> implements Traf
DataOverViewModel dov = resultMap.computeIfAbsent(b.getTaskId(), x -> {
DataOverViewModel temp = new DataOverViewModel();
temp.setTaskId(x);
temp.setTaskName(taskMap.get(x).getName());
temp.setTaskName(taskMap.get(x) == null ? "任务已删除" : taskMap.get(x).getName());
return temp;
});
dov.setBehavior(b.getCount());
......@@ -252,18 +252,18 @@ public class TrafficServiceImpl extends BaseServiceImpl<Traffic> implements Traf
}
@Override
@LocalCache(value = "traffic_statics_123fds4", duration = 1)
@LocalCache(value = "traffic_statics_123fds4", duration = 5)
public JSONArray statistics() {
JSONArray jsonArray = new JSONArray();
jdbcTemplate.query("select '过车数据' as dataType ,max(event_time) maxTime,min(event_time) minTime,count(pics) as picCount,count(video_name) as videoCount from d_traffic where event_type='vehicle' and illegal_state=0\n" +
jdbcTemplate.query("select '过车数据' as dataType ,adddate(max(event_time), interval 8 hour) maxTime,adddate(min(event_time),interval 8 hour) minTime,count(pics) as picCount,count(video_name) as videoCount from d_traffic where event_type='vehicle' and illegal_state=0\n" +
"union\n" +
"select '违法数据' as dataType ,max(event_time) maxTime,min(event_time) minTime,count(pics) as picCount,count(video_name) as videoCount from d_traffic where event_type='vehicle' and illegal_state=1\n" +
"select '违法数据' as dataType ,adddate(max(event_time), interval 8 hour) maxTime,adddate(min(event_time),interval 8 hour) minTime,count(pics) as picCount,count(video_name) as videoCount from d_traffic where event_type='vehicle' and illegal_state=1\n" +
"union\n" +
"select '非机动车数据' as dataType ,max(event_time) maxTime,min(event_time) minTime,count(pics) as picCount,count(video_name) as videoCount from d_traffic where event_type='xcycle'\n" +
"select '非机动车数据' as dataType ,adddate(max(event_time), interval 8 hour) maxTime,adddate(min(event_time),interval 8 hour) minTime,count(pics) as picCount,count(video_name) as videoCount from d_traffic where event_type='xcycle'\n" +
"union\n" +
"select '行人数据' as dataType ,max(event_time) maxTime,min(event_time) minTime,count(pics) as picCount,count(video_name) as videoCount from d_traffic where event_type='pedestrian'\n" +
"select '行人数据' as dataType ,adddate(max(event_time), interval 8 hour) maxTime,adddate(min(event_time),interval 8 hour) minTime,count(pics) as picCount,count(video_name) as videoCount from d_traffic where event_type='pedestrian'\n" +
"union\n" +
"select '事件数据' as dataType ,max(event_time) maxTime,min(event_time) minTime,count(pics) as picCount,count(video) as videoCount from d_behavior", rs -> {
"select '事件数据' as dataType ,adddate(max(event_time), interval 8 hour) maxTime,adddate(min(event_time),interval 8 hour) minTime,count(pics) as picCount,count(video) as videoCount from d_behavior", rs -> {
JSONObject jsonObject = new JSONObject();
jsonObject.put("dataType", rs.getString("dataType"));
jsonObject.put("maxTime", rs.getDate("maxTime"));
......
spring.cloud.consul.discovery.metadata.version=3.0-SNAPSHOT
\ No newline at end of file
......@@ -27,8 +27,6 @@ spring:
# 服务注册标识,格式为:应用名称:服务器IP:端口
instance-id: ${spring.application.name}:${spring.cloud.consul.discovery.ip-address}:${server.port}
ip-address: 192.168.9.146
metadata:
version: 3.0.1-SNAPSHOT
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.9.233:3306/fanxing3
......
......@@ -80,6 +80,10 @@ public class TaskController extends TaskBaseController {
@Override
public Object add(@RequestBody TaskVo taskVo) {
Assert.notNull(taskVo.getRuntimeType(), "运行配置不能为空");
Assert.notNull(taskVo.getResourceNeed(), "资源占用不能为空");
Assert.notNull(taskVo.getStoreConfigId(), "存储配置不能为空");
Assert.notNull(taskVo.getName(), "任务名称不能为空");
Assert.notNull(taskVo.getAlgType(), "算法类型不能为空");
try {
taskVo = taskService.addTask(taskVo.getModel());
opsClientService.addLog("添加任务:" + taskVo.getName());
......
......@@ -25,6 +25,8 @@ public class TaskData implements Serializable {
private String storeConfig;
private String deviceUnid;
public TaskData(Task task) {
this.task = task;
......
......@@ -44,5 +44,6 @@ public class VATask {
this.stream_type = task.getStreamType();
this.resource_use = task.getResourceNeed();
this.scene = JSON.parseArray(task.getScene());
this.device_unid = taskData.getDeviceUnid();
}
}
package com.viontech.fanxing.task.repository;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.commons.service.RedisService;
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
......@@ -34,7 +34,7 @@ public class VAServerRedisRepository {
if (vaServerInfo == null) {
return null;
}
if (!online(devId)) {
if (!online(devId) && vaServerInfo.getStatus() != 0) {
vaServerInfo.setStatus(0);
addOrUpdate(devId, vaServerInfo);
}
......@@ -44,10 +44,10 @@ public class VAServerRedisRepository {
public RMap<String, VaServerInfo> getVaServerInfoMap() {
for (Map.Entry<String, VaServerInfo> entry : vaServerMap.entrySet()) {
String devId = entry.getKey();
if (!online(devId)) {
VaServerInfo value = entry.getValue();
value.setStatus(0);
addOrUpdate(devId, value);
VaServerInfo vaServerInfo = entry.getValue();
if (!online(devId) && vaServerInfo.getStatus() != 0) {
vaServerInfo.setStatus(0);
addOrUpdate(devId, vaServerInfo);
}
}
return vaServerMap;
......
......@@ -6,8 +6,8 @@ import com.viontech.fanxing.commons.model.TaskExample;
import com.viontech.fanxing.task.mapper.TaskMapper;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.utils.TaskUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
......@@ -39,12 +39,8 @@ public class TaskInitRunner implements CommandLineRunner {
List<Task> tasks = taskMapper.selectByExampleWithBLOBs(new TaskExample());
for (Task task : tasks) {
// 随机任务只有 启动接口 可以启动,暂停状态说明运行完了
if (task.getRuntimeType().equals(3) && task.getStatus() == TaskStatus.PAUSE.val) {
continue;
}
// scene和storage不为空,不处于未部署状态
if (StringUtils.isNotBlank(task.getScene()) && task.getStoreConfigId() != null && !TaskStatus.AWAIT.valEqual(task.getStatus())) {
if (TaskUtils.INSTANCE.canRun(task)) {
try {
TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(task.getUnid());
if (taskData == null) {
......
......@@ -139,12 +139,12 @@ public class TaskRunner {
set.remove(taskUnid);
// 防止任务持续无法运行导致超过运行时段
toBeExecutedTaskUnidSet.remove(taskUnid);
// 随机任务不进行部署,并且状态需要改成未部署
if (taskData.getTask().getRuntimeType() != 3) {
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.PAUSE.val);
boolean b = taskDataService.distributeTask(taskData);
// 随机任务不进行部署,并且状态需要改成暂停
if (taskData.getTask().getRuntimeType() == 3) {
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.STOP.val);
} else {
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.AWAIT.val);
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.STAY.val);
boolean b = taskDataService.distributeTask(taskData);
}
}
......
......@@ -38,10 +38,6 @@ public class VaServerCheckRunner {
@Scheduled(cron = "3 * * * * ? ")
public void check() {
RLock lock = redissonClient.getLock("lock:taskRunner");
if (lock.isLocked()) {
return;
}
try {
STREAM_INFO_MAP.clear();
RMap<String, VaServerInfo> vaServerInfoMap = vaServerService.getVaServerRedisRepository().getVaServerInfoMap();
......@@ -54,13 +50,14 @@ public class VaServerCheckRunner {
if (vaServerInfo.getStatus() == 1) {
devLock = vaServerService.getVaServerRedisRepository().getDevLock(devId);
JSONObject status = vaServerService.getStatus(devId);
JSONObject resource = status.getJSONObject("resource");
JSONObject brief = resource.getJSONObject("brief");
float videoResource = brief.getFloatValue("video_total");
float availableResource = brief.getFloatValue("video_free");
vaServerInfo.setAvailableResources(availableResource);
vaServerInfo.setVideoResource(videoResource);
vaServerService.getVaServerRedisRepository().addOrUpdate(devId, vaServerInfo);
// todo 任务状态检测
// JSONObject resource = status.getJSONObject("resource");
// JSONObject brief = resource.getJSONObject("brief");
// float videoResource = brief.getFloatValue("video_total");
// float availableResource = brief.getFloatValue("video_free");
// vaServerInfo.setAvailableResources(availableResource);
// vaServerInfo.setVideoResource(videoResource);
// vaServerService.getVaServerRedisRepository().addOrUpdate(devId, vaServerInfo);
// 统计帧率和视频源状态
if (!status.containsKey("tasks")) {
......
package com.viontech.fanxing.task.service;
import com.viontech.fanxing.commons.constant.ChannelType;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.commons.service.RedisService;
......@@ -39,14 +41,7 @@ public class TaskDataService {
private TaskService taskService;
public void addTask(Task task) {
TaskData taskData = new TaskData(task);
// 获取存储配置
Long storeConfigId = task.getStoreConfigId();
String config = opsClientService.getStoreConfigById(storeConfigId);
if (config == null) {
throw new FanXingException("无法获取对应的存储配置");
}
taskData.setStoreConfig(config);
TaskData taskData = buildTaskData(task);
taskDataRedisRepository.addOrUpdateTaskData(taskData);
// 计算运行时间并生成任务
boolean success = distributeTask(taskData);
......@@ -64,7 +59,7 @@ public class TaskDataService {
if (vaServerInfo != null) {
return true;
}
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.PAUSE.val);
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.STAY.val);
ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
log.info("部署任务[{}],运行时间:[{}]", taskData.getTask().getName(), nextTime.toString());
Long nextExecuteTime = nextTime.left;
......@@ -117,7 +112,7 @@ public class TaskDataService {
if (success) {
removeTaskDataAll(taskUnid);
} else {
throw new FanXingException("failed");
throw new FanXingException("失败");
}
}
......@@ -133,13 +128,30 @@ public class TaskDataService {
throw new FanXingException("设备离线");
} else {
TaskData taskData = new TaskData(task);
TaskData taskData = buildTaskData(task);
// 需要更新taskData,并且向vaServer更新任务信息
taskDataRedisRepository.addOrUpdateTaskData(taskData);
vaServerService.updateTask(taskData);
}
}
private TaskData buildTaskData(Task task) {
TaskData taskData = new TaskData(task);
// 获取存储配置
Long storeConfigId = task.getStoreConfigId();
String config = opsClientService.getStoreConfigById(storeConfigId);
if (config == null) {
throw new FanXingException("无法获取对应的存储配置");
}
taskData.setStoreConfig(config);
if (taskData.getTask().getStreamType().equals(ChannelType.STREAM_VIDEO_CLOUD.value)) {
Channel channel = opsClientService.getChannelByChannelUnid(taskData.getTask().getChannelUnid());
String deviceUnid = channel.getDeviceUnid();
taskData.setDeviceUnid(deviceUnid);
}
return taskData;
}
public TaskDataRedisRepository getRepository() {
return taskDataRedisRepository;
}
......
......@@ -2,13 +2,10 @@ package com.viontech.fanxing.task.service;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ecwid.consul.v1.ConsulClient;
import com.viontech.fanxing.commons.constant.ChannelType;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.vaserver.VATask;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.springframework.http.MediaType;
......@@ -16,7 +13,6 @@ import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.time.Duration;
/**
......@@ -30,22 +26,11 @@ import java.time.Duration;
@Slf4j
public class VAServerHttpService {
@Resource
private OpsClientService opsClientService;
@Resource
private ConsulClient consulClient;
/**
* 下发任务
*/
public JSONObject addTask(TaskData taskData, VaServerInfo vaServerInfo) {
VATask vaTask = new VATask(taskData);
if (vaTask.getStream_type().equals(ChannelType.STREAM_VIDEO_CLOUD.value)) {
Channel channel = opsClientService.getChannelByChannelUnid(vaTask.getChannel_unid());
String deviceUnid = channel.getDeviceUnid();
vaTask.setDevice_unid(deviceUnid);
}
String path = "/api/vaserver/v1/task";
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
.post()
......@@ -55,7 +40,12 @@ public class VAServerHttpService {
.bodyToMono(String.class);
String response = getResponse(stringMono, Duration.ofSeconds(60));
log.info("下发任务结果:{}", response);
return JSON.parseObject(response);
JSONObject jsonObject = JSON.parseObject(response);
Integer code = jsonObject.getInteger("code");
if (code != 200) {
throw new FanXingException(jsonObject.getString("msg"));
}
return jsonObject;
}
/**
......@@ -64,11 +54,6 @@ public class VAServerHttpService {
public JSONObject updateTask(TaskData taskData, VaServerInfo vaServerInfo) {
String path = "/api/vaserver/v1/task";
VATask vaTask = new VATask(taskData);
if (vaTask.getStream_type().equals(ChannelType.STREAM_VIDEO_CLOUD.value)) {
Channel channel = opsClientService.getChannelByChannelUnid(vaTask.getChannel_unid());
String deviceUnid = channel.getDeviceUnid();
vaTask.setDevice_unid(deviceUnid);
}
Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
.put()
.uri(uriBuilder -> uriBuilder.path(path).build())
......@@ -77,7 +62,12 @@ public class VAServerHttpService {
.bodyToMono(String.class);
String response = getResponse(stringMono, Duration.ofSeconds(60));
log.info("更新任务结果:{}", response);
return JSON.parseObject(response);
JSONObject jsonObject = JSON.parseObject(response);
Integer code = jsonObject.getInteger("code");
if (code != 200) {
throw new FanXingException(jsonObject.getString("msg"));
}
return jsonObject;
}
/**
......
......@@ -54,6 +54,9 @@ public class VAServerService {
*/
public void registerVAServer(VaServerInfo vaServerInfo) {
String devId = vaServerInfo.getDevID();
VaServerInfo vaServer = vaServerRedisRepository.getVAServerInfoById(devId);
Float availableResources = vaServer == null ? vaServerInfo.getVideoResource() : vaServer.getAvailableResources();
vaServerInfo.setAvailableResources(availableResources);
vaServerRedisRepository.addOrUpdate(devId, vaServerInfo);
keepalive(devId);
}
......@@ -132,7 +135,11 @@ public class VAServerService {
if (vaServerInfo == null) {
return;
}
vaServerInfo.setAvailableResources(vaServerInfo.getAvailableResources() + param);
float v = vaServerInfo.getAvailableResources() + param;
if (v > vaServerInfo.getVideoResource()) {
v = vaServerInfo.getVideoResource();
}
vaServerInfo.setAvailableResources(v);
vaServerRedisRepository.addOrUpdate(devId, vaServerInfo);
} finally {
vaServerLock.forceUnlock();
......@@ -305,11 +312,11 @@ public class VAServerService {
ip = matcher.group();
}
JSONObject status = new JSONObject();
JSONObject brief = null;
try {
if (vaServer.getStatus() == 1) {
status = vaServerHttpService.status(vaServer);
brief = status.getJSONObject("resource").getJSONObject("brief");
status.remove("resource");
}
} catch (Exception e) {
log.error("", e);
}
......@@ -325,9 +332,7 @@ public class VAServerService {
VaServerOverViewModel model = map.computeIfAbsent(ip, x -> new VaServerOverViewModel());
model.setIp(ip).addInfo(status);
if (brief != null) {
model.addTotal(brief.getFloat("video_total")).addUsed(brief.getFloat("video_busy"));
}
model.addTotal(vaServer.getVideoResource()).addUsed(vaServer.getVideoResource() - vaServer.getAvailableResources());
}
return map.values();
......
......@@ -8,7 +8,6 @@ import com.github.pagehelper.PageInfo;
import com.viontech.fanxing.commons.base.BaseExample;
import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.base.BaseServiceImpl;
import com.viontech.fanxing.commons.base.LocalCache;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Channel;
......@@ -25,6 +24,7 @@ import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.service.VAServerService;
import com.viontech.fanxing.task.service.adapter.TaskService;
import com.viontech.fanxing.task.utils.SceneUtils;
import com.viontech.fanxing.task.utils.TaskUtils;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
......@@ -36,7 +36,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Service
......@@ -180,14 +179,10 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
throw new FanXingException("任务不存在");
}
// 具有存储配置,具有场景配置,并且不是未部署状态的才去更新
if (StringUtils.isNotBlank(task.getScene())
&& task.getStoreConfigId() != null
&& !TaskStatus.AWAIT.valEqual(task.getStatus())) {
// 具有存储配置,具有场景配置,并且不是未部署和停止状态的才去更新
if (TaskUtils.INSTANCE.canRun(task)) {
// 改变了运行时段或者存储配置需要重新部署
boolean rebuild = (!originalTask.getRuntimeConf().equals(task.getRuntimeConf()))
|| (!originalTask.getStoreConfigId().equals(task.getStoreConfigId()));
boolean rebuild = TaskUtils.INSTANCE.needRebuild(originalTask, task);
taskDataService.updateTask(task, rebuild);
}
return new TaskVo(task);
......@@ -234,7 +229,8 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
RLock taskLock = taskDataService.getRepository().getTaskLock(task.getUnid());
try {
updateStatus(id, TaskStatus.PAUSE.val);
// xxx 如果没问题可以删除这行代码
// updateStatus(id, TaskStatus.STAY.val);
taskDataService.addTask(task);
opsClientService.addLog("启动任务:" + task.getName());
} finally {
......@@ -253,7 +249,7 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
RLock taskLock = taskDataService.getRepository().getTaskLock(task.getUnid());
try {
taskDataService.deleteTask(task.getUnid());
updateStatus(id, TaskStatus.AWAIT.val);
updateStatus(id, TaskStatus.STOP.val);
opsClientService.addLog("停止任务:" + task.getName());
} finally {
taskLock.forceUnlock();
......@@ -261,7 +257,6 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
}
@Override
@LocalCache(value = "task_overview", duration = 10, timeunit = TimeUnit.SECONDS)
public JSONObject overview() {
List<Task> tasks = selectByExample(new TaskExample());
float resourceCount = 0;
......
package com.viontech.fanxing.task.utils;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.model.Task;
import org.apache.commons.lang3.StringUtils;
/**
* .
*
* @author 谢明辉
* @date 2021/12/21
*/
@SuppressWarnings("ALL")
public enum TaskUtils {
INSTANCE;
/**
* 判断任务是否可以执行
* 没有场景存储配置,状态是停止或未部署的都不能运行
*/
public boolean canRun(Task task) {
return StringUtils.isNotBlank(task.getScene())
&& task.getStoreConfigId() != null
&& !TaskStatus.AWAIT.valEqual(task.getStatus())
&& !TaskStatus.STOP.valEqual(task.getStatus());
}
/**
* 判断任务是否需要被重新构建
*
* @param original 之前的任务信息
* @param present 现在的任务信息
*/
public boolean needRebuild(Task original, Task present) {
return (!original.getRuntimeConf().equals(present.getRuntimeConf()))
|| (!original.getStoreConfigId().equals(present.getStoreConfigId()));
}
}
spring.cloud.consul.discovery.metadata.version=3.0-SNAPSHOT
\ No newline at end of file
......@@ -27,8 +27,6 @@ spring:
# 服务注册标识,格式为:应用名称:服务器IP:端口
instance-id: ${spring.application.name}:${spring.cloud.consul.discovery.ip-address}:${server.port}
ip-address: 192.168.9.146
metadata:
version: 3.0.1-SNAPSHOT
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.9.233:3306/fanxing3
......@@ -47,6 +45,8 @@ spring:
scheduling:
pool:
size: 10
codec:
max-in-memory-size: -1
mybatis:
type-aliases-package: com.viontech.fanxing.commons.model
mapper-locations: classpath:com/viontech/fanxing/task/mapping/*.xml
......
......@@ -335,21 +335,27 @@ CREATE TABLE IF NOT exists d_export_data
-- 分表
alter TABLE d_traffic PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_traffic20210701 VALUES less than (UNIX_TIMESTAMP('yyyy-MM-dd'))
);
alter TABLE d_traffic_face PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_traffic_face20210701 VALUES less than (UNIX_TIMESTAMP('yyyy-MM-dd'))
);
alter TABLE d_flow_event PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_flow_event20210701 VALUES less than (UNIX_TIMESTAMP('yyyy-MM-dd'))
);
alter TABLE d_flow_data PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_flow_data20210701 VALUES less than (UNIX_TIMESTAMP('yyyy-MM-dd'))
);
alter TABLE d_behavior PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_behavior20210701 VALUES less than (UNIX_TIMESTAMP('yyyy-MM-dd'))
);
set @sql=concat('alter TABLE d_traffic PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_traffic',date_format(adddate(current_date,interval -1 day ),'%Y%m%d') ,' VALUES less than (',UNIX_TIMESTAMP(current_date),'))');
prepare stmt from @sql;
execute stmt;
set @sql=concat('alter TABLE d_traffic_face PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_traffic_face',date_format(adddate(current_date,interval -1 day ),'%Y%m%d') ,' VALUES less than (',UNIX_TIMESTAMP(current_date),'))');
prepare stmt from @sql;
execute stmt;
set @sql=concat('alter TABLE d_flow_event PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_flow_event',date_format(adddate(current_date,interval -1 day ),'%Y%m%d') ,' VALUES less than (',UNIX_TIMESTAMP(current_date),'))');
prepare stmt from @sql;
execute stmt;
set @sql=concat('alter TABLE d_flow_data PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_flow_data',date_format(adddate(current_date,interval -1 day ),'%Y%m%d') ,' VALUES less than (',UNIX_TIMESTAMP(current_date),'))');
prepare stmt from @sql;
execute stmt;
set @sql=concat('alter TABLE d_behavior PARTITION by RANGE (UNIX_TIMESTAMP(event_time))(
PARTITION d_behavior',date_format(adddate(current_date,interval -1 day ),'%Y%m%d') ,' VALUES less than (',UNIX_TIMESTAMP(current_date),'))');
prepare stmt from @sql;
execute stmt;
DEALLOCATE PREPARE stmt;
delimiter $$
DROP PROCEDURE IF EXISTS timestamp_partition
......@@ -396,10 +402,10 @@ create event auto_pt
every 1 day
do
BEGIN
call timestamp_partition('d_traffic', '{schema}');
call timestamp_partition('d_behavior', '{schema}');
call timestamp_partition('d_flow_event', '{schema}');
call timestamp_partition('d_flow_data', '{schema}');
call timestamp_partition('d_traffic_face', '{schema}');
call timestamp_partition('d_traffic', 'fanxing3');
call timestamp_partition('d_behavior', 'fanxing3');
call timestamp_partition('d_flow_event', 'fanxing3');
call timestamp_partition('d_flow_data', 'fanxing3');
call timestamp_partition('d_traffic_face', 'fanxing3');
END$$
delimiter ;
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!