Commit 34f9d5d6 by xmh

1. <feat> 添加 nginx 配置

2. <refactor> 修改srs的http地址,使其通过nginx进行转发
3. <refactor> 扩大 forward 服务的线程池
4. <feat> 数据实时推送加锁
5. <fix> gateway 服务排除 tomcat 相关依赖, 添加日志, 移除视频转发配置(交给nginx)
6. <feat> 手动添加修改相机资源时必要字段进行校验, 更新时全量更新, 如果 streamPath 修改, 则更新对应任务
7. <feat> 添加场景时需要做检查, scene_unid 和 position_num 不能重复
1 parent 08cf7603
Showing 23 changed files with 255 additions and 47 deletions
...@@ -70,8 +70,8 @@ public class VionConfig { ...@@ -70,8 +70,8 @@ public class VionConfig {
} }
public String getHttpUrl(String taskUnid) { public String getHttpUrl(String taskUnid) {
String url = "http://36.112.68.214:30008/live/" + taskUnid + ".flv"; String url = "http://36.112.68.214:30010/srs/live/" + taskUnid + ".flv";
// String url = "http://" + ip + ":" + httpPort + "/" + taskUnid + ".flv"; // String url = "http://" + ip + ":" + httpPort + "/srs/live/" + taskUnid + ".flv";
log.debug(url); log.debug(url);
return url; return url;
} }
......
...@@ -44,9 +44,11 @@ public class ForwardApp { ...@@ -44,9 +44,11 @@ public class ForwardApp {
@Bean @Bean
public TaskExecutor taskExecutor() { public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(50); threadPoolTaskExecutor.setMaxPoolSize(100);
threadPoolTaskExecutor.setCorePoolSize(20); threadPoolTaskExecutor.setCorePoolSize(50);
threadPoolTaskExecutor.setThreadNamePrefix("fanxing-forward-process-"); threadPoolTaskExecutor.setThreadNamePrefix("fanxing-forward-process-");
threadPoolTaskExecutor.setQueueCapacity(1000);
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return threadPoolTaskExecutor; return threadPoolTaskExecutor;
} }
......
...@@ -26,6 +26,7 @@ import java.util.UUID; ...@@ -26,6 +26,7 @@ import java.util.UUID;
@Order(11) @Order(11)
@Profile("!test") @Profile("!test")
public class JobStartRunner implements CommandLineRunner { public class JobStartRunner implements CommandLineRunner {
private static final int WORKER_NUM = 10;
@Resource @Resource
JobRepository jobRepository; JobRepository jobRepository;
...@@ -49,23 +50,24 @@ public class JobStartRunner implements CommandLineRunner { ...@@ -49,23 +50,24 @@ public class JobStartRunner implements CommandLineRunner {
sjl.setTaskExecutor(taskExecutor); sjl.setTaskExecutor(taskExecutor);
sjl.afterPropertiesSet(); sjl.afterPropertiesSet();
JobParameters jobParameters; JobParameters jobParameters;
for (int i = 0; i < 5; i++) {
for (int i = 0; i < WORKER_NUM; i++) {
jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters(); jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters();
sjl.run(trafficJob, jobParameters); sjl.run(trafficJob, jobParameters);
} }
for (int i = 0; i < 5; i++) { for (int i = 0; i < WORKER_NUM; i++) {
jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters(); jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters();
sjl.run(trafficFlowJob, jobParameters); sjl.run(trafficFlowJob, jobParameters);
} }
for (int i = 0; i < 5; i++) { for (int i = 0; i < WORKER_NUM; i++) {
jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters(); jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters();
sjl.run(behaviorJob, jobParameters); sjl.run(behaviorJob, jobParameters);
} }
if (vionConfig.isEnableForward()) { if (vionConfig.isEnableForward()) {
for (int i = 0; i < 5; i++) { for (int i = 0; i < WORKER_NUM; i++) {
jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters(); jobParameters = new JobParametersBuilder().addString("uuid", UUID.randomUUID().toString()).toJobParameters();
sjl.run(forwardJob, jobParameters); sjl.run(forwardJob, jobParameters);
} }
......
...@@ -26,14 +26,17 @@ import java.util.List; ...@@ -26,14 +26,17 @@ import java.util.List;
@Slf4j @Slf4j
public class PicRealTimeEndpoint { public class PicRealTimeEndpoint {
private final static ListMultimap<String, Session> SESSION_MAP = MultimapBuilder.hashKeys().linkedListValues().build(); private final static ListMultimap<String, Session> SESSION_MAP = MultimapBuilder.hashKeys().linkedListValues().build();
private final static Object lock = new Object();
public static void sendInfo(String channelUnid, JSONObject info) { public static void sendInfo(String channelUnid, JSONObject info) {
try { try {
List<Session> sessions = SESSION_MAP.get(channelUnid); synchronized (lock) {
for (Session session : sessions) { List<Session> sessions = SESSION_MAP.get(channelUnid);
try { for (Session session : sessions) {
sendInfo(session, info.toJSONString()); try {
} catch (Exception ignore) { sendInfo(session, info.toJSONString());
} catch (Exception ignore) {
}
} }
} }
} catch (Exception e) { } catch (Exception e) {
...@@ -48,13 +51,17 @@ public class PicRealTimeEndpoint { ...@@ -48,13 +51,17 @@ public class PicRealTimeEndpoint {
@OnOpen @OnOpen
public void onOpen(Session session, @PathParam("channelUnid") String channelUnid) { public void onOpen(Session session, @PathParam("channelUnid") String channelUnid) {
log.info("websocket 上线, channelUnid:{}, sessionId:{}", channelUnid, session.getId()); log.info("websocket 上线, channelUnid:{}, sessionId:{}", channelUnid, session.getId());
SESSION_MAP.put(channelUnid, session); synchronized (lock) {
SESSION_MAP.put(channelUnid, session);
}
} }
@OnClose @OnClose
public void onClose(Session session, @PathParam("channelUnid") String channelUnid) { public void onClose(Session session, @PathParam("channelUnid") String channelUnid) {
log.info("websocket 下线, channelUnid:{}, sessionId:{}", channelUnid, session.getId()); log.info("websocket 下线, channelUnid:{}, sessionId:{}", channelUnid, session.getId());
SESSION_MAP.remove(channelUnid, session); synchronized (lock) {
SESSION_MAP.remove(channelUnid, session);
}
} }
@OnError @OnError
......
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
...@@ -16,6 +17,13 @@ ...@@ -16,6 +17,13 @@
<groupId>com.viontech</groupId> <groupId>com.viontech</groupId>
<artifactId>fanxing-commons</artifactId> <artifactId>fanxing-commons</artifactId>
<version>${parent.version}</version> <version>${parent.version}</version>
<exclusions>
<!-- gateway 不支持tomcat -->
<exclusion>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.cloud</groupId> <groupId>org.springframework.cloud</groupId>
......
...@@ -53,6 +53,7 @@ public class AuthorizationFilter implements GlobalFilter { ...@@ -53,6 +53,7 @@ public class AuthorizationFilter implements GlobalFilter {
ImmutablePair<Boolean, String> checkResult = checkToken(token); ImmutablePair<Boolean, String> checkResult = checkToken(token);
Boolean success = checkResult.left; Boolean success = checkResult.left;
if (!success) { if (!success) {
log.info("token校验未通过:[{}][{}]", checkResult.right, request.getPath());
return unAuthorized(exchange.getResponse(), checkResult.right); return unAuthorized(exchange.getResponse(), checkResult.right);
} }
log.info("token 校验成功:{}", token); log.info("token 校验成功:{}", token);
...@@ -82,7 +83,6 @@ public class AuthorizationFilter implements GlobalFilter { ...@@ -82,7 +83,6 @@ public class AuthorizationFilter implements GlobalFilter {
private Mono<Void> unAuthorized(ServerHttpResponse response, String msg) { private Mono<Void> unAuthorized(ServerHttpResponse response, String msg) {
log.info("token校验未通过:{}", msg);
response.getHeaders().setContentType(MediaType.APPLICATION_JSON); response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
response.setStatusCode(HttpStatus.UNAUTHORIZED); response.setStatusCode(HttpStatus.UNAUTHORIZED);
......
...@@ -29,7 +29,8 @@ import java.util.List; ...@@ -29,7 +29,8 @@ import java.util.List;
*/ */
@SpringBootApplication @SpringBootApplication
@ComponentScan(basePackages = "com.viontech.fanxing", excludeFilters = { @ComponentScan(basePackages = "com.viontech.fanxing", excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = com.viontech.fanxing.commons.config.WebConfig.class) @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = com.viontech.fanxing.commons.config.WebConfig.class),
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = com.viontech.fanxing.commons.base.GlobalExceptionHandler.class)
}) })
@EnableDiscoveryClient @EnableDiscoveryClient
@EnableScheduling @EnableScheduling
......
...@@ -29,12 +29,12 @@ spring: ...@@ -29,12 +29,12 @@ spring:
uri: http://192.168.9.233:30007/ uri: http://192.168.9.233:30007/
predicates: predicates:
- Path=/images/** - Path=/images/**
- id: videoService # - id: videoService
uri: http://192.168.9.233:10350/ # uri: http://192.168.9.233:10350/
predicates: # predicates:
- Path=/video-server/** # - Path=/video-server/**
filters: # filters:
- RewritePath=/video-server/?(?<segment>.*),/$\{segment} # - RewritePath=/video-server/?(?<segment>.*),/$\{segment}
consul: consul:
# 服务发现配置 # 服务发现配置
discovery: discovery:
......
...@@ -41,6 +41,23 @@ public class ChannelController extends ChannelBaseController { ...@@ -41,6 +41,23 @@ public class ChannelController extends ChannelBaseController {
} }
@Override @Override
@RequestMapping(value = "/{id}", method = RequestMethod.POST)
@ResponseBody
public Object update(@PathVariable(value = "id") Long id, @RequestBody ChannelVo channelVo) {
Assert.notNull(channelVo.getChannelUnid(), "设备编号不能为空");
Assert.notNull(channelVo.getName(), "设备名称不能为空");
Assert.notNull(channelVo.getIp(), "IP不能为空");
Assert.notNull(channelVo.getPort(), "端口不能为空");
Assert.notNull(channelVo.getStreamType(), "视频流类型不能为空");
Assert.notNull(channelVo.getUsername(), "用户名不能为空");
Assert.notNull(channelVo.getPassword(), "密码不能为空");
Assert.notNull(channelVo.getStreamPath(), "视频流地址不能为空");
Assert.notNull(channelVo.getType(), "类型不能为空");
ChannelVo update = channelService.update(id, channelVo);
return JsonMessageUtil.getSuccessJsonMsg(MESSAGE_UPDATE_SUCCESS, update);
}
@Override
@GetMapping() @GetMapping()
public Object page(ChannelVo channelVo, public Object page(ChannelVo channelVo,
@RequestParam(value = "page", defaultValue = "-1") int page, @RequestParam(value = "page", defaultValue = "-1") int page,
...@@ -111,7 +128,6 @@ public class ChannelController extends ChannelBaseController { ...@@ -111,7 +128,6 @@ public class ChannelController extends ChannelBaseController {
public Object add(@RequestBody ChannelVo channelVo) { public Object add(@RequestBody ChannelVo channelVo) {
Assert.notNull(channelVo.getChannelUnid(), "设备编号不能为空"); Assert.notNull(channelVo.getChannelUnid(), "设备编号不能为空");
Assert.notNull(channelVo.getName(), "设备名称不能为空"); Assert.notNull(channelVo.getName(), "设备名称不能为空");
Assert.notNull(channelVo.getBrand(), "厂家不能为空");
Assert.notNull(channelVo.getIp(), "IP不能为空"); Assert.notNull(channelVo.getIp(), "IP不能为空");
Assert.notNull(channelVo.getPort(), "端口不能为空"); Assert.notNull(channelVo.getPort(), "端口不能为空");
Assert.notNull(channelVo.getStreamType(), "视频流类型不能为空"); Assert.notNull(channelVo.getStreamType(), "视频流类型不能为空");
......
package com.viontech.fanxing.ops.feign; package com.viontech.fanxing.ops.feign;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.keliu.util.JsonMessageUtil; import com.viontech.keliu.util.JsonMessageUtil;
import org.springframework.cloud.openfeign.FeignClient; import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.List; import java.util.List;
...@@ -26,4 +26,7 @@ public interface TaskFeignClient { ...@@ -26,4 +26,7 @@ public interface TaskFeignClient {
@GetMapping("/tasks") @GetMapping("/tasks")
JsonMessageUtil.JsonMessage<List<Task>> getTaskByChannelUnid(@RequestParam("channelUnid") String channelUnid); JsonMessageUtil.JsonMessage<List<Task>> getTaskByChannelUnid(@RequestParam("channelUnid") String channelUnid);
@PostMapping("/tasks/{id}")
JsonMessageUtil.JsonMessage updateById(@PathVariable("id") Long id, @RequestBody TaskVo taskVo);
} }
...@@ -3,6 +3,7 @@ package com.viontech.fanxing.ops.service.adapter; ...@@ -3,6 +3,7 @@ package com.viontech.fanxing.ops.service.adapter;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.base.BaseService; import com.viontech.fanxing.commons.base.BaseService;
import com.viontech.fanxing.commons.model.Channel; import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.vo.ChannelVo;
import com.viontech.fanxing.commons.vo.DictCodeVo; import com.viontech.fanxing.commons.vo.DictCodeVo;
import java.util.List; import java.util.List;
...@@ -13,4 +14,6 @@ public interface ChannelService extends BaseService<Channel> { ...@@ -13,4 +14,6 @@ public interface ChannelService extends BaseService<Channel> {
List<DictCodeVo> channelOrg(List<Channel> channels); List<DictCodeVo> channelOrg(List<Channel> channels);
ChannelVo update(long id, ChannelVo vo);
} }
\ No newline at end of file \ No newline at end of file
...@@ -11,6 +11,7 @@ import com.viontech.fanxing.commons.exception.FanXingException; ...@@ -11,6 +11,7 @@ import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.*; import com.viontech.fanxing.commons.model.*;
import com.viontech.fanxing.commons.vo.ChannelVo; import com.viontech.fanxing.commons.vo.ChannelVo;
import com.viontech.fanxing.commons.vo.DictCodeVo; import com.viontech.fanxing.commons.vo.DictCodeVo;
import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.fanxing.ops.feign.TaskFeignClient; import com.viontech.fanxing.ops.feign.TaskFeignClient;
import com.viontech.fanxing.ops.mapper.ChannelMapper; import com.viontech.fanxing.ops.mapper.ChannelMapper;
import com.viontech.fanxing.ops.service.adapter.ChannelService; import com.viontech.fanxing.ops.service.adapter.ChannelService;
...@@ -22,6 +23,7 @@ import lombok.extern.slf4j.Slf4j; ...@@ -22,6 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
import javax.annotation.Resource; import javax.annotation.Resource;
...@@ -63,10 +65,42 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan ...@@ -63,10 +65,42 @@ public class ChannelServiceImpl extends BaseServiceImpl<Channel> implements Chan
return channels; return channels;
} }
@Transactional(rollbackFor = Exception.class)
@Override
public ChannelVo update(long id, ChannelVo vo) {
Channel channel = selectByPrimaryKey(id);
if (channel == null) {
throw new RuntimeException("资源不存在");
}
vo.setId(id);
updateByPrimaryKey(vo);
// 如果对 streamPath 进行更新,需要同时更新任务信息
if (vo.getStreamPath() != null && !channel.getStreamPath().equals(vo.getChannelUnid())) {
log.info("streamPath 变更,需要更新任务信息");
JsonMessageUtil.JsonMessage<List<Task>> res = taskFeignClient.getTaskByChannelUnid(channel.getChannelUnid());
List<Task> data = res.getData();
if (data != null && data.size() > 0) {
log.info("需要更新的任务数量:{}", data.size());
for (Task item : data) {
TaskVo taskVo = new TaskVo();
taskVo.setStreamPath(vo.getStreamPath());
JsonMessageUtil.JsonMessage jsonMessage = taskFeignClient.updateById(item.getId(), taskVo);
if (jsonMessage.isSuccess()) {
log.info("任务[{}]更新成功", item.getName());
} else {
log.info("任务[{}]更新失败:[{}]", item.getName(), jsonMessage.getMsg());
}
}
}
}
return vo;
}
@Override @Override
public Channel selectByPrimaryKey(Object id) { public Channel selectByPrimaryKey(Object id) {
Channel channel = super.selectByPrimaryKey(id); Channel channel = super.selectByPrimaryKey(id);
if (ChannelType.FILE.value == channel.getType()) { if (channel != null && ChannelType.FILE.value == channel.getType()) {
channel.setStreamPath(vionConfig.getImage().getUrlPrefix() + channel.getStreamPath()); channel.setStreamPath(vionConfig.getImage().getUrlPrefix() + channel.getStreamPath());
} }
return channel; return channel;
......
...@@ -79,7 +79,7 @@ public class TaskController extends TaskBaseController { ...@@ -79,7 +79,7 @@ public class TaskController extends TaskBaseController {
} catch (DuplicateKeyException e) { } catch (DuplicateKeyException e) {
return JsonMessageUtil.getErrorJsonMsg("任务名称重复:" + taskVo.getName()); return JsonMessageUtil.getErrorJsonMsg("任务名称重复:" + taskVo.getName());
} }
return JsonMessageUtil.getSuccessJsonMsg(taskVo); return JsonMessageUtil.getSuccessJsonMsg("success", taskVo);
} }
@Override @Override
...@@ -153,7 +153,13 @@ public class TaskController extends TaskBaseController { ...@@ -153,7 +153,13 @@ public class TaskController extends TaskBaseController {
} }
byte[] bytes = file.getBytes(); byte[] bytes = file.getBytes();
String s = new String(bytes, StandardCharsets.UTF_8); String s = new String(bytes, StandardCharsets.UTF_8);
JSONObject source = JSON.parseObject(s); JSONObject source;
try {
source = JSON.parseObject(s);
} catch (Exception e) {
log.error("场景配置文件解析失败", e);
return JsonMessageUtil.getErrorJsonMsg("场景配置文件解析失败");
}
// 合并配置 // 合并配置
SceneUtils.INSTANCE.mergeScene(source, targetScene); SceneUtils.INSTANCE.mergeScene(source, targetScene);
// 合并之后,因为需要设备信息的配置,所以需要重新构建 // 合并之后,因为需要设备信息的配置,所以需要重新构建
...@@ -206,7 +212,7 @@ public class TaskController extends TaskBaseController { ...@@ -206,7 +212,7 @@ public class TaskController extends TaskBaseController {
JSONObject result = new JSONObject(); JSONObject result = new JSONObject();
result.put("config", sceneNeed.getJSONObject("config")); result.put("config", sceneNeed.getJSONObject("config"));
result.put("rois", sceneNeed.getJSONArray("rois")); result.put("rois", sceneNeed.getJSONArray("rois"));
result.put("calibration", sceneNeed.getJSONArray("calibration")); result.put("calibration", sceneNeed.getJSONObject("calibration"));
String s = result.toString(); String s = result.toString();
byte[] bytes = s.getBytes(StandardCharsets.UTF_8); byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
...@@ -226,6 +232,7 @@ public class TaskController extends TaskBaseController { ...@@ -226,6 +232,7 @@ public class TaskController extends TaskBaseController {
} }
// 1 启动, 2暂停, 3删除 // 1 启动, 2暂停, 3删除
ListMultimap<String, Long> build = MultimapBuilder.treeKeys().linkedListValues().build(); ListMultimap<String, Long> build = MultimapBuilder.treeKeys().linkedListValues().build();
boolean error = false;
for (Long taskId : taskIdArr) { for (Long taskId : taskIdArr) {
try { try {
switch (type) { switch (type) {
...@@ -244,9 +251,17 @@ public class TaskController extends TaskBaseController { ...@@ -244,9 +251,17 @@ public class TaskController extends TaskBaseController {
build.put("success", taskId); build.put("success", taskId);
} catch (Exception e) { } catch (Exception e) {
log.info("", e); log.info("", e);
error = true;
build.put("error", taskId); build.put("error", taskId);
} }
} }
return JsonMessageUtil.getSuccessJsonMsg("success", build.asMap()); if (error) {
JsonMessageUtil.JsonMessage message = JsonMessageUtil.getErrorJsonMsg("有任务操作失败");
message.setData(build);
return message;
} else {
return JsonMessageUtil.getSuccessJsonMsg("success");
}
} }
} }
\ No newline at end of file \ No newline at end of file
...@@ -2,8 +2,8 @@ package com.viontech.fanxing.task.model.runtime; ...@@ -2,8 +2,8 @@ package com.viontech.fanxing.task.model.runtime;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
import com.viontech.fanxing.commons.exception.FanXingException; import com.viontech.fanxing.commons.exception.FanXingException;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
...@@ -78,11 +78,11 @@ public interface RuntimeConfig extends Serializable { ...@@ -78,11 +78,11 @@ public interface RuntimeConfig extends Serializable {
@Setter @Setter
@Accessors(chain = true) @Accessors(chain = true)
class Config implements Serializable { class Config implements Serializable {
@JsonDeserialize(using = LocalDateTimeDeserializer.class) @JsonDeserialize(using = LocalTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class) @JsonSerialize(using = LocalTimeSerializer.class)
private LocalTime start; private LocalTime start;
@JsonDeserialize(using = LocalDateTimeDeserializer.class) @JsonDeserialize(using = LocalTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class) @JsonSerialize(using = LocalTimeSerializer.class)
private LocalTime end; private LocalTime end;
} }
......
...@@ -14,6 +14,7 @@ import org.apache.commons.lang3.StringUtils; ...@@ -14,6 +14,7 @@ import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet; import org.redisson.api.RScoredSortedSet;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -29,6 +30,7 @@ import java.util.Collection; ...@@ -29,6 +30,7 @@ import java.util.Collection;
@Component @Component
@Slf4j @Slf4j
@Profile("!test")
public class TaskRunner { public class TaskRunner {
@Resource @Resource
...@@ -52,13 +54,13 @@ public class TaskRunner { ...@@ -52,13 +54,13 @@ public class TaskRunner {
Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true); Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);
for (String taskUnid : entryCollection) { for (String taskUnid : entryCollection) {
log.info("开始任务 : {}", taskUnid);
TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid); TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid);
if (taskData == null) { if (taskData == null) {
log.info("找不到对应任务,移除所有:{}", taskUnid); log.info("找不到对应任务,移除所有:{}", taskUnid);
taskDataService.removeTaskDataAll(taskUnid); taskDataService.removeTaskDataAll(taskUnid);
continue; continue;
} }
log.info("开始任务 [{}] [{}]", taskData.getTask().getName(), taskUnid);
Task task = taskData.getTask(); Task task = taskData.getTask();
String taskVaType = task.getVaType(); String taskVaType = task.getVaType();
Float resourceNeed = task.getResourceNeed(); Float resourceNeed = task.getResourceNeed();
...@@ -98,7 +100,14 @@ public class TaskRunner { ...@@ -98,7 +100,14 @@ public class TaskRunner {
continue; continue;
} }
boolean success = vaServerService.executeTask(taskData, server); try {
log.info("开始下发任务:[{}]", taskData.getTask().getName());
vaServerService.executeTask(taskData, server);
} catch (Exception e) {
log.error("下发任务失败", e);
taskService.updateStatus(task.getId(), TaskStatus.CAN_NOT_RUN.val);
continue;
}
// 修改任务状态 // 修改任务状态
taskService.updateStatus(task.getId(), TaskStatus.RUNNING.val); taskService.updateStatus(task.getId(), TaskStatus.RUNNING.val);
...@@ -126,23 +135,24 @@ public class TaskRunner { ...@@ -126,23 +135,24 @@ public class TaskRunner {
Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true); Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);
for (String taskUnid : entryCollection) { for (String taskUnid : entryCollection) {
log.info("停止任务 : {}", taskUnid);
TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid); TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid);
if (taskData == null) { if (taskData == null) {
log.info("找不到对应任务,移除所有:{}", taskUnid); log.info("找不到对应任务,移除所有:{}", taskUnid);
taskDataService.removeTaskDataAll(taskUnid); taskDataService.removeTaskDataAll(taskUnid);
continue; continue;
} }
log.info("停止任务 [{}] [{}]", taskData.getTask().getName(), taskUnid);
// 获取可用的 vaserver ,执行任务终止动作,如果成功,解除 taskData和 vaServer 的关联,并且从 zset 中移除任务,恢复vaserver资源数,计算下次任务执行时间,放入zset中 // 获取可用的 vaserver ,执行任务终止动作,如果成功,解除 taskData和 vaServer 的关联,并且从 zset 中移除任务,恢复vaserver资源数,计算下次任务执行时间,放入zset中
boolean success = vaServerService.terminateTask(taskUnid); boolean success = vaServerService.terminateTask(taskUnid);
if (success) { if (success) {
set.remove(taskUnid);
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.PAUSE.val); taskService.updateStatus(taskData.getTask().getId(), TaskStatus.PAUSE.val);
// 随机任务不进行部署 // 随机任务不进行部署
if (taskData.getTask().getRuntimeType() != TaskStatus.PAUSE.val) { if (taskData.getTask().getRuntimeType() != 3) {
boolean b = taskDataService.distributeTask(taskData); boolean b = taskDataService.distributeTask(taskData);
} }
} }
set.remove(taskUnid);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("", e); log.error("", e);
......
...@@ -6,6 +6,7 @@ import com.viontech.fanxing.task.service.VAServerService; ...@@ -6,6 +6,7 @@ import com.viontech.fanxing.task.service.VAServerService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -22,12 +23,13 @@ import java.util.Set; ...@@ -22,12 +23,13 @@ import java.util.Set;
@Component @Component
@Slf4j @Slf4j
@Profile("!test")
public class VaServerCheckRunner { public class VaServerCheckRunner {
@Resource @Resource
private VAServerService vaServerService; private VAServerService vaServerService;
@Scheduled(cron = "3 0/5 * * * ? ") @Scheduled(cron = "3 * * * * ? ")
public void check() { public void check() {
try { try {
RMap<String, VaServerInfo> vaServerInfoMap = vaServerService.getVaServerRedisRepository().getVaServerInfoMap(); RMap<String, VaServerInfo> vaServerInfoMap = vaServerService.getVaServerRedisRepository().getVaServerInfoMap();
...@@ -53,6 +55,7 @@ public class VaServerCheckRunner { ...@@ -53,6 +55,7 @@ public class VaServerCheckRunner {
} }
} catch (Exception e) { } catch (Exception e) {
log.info("", e); log.info("", e);
} finally {
if (devLock != null) { if (devLock != null) {
devLock.forceUnlock(); devLock.forceUnlock();
} }
......
package com.viontech.fanxing.task.service; package com.viontech.fanxing.task.service;
import com.mysql.cj.log.Log;
import com.viontech.fanxing.commons.exception.FanXingException; import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.StoreConfig; import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
...@@ -8,6 +9,7 @@ import com.viontech.fanxing.task.model.TaskData; ...@@ -8,6 +9,7 @@ import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.runtime.RuntimeConfig; import com.viontech.fanxing.task.model.runtime.RuntimeConfig;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.repository.TaskDataRedisRepository; import com.viontech.fanxing.task.repository.TaskDataRedisRepository;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet; import org.redisson.api.RScoredSortedSet;
...@@ -22,6 +24,7 @@ import javax.annotation.Resource; ...@@ -22,6 +24,7 @@ import javax.annotation.Resource;
* @date 2021/7/13 * @date 2021/7/13
*/ */
@Service @Service
@Slf4j
public class TaskDataService { public class TaskDataService {
@Resource @Resource
...@@ -61,6 +64,7 @@ public class TaskDataService { ...@@ -61,6 +64,7 @@ public class TaskDataService {
} }
ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal(); ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
log.info("部署任务[{}],运行时间:[{}]", taskData.getTask().getName(), nextTime.toString());
Long nextExecuteTime = nextTime.left; Long nextExecuteTime = nextTime.left;
Long nextTerminateTime = nextTime.right; Long nextTerminateTime = nextTime.right;
if (nextExecuteTime != null) { if (nextExecuteTime != null) {
...@@ -68,7 +72,7 @@ public class TaskDataService { ...@@ -68,7 +72,7 @@ public class TaskDataService {
toBeExecutedTaskUnidSet.add(nextExecuteTime, taskUnid); toBeExecutedTaskUnidSet.add(nextExecuteTime, taskUnid);
if (nextTerminateTime != null) { if (nextTerminateTime != null) {
RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet(); RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet();
toBeTerminatedTaskUnidSet.add(nextTerminateTime, taskUnid); boolean add = toBeTerminatedTaskUnidSet.add(nextTerminateTime, taskUnid);
} }
return true; return true;
} else { } else {
......
...@@ -37,7 +37,7 @@ public class VAServerHttpService { ...@@ -37,7 +37,7 @@ public class VAServerHttpService {
.bodyValue(vaTask) .bodyValue(vaTask)
.retrieve() .retrieve()
.bodyToMono(String.class); .bodyToMono(String.class);
String response = stringMono.block(Duration.ofSeconds(20)); String response = stringMono.block(Duration.ofSeconds(60));
log.info("下发任务结果:{}", response); log.info("下发任务结果:{}", response);
return JSON.parseObject(response); return JSON.parseObject(response);
} }
......
...@@ -10,6 +10,7 @@ import com.viontech.fanxing.commons.base.BaseMapper; ...@@ -10,6 +10,7 @@ import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.base.BaseServiceImpl; import com.viontech.fanxing.commons.base.BaseServiceImpl;
import com.viontech.fanxing.commons.base.LocalCache; import com.viontech.fanxing.commons.base.LocalCache;
import com.viontech.fanxing.commons.constant.TaskStatus; import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Channel; import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.model.DictCode; import com.viontech.fanxing.commons.model.DictCode;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
...@@ -23,6 +24,7 @@ import com.viontech.fanxing.task.service.OpsClientService; ...@@ -23,6 +24,7 @@ import com.viontech.fanxing.task.service.OpsClientService;
import com.viontech.fanxing.task.service.TaskDataService; import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.service.VAServerService; import com.viontech.fanxing.task.service.VAServerService;
import com.viontech.fanxing.task.service.adapter.TaskService; import com.viontech.fanxing.task.service.adapter.TaskService;
import com.viontech.fanxing.task.utils.SceneUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -149,8 +151,18 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic ...@@ -149,8 +151,18 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
@Override @Override
public TaskVo updateTask(Task task) { public TaskVo updateTask(Task task) {
if (StringUtils.isNotBlank(task.getScene())) {
boolean b = SceneUtils.INSTANCE.checkScenes(task.getScene());
if (!b) {
throw new FanXingException("场景配置参数校验不通过");
}
}
updateByPrimaryKeySelective(task); updateByPrimaryKeySelective(task);
task = selectByPrimaryKey(task.getId()); task = selectByPrimaryKey(task.getId());
if (task == null) {
throw new FanXingException("任务不存在");
}
if (StringUtils.isNotBlank(task.getScene()) if (StringUtils.isNotBlank(task.getScene())
&& task.getStoreConfigId() != null && task.getStoreConfigId() != null
......
package com.viontech.fanxing.task.utils; package com.viontech.fanxing.task.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.task.model.Scene;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/** /**
* . * .
...@@ -31,4 +37,24 @@ public enum SceneUtils { ...@@ -31,4 +37,24 @@ public enum SceneUtils {
target.put("calibration", calibration); target.put("calibration", calibration);
} }
} }
/**
* 对scene的信息进行校验
*/
public boolean checkScenes(String sceneStr) {
List<Scene> scenes = JSON.parseArray(sceneStr, Scene.class);
Set<String> unidSet = new HashSet<>();
Set<String> positionSet = new HashSet<>();
// scene_unid 和 position_num 不能重复
for (Scene item : scenes) {
if (unidSet.contains(item.getScene_unid()) || positionSet.contains(item.getPosition_num())) {
return false;
} else {
unidSet.add(item.getScene_unid());
positionSet.add(item.getPosition_num());
}
}
return true;
}
} }
...@@ -57,7 +57,7 @@ vion: ...@@ -57,7 +57,7 @@ vion:
srs: srs:
ip: 192.168.9.233 ip: 192.168.9.233
rtmp-port: 1935 rtmp-port: 1935
http-port: 8080 http-port: 30010
gateway: gateway:
ip: 192.168.9.233 ip: 192.168.9.233
port: 30000 port: 30000
\ No newline at end of file \ No newline at end of file
...@@ -3,8 +3,13 @@ package com.viontech.fanxing.task; ...@@ -3,8 +3,13 @@ package com.viontech.fanxing.task;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.viontech.fanxing.commons.model.Channel; import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.model.DictCode; import com.viontech.fanxing.commons.model.DictCode;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.runtime.DailyRuntimeConfig; import com.viontech.fanxing.task.model.runtime.DailyRuntimeConfig;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.service.OpsClientService; import com.viontech.fanxing.task.service.OpsClientService;
import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.service.VAServerHttpService;
import com.viontech.fanxing.task.service.VAServerService;
import com.viontech.fanxing.task.service.impl.TaskServiceImpl; import com.viontech.fanxing.task.service.impl.TaskServiceImpl;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.Test; import org.junit.Test;
...@@ -32,6 +37,12 @@ public class MainTest { ...@@ -32,6 +37,12 @@ public class MainTest {
private OpsClientService opsClientService; private OpsClientService opsClientService;
@Resource @Resource
private TaskServiceImpl taskService; private TaskServiceImpl taskService;
@Resource
private TaskDataService taskDataService;
@Resource
private VAServerService vaServerService;
@Resource
private VAServerHttpService vaServerHttpService;
@Test @Test
public void opsClientTest() { public void opsClientTest() {
...@@ -53,4 +64,21 @@ public class MainTest { ...@@ -53,4 +64,21 @@ public class MainTest {
System.out.println(new Date(n.left)); System.out.println(new Date(n.left));
System.out.println(new Date(n.right)); System.out.println(new Date(n.right));
} }
@Test
public void nextTimeTest() {
TaskData taskData = taskDataService.getRepository().getTaskDataByUnid("612f56aa-3c6c-11ec-a7e6-0242ac110007");
ImmutablePair<Long, Long> next = taskData.getRuntimeConfig().getNextTimeOfExecutionAndTerminal();
System.out.println(next);
// assert next.left == 1635906774000L;
// assert next.right == 1635953574000L;
}
@Test
public void stopTask() {
String taskUnid = "090677cc-3c6c-11ec-a7e6-0242ac110007";
VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
vaServerHttpService.rmTask(taskUnid, vaServerInfo);
}
} }
upstream gateway {
server 127.0.0.1:30000;
server 192.168.9.233:30000;
server 192.168.9.133:30000;
}
server {
listen 30010;
location / {
alias /xmh/fanxing3/page/;
index index.html;
}
location /srs/ {
rewrite ^/srs/(.*) /$1 break;
proxy_pass http://192.168.9.233:8080;
}
location /api/ {
rewrite ^/api/(.*) /$1 break;
proxy_pass http://gateway;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
location /api/video-server/ {
rewrite ^/api/video-server/(.*) /$1 break;
proxy_pass http://192.168.9.233:10350/;
}
}
\ No newline at end of file \ No newline at end of file
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!