TaskDataService.java
5.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package com.viontech.fanxing.task.service;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.StoreConfig;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.feign.OpsClient;
import com.viontech.fanxing.task.model.RuntimeConfig;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import com.viontech.fanxing.task.repository.TaskDataRedisRepository;
import com.viontech.keliu.util.JsonMessageUtil;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.redisson.api.RMap;
import org.redisson.api.RScoredSortedSet;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* .
*
* @author 谢明辉
* @date 2021/7/13
*/
@Service
public class TaskDataService {
@Resource
private RedisService redisService;
@Resource
private VAServerService vaServerService;
@Resource
private TaskDataRedisRepository taskDataRedisRepository;
@Resource
private OpsClient opsClient;
public void addTask(Task task) {
TaskData taskData = new TaskData(task);
// 获取存储配置
Long storeConfigId = task.getStoreConfigId();
JsonMessageUtil.JsonMessage<StoreConfig> storeConfigRes = opsClient.getStoreConfigById(storeConfigId);
StoreConfig storeConfigVo = (StoreConfig) storeConfigRes.getData();
if (storeConfigVo == null) {
throw new FanXingException("无法获取对应的存储配置");
}
taskData.setStoreConfig(storeConfigVo.getContent());
taskDataRedisRepository.addOrUpdateTaskData(taskData);
// 计算运行时间并生成任务
boolean success = distributeTask(taskData);
if (!success) {
throw new FanXingException("任务找不到可执行时间");
}
}
public boolean distributeTask(TaskData taskData) {
RuntimeConfig runtimeConfig = taskData.getRuntimeConfig();
String taskUnid = taskData.getTask().getUnid();
// 如果任务正在执行则不进行分配
VaServerInfo vaServerInfo = taskRunOn(taskUnid);
if (vaServerInfo != null) {
return false;
}
ImmutablePair<Long, Long> nextTime = runtimeConfig.getNextTimeOfExecutionAndTerminal();
Long nextExecuteTime = nextTime.left;
Long nextTerminateTime = nextTime.right;
if (nextExecuteTime != null) {
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
toBeExecutedTaskUnidSet.add(nextExecuteTime, taskUnid);
if (nextTerminateTime != null) {
RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet();
toBeTerminatedTaskUnidSet.add(nextTerminateTime, taskUnid);
}
return true;
} else {
return false;
}
}
public void removeTaskDataAll(String taskUnid) {
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
RScoredSortedSet<String> toBeTerminatedTaskUnidSet = redisService.getToBeTerminatedTaskUnidSet();
toBeExecutedTaskUnidSet.remove(taskUnid);
toBeTerminatedTaskUnidSet.remove(taskUnid);
taskDataRedisRepository.remove(taskUnid);
}
public VaServerInfo taskRunOn(String taskUnid) {
RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
String devId = taskVaServerMap.get(taskUnid);
return devId == null ? null : vaServerService.getVaServerRedisRepository().getVAServerInfoById(devId);
}
/**
* 解绑任务和设备的关联状态
*
* @return ImmutablePair<String, String>
* <li>left <code>taskUnid</code></li>
* <li>right <code>devId</code></li>
*/
public ImmutablePair<String, String> unlinkTaskAndVaServer(String taskUnid) {
RMap<String, String> taskVaServerMap = redisService.getTaskVaServerMap();
String devId = taskVaServerMap.get(taskUnid);
taskVaServerMap.remove(taskUnid);
return ImmutablePair.of(taskUnid, devId);
}
public void deleteTask(String taskUnid) {
boolean success = vaServerService.terminateTask(taskUnid);
if (success) {
removeTaskDataAll(taskUnid);
} else {
throw new FanXingException("failed");
}
}
public void updateTask(Task task) {
String taskUnid = task.getUnid();
VaServerInfo vaServerInfo = taskRunOn(taskUnid);
// vaServerId 为空说明任务未执行可以先删除再建立新任务
if (vaServerInfo == null) {
deleteTask(taskUnid);
addTask(task);
} else {
TaskData taskData = new TaskData(task);
// 需要更新taskData,并且向vaServer更新任务信息
taskDataRedisRepository.addOrUpdateTaskData(taskData);
vaServerService.updateTask(taskData);
}
}
public TaskDataRedisRepository getRepository() {
return taskDataRedisRepository;
}
}