Commit e0d8a6b4 by xmh

重构,移除task-manager模块和task-scheduling 模块,合并为 task 模块,存储配置功能合并到 ops 模块

1 parent 2d0dca77
Showing 42 changed files with 231 additions and 599 deletions
package com.viontech.fanxing.task.manager.controller.base; package com.viontech.fanxing.ops.controller.base;
import com.viontech.fanxing.commons.base.BaseController; import com.viontech.fanxing.commons.base.BaseController;
import com.viontech.fanxing.commons.base.BaseExample; import com.viontech.fanxing.commons.base.BaseExample;
import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.base.BaseService; import com.viontech.fanxing.commons.base.BaseService;
import com.viontech.fanxing.task.manager.mapper.StoreConfigMapper;
import com.viontech.fanxing.commons.model.StoreConfig; import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.StoreConfigExample; import com.viontech.fanxing.commons.model.StoreConfigExample;
import com.viontech.fanxing.task.manager.service.adapter.StoreConfigService;
import com.viontech.fanxing.commons.vo.StoreConfigVo; import com.viontech.fanxing.commons.vo.StoreConfigVo;
import com.viontech.fanxing.ops.service.adapter.StoreConfigService;
import javax.annotation.Resource; import javax.annotation.Resource;
public abstract class StoreConfigBaseController extends BaseController<StoreConfig, StoreConfigVo> { public abstract class StoreConfigBaseController extends BaseController<StoreConfig, StoreConfigVo> {
......
package com.viontech.fanxing.task.manager.controller.web; package com.viontech.fanxing.ops.controller.web;
import com.viontech.fanxing.commons.base.BaseExample; import com.viontech.fanxing.commons.base.BaseExample;
import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.StoreConfigExample; import com.viontech.fanxing.commons.model.StoreConfigExample;
import com.viontech.fanxing.commons.vo.StoreConfigVo; import com.viontech.fanxing.commons.vo.StoreConfigVo;
import com.viontech.fanxing.task.manager.controller.base.StoreConfigBaseController; import com.viontech.fanxing.ops.controller.base.StoreConfigBaseController;
import com.viontech.keliu.util.JsonMessageUtil; import com.viontech.keliu.util.JsonMessageUtil;
import org.springframework.stereotype.Controller; import org.springframework.stereotype.Controller;
import org.springframework.util.StreamUtils; import org.springframework.util.StreamUtils;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@Controller @Controller
@RequestMapping("/storeConfigs") @RequestMapping("/storeConfigs")
......
package com.viontech.fanxing.task.manager.mapper; package com.viontech.fanxing.ops.mapper;
import com.viontech.fanxing.commons.base.BaseMapper; import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.model.StoreConfig; import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.StoreConfigExample; import com.viontech.fanxing.commons.model.StoreConfigExample;
import java.util.List;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import java.util.List;
public interface StoreConfigMapper extends BaseMapper { public interface StoreConfigMapper extends BaseMapper {
int countByExample(StoreConfigExample example); int countByExample(StoreConfigExample example);
......
<?xml version="1.0" encoding="UTF-8" ?> <?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.viontech.fanxing.task.manager.mapper.StoreConfigMapper" > <mapper namespace="com.viontech.fanxing.ops.mapper.StoreConfigMapper" >
<resultMap id="BaseResultMapRoot" type="com.viontech.fanxing.commons.model.StoreConfig" > <resultMap id="BaseResultMapRoot" type="com.viontech.fanxing.commons.model.StoreConfig" >
<id column="storeConfig_id" property="id" /> <id column="storeConfig_id" property="id" />
<result column="storeConfig_unid" property="unid" /> <result column="storeConfig_unid" property="unid" />
...@@ -78,7 +78,7 @@ ...@@ -78,7 +78,7 @@
</sql> </sql>
<sql id="Base_Column_List" > <sql id="Base_Column_List" >
<if test="!(_parameter.getClass().getSimpleName() == 'StoreConfigExample')" > <if test="!(_parameter.getClass().getSimpleName() == 'StoreConfigExample')" >
<include refid="com.viontech.fanxing.task.manager.mapper.StoreConfigMapper.Base_Column_List_Root" /> <include refid="com.viontech.fanxing.ops.mapper.StoreConfigMapper.Base_Column_List_Root" />
</if> </if>
<if test="_parameter.getClass().getSimpleName() == 'StoreConfigExample'" > <if test="_parameter.getClass().getSimpleName() == 'StoreConfigExample'" >
<foreach collection="columnContainerSet" item="columns" separator="," > <foreach collection="columnContainerSet" item="columns" separator="," >
...@@ -88,7 +88,7 @@ ...@@ -88,7 +88,7 @@
${columns.columnContainerStr} ${columns.columnContainerStr}
</if> </if>
<if test="!columns.valid" > <if test="!columns.valid" >
<include refid="com.viontech.fanxing.task.manager.mapper.StoreConfigMapper.Base_Column_List_Root" /> <include refid="com.viontech.fanxing.ops.mapper.StoreConfigMapper.Base_Column_List_Root" />
</if> </if>
</when> </when>
</choose> </choose>
......
package com.viontech.fanxing.task.manager.service.adapter; package com.viontech.fanxing.ops.service.adapter;
import com.viontech.fanxing.commons.base.BaseService; import com.viontech.fanxing.commons.base.BaseService;
import com.viontech.fanxing.commons.model.StoreConfig; import com.viontech.fanxing.commons.model.StoreConfig;
......
package com.viontech.fanxing.task.manager.service.impl; package com.viontech.fanxing.ops.service.impl;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.viontech.fanxing.commons.base.BaseExample;
import com.viontech.fanxing.commons.base.BaseMapper; import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.base.BaseServiceImpl; import com.viontech.fanxing.commons.base.BaseServiceImpl;
import com.viontech.fanxing.commons.model.StoreConfigExample;
import com.viontech.fanxing.task.manager.mapper.StoreConfigMapper;
import com.viontech.fanxing.commons.model.StoreConfig; import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.task.manager.service.adapter.StoreConfigService; import com.viontech.fanxing.ops.mapper.StoreConfigMapper;
import javax.annotation.Resource; import com.viontech.fanxing.ops.service.adapter.StoreConfigService;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.List; import javax.annotation.Resource;
@Service @Service
public class StoreConfigServiceImpl extends BaseServiceImpl<StoreConfig> implements StoreConfigService { public class StoreConfigServiceImpl extends BaseServiceImpl<StoreConfig> implements StoreConfigService {
......
package com.viontech.fanxing.task.manager.feign;
import com.viontech.fanxing.commons.feing.TaskSchedulingTasksAdapter;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.keliu.util.JsonMessageUtil;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.*;
/**
* .
*
* @author 谢明辉
* @date 2021/7/12
*/
@Component
@FeignClient(value = "fanxing-task-scheduling")
public interface TaskSchedulingClient extends TaskSchedulingTasksAdapter {
@Override
@PostMapping("/tasks")
JsonMessageUtil.JsonMessage add(@RequestBody Task task);
@Override
@PutMapping("/tasks")
JsonMessageUtil.JsonMessage update(@RequestBody Task task);
@Override
@DeleteMapping("/tasks")
JsonMessageUtil.JsonMessage delete(@RequestParam("taskUnid") String taskUnid);
}
server:
port: 30002
spring:
profiles:
active:
${PROFILE}
application:
name: fanxing-task-manager
cloud:
consul:
host: 192.168.9.233
port: 8500
discovery:
service-name: ${spring.application.name}
# config 在 consul > key/value 中命名规则: prefix/default-context,profiles.active/data-key
config:
enabled: true
format: YAML
prefix: fanxing
default-context: ${spring.application.name}
data-key: config
watch:
enabled: true
delay: 10000
wait-time: 30
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.viontech</groupId>
<artifactId>fanxing3</artifactId>
<version>3.0.0-SNAPSHOT</version>
</parent>
<!-- 任务调度服务 -->
<artifactId>fanxing-task-scheduling</artifactId>
<version>${parent.version}</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.viontech</groupId>
<artifactId>fanxing-commons</artifactId>
<version>${parent.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-config</artifactId>
</dependency>
</dependencies>
<build>
<finalName>fanxing-task-scheduling</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<excludes>
<exclude>application.yml</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file \ No newline at end of file
package com.viontech.fanxing.task.scheduling;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* .
*
* @author 谢明辉
* @date 2021/6/11
*/
@EnableDiscoveryClient
@EnableScheduling
@SpringBootApplication(scanBasePackages = "com.viontech.fanxing")
@EnableFeignClients
@Slf4j
public class TaskSchedulingApp {
public static void main(String[] args) {
try {
SpringApplication.run(TaskSchedulingApp.class, args);
} catch (Exception e) {
log.error("taskScheduling app start error", e);
}
}
}
package com.viontech.fanxing.task.scheduling.controller;
import com.viontech.fanxing.commons.feing.TaskSchedulingTasksAdapter;
import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.task.scheduling.feign.TaskClient;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.service.TaskService;
import com.viontech.fanxing.task.scheduling.service.VAServerService;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
/**
* .
*
* @author 谢明辉
* @date 2021/7/12
*/
@RestController
@RequestMapping("/tasks")
@Slf4j
public class TaskController implements TaskSchedulingTasksAdapter {
@Resource
private VAServerService vaServerService;
@Resource
private TaskService taskService;
@Resource
private TaskClient taskClient;
@Override
@PostMapping
public JsonMessageUtil.JsonMessage add(@RequestBody Task task) {
TaskData taskData = new TaskData(task);
// 获取存储配置
Long storeConfigId = task.getStoreConfigId();
JsonMessageUtil.JsonMessage<StoreConfig> storeConfigRes = taskClient.getStoreConfigById(storeConfigId);
StoreConfig storeConfigVo = (StoreConfig) storeConfigRes.getData();
if (storeConfigVo == null) {
return JsonMessageUtil.getErrorJsonMsg("无法获取对应的存储配置");
}
taskData.setStoreConfig(storeConfigVo.getContent());
// 计算运行时间并生成任务
boolean success = taskService.distributeTask(taskData);
if (success) {
taskService.getRepository().addOrUpdateTaskData(taskData);
}
return success ? JsonMessageUtil.getSuccessJsonMsg("success") : JsonMessageUtil.getErrorJsonMsg("任务找不到可执行时间");
}
@Override
@PutMapping
public JsonMessageUtil.JsonMessage update(@RequestBody Task task) {
String taskUnid = task.getUnid();
VaServerInfo vaServerInfo = taskService.taskRunOn(taskUnid);
// vaServerId 为空说明任务未执行可以先删除再建立新任务
if (vaServerInfo == null) {
JsonMessageUtil.JsonMessage delete = delete(taskUnid);
if (delete.isSuccess()) {
JsonMessageUtil.JsonMessage add = add(task);
return add;
} else {
return delete;
}
} else {
taskService.updateTask(task);
return JsonMessageUtil.getSuccessJsonMsg("success");
}
}
@Override
@DeleteMapping
public JsonMessageUtil.JsonMessage delete(@RequestParam("taskUnid") String taskUnid) {
boolean success = vaServerService.terminateTask(taskUnid);
if (success) {
taskService.removeTaskDataAll(taskUnid);
return JsonMessageUtil.getSuccessJsonMsg("success");
} else {
return JsonMessageUtil.getErrorJsonMsg("failed");
}
}
}
spring:
cloud:
loadbalancer:
ribbon:
enabled: false
consul:
# 服务发现配置
discovery:
# 启用服务发现
enabled: true
# 启用服务注册
register: true
# 服务停止时取消注册
deregister: true
# 表示注册时使用IP而不是hostname
prefer-ip-address: true
# 执行监控检查的频率
health-check-interval: 10s
# 设置健康检查失败多长时间后,取消注册
health-check-critical-timeout: 30s
# 健康检查的路径
health-check-path: /actuator/info
# 服务注册标识,格式为:应用名称:服务器IP:端口
instance-id: ${spring.application.name}:${spring.cloud.consul.discovery.ip-address}:${server.port}
ip-address: 192.168.9.146
redis:
host: 192.168.9.233
port: 6379
password: 3c61f2e4c4d1877ef9d01319c3a0fccaeabb1518
database: 15
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
autoconfigure:
exclude: org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
logging:
config: classpath:logback-${spring.profiles.active}.xml
vion:
redisson:
path: F:\myIDEAworkspace\jt\fanxing3\fanxing-commons\src\main\resources\redisson.yml
gateway:
ip: 192.168.9.233
port: 30000
\ No newline at end of file \ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="10 seconds">
<contextName>logback</contextName>
<property name="log.path" value="logs"/>
<property name="pattern" value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%-5level] [%thread] %logger{50} - %msg%n"/>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>info</level>
</filter>
<encoder>
<Pattern>${pattern}</Pattern>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="CONSOLE"/>
</root>
</configuration>
\ No newline at end of file \ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->
<!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true -->
<!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 -->
<!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 -->
<configuration scan="true" scanPeriod="10 seconds">
<!--<include resource="org/springframework/boot/logging/logback/base.xml" />-->
<contextName>logback</contextName>
<!-- name的值是变量的名称,value的值时变量定义的值。通过定义的值会被插入到logger上下文中。定义变量后,可以使“${}”来使用变量。 -->
<property name="log.path" value="logs"/>
<property name="pattern" value="[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%-5level] [%thread] %logger{50} - %msg%n"/>
<!--输出到控制台-->
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息-->
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>info</level>
</filter>
<encoder>
<Pattern>${pattern}</Pattern>
<!-- 设置字符集 -->
</encoder>
</appender>
<!--输出到文件-->
<!-- 时间滚动输出 level为 DEBUG 日志 -->
<appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_debug.log</file>
<!--日志文件输出格式-->
<encoder>
<Pattern>${pattern}</Pattern>
<charset>UTF-8</charset> <!-- 设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志归档 -->
<fileNamePattern>${log.path}/debug/log-debug-%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录debug级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>debug</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 时间滚动输出 level为 INFO 日志 -->
<appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_info.log</file>
<!--日志文件输出格式-->
<encoder>
<Pattern>${pattern}</Pattern>
<charset>UTF-8</charset>
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 每天日志归档路径以及格式 -->
<fileNamePattern>${log.path}/info/log-info-%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录info级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>info</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>NEUTRAL</onMismatch>
</filter>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>warn</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 时间滚动输出 level为 WARN 日志 -->
<appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_warn.log</file>
<!--日志文件输出格式-->
<encoder>
<Pattern>${pattern}</Pattern>
<charset>UTF-8</charset> <!-- 此处设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/warn/log-warn-%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>5</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录warn级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>warn</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- 时间滚动输出 level为 ERROR 日志 -->
<appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<!-- 正在记录的日志文件的路径及文件名 -->
<file>${log.path}/log_error.log</file>
<!--日志文件输出格式-->
<encoder>
<Pattern>${pattern}</Pattern>
<charset>UTF-8</charset> <!-- 此处设置字符集 -->
</encoder>
<!-- 日志记录器的滚动策略,按日期,按大小记录 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${log.path}/error/log-error-%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>100MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
<!--日志文件保留天数-->
<maxHistory>15</maxHistory>
</rollingPolicy>
<!-- 此日志文件只记录ERROR级别的 -->
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>ERROR</level>
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<logger name="com.viontech" level="debug">
<appender-ref ref="DEBUG_FILE"/>
</logger>
<root level="info">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="INFO_FILE"/>
<appender-ref ref="WARN_FILE"/>
<appender-ref ref="ERROR_FILE"/>
</root>
</configuration>
\ No newline at end of file \ No newline at end of file
...@@ -9,8 +9,8 @@ ...@@ -9,8 +9,8 @@
<artifactId>fanxing3</artifactId> <artifactId>fanxing3</artifactId>
<version>3.0.0-SNAPSHOT</version> <version>3.0.0-SNAPSHOT</version>
</parent> </parent>
<!-- 任务管理服务 --> <!-- 任务调度服务 -->
<artifactId>fanxing-task-manager</artifactId> <artifactId>fanxing-task</artifactId>
<version>${parent.version}</version> <version>${parent.version}</version>
<dependencies> <dependencies>
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
<build> <build>
<finalName>fanxing-task-manager</finalName> <finalName>fanxing-task-scheduling</finalName>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
...@@ -60,5 +60,4 @@ ...@@ -60,5 +60,4 @@
</resource> </resource>
</resources> </resources>
</build> </build>
</project> </project>
\ No newline at end of file \ No newline at end of file
package com.viontech.fanxing.task.manager; package com.viontech.fanxing.task;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan; import org.mybatis.spring.annotation.MapperScan;
...@@ -19,14 +19,14 @@ import org.springframework.scheduling.annotation.EnableScheduling; ...@@ -19,14 +19,14 @@ import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication(scanBasePackages = "com.viontech.fanxing") @SpringBootApplication(scanBasePackages = "com.viontech.fanxing")
@EnableFeignClients @EnableFeignClients
@Slf4j @Slf4j
@MapperScan(basePackages = "com.viontech.fanxing.task.manager.mapper") @MapperScan(basePackages = "com.viontech.fanxing.task.mapper")
public class TaskManagerApp { public class TaskApp {
public static void main(String[] args) { public static void main(String[] args) {
try { try {
SpringApplication.run(TaskManagerApp.class, args); SpringApplication.run(TaskApp.class, args);
} catch (Exception e) { } catch (Exception e) {
log.error("taskManager app start error", e); log.error("task app start error", e);
} }
} }
} }
package com.viontech.fanxing.task.scheduling.controller; package com.viontech.fanxing.task.controller;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.service.VAServerService; import com.viontech.fanxing.task.service.VAServerService;
import com.viontech.keliu.util.JsonMessageUtil; import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RMap; import org.redisson.api.RMap;
......
package com.viontech.fanxing.task.manager.controller.base; package com.viontech.fanxing.task.controller.base;
import com.viontech.fanxing.commons.base.BaseController; import com.viontech.fanxing.commons.base.BaseController;
import com.viontech.fanxing.commons.base.BaseExample; import com.viontech.fanxing.commons.base.BaseExample;
import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.base.BaseService; import com.viontech.fanxing.commons.base.BaseService;
import com.viontech.fanxing.task.manager.mapper.TaskMapper;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.TaskExample; import com.viontech.fanxing.commons.model.TaskExample;
import com.viontech.fanxing.task.manager.service.adapter.TaskService;
import com.viontech.fanxing.commons.vo.TaskVo; import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.fanxing.task.service.adapter.TaskService;
import javax.annotation.Resource; import javax.annotation.Resource;
public abstract class TaskBaseController extends BaseController<Task, TaskVo> { public abstract class TaskBaseController extends BaseController<Task, TaskVo> {
......
package com.viontech.fanxing.task.manager.controller.web; package com.viontech.fanxing.task.controller.web;
import com.viontech.fanxing.commons.base.BaseExample; import com.viontech.fanxing.commons.base.BaseExample;
import com.viontech.fanxing.commons.model.TaskExample; import com.viontech.fanxing.commons.model.TaskExample;
import com.viontech.fanxing.commons.vo.TaskVo; import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.fanxing.task.manager.controller.base.TaskBaseController; import com.viontech.fanxing.task.controller.base.TaskBaseController;
import com.viontech.keliu.util.JsonMessageUtil; import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
...@@ -43,11 +43,6 @@ public class TaskController extends TaskBaseController { ...@@ -43,11 +43,6 @@ public class TaskController extends TaskBaseController {
return JsonMessageUtil.getSuccessJsonMsg("success"); return JsonMessageUtil.getSuccessJsonMsg("success");
} }
@PutMapping("/{id}")
public JsonMessageUtil.JsonMessage<TaskVo> updateStatus(@PathVariable("id") Long id, @RequestParam Integer status) {
taskService.updateStatus(id, status);
return JsonMessageUtil.getSuccessJsonMsg("success");
}
@GetMapping("/startTask/{id}") @GetMapping("/startTask/{id}")
public JsonMessageUtil.JsonMessage<TaskVo> startTask(@PathVariable("id") Long id) { public JsonMessageUtil.JsonMessage<TaskVo> startTask(@PathVariable("id") Long id) {
......
package com.viontech.fanxing.task.scheduling.feign; package com.viontech.fanxing.task.feign;
import com.viontech.fanxing.commons.model.StoreConfig; import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.keliu.util.JsonMessageUtil; import com.viontech.keliu.util.JsonMessageUtil;
import org.springframework.cloud.openfeign.FeignClient; import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
/** /**
* . * .
...@@ -15,11 +15,8 @@ import org.springframework.web.bind.annotation.*; ...@@ -15,11 +15,8 @@ import org.springframework.web.bind.annotation.*;
*/ */
@Component @Component
@FeignClient(value = "fanxing-task-manager") @FeignClient(value = "fanxing-ops")
public interface TaskClient { public interface OpsClient {
@PutMapping("/tasks/{id}")
JsonMessageUtil.JsonMessage<TaskVo> updateTaskStatus(@PathVariable("id") Long taskId, @RequestParam Integer status);
@GetMapping("/storeConfigs/{id}") @GetMapping("/storeConfigs/{id}")
JsonMessageUtil.JsonMessage<StoreConfig> getStoreConfigById(@PathVariable("id") Long storeConfigId); JsonMessageUtil.JsonMessage<StoreConfig> getStoreConfigById(@PathVariable("id") Long storeConfigId);
......
package com.viontech.fanxing.task.manager.mapper; package com.viontech.fanxing.task.mapper;
import com.viontech.fanxing.commons.base.BaseMapper; import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
......
<?xml version="1.0" encoding="UTF-8" ?> <?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.viontech.fanxing.task.manager.mapper.TaskMapper" > <mapper namespace="com.viontech.fanxing.task.mapper.TaskMapper" >
<resultMap id="BaseResultMapRoot" type="com.viontech.fanxing.commons.model.Task" > <resultMap id="BaseResultMapRoot" type="com.viontech.fanxing.commons.model.Task" >
<id column="task_id" property="id" /> <id column="task_id" property="id" />
<result column="task_unid" property="unid" /> <result column="task_unid" property="unid" />
...@@ -90,7 +90,7 @@ ...@@ -90,7 +90,7 @@
</sql> </sql>
<sql id="Base_Column_List" > <sql id="Base_Column_List" >
<if test="!(_parameter.getClass().getSimpleName() == 'TaskExample')" > <if test="!(_parameter.getClass().getSimpleName() == 'TaskExample')" >
<include refid="com.viontech.fanxing.task.manager.mapper.TaskMapper.Base_Column_List_Root" /> <include refid="com.viontech.fanxing.task.mapper.TaskMapper.Base_Column_List_Root" />
</if> </if>
<if test="_parameter.getClass().getSimpleName() == 'TaskExample'" > <if test="_parameter.getClass().getSimpleName() == 'TaskExample'" >
<foreach collection="columnContainerSet" item="columns" separator="," > <foreach collection="columnContainerSet" item="columns" separator="," >
...@@ -100,7 +100,7 @@ ...@@ -100,7 +100,7 @@
${columns.columnContainerStr} ${columns.columnContainerStr}
</if> </if>
<if test="!columns.valid" > <if test="!columns.valid" >
<include refid="com.viontech.fanxing.task.manager.mapper.TaskMapper.Base_Column_List_Root" /> <include refid="com.viontech.fanxing.task.mapper.TaskMapper.Base_Column_List_Root" />
</if> </if>
</when> </when>
</choose> </choose>
......
package com.viontech.fanxing.task.scheduling.model; package com.viontech.fanxing.task.model;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
......
package com.viontech.fanxing.task.scheduling.model; package com.viontech.fanxing.task.model;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import lombok.Getter; import lombok.Getter;
......
package com.viontech.fanxing.task.scheduling.model.vaserver; package com.viontech.fanxing.task.model.vaserver;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.model.TaskData;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.Setter; import lombok.Setter;
......
package com.viontech.fanxing.task.scheduling.model.vaserver; package com.viontech.fanxing.task.model.vaserver;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
......
package com.viontech.fanxing.task.scheduling.repository; package com.viontech.fanxing.task.repository;
import com.viontech.fanxing.commons.constant.RedisKeys; import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.commons.service.RedisService; import com.viontech.fanxing.commons.service.RedisService;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
......
package com.viontech.fanxing.task.scheduling.repository; package com.viontech.fanxing.task.repository;
import com.viontech.fanxing.commons.constant.RedisKeys; import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.commons.service.RedisService; import com.viontech.fanxing.commons.service.RedisService;
import org.redisson.api.RBucket; import org.redisson.api.RBucket;
import org.redisson.api.RMap; import org.redisson.api.RMap;
......
package com.viontech.fanxing.task.runner;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.TaskExample;
import com.viontech.fanxing.task.feign.OpsClient;
import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.service.adapter.TaskService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* .
*
* @author 谢明辉
* @date 2021/9/2
*/
@Component
@Slf4j
public class TaskInitRunner implements CommandLineRunner {
@Resource
private TaskService taskService;
@Resource
private TaskDataService taskDataService;
@Resource
private OpsClient opsClient;
@Override
public void run(String... args) throws Exception {
log.info("===================任务初始化开始===================");
List<Task> tasks = taskService.selectByExample(new TaskExample());
for (Task task : tasks) {
if (StringUtils.isNotBlank(task.getScene()) && task.getStoreConfigId() != null) {
try {
taskDataService.addTask(task);
} catch (Exception e) {
log.info("初始化任务失败,任务unid:{},失败信息:{}", task.getUnid(), e.getMessage());
}
}
}
log.info("===================任务初始化结束===================");
}
}
package com.viontech.fanxing.task.scheduling.runner; package com.viontech.fanxing.task.runner;
import com.viontech.fanxing.commons.constant.TaskStatus; import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService; import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.scheduling.feign.TaskClient; import com.viontech.fanxing.task.feign.OpsClient;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.service.TaskService; import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.scheduling.service.VAServerService; import com.viontech.fanxing.task.service.VAServerService;
import com.viontech.fanxing.task.service.adapter.TaskService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RMap; import org.redisson.api.RMap;
...@@ -34,9 +35,11 @@ public class TaskRunner { ...@@ -34,9 +35,11 @@ public class TaskRunner {
@Resource @Resource
private VAServerService vaServerService; private VAServerService vaServerService;
@Resource @Resource
private TaskService taskService; private TaskDataService taskDataService;
@Resource
private OpsClient opsClient;
@Resource @Resource
private TaskClient taskClient; private TaskService taskService;
@Scheduled(fixedDelay = 5000) @Scheduled(fixedDelay = 5000)
public void executedTaskListener() { public void executedTaskListener() {
...@@ -49,10 +52,10 @@ public class TaskRunner { ...@@ -49,10 +52,10 @@ public class TaskRunner {
Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true); Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);
for (String taskUnid : entryCollection) { for (String taskUnid : entryCollection) {
log.info("开始任务 : {}", taskUnid); log.info("开始任务 : {}", taskUnid);
TaskData taskData = taskService.getRepository().getTaskDataByUnid(taskUnid); TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid);
if (taskData == null) { if (taskData == null) {
log.info("找不到对应任务,移除所有:{}", taskUnid); log.info("找不到对应任务,移除所有:{}", taskUnid);
taskService.removeTaskDataAll(taskUnid); taskDataService.removeTaskDataAll(taskUnid);
continue; continue;
} }
Task task = taskData.getTask(); Task task = taskData.getTask();
...@@ -84,14 +87,14 @@ public class TaskRunner { ...@@ -84,14 +87,14 @@ public class TaskRunner {
// 找不到可以用来执行的设备,需要修改状态 // 找不到可以用来执行的设备,需要修改状态
if (server == null) { if (server == null) {
log.info("找不到可用的 VAServer,跳过:{}", taskUnid); log.info("找不到可用的 VAServer,跳过:{}", taskUnid);
taskClient.updateTaskStatus(task.getId(), TaskStatus.CAN_NOT_RUN.val); taskService.updateStatus(task.getId(), TaskStatus.CAN_NOT_RUN.val);
continue; continue;
} }
boolean success = vaServerService.executeTask(taskData, server); boolean success = vaServerService.executeTask(taskData, server);
// 修改任务状态 // 修改任务状态
taskClient.updateTaskStatus(task.getId(), TaskStatus.RUNNING.val); taskService.updateStatus(task.getId(), TaskStatus.RUNNING.val);
// 移除任务 // 移除任务
set.remove(taskUnid); set.remove(taskUnid);
...@@ -117,17 +120,17 @@ public class TaskRunner { ...@@ -117,17 +120,17 @@ public class TaskRunner {
for (String taskUnid : entryCollection) { for (String taskUnid : entryCollection) {
log.info("停止任务 : {}", taskUnid); log.info("停止任务 : {}", taskUnid);
TaskData taskData = taskService.getRepository().getTaskDataByUnid(taskUnid); TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid);
if (taskData == null) { if (taskData == null) {
log.info("找不到对应任务,移除所有:{}", taskUnid); log.info("找不到对应任务,移除所有:{}", taskUnid);
taskService.removeTaskDataAll(taskUnid); taskDataService.removeTaskDataAll(taskUnid);
continue; continue;
} }
// 获取可用的 vaserver ,执行任务终止动作,如果成功,解除 taskData和 vaServer 的关联,并且从 zset 中移除任务,恢复vaserver资源数,计算下次任务执行时间,放入zset中 // 获取可用的 vaserver ,执行任务终止动作,如果成功,解除 taskData和 vaServer 的关联,并且从 zset 中移除任务,恢复vaserver资源数,计算下次任务执行时间,放入zset中
boolean success = vaServerService.terminateTask(taskUnid); boolean success = vaServerService.terminateTask(taskUnid);
if (success) { if (success) {
taskClient.updateTaskStatus(taskData.getTask().getId(), TaskStatus.PAUSE.val); taskService.updateStatus(taskData.getTask().getId(), TaskStatus.PAUSE.val);
boolean b = taskService.distributeTask(taskData); boolean b = taskDataService.distributeTask(taskData);
} }
set.remove(taskUnid); set.remove(taskUnid);
} }
......
package com.viontech.fanxing.task.scheduling.service; package com.viontech.fanxing.task.service;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService; import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.scheduling.model.RuntimeConfig; import com.viontech.fanxing.task.feign.OpsClient;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.model.RuntimeConfig;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.scheduling.repository.TaskDataRedisRepository; import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.repository.TaskDataRedisRepository;
import com.viontech.keliu.util.JsonMessageUtil;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet; import org.redisson.api.RScoredSortedSet;
...@@ -20,7 +24,7 @@ import javax.annotation.Resource; ...@@ -20,7 +24,7 @@ import javax.annotation.Resource;
* @date 2021/7/13 * @date 2021/7/13
*/ */
@Service @Service
public class TaskService { public class TaskDataService {
@Resource @Resource
private RedisService redisService; private RedisService redisService;
...@@ -28,10 +32,39 @@ public class TaskService { ...@@ -28,10 +32,39 @@ public class TaskService {
private VAServerService vaServerService; private VAServerService vaServerService;
@Resource @Resource
private TaskDataRedisRepository taskDataRedisRepository; private TaskDataRedisRepository taskDataRedisRepository;
@Resource
private OpsClient opsClient;
public void addTask(Task task) {
TaskData taskData = new TaskData(task);
// 获取存储配置
Long storeConfigId = task.getStoreConfigId();
JsonMessageUtil.JsonMessage<StoreConfig> storeConfigRes = opsClient.getStoreConfigById(storeConfigId);
StoreConfig storeConfigVo = (StoreConfig) storeConfigRes.getData();
if (storeConfigVo == null) {
throw new FanXingException("无法获取对应的存储配置");
}
taskData.setStoreConfig(storeConfigVo.getContent());
// 计算运行时间并生成任务
boolean success = distributeTask(taskData);
if (success) {
taskDataRedisRepository.addOrUpdateTaskData(taskData);
} else {
throw new FanXingException("任务找不到可执行时间");
}
}
public boolean distributeTask(TaskData taskData) { public boolean distributeTask(TaskData taskData) {
RuntimeConfig runtimeConfig = taskData.getRuntimeConfig(); RuntimeConfig runtimeConfig = taskData.getRuntimeConfig();
String taskUnid = taskData.getTask().getUnid(); String taskUnid = taskData.getTask().getUnid();
// 如果任务正在执行则不进行分配
VaServerInfo vaServerInfo = taskRunOn(taskUnid);
if (vaServerInfo != null) {
return false;
}
ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal(); ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
Long nextExecuteTime = nextTime.left; Long nextExecuteTime = nextTime.left;
Long nextTerminateTime = nextTime.right; Long nextTerminateTime = nextTime.right;
...@@ -78,12 +111,30 @@ public class TaskService { ...@@ -78,12 +111,30 @@ public class TaskService {
return ImmutablePair.of(taskUnid, devId); return ImmutablePair.of(taskUnid, devId);
} }
public void deleteTask(String taskUnid) {
boolean success = vaServerService.terminateTask(taskUnid);
if (success) {
removeTaskDataAll(taskUnid);
} else {
throw new FanXingException("failed");
}
}
public void updateTask(Task task) { public void updateTask(Task task) {
TaskData taskData = new TaskData(task); String taskUnid = task.getUnid();
// 需要更新taskData,并且向vaServer更新任务信息 VaServerInfo vaServerInfo = taskRunOn(taskUnid);
taskDataRedisRepository.addOrUpdateTaskData(taskData);
vaServerService.updateTask(taskData); // vaServerId 为空说明任务未执行可以先删除再建立新任务
if (vaServerInfo == null) {
deleteTask(taskUnid);
addTask(task);
} else {
TaskData taskData = new TaskData(task);
// 需要更新taskData,并且向vaServer更新任务信息
taskDataRedisRepository.addOrUpdateTaskData(taskData);
vaServerService.updateTask(taskData);
}
} }
public TaskDataRedisRepository getRepository() { public TaskDataRedisRepository getRepository() {
......
package com.viontech.fanxing.task.scheduling.service; package com.viontech.fanxing.task.service;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VATask; import com.viontech.fanxing.task.model.vaserver.VATask;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders; import org.apache.http.HttpHeaders;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
......
package com.viontech.fanxing.task.scheduling.service; package com.viontech.fanxing.task.service;
import com.viontech.fanxing.commons.constant.RedisKeys; import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.exception.FanXingException; import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService; import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.repository.VAServerRedisRepository;
import com.viontech.fanxing.task.scheduling.repository.VAServerRedisRepository; import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import org.redisson.api.RBucket; import org.redisson.api.RBucket;
import org.redisson.api.RLock; import org.redisson.api.RLock;
import org.redisson.api.RMap; import org.redisson.api.RMap;
...@@ -31,7 +31,7 @@ public class VAServerService { ...@@ -31,7 +31,7 @@ public class VAServerService {
@Resource @Resource
private VAServerRedisRepository vaServerRedisRepository; private VAServerRedisRepository vaServerRedisRepository;
@Resource @Resource
private TaskService taskService; private TaskDataService taskDataService;
@Resource @Resource
private VAServerHttpService vaServerHttpService; private VAServerHttpService vaServerHttpService;
...@@ -84,7 +84,7 @@ public class VAServerService { ...@@ -84,7 +84,7 @@ public class VAServerService {
* 删除任务 * 删除任务
*/ */
public boolean terminateTask(String taskUnid) { public boolean terminateTask(String taskUnid) {
TaskData taskData = taskService.getRepository().getTaskDataByUnid(taskUnid); TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid);
if (taskData == null) { if (taskData == null) {
return false; return false;
} }
...@@ -128,7 +128,7 @@ public class VAServerService { ...@@ -128,7 +128,7 @@ public class VAServerService {
* 修改任务 * 修改任务
*/ */
public boolean updateTask(TaskData taskData) { public boolean updateTask(TaskData taskData) {
VaServerInfo vaServerInfo = taskService.taskRunOn(taskData.getTask().getUnid()); VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskData.getTask().getUnid());
vaServerHttpService.updateTask(taskData, vaServerInfo); vaServerHttpService.updateTask(taskData, vaServerInfo);
return true; return true;
} }
...@@ -137,7 +137,7 @@ public class VAServerService { ...@@ -137,7 +137,7 @@ public class VAServerService {
* 截图 * 截图
*/ */
public Object snapshot(String taskUnid) { public Object snapshot(String taskUnid) {
VaServerInfo vaServerInfo = taskService.taskRunOn(taskUnid); VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
if (vaServerInfo != null) { if (vaServerInfo != null) {
return vaServerHttpService.snapshot(taskUnid, vaServerInfo); return vaServerHttpService.snapshot(taskUnid, vaServerInfo);
} else { } else {
...@@ -149,7 +149,7 @@ public class VAServerService { ...@@ -149,7 +149,7 @@ public class VAServerService {
* 获取点播地址 * 获取点播地址
*/ */
public Object getAnalyzeStream(String taskUnid) { public Object getAnalyzeStream(String taskUnid) {
VaServerInfo vaServerInfo = taskService.taskRunOn(taskUnid); VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
if (vaServerInfo != null) { if (vaServerInfo != null) {
return vaServerHttpService.getAnalyzeStream(taskUnid, vaServerInfo); return vaServerHttpService.getAnalyzeStream(taskUnid, vaServerInfo);
} else { } else {
...@@ -161,7 +161,7 @@ public class VAServerService { ...@@ -161,7 +161,7 @@ public class VAServerService {
* 输出分析流 * 输出分析流
*/ */
public Object startAnalyzeStream(String taskUnid, String url) { public Object startAnalyzeStream(String taskUnid, String url) {
VaServerInfo vaServerInfo = taskService.taskRunOn(taskUnid); VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
if (vaServerInfo != null) { if (vaServerInfo != null) {
return vaServerHttpService.startAnalyzeStream(taskUnid, vaServerInfo, url); return vaServerHttpService.startAnalyzeStream(taskUnid, vaServerInfo, url);
} else { } else {
...@@ -193,7 +193,7 @@ public class VAServerService { ...@@ -193,7 +193,7 @@ public class VAServerService {
* 场景切换 * 场景切换
*/ */
public Object switchScene(String taskUnid, String sceneId) { public Object switchScene(String taskUnid, String sceneId) {
VaServerInfo vaServerInfo = taskService.taskRunOn(taskUnid); VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
if (vaServerInfo != null) { if (vaServerInfo != null) {
return vaServerHttpService.switchScene(taskUnid, vaServerInfo, sceneId); return vaServerHttpService.switchScene(taskUnid, vaServerInfo, sceneId);
} else { } else {
...@@ -205,7 +205,7 @@ public class VAServerService { ...@@ -205,7 +205,7 @@ public class VAServerService {
* 任务轮训状态切换 * 任务轮训状态切换
*/ */
public Object updateRotationStatus(String taskUnid, Integer rotationStatus) { public Object updateRotationStatus(String taskUnid, Integer rotationStatus) {
VaServerInfo vaServerInfo = taskService.taskRunOn(taskUnid); VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
if (vaServerInfo != null) { if (vaServerInfo != null) {
return vaServerHttpService.updateRotationStatus(taskUnid, rotationStatus, vaServerInfo); return vaServerHttpService.updateRotationStatus(taskUnid, rotationStatus, vaServerInfo);
} else { } else {
...@@ -217,7 +217,7 @@ public class VAServerService { ...@@ -217,7 +217,7 @@ public class VAServerService {
* 任务轮训状态查询 * 任务轮训状态查询
*/ */
public Object getRotationStatus(String taskUnid) { public Object getRotationStatus(String taskUnid) {
VaServerInfo vaServerInfo = taskService.taskRunOn(taskUnid); VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
if (vaServerInfo != null) { if (vaServerInfo != null) {
return vaServerHttpService.getRotationStatus(taskUnid, vaServerInfo); return vaServerHttpService.getRotationStatus(taskUnid, vaServerInfo);
} else { } else {
......
package com.viontech.fanxing.task.manager.service.adapter; package com.viontech.fanxing.task.service.adapter;
import com.viontech.fanxing.commons.base.BaseService; import com.viontech.fanxing.commons.base.BaseService;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
......
package com.viontech.fanxing.task.manager.service.impl; package com.viontech.fanxing.task.service.impl;
import com.viontech.fanxing.commons.base.BaseMapper; import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.base.BaseServiceImpl; import com.viontech.fanxing.commons.base.BaseServiceImpl;
import com.viontech.fanxing.commons.constant.TaskStatus; import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.vo.TaskVo; import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.fanxing.task.manager.feign.TaskSchedulingClient; import com.viontech.fanxing.task.mapper.TaskMapper;
import com.viontech.fanxing.task.manager.mapper.TaskMapper; import com.viontech.fanxing.task.service.TaskDataService;
import com.viontech.fanxing.task.manager.service.adapter.TaskService; import com.viontech.fanxing.task.service.adapter.TaskService;
import com.viontech.keliu.util.JsonMessageUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
...@@ -20,7 +19,7 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic ...@@ -20,7 +19,7 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
@Resource @Resource
private TaskMapper taskMapper; private TaskMapper taskMapper;
@Resource @Resource
private TaskSchedulingClient taskSchedulingClient; private TaskDataService taskDataService;
@Override @Override
public BaseMapper<Task> getMapper() { public BaseMapper<Task> getMapper() {
...@@ -34,10 +33,7 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic ...@@ -34,10 +33,7 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
task = selectByPrimaryKey(task.getId()); task = selectByPrimaryKey(task.getId());
if (StringUtils.isNotBlank(task.getScene()) && task.getStoreConfigId() != null) { if (StringUtils.isNotBlank(task.getScene()) && task.getStoreConfigId() != null) {
JsonMessageUtil.JsonMessage add = taskSchedulingClient.add(task); taskDataService.addTask(task);
if (!add.isSuccess()) {
throw new RuntimeException(add.getMsg());
}
} }
return new TaskVo(task); return new TaskVo(task);
} }
...@@ -49,10 +45,7 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic ...@@ -49,10 +45,7 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
task = selectByPrimaryKey(task.getId()); task = selectByPrimaryKey(task.getId());
if (StringUtils.isNotBlank(task.getScene()) && task.getStoreConfigId() != null) { if (StringUtils.isNotBlank(task.getScene()) && task.getStoreConfigId() != null) {
JsonMessageUtil.JsonMessage add = taskSchedulingClient.add(task); taskDataService.addTask(task);
if (!add.isSuccess()) {
throw new RuntimeException(add.getMsg());
}
} }
return new TaskVo(task); return new TaskVo(task);
} }
...@@ -61,12 +54,8 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic ...@@ -61,12 +54,8 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void removeTask(Long id) { public void removeTask(Long id) {
Task task = selectByPrimaryKey(id); Task task = selectByPrimaryKey(id);
JsonMessageUtil.JsonMessage delete = taskSchedulingClient.delete(task.getUnid()); taskDataService.deleteTask(task.getUnid());
if (delete.isSuccess()) { deleteByPrimaryKey(id);
deleteByPrimaryKey(id);
} else {
throw new RuntimeException(delete.getMsg());
}
} }
@Override @Override
...@@ -94,9 +83,6 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic ...@@ -94,9 +83,6 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
taskVo.setId(id); taskVo.setId(id);
updateByPrimaryKeySelective(taskVo); updateByPrimaryKeySelective(taskVo);
JsonMessageUtil.JsonMessage add = taskSchedulingClient.add(task); taskDataService.addTask(task);
if (!add.isSuccess()) {
throw new RuntimeException(add.getMsg());
}
} }
} }
\ No newline at end of file \ No newline at end of file
...@@ -37,16 +37,19 @@ spring: ...@@ -37,16 +37,19 @@ spring:
date-format: yyyy-MM-dd HH:mm:ss date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8 time-zone: GMT+8
default-property-inclusion: non_null default-property-inclusion: non_null
logging:
config: classpath:logback-${spring.profiles.active}.xml
mybatis: mybatis:
type-aliases-package: com.viontech.fanxing.commons.model type-aliases-package: com.viontech.fanxing.commons.model
mapper-locations: classpath:com/viontech/fanxing/task/manager/mapping/*.xml mapper-locations: classpath:com/viontech/fanxing/task/mapping/*.xml
pagehelper: pagehelper:
helper-dialect: mysql helper-dialect: mysql
reasonable: true reasonable: true
supportMethodsArguments: true supportMethodsArguments: true
params: count=countByExample params: count=countByExample
logging:
config: classpath:logback-${spring.profiles.active}.xml
vion: vion:
redisson: redisson:
path: F:\myIDEAworkspace\jt\fanxing3\fanxing-commons\src\main\resources\redisson.yml path: F:\myIDEAworkspace\jt\fanxing3\fanxing-commons\src\main\resources\redisson.yml
gateway:
ip: 192.168.9.233
port: 30000
\ No newline at end of file \ No newline at end of file
...@@ -5,7 +5,7 @@ spring: ...@@ -5,7 +5,7 @@ spring:
active: active:
${PROFILE} ${PROFILE}
application: application:
name: fanxing-task-scheduling name: fanxing-task
cloud: cloud:
consul: consul:
host: 192.168.9.233 host: 192.168.9.233
......
package com.viontech.fanxing.task.scheduling.service; package com.viontech.fanxing.task.service;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.viontech.fanxing.commons.constant.TaskStatus; import com.viontech.fanxing.commons.constant.TaskStatus;
...@@ -6,11 +6,12 @@ import com.viontech.fanxing.commons.model.StoreConfig; ...@@ -6,11 +6,12 @@ import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService; import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.commons.vo.TaskVo; import com.viontech.fanxing.commons.vo.TaskVo;
import com.viontech.fanxing.task.scheduling.feign.TaskClient; import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.TaskData; import com.viontech.fanxing.task.feign.OpsClient;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo; import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.repository.TaskDataRedisRepository; import com.viontech.fanxing.task.repository.TaskDataRedisRepository;
import com.viontech.fanxing.task.scheduling.repository.VAServerRedisRepository; import com.viontech.fanxing.task.repository.VAServerRedisRepository;
import com.viontech.fanxing.task.service.adapter.TaskService;
import com.viontech.keliu.util.JsonMessageUtil; import com.viontech.keliu.util.JsonMessageUtil;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
...@@ -41,9 +42,11 @@ class VAServerHttpServiceTest { ...@@ -41,9 +42,11 @@ class VAServerHttpServiceTest {
private VAServerRedisRepository vaServerRedisRepository; private VAServerRedisRepository vaServerRedisRepository;
@Resource @Resource
private TaskClient taskClient; private OpsClient opsClient;
private VaServerInfo vaServerInfo; private VaServerInfo vaServerInfo;
private TaskData taskData; private TaskData taskData;
@Resource
private TaskService taskService;
@BeforeEach @BeforeEach
public void before() { public void before() {
...@@ -92,7 +95,7 @@ class VAServerHttpServiceTest { ...@@ -92,7 +95,7 @@ class VAServerHttpServiceTest {
@Test @Test
void storeConfig() { void storeConfig() {
JsonMessageUtil.JsonMessage<StoreConfig> storeConfigById = taskClient.getStoreConfigById(6L); JsonMessageUtil.JsonMessage<StoreConfig> storeConfigById = opsClient.getStoreConfigById(6L);
System.out.println(JSON.toJSONString(storeConfigById.getData())); System.out.println(JSON.toJSONString(storeConfigById.getData()));
} }
...@@ -108,8 +111,7 @@ class VAServerHttpServiceTest { ...@@ -108,8 +111,7 @@ class VAServerHttpServiceTest {
void test() throws Exception { void test() throws Exception {
// RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap(); // RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
// taskVaServerMap.put("b0c20c4a-fffd-11eb-a74d-0242ac11001d", DEV_ID); // taskVaServerMap.put("b0c20c4a-fffd-11eb-a74d-0242ac11001d", DEV_ID);
JsonMessageUtil.JsonMessage<TaskVo> taskVoJsonMessage = taskClient.updateTaskStatus(19L, TaskStatus.RUNNING.val); taskService.updateStatus(19L, TaskStatus.RUNNING.val);
System.out.println(JSON.toJSONString(taskVoJsonMessage.getData()));
// RScoredSortedSet<String> set = redisService.getClient().getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET); // RScoredSortedSet<String> set = redisService.getClient().getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET);
// RScoredSortedSet<String> set2 = redisService.getClient().getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET); // RScoredSortedSet<String> set2 = redisService.getClient().getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
......
...@@ -16,8 +16,7 @@ ...@@ -16,8 +16,7 @@
<modules> <modules>
<module>fanxing-commons</module> <module>fanxing-commons</module>
<module>fanxing-gateway</module> <module>fanxing-gateway</module>
<module>fanxing-task-manager</module> <module>fanxing-task</module>
<module>fanxing-task-scheduling</module>
<module>fanxing-forward</module> <module>fanxing-forward</module>
<module>fanxing-ops</module> <module>fanxing-ops</module>
<module>fanxing-query</module> <module>fanxing-query</module>
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!