TaskDataService.java
6.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package com.viontech.fanxing.task.service;
import com.viontech.fanxing.commons.constant.ChannelType;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.runtime.RuntimeConfig;
import com.viontech.fanxing.task.repository.TaskDataRedisRepository;
import com.viontech.fanxing.task.service.adapter.TaskService;
import com.viontech.fanxing.task.utils.TaskUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* .
*
* @author 谢明辉
* @date 2021/7/13
*/
@Service
@Slf4j
public class TaskDataService {
@Resource
private RedisService redisService;
@Resource
private VAServerService vaServerService;
@Resource
private TaskDataRedisRepository taskDataRedisRepository;
@Resource
private OpsClientService opsClientService;
@Resource
private TaskService taskService;
public void addTask(Task task) {
TaskData taskData = buildTaskData(task);
TaskUtils.INSTANCE.checkRuntimeConf(taskData, vaServerService, this);
taskDataRedisRepository.addOrUpdateTaskData(taskData);
// 计算运行时间并生成任务
boolean success = distributeTask(taskData);
if (!success) {
throw new FanXingException("任务找不到可执行时间");
}
}
public boolean distributeTask(TaskData taskData) {
RuntimeConfig runtimeConfig = taskData.getRuntimeConfig();
String taskUnid = taskData.getTask().getUnid();
// 如果任务正在执行则不进行分配
VaServerInfo vaServerInfo = taskRunOn(taskUnid);
if (vaServerInfo != null) {
return true;
}
ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
log.info("部署任务[{}],运行时间:[{}]", taskData.getTask().getName(), nextTime.toString());
Long nextExecuteTime = nextTime.left;
Long nextTerminateTime = nextTime.right;
if (nextExecuteTime != null) {
// 需要立即启动的修改状态为 启动中, 需要过一段时间运行的修改状态为 待时
if (nextExecuteTime > System.currentTimeMillis()) {
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.STAY.val);
} else {
taskService.updateStatus(taskData.getTask().getId(), TaskStatus.STARTING.val);
}
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
toBeExecutedTaskUnidSet.add(nextExecuteTime, taskUnid);
if (nextTerminateTime != null) {
RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet();
boolean add = toBeTerminatedTaskUnidSet.add(nextTerminateTime, taskUnid);
}
return true;
} else {
return false;
}
}
public void removeTaskDataAll(String taskUnid) {
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet();
toBeExecutedTaskUnidSet.remove(taskUnid);
toBeTerminatedTaskUnidSet.remove(taskUnid);
taskDataRedisRepository.remove(taskUnid);
}
public VaServerInfo taskRunOn(String taskUnid) {
RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
String devId = taskVaServerMap.get(taskUnid);
return devId == null ? null : vaServerService.getVaServerRedisRepository().getVAServerInfoById(devId);
}
/**
* 解绑任务和设备的关联状态
*
* @return ImmutablePair<String, String>
* <li>left <code>taskUnid</code></li>
* <li>right <code>devId</code></li>
*/
public ImmutablePair<String, String> unlinkTaskAndVaServer(String taskUnid) {
RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
String devId = taskVaServerMap.get(taskUnid);
taskVaServerMap.remove(taskUnid);
return ImmutablePair.of(taskUnid, devId);
}
public void deleteTask(String taskUnid) {
boolean success = vaServerService.terminateTask(taskUnid);
if (success) {
removeTaskDataAll(taskUnid);
} else {
throw new FanXingException("失败");
}
}
public void updateTask(Task task, boolean rebuild) {
String taskUnid = task.getUnid();
VaServerInfo vaServerInfo = taskRunOn(taskUnid);
// vaServerId 为空说明任务未执行可以先删除再建立新任务
if (vaServerInfo == null || rebuild) {
deleteTask(taskUnid);
addTask(task);
} else if (vaServerInfo.getStatus() == 0) {
throw new FanXingException("设备离线");
} else {
TaskData taskData = buildTaskData(task);
// 需要更新taskData,并且向vaServer更新任务信息
taskDataRedisRepository.addOrUpdateTaskData(taskData);
vaServerService.updateTask(taskData);
}
}
private TaskData buildTaskData(Task task) {
TaskData taskData = new TaskData(task);
// 获取存储配置
Long storeConfigId = task.getStoreConfigId();
String config = opsClientService.getStoreConfigById(storeConfigId);
if (config == null) {
throw new FanXingException("无法获取对应的存储配置");
}
taskData.setStoreConfig(config);
taskData.setStoreConfigId(task.getStoreConfigId());
if (taskData.getTask().getStreamType().equals(ChannelType.STREAM_VIDEO_CLOUD.value)) {
Channel channel = opsClientService.getChannelByChannelUnid(taskData.getTask().getChannelUnid());
String deviceUnid = channel.getDeviceUnid();
taskData.setDeviceUnid(deviceUnid);
}
return taskData;
}
public TaskDataRedisRepository getRepository() {
return taskDataRedisRepository;
}
public TaskService getTaskService() {
return taskService;
}
}