Commit 5adeb0ca by xmh

任务调度部分

1 parent 93c2722f
......@@ -33,7 +33,7 @@
任务下发和任务结束两个动作会被定义为延时任务
拿到一个任务后计算下一次任务下发的时间,将时间戳作为分数,将任务作为内容存入redis的zset中。调度任务会有每秒执行一次并且使用分布式锁的定时任务,从zset中循环取出分数最低的一条判断是否应该被执行,如果需要被执行,将任务推入list队列中并从zset中移除。有另外的定时任务通过订阅队列获取到动作并执行,如果是任务下发动作,那么在执行完成后需要计算下一次任务结束的时间并加入zset中,如果是任务结束动作,同样需要计算下一次任务开始执行的时间并加入zset
拿到任务后,计算任务下一次下发时间和终止时间,将时间戳作为分数,将任务作为内容分别存入下发动作和终止动作的zset中。有两个定时任务每隔5秒执行一次,每次取出分数最小的动作判断是否应该被执行,如果需要被执行,并且是下发动作,寻找到可用的VAServer,执行下发动作,成功后将任务和VAServer进行关联,并将动作从zset中移除。如果是终止动作,寻找到与任务关联的VAServer,执行终止动作,成功后解除关联,并将动作从zset中移除,计算下一次任务下发时间和终止时间,加入zset中
5. 转发服务
......
......@@ -93,5 +93,10 @@
<artifactId>mybatis</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.viontech.fanxing.commons.constant;
/**
* .
*
* @author 谢明辉
* @date 2021/7/12
*/
public class RedisKeys {
public static final String SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET = "scheduling:toBeExecutedTaskUnidSet";
public static final String SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET = "scheduling:toBeTerminatedTaskUnidSet";
public static final String SCHEDULING_TASK_DATA_MAP = "scheduling:taskDataMap";
public static final String SCHEDULING_VASERVER_MAP = "scheduling:vaServerMap";
public static final String SCHEDULING_TASK_VASERVER_MAP = "scheduling:taskVAServerMap";
public static String getVAServerKeepAliveKey(String devId) {
return devId == null ? "scheduling:keepalive" : "scheduling:keepalive" + ":" + devId;
}
}
......@@ -2,10 +2,15 @@ package com.viontech.fanxing.task.scheduling.controller;
import com.viontech.fanxing.commons.feing.TaskSchedulingTasksAdapter;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.service.TaskService;
import com.viontech.fanxing.task.scheduling.service.VAServerService;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.springframework.web.bind.annotation.*;
import org.w3c.dom.html.HTMLTableRowElement;
import javax.annotation.Resource;
/**
* .
......@@ -18,21 +23,49 @@ import org.w3c.dom.html.HTMLTableRowElement;
@Slf4j
public class TaskController implements TaskSchedulingTasksAdapter {
@Resource
private VAServerService vaServerService;
@Resource
private TaskService taskService;
@Override
@PostMapping
public JsonMessageUtil.JsonMessage add(Task task) {
return null;
TaskData taskData = new TaskData(task);
String unid = task.getUnid();
ImmutablePair<Long, Long> nextTime = taskData.getRuntimeConfig().getNextTimeOfExecutionAndTerminal();
Long nextExecuteTime = nextTime.left;
Long nextTerminateTime = nextTime.right;
if (nextExecuteTime != null) {
taskService.addExecutableTaskData(unid, nextExecuteTime);
taskService.addTerminatableTaskData(unid, nextTerminateTime);
taskService.addTaskData(taskData);
return JsonMessageUtil.getSuccessJsonMsg("success");
} else {
return JsonMessageUtil.getErrorJsonMsg("任务找不到可执行时间");
}
}
@Override
@PutMapping
public JsonMessageUtil.JsonMessage update(Task task) {
return null;
JsonMessageUtil.JsonMessage delete = delete(task);
if (delete.isSuccess()) {
JsonMessageUtil.JsonMessage add = add(task);
return add;
} else {
return delete;
}
}
@Override
@DeleteMapping
public JsonMessageUtil.JsonMessage delete(Task task) {
return null;
String unid = task.getUnid();
taskService.removeTaskDataAll(unid);
vaServerService.terminateTask(task);
// todo
return JsonMessageUtil.getSuccessJsonMsg("success");
}
}
package com.viontech.fanxing.task.scheduling.controller;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.service.VAServerService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* .
*
* @author 谢明辉
* @date 2021/7/13
*/
@RestController
public class VAServerController {
@Resource
private VAServerService vaServerService;
@PostMapping("/register")
public Object register(@RequestBody VaServerInfo vaServerInfo) {
vaServerService.registeVAServer(vaServerInfo);
return "success";
}
@PostMapping("/keepalive")
public Object keepalive(String devId) {
vaServerService.keepalive(devId);
return "success";
}
@PostMapping("/status")
public Object status() {
// todo
return null;
}
}
package com.viontech.fanxing.task.scheduling.model;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.model.Task;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.security.InvalidParameterException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
/**
* type
* <li>0 每天运行,指定开始和结束的时间</li>
* <li>1 运行一次,指定具体的开始和结束的日期时间</li>
* <li>2 按照星期配置,指定每周某天的开始和结束的时间</li>
* <li>3 随机运行,指定运行时长</li>
*
* @author 谢明辉
* @date 2021/7/13
*/
@Getter
@Setter
public class RuntimeConfig {
private static final String[] WEEK_ARR = new String[]{"sun", "mon", "tue", "wed", "thu", "fri", "sat"};
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss");
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private int type;
private HashMap<Integer, Config> weekConfigMap;
private Config singleConfig;
public RuntimeConfig(Task task) {
String runtimeConf = task.getRuntimeConf();
JSONObject jsonObject = JSON.parseObject(runtimeConf);
Integer typeT = jsonObject.getInteger("type");
if (typeT == null) {
throw new InvalidParameterException("运行配置中type为空");
}
this.type = typeT;
JSONObject config = jsonObject.getJSONObject("config");
switch (this.type) {
case 0:
String start = config.getString("start");
String end = config.getString("end");
this.singleConfig = new Config().setStart(LocalTime.parse(start, TIME_FORMATTER)).setEnd(LocalTime.parse(end, TIME_FORMATTER));
break;
case 1:
String start1 = config.getString("start");
String end1 = config.getString("end");
this.singleConfig = new Config().setStartTime(LocalDateTime.parse(start1, DATE_TIME_FORMATTER)).setEndTime(LocalDateTime.parse(end1, DATE_TIME_FORMATTER));
break;
case 2:
this.weekConfigMap = new HashMap<>();
for (int i = 0; i < WEEK_ARR.length; i++) {
String week = WEEK_ARR[i];
JSONObject configMap = config.getJSONObject(week);
if (configMap == null) {
continue;
}
String weekStart = configMap.getString("start");
String weekEnd = configMap.getString("end");
Config weekConfig = new Config().setStart(LocalTime.parse(weekStart, TIME_FORMATTER)).setEnd(LocalTime.parse(weekEnd, TIME_FORMATTER));
this.weekConfigMap.put(i, weekConfig);
}
break;
case 3:
Long runningTime = config.getLong("runningTime");
this.singleConfig = new Config().setRunningTime(runningTime);
break;
default:
throw new InvalidParameterException("错误的type");
}
}
/**
* @return org.apache.commons.lang3.tuple.ImmutablePair<Long, Long>
* <li>left 下一次任务下发时间</li>
* <li>right 下一次任务终止时间</li>
*/
public ImmutablePair<Long, Long> getNextTimeOfExecutionAndTerminal() {
Long executeTimestamp = null;
Long terminateTimestamp = null;
LocalDateTime now = LocalDateTime.now();
LocalDateTime executeTime;
LocalDateTime terminateTime;
switch (this.type) {
case 0:
LocalTime start = singleConfig.getStart();
executeTime = LocalDateTime.of(LocalDate.now(), start);
LocalTime end = singleConfig.getEnd();
terminateTime = LocalDateTime.of(executeTime.toLocalDate(), end);
// 如果终止时间在现在的时间之前需要明天执行
if (terminateTime.isBefore(now)) {
executeTime = executeTime.plusDays(1);
terminateTime = terminateTime.plusDays(1);
}
executeTimestamp = executeTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
terminateTimestamp = terminateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
break;
case 1:
executeTime = singleConfig.getStartTime();
terminateTime = singleConfig.getEndTime();
if (terminateTime.isBefore(now)) {
return null;
}
executeTimestamp = executeTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
terminateTimestamp = terminateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
break;
case 2:
Calendar instance = Calendar.getInstance();
instance.setTime(new Date());
int dayOfWeek = instance.get(Calendar.DAY_OF_WEEK);
dayOfWeek -= 1;
LocalDate localDate = LocalDate.now();
Config config;
boolean success = false;
for (int i = 0; i <= 7; i++) {
config = this.weekConfigMap.get(dayOfWeek);
if (config != null) {
executeTime = LocalDateTime.of(localDate, config.getStart());
terminateTime = LocalDateTime.of(executeTime.toLocalDate(), config.getEnd());
if (terminateTime.isAfter(now)) {
executeTimestamp = executeTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
terminateTimestamp = terminateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
success = true;
break;
}
}
localDate = localDate.plusDays(1);
dayOfWeek = (dayOfWeek + 1) % 7;
}
if (!success) {
return null;
}
break;
case 3:
// 随机执行,待完善
break;
default:
return null;
}
return ImmutablePair.of(executeTimestamp, terminateTimestamp);
}
public static class Config {
private LocalTime start;
private LocalTime end;
private Long runningTime;
private LocalDateTime startTime;
private LocalDateTime endTime;
public LocalTime getStart() {
return start;
}
public Config setStart(LocalTime start) {
this.start = start;
return this;
}
public LocalTime getEnd() {
return end;
}
public Config setEnd(LocalTime end) {
this.end = end;
return this;
}
public Long getRunningTime() {
return runningTime;
}
public Config setRunningTime(Long runningTime) {
this.runningTime = runningTime;
return this;
}
public LocalDateTime getStartTime() {
return startTime;
}
public Config setStartTime(LocalDateTime startTime) {
this.startTime = startTime;
return this;
}
public LocalDateTime getEndTime() {
return endTime;
}
public Config setEndTime(LocalDateTime endTime) {
this.endTime = endTime;
return this;
}
}
}
package com.viontech.fanxing.task.scheduling.model;
import com.viontech.fanxing.commons.model.Task;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.io.Serializable;
/**
* .
*
* @author 谢明辉
* @date 2021/7/12
*/
@NoArgsConstructor
@Getter
@Setter
public class TaskData implements Serializable {
private Task task;
private RuntimeConfig runtimeConfig;
public TaskData(Task task) {
this.task = task;
runtimeConfig = new RuntimeConfig(task);
}
}
package com.viontech.fanxing.task.scheduling.model.vaserver;
import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
/**
* .
*
* @author 谢明辉
* @date 2021/7/13
*/
@Getter
@Setter
public class VaServerInfo implements Serializable {
private String devID;
private String serviceName;
private Float videoResource;
private String serviceBaseUrl;
private String proxy;
}
package com.viontech.fanxing.task.scheduling.runner;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.task.scheduling.model.RuntimeConfig;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.scheduling.service.TaskService;
import com.viontech.fanxing.task.scheduling.service.VAServerService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.redisson.client.protocol.ScoredEntry;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
/**
* .
*
* @author 谢明辉
* @date 2021/7/13
*/
@Component
@Slf4j
public class TaskRunner {
@Resource
private RedissonClient redissonClient;
@Resource
private VAServerService vaServerService;
@Resource
private TaskService taskService;
@Scheduled(fixedDelay = 5000)
public void executedTaskListener() {
RLock lock = redissonClient.getLock("executedTaskListener");
boolean isLock;
try {
isLock = lock.tryLock(10, 8, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return;
}
if (isLock) {
try {
RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET);
RMap<String, TaskData> taskDataMap = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_DATA_MAP);
RMap<String, VaServerInfo> vaServerMap = redissonClient.getMap(RedisKeys.SCHEDULING_VASERVER_MAP);
for (; ; ) {
Collection<ScoredEntry<String>> entryCollection = set.entryRange(0, 0);
Iterator<ScoredEntry<String>> iterator = entryCollection.iterator();
if (iterator.hasNext()) {
ScoredEntry<String> next = iterator.next();
Double score = next.getScore();
String taskUnid = next.getValue();
if (System.currentTimeMillis() < score) {
return;
} else {
TaskData taskData = taskDataMap.get(taskUnid);
Collection<VaServerInfo> vaServerInfos = vaServerMap.values();
// todo 获取可用的vaserver,执行任务下发动作,如果成功,关联taskData和vaServer,修改可用资源数,并且从zset中移除任务
vaServerService.executeTask(taskData.getTask(), "devId");
set.remove(taskUnid);
}
} else {
return;
}
}
} catch (Exception e) {
log.error("", e);
} finally {
try {
lock.unlock();
} catch (Exception ignore) {
}
}
}
}
@Scheduled(fixedDelay = 5000)
public void terminatedTaskListener() {
RLock lock = redissonClient.getLock("terminatedTaskListener");
boolean isLock;
try {
isLock = lock.tryLock(10, 8, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return;
}
if (isLock) {
try {
RScoredSortedSet<String> set = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
RMap<String, TaskData> taskDataMap = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_DATA_MAP);
for (; ; ) {
Collection<ScoredEntry<String>> entryCollection = set.entryRange(0, 0);
Iterator<ScoredEntry<String>> iterator = entryCollection.iterator();
if (iterator.hasNext()) {
ScoredEntry<String> next = iterator.next();
Double score = next.getScore();
String taskUnid = next.getValue();
if (System.currentTimeMillis() < score) {
return;
} else {
TaskData taskData = taskDataMap.get(taskUnid);
RuntimeConfig runtimeConfig = taskData.getRuntimeConfig();
boolean success = vaServerService.terminateTask(taskData.getTask());
// todo 获取可用的 vaserver ,执行任务终止动作,如果成功,解除 taskData和 vaServer 的关联,并且从 zset 中移除任务,恢复vaserver资源数,计算下次任务执行时间,放入zset中
if (success) {
ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
taskService.addExecutableTaskData(taskUnid, nextTime.left);
taskService.addTerminatableTaskData(taskUnid, nextTime.right);
}
set.remove(taskUnid);
}
} else {
return;
}
}
} catch (Exception e) {
log.error("", e);
} finally {
try {
lock.unlock();
} catch (Exception ignore) {
}
}
}
}
}
package com.viontech.fanxing.task.scheduling.service;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.task.scheduling.model.TaskData;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* .
*
* @author 谢明辉
* @date 2021/7/13
*/
@Service
public class TaskService {
@Resource
private RedissonClient redissonClient;
public boolean addExecutableTaskData(String taskUnid, Long timestamp) {
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET);
return toBeExecutedTaskUnidSet.add(timestamp, taskUnid);
}
public boolean addTerminatableTaskData(String taskUnid, Long timestamp) {
RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
return toBeTerminatedTaskUnidSet.add(timestamp, taskUnid);
}
public void addTaskData(TaskData taskData) {
RMap<String, TaskData> taskDataMap = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_DATA_MAP);
taskDataMap.put(taskData.getTask().getUnid(), taskData);
}
public void removeTaskDataAll(String taskUnid) {
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_EXECUTED_TASK_UNID_SET);
RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redissonClient.getScoredSortedSet(RedisKeys.SCHEDULING_TO_BE_TERMINATED_TASK_UNID_SET);
RMap<String, TaskData> taskDataMap = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_DATA_MAP);
toBeExecutedTaskUnidSet.remove(taskUnid);
toBeTerminatedTaskUnidSet.remove(taskUnid);
taskDataMap.remove(taskUnid);
}
}
package com.viontech.fanxing.task.scheduling.service;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.task.scheduling.model.vaserver.VaServerInfo;
import org.redisson.api.RBucket;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* 与 视频分析服务 交互的 service
*
* @author 谢明辉
* @date 2021/7/13
*/
@Service
public class VAServerService {
@Resource
private RestTemplateBuilder restTemplateBuilder;
@Resource
private RedissonClient redissonClient;
public void registeVAServer(VaServerInfo vaServerInfo) {
String devID = vaServerInfo.getDevID();
RMap<String, VaServerInfo> map = redissonClient.getMap(RedisKeys.SCHEDULING_VASERVER_MAP);
RBucket<Object> bucket = redissonClient.getBucket(RedisKeys.getVAServerKeepAliveKey(devID));
bucket.set(1);
bucket.expire(3, TimeUnit.MINUTES);
map.put(devID, vaServerInfo);
}
public void keepalive(String devId) {
RBucket<Object> bucket = redissonClient.getBucket(RedisKeys.getVAServerKeepAliveKey(devId));
bucket.set(1);
bucket.expire(3, TimeUnit.MINUTES);
}
public void linkTaskAndVaServer(String taskUnid, String devId) {
RMap<String, String> map = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_VASERVER_MAP);
map.put(taskUnid, devId);
}
public void unlinkTaskAndVaServer(String taskUnid) {
RMap<String, String> map = redissonClient.getMap(RedisKeys.SCHEDULING_TASK_VASERVER_MAP);
map.remove(taskUnid);
}
/**
* 下发任务
*/
public boolean executeTask(Task task, String devId) {
linkTaskAndVaServer(task.getUnid(), devId);
return false;
}
/**
* 删除任务
*/
public boolean terminateTask(Task task) {
unlinkTaskAndVaServer(task.getUnid());
return false;
}
/**
* 截图
*/
public Object snapshot(Task task) {
return null;
}
/**
* 获取rtsp流点播地址
*/
public Object getRtspPath() {
return null;
}
/**
* 主动推送rtmp流
*/
public Object rtmp() {
return null;
}
/**
* 录像上传
*/
public Object uploadVideo() {
return null;
}
}
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!