Commit 44f0e791 by xmh

<feat> gateway 为接口签名做准备

<feat> redisson 分布式锁改为看门狗式
<fix> DataStatisticUtil 对任务id做null判断
<fix> 任务调度部分改动
1 parent 4729fda6
...@@ -7,9 +7,7 @@ import org.redisson.spring.data.connection.RedissonConnectionFactory; ...@@ -7,9 +7,7 @@ import org.redisson.spring.data.connection.RedissonConnectionFactory;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
...@@ -32,6 +30,7 @@ public class RedissonConfig { ...@@ -32,6 +30,7 @@ public class RedissonConfig {
public RedissonClient redisson(@Value("${vion.redisson.path}") String path) throws IOException { public RedissonClient redisson(@Value("${vion.redisson.path}") String path) throws IOException {
try (FileInputStream inputStream = new FileInputStream(path)) { try (FileInputStream inputStream = new FileInputStream(path)) {
Config config = Config.fromYAML(inputStream); Config config = Config.fromYAML(inputStream);
config.setLockWatchdogTimeout(12000);
return Redisson.create(config); return Redisson.create(config);
} }
} }
......
...@@ -40,27 +40,18 @@ public class RedisService { ...@@ -40,27 +40,18 @@ public class RedisService {
public RLock getLockMust(String lockName) { public RLock getLockMust(String lockName) {
RLock lock = redissonClient.getLock(lockName); RLock lock = redissonClient.getLock(lockName);
boolean isLock = false; lock.lock();
while (!isLock) {
try {
isLock = lock.tryLock(30, 25, TimeUnit.SECONDS);
} catch (Exception ignore) {
}
}
return lock; return lock;
} }
public RLock getLockMust(String lockName, Integer waitTime, Integer leaseTime, TimeUnit timeUnit) { public RLock tryLock(String lockName) {
RLock lock = redissonClient.getLock(lockName); RLock lock = redissonClient.getLock(lockName);
boolean isLock = false; boolean success = false;
while (!isLock) { try {
try { success = lock.tryLock(30, TimeUnit.SECONDS);
isLock = lock.tryLock(waitTime, leaseTime, timeUnit); } catch (InterruptedException ignore) {
Thread.sleep(50L);
} catch (Exception ignore) {
}
} }
return lock; return success ? lock : null;
} }
public RedissonClient getClient() { public RedissonClient getClient() {
......
...@@ -62,6 +62,9 @@ public class DataStatisticUtil { ...@@ -62,6 +62,9 @@ public class DataStatisticUtil {
} }
public void loadDataStatistic(Long taskId, String taskName, LocalDateTime dateTime, String redisKey) { public void loadDataStatistic(Long taskId, String taskName, LocalDateTime dateTime, String redisKey) {
if (taskId == null) {
return;
}
LocalDate localDate = dateTime.toLocalDate(); LocalDate localDate = dateTime.toLocalDate();
int hour = dateTime.getHour(); int hour = dateTime.getHour();
List<DataStatistic> query = jdbcTemplate.query("select * from d_data_statistic where date=? and task_id=?", new BeanPropertyRowMapper<>(DataStatistic.class), localDate, hour); List<DataStatistic> query = jdbcTemplate.query("select * from d_data_statistic where date=? and task_id=?", new BeanPropertyRowMapper<>(DataStatistic.class), localDate, hour);
......
...@@ -11,6 +11,7 @@ import org.apache.commons.lang3.StringUtils; ...@@ -11,6 +11,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.ImmutablePair;
import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.*; import org.springframework.http.*;
import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.ServerHttpResponse;
...@@ -37,7 +38,7 @@ import java.util.concurrent.Future; ...@@ -37,7 +38,7 @@ import java.util.concurrent.Future;
@Slf4j @Slf4j
@Component @Component
public class AuthorizationFilter implements GlobalFilter { public class AuthorizationFilter implements GlobalFilter, Ordered {
ExecutorService executorService = Executors.newCachedThreadPool(); ExecutorService executorService = Executors.newCachedThreadPool();
@Resource @Resource
...@@ -161,4 +162,9 @@ public class AuthorizationFilter implements GlobalFilter { ...@@ -161,4 +162,9 @@ public class AuthorizationFilter implements GlobalFilter {
} }
return null; return null;
} }
@Override
public int getOrder() {
return HIGHEST_PRECEDENCE + 101;
}
} }
package com.viontech.fanxing.forward;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* .
*
* @author 谢明辉
* @date 2022/2/17
*/
//@Component
public class CacheBodyGatewayFilter implements Ordered, GlobalFilter {
public static final String CACHE_REQUEST_BODY_OBJECT_KEY = "cachedRequestBodyObject";
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
if (exchange.getRequest().getHeaders().getContentType() == null) {
return chain.filter(exchange);
} else {
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, cachedFlux);
return chain.filter(exchange.mutate().request(mutatedRequest).build());
});
}
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
...@@ -43,7 +43,6 @@ import java.util.ArrayList; ...@@ -43,7 +43,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry; import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
...@@ -85,7 +84,10 @@ public class ExportDataJob { ...@@ -85,7 +84,10 @@ public class ExportDataJob {
@Scheduled(fixedDelay = 10000L) @Scheduled(fixedDelay = 10000L)
public void run() { public void run() {
log.info("执行数据导出定时任务开始"); log.info("执行数据导出定时任务开始");
RLock lock = redisService.getLockMust("exportDataJob", 60, 59, TimeUnit.MINUTES); RLock lock = redisService.tryLock("exportDataJob");
if (lock == null) {
return;
}
try { try {
List<ExportData> exportData = getExportDataNeedFinished(); List<ExportData> exportData = getExportDataNeedFinished();
for (ExportData item : exportData) { for (ExportData item : exportData) {
...@@ -213,8 +215,7 @@ public class ExportDataJob { ...@@ -213,8 +215,7 @@ public class ExportDataJob {
} }
private <K> ImmutableTriple<Boolean, String, Integer> export(ExportData item, Function<Integer, PageInfo<K>> getJsonData, Function<K, ExportBaseModel> build, boolean withPic, boolean withVideo, Integer nextPage, private <K> ImmutableTriple<Boolean, String, Integer> export(ExportData item, Function<Integer, PageInfo<K>> getJsonData, Function<K, ExportBaseModel> build, boolean withPic, boolean withVideo, Integer nextPage, Function<K, String> getPic, Function<K, String> getVideo) {
Function<K, String> getPic, Function<K, String> getVideo) {
if (needInterrupt(item.getId())) { if (needInterrupt(item.getId())) {
log.info("任务已被删除,打断任务"); log.info("任务已被删除,打断任务");
return ImmutableTriple.of(false, null, null); return ImmutableTriple.of(false, null, null);
...@@ -301,10 +302,7 @@ public class ExportDataJob { ...@@ -301,10 +302,7 @@ public class ExportDataJob {
int nextPage = 1; int nextPage = 1;
List<String> pathList = new ArrayList<>(); List<String> pathList = new ArrayList<>();
while (nextPage != 0) { while (nextPage != 0) {
ImmutableTriple<Boolean, String, Integer> export = export(item, ImmutableTriple<Boolean, String, Integer> export = export(item, (page) -> trafficService.getJsonData(example, page, pageSize), x -> buildTrafficData(type, x), withPic, withVideo, nextPage, TrafficVo::getPics, TrafficVo::getVideoName);
(page) -> trafficService.getJsonData(example, page, pageSize),
x -> buildTrafficData(type, x), withPic, withVideo, nextPage,
TrafficVo::getPics, TrafficVo::getVideoName);
if (export.left) { if (export.left) {
String path = export.middle; String path = export.middle;
pathList.add(path); pathList.add(path);
...@@ -334,10 +332,7 @@ public class ExportDataJob { ...@@ -334,10 +332,7 @@ public class ExportDataJob {
int nextPage = 1; int nextPage = 1;
List<String> pathList = new ArrayList<>(); List<String> pathList = new ArrayList<>();
while (nextPage != 0) { while (nextPage != 0) {
ImmutableTriple<Boolean, String, Integer> export = export(item, ImmutableTriple<Boolean, String, Integer> export = export(item, (page) -> behaviorService.getJsonData(example, page, pageSize), this::buildBehaviorModel, withPic, withVideo, nextPage, BehaviorVo::getPics, BehaviorVo::getVideo);
(page) -> behaviorService.getJsonData(example, page, pageSize),
this::buildBehaviorModel, withPic, withVideo, nextPage,
BehaviorVo::getPics, BehaviorVo::getVideo);
if (export.left) { if (export.left) {
String path = export.middle; String path = export.middle;
pathList.add(path); pathList.add(path);
......
...@@ -41,9 +41,12 @@ public class TaskRunner { ...@@ -41,9 +41,12 @@ public class TaskRunner {
@Resource @Resource
private TaskService taskService; private TaskService taskService;
@Scheduled(fixedDelay = 5000) @Scheduled(fixedDelay = 2000)
public void run() { public void run() {
RLock jobLock = redisService.getLockMust("lock:taskRunner"); RLock jobLock = redisService.tryLock("lock:taskRunner");
if (jobLock == null) {
return;
}
try { try {
terminatedTask(); terminatedTask();
executedTask(); executedTask();
...@@ -127,8 +130,8 @@ public class TaskRunner { ...@@ -127,8 +130,8 @@ public class TaskRunner {
try { try {
RScoredSortedSet<String> set = redisService.getToBeTerminatedTaskUnidSet(); RScoredSortedSet<String> set = redisService.getToBeTerminatedTaskUnidSet();
RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet(); RScoredSortedSet<String> toBeExecutedTaskUnidSet = redisService.getToBeExecutedTaskUnidSet();
// 提前 1 秒预读 // 提前 2 秒预读
Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis() + 1000, true); Collection<String> entryCollection = set.valueRange(0, true, System.currentTimeMillis() + 2000, true);
for (String taskUnid : entryCollection) { for (String taskUnid : entryCollection) {
TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid); TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid);
......
...@@ -114,6 +114,11 @@ public class VAServerService { ...@@ -114,6 +114,11 @@ public class VAServerService {
// 如果vaServerId不为空,需要终止任务 // 如果vaServerId不为空,需要终止任务
if (vaServerId != null) { if (vaServerId != null) {
VaServerInfo vaServerInfo = vaServerRedisRepository.getVAServerInfoById(vaServerId); VaServerInfo vaServerInfo = vaServerRedisRepository.getVAServerInfoById(vaServerId);
// 如果找不到对应的 vaServer , 相当于已经停止了任务
if (vaServerInfo == null) {
return true;
}
// 如果 vaServer 为离线, 那么无法停止
if (0 == vaServerInfo.getStatus()) { if (0 == vaServerInfo.getStatus()) {
return false; return false;
} }
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!