Commit 2b39aaf6 by xmh

<fix> 任务处理添加分布式锁,防止互相干扰

1 parent 3778ad59
package com.viontech.fanxing.task.repository; package com.viontech.fanxing.task.repository;
import com.viontech.fanxing.commons.constant.RedisKeys; import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.commons.service.RedisService; import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.model.TaskData;
import org.redisson.api.RLock;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
...@@ -37,4 +38,8 @@ public class TaskDataRedisRepository { ...@@ -37,4 +38,8 @@ public class TaskDataRedisRepository {
return taskDataRMap.remove(unid); return taskDataRMap.remove(unid);
} }
public RLock getTaskLock(String unid) {
return redisService.getLockMust("lock:task:" + unid);
}
} }
...@@ -54,66 +54,70 @@ public class TaskRunner { ...@@ -54,66 +54,70 @@ public class TaskRunner {
Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true); Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis(), true);
for (String taskUnid : entryCollection) { for (String taskUnid : entryCollection) {
TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid); RLock taskLock = taskDataService.getRepository().getTaskLock(taskUnid);
if (taskData == null) { try {
log.info("找不到对应任务,移除所有:{}", taskUnid); TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid);
taskDataService.removeTaskDataAll(taskUnid); if (taskData == null) {
continue; log.info("找不到对应任务,移除所有:{}", taskUnid);
} taskDataService.removeTaskDataAll(taskUnid);
log.info("开始任务 [{}] [{}]", taskData.getTask().getName(), taskUnid);
Task task = taskData.getTask();
String taskVaType = task.getVaType();
Float resourceNeed = task.getResourceNeed();
Collection<String> vaServerIdSet = vaServerMap.keySet();
// todo 暂时先找有可用资源的vaserver,以后再进行算法优化
VaServerInfo server = null;
for (String devId : vaServerIdSet) {
VaServerInfo temp = vaServerMap.get(devId);
// 不在线
if (temp.getStatus() == 0) {
continue; continue;
} }
// 指定了VAServer log.info("开始任务 [{}] [{}]", taskData.getTask().getName(), taskUnid);
if (StringUtils.isNotBlank(taskVaType)) { Task task = taskData.getTask();
if (!taskVaType.equals(temp.getPlatType())) { String taskVaType = task.getVaType();
Float resourceNeed = task.getResourceNeed();
Collection<String> vaServerIdSet = vaServerMap.keySet();
// todo 暂时先找有可用资源的vaserver,以后再进行算法优化
VaServerInfo server = null;
for (String devId : vaServerIdSet) {
VaServerInfo temp = vaServerMap.get(devId);
// 不在线
if (temp.getStatus() == 0) {
continue; continue;
} }
} // 指定了VAServer
if (temp.getAvailableResources() >= resourceNeed) { if (StringUtils.isNotBlank(taskVaType)) {
devLock = vaServerService.getVaServerRedisRepository().getDevLock(devId); if (!taskVaType.equals(temp.getPlatType())) {
temp = vaServerMap.get(devId); continue;
}
}
if (temp.getAvailableResources() >= resourceNeed) { if (temp.getAvailableResources() >= resourceNeed) {
server = temp; devLock = vaServerService.getVaServerRedisRepository().getDevLock(devId);
break; temp = vaServerMap.get(devId);
} else { if (temp.getAvailableResources() >= resourceNeed) {
devLock.forceUnlock(); server = temp;
devLock = null; break;
} else {
devLock.forceUnlock();
devLock = null;
}
} }
} }
}
// 找不到可以用来执行的设备,需要修改状态
if (server == null) {
log.debug("找不到可用的 VAServer,跳过:{}", taskUnid);
taskService.updateStatus(task.getId(), TaskStatus.CAN_NOT_RUN.val);
continue;
}
try { // 找不到可以用来执行的设备,需要修改状态
log.info("开始下发任务:[{}]", taskData.getTask().getName()); if (server == null) {
vaServerService.executeTask(taskData, server); log.debug("找不到可用的 VAServer,跳过:{}", taskUnid);
} catch (Exception e) { taskService.updateStatus(task.getId(), TaskStatus.CAN_NOT_RUN.val);
log.error("下发任务失败", e); continue;
taskService.updateStatus(task.getId(), TaskStatus.CAN_NOT_RUN.val); }
continue;
}
// 修改任务状态 try {
taskService.updateStatus(task.getId(), TaskStatus.RUNNING.val); log.info("开始下发任务:[{}]", taskData.getTask().getName());
// 移除任务 vaServerService.executeTask(taskData, server);
set.remove(taskUnid); } catch (Exception e) {
log.error("下发任务失败", e);
taskService.updateStatus(task.getId(), TaskStatus.CAN_NOT_RUN.val);
continue;
}
// 修改任务状态
taskService.updateStatus(task.getId(), TaskStatus.RUNNING.val);
// 移除任务
set.remove(taskUnid);
} finally {
taskLock.forceUnlock();
}
} }
} catch (Exception e) { } catch (Exception e) {
log.error("", e); log.error("", e);
......
...@@ -3,6 +3,7 @@ package com.viontech.fanxing.task.service; ...@@ -3,6 +3,7 @@ package com.viontech.fanxing.task.service;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.config.VionConfig; import com.viontech.fanxing.commons.config.VionConfig;
import com.viontech.fanxing.commons.constant.RedisKeys; import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.exception.FanXingException; import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Task; import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.service.RedisService; import com.viontech.fanxing.commons.service.RedisService;
...@@ -177,6 +178,9 @@ public class VAServerService { ...@@ -177,6 +178,9 @@ public class VAServerService {
if (taskData == null) { if (taskData == null) {
throw new FanXingException("找不到对应的任务"); throw new FanXingException("找不到对应的任务");
} }
if (!TaskStatus.RUNNING.valEqual(taskData.getTask().getStatus())) {
throw new FanXingException("任务不在运行状态");
}
VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid); VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
if (vaServerInfo != null) { if (vaServerInfo != null) {
if (vaServerInfo.getStatus() == 0) { if (vaServerInfo.getStatus() == 0) {
......
...@@ -26,6 +26,7 @@ import com.viontech.fanxing.task.service.VAServerService; ...@@ -26,6 +26,7 @@ import com.viontech.fanxing.task.service.VAServerService;
import com.viontech.fanxing.task.service.adapter.TaskService; import com.viontech.fanxing.task.service.adapter.TaskService;
import com.viontech.fanxing.task.utils.SceneUtils; import com.viontech.fanxing.task.utils.SceneUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RLock;
import org.redisson.api.RMap; import org.redisson.api.RMap;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
...@@ -179,8 +180,13 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic ...@@ -179,8 +180,13 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void removeTask(Long id) { public void removeTask(Long id) {
Task task = selectByPrimaryKey(id); Task task = selectByPrimaryKey(id);
taskDataService.deleteTask(task.getUnid()); RLock taskLock = taskDataService.getRepository().getTaskLock(task.getUnid());
deleteByPrimaryKey(id); try {
taskDataService.deleteTask(task.getUnid());
deleteByPrimaryKey(id);
} finally {
taskLock.forceUnlock();
}
} }
/** /**
...@@ -208,9 +214,16 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic ...@@ -208,9 +214,16 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
if (task.getStoreConfigId() == null) { if (task.getStoreConfigId() == null) {
throw new IllegalArgumentException("存储配置为空,无法执行"); throw new IllegalArgumentException("存储配置为空,无法执行");
} }
updateStatus(id, TaskStatus.PAUSE.val);
taskDataService.addTask(task); RLock taskLock = taskDataService.getRepository().getTaskLock(task.getUnid());
opsClientService.addLog("启动任务:" + task.getName()); try {
updateStatus(id, TaskStatus.PAUSE.val);
taskDataService.addTask(task);
opsClientService.addLog("启动任务:" + task.getName());
} finally {
taskLock.forceUnlock();
}
} }
/** /**
...@@ -220,9 +233,14 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic ...@@ -220,9 +233,14 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
@Override @Override
public void stopTask(Long id) { public void stopTask(Long id) {
Task task = selectByPrimaryKey(id); Task task = selectByPrimaryKey(id);
taskDataService.deleteTask(task.getUnid()); RLock taskLock = taskDataService.getRepository().getTaskLock(task.getUnid());
updateStatus(id, TaskStatus.PAUSE.val); try {
opsClientService.addLog("停止任务:" + task.getName()); taskDataService.deleteTask(task.getUnid());
updateStatus(id, TaskStatus.PAUSE.val);
opsClientService.addLog("停止任务:" + task.getName());
} finally {
taskLock.forceUnlock();
}
} }
@Override @Override
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!