Commit 492b628b by xmh

任务管理服务:

1. <refactor> 使用 LocalCache 缓存

运维服务:
1. <feat> 添加 VideoController.overview 接口

转发服务:
1. 1. <refactor> 使用 LocalCache 缓存

commons:
1. <feat> 添加通用缓存类 CacheUtil
2. <feat> 添加缓存 aop
1 parent 6b32114f
package com.viontech.fanxing.commons.aop;
import com.viontech.fanxing.commons.base.LocalCache;
import com.viontech.fanxing.commons.utils.CacheUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
/**
* .
*
* @author 谢明辉
* @date 2021/9/9
*/
@Component
@Aspect
@Slf4j
public class LocalCacheAspect {
@Around("@annotation(localCache)")
public Object around(ProceedingJoinPoint point, LocalCache localCache) throws Throwable {
try {
ImmutablePair<Boolean, Object> cacheResult = CacheUtil.get(localCache.value());
Boolean exists = cacheResult.left;
Object o;
if (exists && cacheResult.right != null) {
o = cacheResult.right;
} else {
o = point.proceed();
CacheUtil.cache(localCache.value(), o, localCache.timeunit(), localCache.duration());
}
return o;
} catch (RuntimeException e) {
log.error("缓存调用方法:{}", point.toLongString());
log.error("缓存出错", e);
}
return point.proceed();
}
}
package com.viontech.fanxing.commons.base;
import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;
/**
* .
*
* @author 谢明辉
* @date 2021/9/9
*/
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Target(ElementType.METHOD)
public @interface LocalCache {
/**
* 指定 cache 数据的 key
*/
String value();
/**
* 指定 cache 的过期时间单位
*/
TimeUnit timeunit() default TimeUnit.MINUTES;
/**
* 指定 cache 的过期时间
*/
long duration() default -1;
}
package com.viontech.fanxing.commons.utils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* .
*
* @author 谢明辉
* @date 2021/9/9
*/
@Slf4j
public class CacheUtil {
private static final ConcurrentHashMap<String, CacheInfo> CACHE = new ConcurrentHashMap<>();
public static void cache(String key, Object o) throws Exception {
cache(key, o, TimeUnit.MINUTES, 5L);
}
public static void cache(String key, Object o, TimeUnit timeUnit, long duration) throws Exception {
cache(key, o, timeUnit, duration, null, false);
}
public static void cache(String key, Object o, TimeUnit timeUnit, long duration, Callable callable, boolean accessExpireOnAccess) throws Exception {
log.info("对数据进行缓存,key:{},缓存时长:{},单位:{}", key, duration, timeUnit.toString());
if (o == null && callable == null) {
throw new RuntimeException("数据和load方法不能都为空");
}
long expire;
if (duration == -1) {
expire = Long.MAX_VALUE;
} else {
expire = timeUnit.toMillis(duration) + System.currentTimeMillis();
}
CacheInfo cacheInfo = new CacheInfo(key, o, timeUnit, duration, callable, accessExpireOnAccess, expire);
CACHE.put(key, cacheInfo);
}
/**
* @return <li>left : 是否存在</li><li>right : 值</li>
*/
public static <T> ImmutablePair<Boolean, T> get(String key) {
// key 不存在
if (!CACHE.containsKey(key)) {
return ImmutablePair.of(false, null);
}
CacheInfo cacheInfo = CACHE.get(key);
Object o = null;
try {
o = cacheInfo.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
return ImmutablePair.of(true, o == null ? null : (T) o);
}
private static class CacheInfo {
/** 缓存Loader */
public final Callable callable;
/** 访问时是否刷新过期时间 */
public final boolean updateExpireOnAccess;
private final String key;
/** 缓存时长的单位 */
private final TimeUnit timeUnit;
/** 缓存时长 */
private final Long duration;
/** 过期时间戳 */
public Long expire;
/** 缓存数据 */
public Object data;
public CacheInfo(String key, Object data, TimeUnit timeUnit, Long duration, Callable callable, boolean updateExpireOnAccess, Long expire) {
this.key = key;
this.data = data;
this.timeUnit = timeUnit;
this.duration = duration;
this.callable = callable;
this.updateExpireOnAccess = updateExpireOnAccess;
this.expire = expire;
}
public Object get() throws Exception {
// 没过期
if (System.currentTimeMillis() < expire) {
// 数据为空 callable 不为空,则初始化
if (data == null && callable != null) {
data = callable.call();
}
// 续期处理 duration 为 -1 不用续期
if (updateExpireOnAccess && duration != -1) {
expire = System.currentTimeMillis() + timeUnit.toMillis(duration);
}
log.debug("缓存命中:{}", key);
return data;
}
// 过期处理
else {
if (callable != null) {
log.debug("{} 缓存过期,调用 Loader 重新缓存", key);
data = callable.call();
expire = System.currentTimeMillis() + timeUnit.toMillis(duration);
return data;
} else {
return null;
}
}
}
}
}
......@@ -37,6 +37,10 @@
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
......
package com.viontech.fanxing.forward.util;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.viontech.fanxing.commons.base.LocalCache;
import com.viontech.fanxing.commons.model.Forward;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.forward.feign.OpsFeignClient;
......@@ -11,11 +10,9 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
......@@ -29,58 +26,40 @@ import java.util.stream.Collectors;
@Slf4j
public class CacheUtils {
private static final Cache<Object, Object> CACHE = CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(2)).build();
@Resource
private TaskFeignClient taskFeignClient;
@Resource
private OpsFeignClient opsFeignClient;
@LocalCache(value = "task_map", duration = 1)
public synchronized Map<String, Task> getTaskMap() {
Map<String, Task> result;
JsonMessageUtil.JsonMessage<List<Task>> response = null;
try {
result = (Map<String, Task>) CACHE.get("task_map", () -> {
JsonMessageUtil.JsonMessage<List<Task>> response = null;
try {
response = taskFeignClient.getAllTask();
} catch (Exception e) {
log.info("获取 task_map 失败:", e);
}
if (response != null && response.getData() != null) {
List<Task> data = (List<Task>) response.getData();
return data.stream().collect(Collectors.toMap(Task::getUnid, x -> x, (x, y) -> x));
} else {
return Collections.emptyMap();
}
});
} catch (ExecutionException e) {
log.error("", e);
result = Collections.emptyMap();
response = taskFeignClient.getAllTask();
} catch (Exception e) {
log.info("获取 task_map 失败:", e);
}
if (response != null && response.getData() != null) {
List<Task> data = (List<Task>) response.getData();
return data.stream().collect(Collectors.toMap(Task::getUnid, x -> x, (x, y) -> x));
} else {
return Collections.emptyMap();
}
return result;
}
@LocalCache(value = "forward_list", duration = 5)
public synchronized List<Forward> getAllForward() {
List<Forward> result;
JsonMessageUtil.JsonMessage<Forward> response = null;
try {
result = (List<Forward>) CACHE.get("forward_list", () -> {
JsonMessageUtil.JsonMessage<Forward> response = null;
try {
response = opsFeignClient.getForwards();
} catch (Exception e) {
log.info("获取 forward_list 失败:", e);
}
if (response != null && response.getData() != null) {
return response.getData();
} else {
return Collections.emptyList();
}
});
} catch (ExecutionException e) {
log.error("", e);
result = Collections.emptyList();
response = opsFeignClient.getForwards();
} catch (Exception e) {
log.info("获取 forward_list 失败:", e);
}
if (response != null && response.getData() != null) {
return (List<Forward>) response.getData();
} else {
return Collections.emptyList();
}
return result;
}
......
package com.viontech.fanxing.ops.controller.main;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.ops.service.main.VideoService;
import com.viontech.keliu.util.JsonMessageUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
......@@ -33,4 +31,10 @@ public class VideoController {
return JsonMessageUtil.getSuccessJsonMsg("success");
}
@GetMapping("/overview")
public Object overview() {
JSONObject res = videoService.overview();
return JsonMessageUtil.getSuccessJsonMsg(res);
}
}
package com.viontech.fanxing.ops.service.main;
import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.base.LocalCache;
import com.viontech.fanxing.commons.config.VionConfig;
import com.viontech.fanxing.commons.constant.ChannelType;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.commons.model.ChannelExample;
import com.viontech.fanxing.commons.model.ChannelTag;
import com.viontech.fanxing.ops.service.adapter.ChannelService;
import com.viontech.fanxing.ops.service.adapter.ChannelTagService;
......@@ -18,8 +21,10 @@ import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.util.IntSummaryStatistics;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
/**
* .
......@@ -83,4 +88,22 @@ public class VideoService {
}
}
}
/**
* 获取录像文件头部概览
*
* @return
*/
@LocalCache(value = "video_overView", duration = 5)
public JSONObject overview() {
ChannelExample channelExample = new ChannelExample();
channelExample.createCriteria().andTypeEqualTo(ChannelType.FILE.value);
channelExample.createColumns().hasPortColumn();
List<Channel> channels = channelService.selectByExample(channelExample);
IntSummaryStatistics summary = channels.stream().map(Channel::getPort).collect(Collectors.summarizingInt(x -> x));
JSONObject res = new JSONObject();
res.put("fileCount", summary.getCount());
res.put("size", summary.getSum());
return res;
}
}
package com.viontech.fanxing.task.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.viontech.fanxing.commons.base.BaseMapper;
import com.viontech.fanxing.commons.base.BaseServiceImpl;
import com.viontech.fanxing.commons.base.LocalCache;
import com.viontech.fanxing.commons.constant.TaskStatus;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.TaskExample;
import com.viontech.fanxing.commons.vo.TaskVo;
......@@ -22,10 +19,8 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
@Service
public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskService {
......@@ -37,32 +32,6 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
private TaskDataService taskDataService;
@Resource
private VAServerService vaServerService;
private final LoadingCache<String, JSONObject> OVERVIEW_CACHE = CacheBuilder.newBuilder()
.expireAfterWrite(Duration.ofMinutes(2))
.build(new CacheLoader<String, JSONObject>() {
@Override
public JSONObject load(String key) throws Exception {
List<Task> tasks = selectByExample(new TaskExample());
int resourceCount = 0;
int usedResourceCount = 0;
long taskCount = tasks.size();
long runningTaskCount = tasks.stream().filter(x -> TaskStatus.RUNNING.valEqual(x.getStatus())).count();
RMap<String, VaServerInfo> map = vaServerService.getVaServerRedisRepository().getVaServerInfoMap();
Collection<VaServerInfo> vaServerInfos = map.readAllValues();
for (VaServerInfo info : vaServerInfos) {
resourceCount += info.getVideoResource();
usedResourceCount += (info.getVideoResource() - info.getAvailableResources());
}
JSONObject result = new JSONObject();
result.put("resourceCount", resourceCount);
result.put("usedResourceCount", usedResourceCount);
result.put("taskCount", taskCount);
result.put("runningTaskCount", runningTaskCount);
return result;
}
});
@Override
public BaseMapper<Task> getMapper() {
......@@ -129,11 +98,26 @@ public class TaskServiceImpl extends BaseServiceImpl<Task> implements TaskServic
}
@Override
@LocalCache(value = "task_overview", duration = 3)
public JSONObject overview() {
try {
return OVERVIEW_CACHE.get("");
} catch (ExecutionException e) {
throw new FanXingException(e);
List<Task> tasks = selectByExample(new TaskExample());
int resourceCount = 0;
int usedResourceCount = 0;
long taskCount = tasks.size();
long runningTaskCount = tasks.stream().filter(x -> TaskStatus.RUNNING.valEqual(x.getStatus())).count();
RMap<String, VaServerInfo> map = vaServerService.getVaServerRedisRepository().getVaServerInfoMap();
Collection<VaServerInfo> vaServerInfos = map.readAllValues();
for (VaServerInfo info : vaServerInfos) {
resourceCount += info.getVideoResource();
usedResourceCount += (info.getVideoResource() - info.getAvailableResources());
}
JSONObject result = new JSONObject();
result.put("resourceCount", resourceCount);
result.put("usedResourceCount", usedResourceCount);
result.put("taskCount", taskCount);
result.put("runningTaskCount", runningTaskCount);
return result;
}
}
\ No newline at end of file
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!