Commit bbef25e5 by xmh

<fix> 如果设备使用中,则不允许修改设备编号

<fix> 修正上传视频录像格式不正确时的删除逻辑
<fix> 分析结果数据概览列出每个任务的分析数据情况
<fix> 添加任务时检测运行配置
<refactor> 优化任务下发和停止逻辑优化
<fix> 资源使用概览不再统计离线服务的资源使用情况
<feat> 在任务下发或任务停止的定时任务执行时不进行服务资源检测
1 parent bc41bdbc
......@@ -106,6 +106,13 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
if (channel == null) {
throw new RuntimeException("资源不存在");
}
// 如果有任务存在并且改了 channelUnid 则报异常
JsonMessageUtil.JsonMessage<List<Task>> res = taskFeignClient.getTaskByChannelUnid(channel.getChannelUnid());
List<Task> data = res.getData();
if (data != null && data.size() > 0 && !channel.getChannelUnid().equals(vo.getChannelUnid())) {
throw new RuntimeException("设备使用中,不能更改设备编号");
}
vo.setId(id);
vo.setUnid(channel.getUnid());
vo.setType(channel.getType());
......@@ -117,8 +124,6 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
// 如果对 streamPath 进行更新,需要同时更新任务信息
if (vo.getStreamPath() != null && !channel.getStreamPath().equals(vo.getChannelUnid())) {
log.info("streamPath 变更,需要更新任务信息");
JsonMessageUtil.JsonMessage<List<Task>> res = taskFeignClient.getTaskByChannelUnid(channel.getChannelUnid());
List<Task> data = res.getData();
if (data != null && data.size() > 0) {
log.info("需要更新的任务数量:{}", data.size());
for (Task item : data) {
......
......@@ -60,7 +60,7 @@ public class ContentServiceImpl extends BaseServiceImpl<Content> implements Cont
ContentExample contentExample = new ContentExample();
contentExample.createCriteria().andTypeEqualTo(TYPE_PLATFORM_CONFIG).andNameEqualTo(NAME_TIMING_CONFIG);
addOrUpdate(TYPE_PLATFORM_CONFIG, NAME_TIMING_CONFIG, jsonObject.toJSONString());
// todo 发给运维服务
// 发给运维服务
List<OpsServer> opsServers = opsServerService.listAll();
for (OpsServer opsServer : opsServers) {
opsServerService.distributeTimingConfig(opsServer, jsonObject);
......
package com.viontech.fanxing.ops.service.main;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.RandomUtil;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.config.VionConfig;
......@@ -7,6 +8,7 @@ import com.viontech.fanxing.commons.feign.TaskFeignClient;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.ops.model.OpsServer;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RKeys;
......@@ -95,12 +97,14 @@ public class OpsServerService {
}
}
@SneakyThrows
public void updateVaServer(String[] devIdArr, MultipartFile file) {
// todo 保存文件到本地,根据 devIdArr 获取对应分析服务,再根据分析服务的 ip 获取对应的运维服务,下发文件地址和升级信息
// 保存文件到本地,根据 devIdArr 获取对应分析服务,再根据分析服务的 ip 获取对应的运维服务,下发文件地址和升级信息
log.info("收到升级请求,目标服务:{},文件:{}", Arrays.toString(devIdArr), file.getName());
File local = new File(vionConfig.getImage().getPath() + "/temp/" + RandomUtil.randomString(6) + "_" + file.getOriginalFilename());
local.getParentFile().mkdirs();
String imageUrl = vionConfig.getImage().getUrlPrefix() + "/temp/" + local.getName();
String imageUrl = "http://RegAddress/api/images/temp/" + local.getName();
FileUtil.writeFromStream(file.getInputStream(), local);
Map<String, List<String>> ipContainerNameMap = getIpContainerNameMap(devIdArr);
List<CompletableFuture<String>> futures = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : ipContainerNameMap.entrySet()) {
......@@ -138,10 +142,8 @@ public class OpsServerService {
return null;
}
}).collect(Collectors.toList());
// todo 收到任务id后的逻辑还没有
log.info("升级任务id:{}", taskIds);
}
private void addOrUpdateOpsServer(OpsServer opsServer) {
......
......@@ -85,10 +85,9 @@ public class VideoService {
FileUtils.copyToFile(file.getInputStream(), video);
checkVideoType(video);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
video.delete();
video.deleteOnExit();
throw new RuntimeException(e);
}
double videoLength = (double) video.length();
double mbSize = videoLength / 1024 / 1024;
......
......@@ -70,12 +70,24 @@ public class TrafficServiceImpl extends BaseServiceImpl<Traffic> implements Traf
@Override
public JSONObject dataOverview(Date date, Long taskId, Integer page, Integer pageSize) {
// todo 还需要加有效分析时长(effectiveAnalysisTime) 和 异常次数(exceptionNum)
Map<Long, Task> taskMap = taskClientService.taskMap();
Date min = DateUtil.setDayMinTime(date);
Date max = DateUtil.setDayMaxTime(date);
HashMap<Long, DataOverViewModel> resultMap = new HashMap<>();
// 每个任务都给要列出来
for (Map.Entry<Long, Task> entry : taskMap.entrySet()) {
if (taskId != null && !entry.getKey().equals(taskId)) {
continue;
}
resultMap.computeIfAbsent(entry.getKey(), x -> {
DataOverViewModel temp = new DataOverViewModel();
temp.setTaskId(x);
temp.setTaskName(entry.getValue().getName());
return temp;
});
}
TrafficExample trafficExample = new TrafficExample();
TrafficExample.Criteria criteria = trafficExample.createCriteria().andEventTimeGreaterThanOrEqualTo(min).andEventTimeLessThanOrEqualTo(max);
if (taskId != null) {
......@@ -142,6 +154,7 @@ public class TrafficServiceImpl extends BaseServiceImpl<Traffic> implements Traf
Map<Long, String> task_id_unid_map = taskMap.values().stream().collect(Collectors.toMap(Task::getId, Task::getUnid, (x, y) -> x));
Map<String, JSONObject> taskStateMap = taskClientService.taskStateMap();
// 加上有效分析时长和异常次数
Collection<DataOverViewModel> values = resultMap.values();
List<DataOverViewModel> collect = values.stream()
.filter(x -> x.getTaskId() != null)
......@@ -162,7 +175,9 @@ public class TrafficServiceImpl extends BaseServiceImpl<Traffic> implements Traf
jsonObject.put("total", collect.size());
jsonObject.put("page", page);
jsonObject.put("pageSize", pageSize);
jsonObject.put("data", partition.get(page - 1));
if (partition != null && partition.size() > 0) {
jsonObject.put("data", partition.get(page - 1));
}
return jsonObject;
}
......
......@@ -79,6 +79,7 @@ public class TaskController extends TaskBaseController {
@PostMapping
@Override
public Object add(@RequestBody TaskVo taskVo) {
Assert.notNull(taskVo.getRuntimeType(), "运行配置不能为空");
try {
taskVo = taskService.addTask(taskVo.getModel());
opsClientService.addLog("添加任务:" + taskVo.getName());
......
......@@ -2,9 +2,9 @@ package com.viontech.fanxing.task.runner;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.service.VAServerService;
import com.viontech.fanxing.task.service.adapter.TaskService;
......@@ -44,7 +44,6 @@ public class TaskRunner {
@Scheduled(fixedDelay = 5000)
public void executedTaskListener() {
RLock jobLock = redisService.getLockMust("lock:taskRunner");
RLock devLock = null;
try {
RScoredSortedSet<String> set = redisService.getToBeExecutedTaskUnidSet();
RMap<String, VaServerInfo> vaServerMap = vaServerService.getVaServerRedisRepository().getVaServerInfoMap();
......@@ -80,25 +79,16 @@ public class TaskRunner {
}
}
if (temp.getAvailableResources() >= resourceNeed) {
devLock = vaServerService.getVaServerRedisRepository().getDevLock(devId);
temp = vaServerMap.get(devId);
if (temp.getAvailableResources() >= resourceNeed) {
server = temp;
break;
} else {
devLock.forceUnlock();
devLock = null;
}
server = temp;
}
}
// 找不到可以用来执行的设备,需要修改状态
if (server == null) {
log.debug("找不到可用的 VAServer,跳过:{}", taskUnid);
taskService.updateStatus(task.getId(), TaskStatus.CAN_NOT_RUN.val);
continue;
}
RLock devLock = vaServerService.getVaServerRedisRepository().getDevLock(server.getDevID());
try {
log.info("开始下发任务:[{}]", taskData.getTask().getName());
vaServerService.executeTask(taskData, server);
......@@ -106,6 +96,8 @@ public class TaskRunner {
log.error("下发任务失败", e);
taskService.updateStatus(task.getId(), TaskStatus.CAN_NOT_RUN.val);
continue;
} finally {
devLock.forceUnlock();
}
// 修改任务状态
......@@ -119,9 +111,6 @@ public class TaskRunner {
} catch (Exception e) {
log.error("", e);
} finally {
if (devLock != null) {
devLock.forceUnlock();
}
jobLock.forceUnlock();
}
......@@ -150,10 +139,12 @@ public class TaskRunner {
set.remove(taskUnid);
// 防止任务持续无法运行导致超过运行时段
toBeExecutedTaskUnidSet.remove(taskUnid);
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.PAUSE.val);
// 随机任务不进行部署
// 随机任务不进行部署,并且状态需要改成未部署
if (taskData.getTask().getRuntimeType() != 3) {
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.PAUSE.val);
boolean b = taskDataService.distributeTask(taskData);
} else {
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.AWAIT.val);
}
}
......
......@@ -8,6 +8,7 @@ import com.viontech.fanxing.task.service.VAServerService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
......@@ -31,11 +32,18 @@ public class VaServerCheckRunner {
public static final ConcurrentHashMap<String, StreamInfo> STREAM_INFO_MAP = new ConcurrentHashMap<>();
@Resource
private RedissonClient redissonClient;
@Resource
private VAServerService vaServerService;
@Scheduled(cron = "3 * * * * ? ")
public void check() {
RLock lock = redissonClient.getLock("lock:taskRunner");
if (lock.isLocked()) {
return;
}
try {
STREAM_INFO_MAP.clear();
RMap<String, VaServerInfo> vaServerInfoMap = vaServerService.getVaServerRedisRepository().getVaServerInfoMap();
Set<Map.Entry<String, VaServerInfo>> entries = vaServerInfoMap.readAllEntrySet();
for (Map.Entry<String, VaServerInfo> entry : entries) {
......
......@@ -264,16 +264,18 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
@LocalCache(value = "task_overview", duration = 10, timeunit = TimeUnit.SECONDS)
public JSONObject overview() {
List<Task> tasks = selectByExample(new TaskExample());
int resourceCount = 0;
int usedResourceCount = 0;
float resourceCount = 0;
float usedResourceCount = 0;
long taskCount = tasks.size();
long runningTaskCount = tasks.stream().filter(x -> TaskStatus.RUNNING.valEqual(x.getStatus())).count();
RMap<String, VaServerInfo> map = vaServerService.getVaServerRedisRepository().getVaServerInfoMap();
Collection<VaServerInfo> vaServerInfos = map.readAllValues();
for (VaServerInfo info : vaServerInfos) {
resourceCount += info.getVideoResource();
usedResourceCount += (info.getVideoResource() - info.getAvailableResources());
if (info.getStatus() == 1) {
resourceCount += info.getVideoResource();
usedResourceCount += (info.getVideoResource() - info.getAvailableResources());
}
}
JSONObject result = new JSONObject();
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!