Commit 767d0475 by 毛树良

[chg]:增加空topic管理

1 parent 05d9b42e
...@@ -2,7 +2,9 @@ package com.viontech.keliu; ...@@ -2,7 +2,9 @@ package com.viontech.keliu;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@SpringBootApplication @SpringBootApplication
public class PassengerDataStorageApplication { public class PassengerDataStorageApplication {
......
package com.viontech.keliu.cron;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.viontech.keliu.service.KafkaTopicService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.Date;
/**
* kafka topic 清理任务
* @author: msl
* Date: 2025/5/30
*/
@Slf4j
@Component
public class TopicCleanupJob {
@Value("${vion.topicCleanup.preDay:7}")
private Integer preDay;
@Resource
private KafkaTopicService kafkaTopicService;
@Scheduled(cron = "${vion.topicCleanup.cron:-}")
public void doTopicCleanup() {
DateTime preDate = DateUtil.offsetDay(new Date(), preDay * (-1));
DateTime countDate = DateUtil.beginOfDay(preDate);
LocalDate maxCountDate = countDate.toInstant().atZone(ZoneId.of("Asia/Shanghai")).toLocalDate();
log.info("doTopicCleanup execute start, preDay: {}, countDate: {}", preDay, maxCountDate);
try {
kafkaTopicService.deleteEmptyTopicsByTopicDate("Mall_FaceCapture_.*", maxCountDate);
} catch (Exception e) {
log.error("doTopicCleanup[Mall_FaceCapture_].Exception={}", e.getMessage(), e);
}
log.info("doTopicCleanup execute[Mall_FaceCapture_] end, preDay: {}, countDate: {}", preDay, maxCountDate);
try {
kafkaTopicService.deleteEmptyTopicsByTopicDate("Mall_PersonLabel_.*", maxCountDate);
} catch (Exception e) {
log.error("doTopicCleanup[Mall_PersonLabel_].Exception={}", e.getMessage(), e);
}
log.info("doTopicCleanup execute[Mall_PersonLabel_] end, preDay: {}, countDate: {}", preDay, maxCountDate);
log.info("doTopicCleanup execute end, preDay: {}, countDate: {}", preDay, maxCountDate);
}
}
\ No newline at end of file \ No newline at end of file
package com.viontech.keliu.service;
import cn.hutool.core.collection.CollectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Service;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* @author: msl
* Date: 2025/5/30
*/
@Service
@Slf4j
public class KafkaTopicService {
private final AdminClient adminClient;
public KafkaTopicService(KafkaAdmin kafkaAdmin) {
this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
/**
* 首先匹配指定正则表达式的topic,然后根据topic中日期进行过滤,最后删除匹配的topic,如果topic中没有数据,则删除该topic
* @param topicPattern topic正则表达式
* @param maxDate 最大有效日期
* @throws ExecutionException
* @throws InterruptedException
*/
public void deleteEmptyTopicsByTopicDate(String topicPattern, LocalDate maxDate) throws ExecutionException, InterruptedException {
// 1. 获取匹配的Topic列表
Pattern pattern = Pattern.compile(topicPattern);
Set<String> topics = adminClient.listTopics().names().get().stream()
.filter(topic -> pattern.matcher(topic).matches())
.collect(Collectors.toSet());
// 2.校验topic中的日期
topics = topics.stream().filter(d -> expireTopic(d, maxDate)).collect(Collectors.toSet());
if (CollectionUtil.isEmpty(topics)) {
return;
}
// 3. 获取Topic描述
Map<String, TopicDescription> topicDescriptions = adminClient.describeTopics(topics).all().get();
// 4. 检查并删除空Topic
List<String> topicsToDelete = new ArrayList<>();
for (String topic : topics) {
if (isTopicEmpty(topic, topicDescriptions.get(topic))) {
topicsToDelete.add(topic);
}
}
// 4. 执行删除操作
if (CollectionUtil.isNotEmpty(topicsToDelete)) {
DeleteTopicsResult deleteResult = adminClient.deleteTopics(topicsToDelete);
deleteResult.all().get();
log.info("已删除空Topic: {}", topicsToDelete);
}
}
/**
* 删除匹配指定正则表达式的空Topic
*/
public void deleteEmptyTopics(String topicPattern) throws ExecutionException, InterruptedException {
// 1. 获取匹配的Topic列表
Pattern pattern = Pattern.compile(topicPattern);
Set<String> topics = adminClient.listTopics().names().get().stream()
.filter(topic -> pattern.matcher(topic).matches())
.collect(Collectors.toSet());
if (CollectionUtil.isEmpty(topics)) {
return;
}
// 2. 获取Topic描述
Map<String, TopicDescription> topicDescriptions = adminClient.describeTopics(topics).all().get();
// 3. 检查并删除空Topic
List<String> topicsToDelete = new ArrayList<>();
for (String topic : topics) {
if (isTopicEmpty(topic, topicDescriptions.get(topic))) {
topicsToDelete.add(topic);
}
}
// 4. 执行删除操作
if (CollectionUtil.isNotEmpty(topicsToDelete)) {
DeleteTopicsResult deleteResult = adminClient.deleteTopics(topicsToDelete);
deleteResult.all().get();
log.info("已删除空Topic: {}", topicsToDelete);
}
}
/**
* 检查Topic是否为空
*/
private boolean isTopicEmpty(String topic, TopicDescription description)
throws ExecutionException, InterruptedException {
// 1. 创建Topic分区列表
List<TopicPartition> partitions = description.partitions().stream()
.map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
// 2. 获取最早偏移量
Map<TopicPartition, OffsetSpec> startOffsetsMap = partitions.stream()
.collect(Collectors.toMap(p -> p, p -> OffsetSpec.earliest()));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> startOffsets =
adminClient.listOffsets(startOffsetsMap).all().get();
// 3. 获取最新偏移量
Map<TopicPartition, OffsetSpec> endOffsetsMap = partitions.stream()
.collect(Collectors.toMap(p -> p, p -> OffsetSpec.latest()));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
adminClient.listOffsets(endOffsetsMap).all().get();
// 4. 检查所有分区是否为空
return partitions.stream().allMatch(partition ->
startOffsets.get(partition).offset() == endOffsets.get(partition).offset()
);
}
/**
* 过期Topic
* @param topic
* @param maxDate
* @return true:过期,false:未过期
*/
private boolean expireTopic(String topic, LocalDate maxDate) {
LocalDate topicDate = com.viontech.keliu.utils.DateUtil.extractDate(topic);
if (topicDate == null) {
return false;
}
int comparison = topicDate.compareTo(maxDate);
if (comparison < 0) {
return true;
} else {
return false;
}
}
}
\ No newline at end of file \ No newline at end of file
...@@ -3,14 +3,21 @@ package com.viontech.keliu.utils; ...@@ -3,14 +3,21 @@ package com.viontech.keliu.utils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Date; import java.util.Date;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class DateUtil { public class DateUtil {
private static final DateTimeFormatter YYYYMMDD_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("Asia/Shanghai")); private static final DateTimeFormatter YYYYMMDD_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("Asia/Shanghai"));
private static final DateTimeFormatter YYYYMMDD_FORMATTER2 = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneId.of("Asia/Shanghai"));
private static final DateTimeFormatter YYYYMMDDHHMM_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm").withZone(ZoneId.of("Asia/Shanghai")); private static final DateTimeFormatter YYYYMMDDHHMM_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm").withZone(ZoneId.of("Asia/Shanghai"));
// 正则表达式匹配8位连续数字(YYYYMMDD格式)
private static final Pattern YYYYMMDD_PATTERN = Pattern.compile("(\\d{8})");
/** /**
* *
...@@ -49,4 +56,21 @@ public class DateUtil { ...@@ -49,4 +56,21 @@ public class DateUtil {
SimpleDateFormat sdf = new SimpleDateFormat(format); SimpleDateFormat sdf = new SimpleDateFormat(format);
return sdf.format(date); return sdf.format(date);
} }
/**
* 从字符串中提取日期(YYYYMMDD)
* @param input 输入字符串
* @return 包含日期的Optional对象(为空表示未找到或格式无效)
*/
public static LocalDate extractDate(String input) {
try {
Matcher matcher = YYYYMMDD_PATTERN.matcher(input);
if (matcher.find()) {
String dateStr = matcher.group(1);
return Optional.of(LocalDate.parse(dateStr, YYYYMMDD_FORMATTER2)).get();
}
} catch (Exception e) {
}
return null;
}
} }
...@@ -4,7 +4,7 @@ spring.application.name=VVAS-DataCenter-DBWriter ...@@ -4,7 +4,7 @@ spring.application.name=VVAS-DataCenter-DBWriter
# Db # Db
spring.datasource.driver-class-name=org.postgresql.Driver spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://192.168.1.55:5432/VionCount spring.datasource.url=jdbc:postgresql://192.168.1.64:5432/VionCount
spring.datasource.username=postgres spring.datasource.username=postgres
spring.datasource.password=vion spring.datasource.password=vion
spring.datasource.hikari.connection-timeout=30000 spring.datasource.hikari.connection-timeout=30000
...@@ -14,7 +14,7 @@ spring.datasource.hikari.minimum-idle=5 ...@@ -14,7 +14,7 @@ spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.maximum-pool-size=50 spring.datasource.hikari.maximum-pool-size=50
# redis # redis
spring.redis.host=192.168.1.55 spring.redis.host=192.168.1.64
spring.redis.port=6379 spring.redis.port=6379
spring.redis.password=vionredis spring.redis.password=vionredis
spring.redis.database=0 spring.redis.database=0
...@@ -22,7 +22,7 @@ spring.redis.lettuce.pool.min-idle=1 ...@@ -22,7 +22,7 @@ spring.redis.lettuce.pool.min-idle=1
spring.redis.lettuce.pool.max-active=10 spring.redis.lettuce.pool.max-active=10
# kafka # kafka
spring.kafka.bootstrap-servers=192.168.1.55:9092 spring.kafka.bootstrap-servers=192.168.1.64:9092
spring.kafka.properties.spring.json.add.type.headers=false spring.kafka.properties.spring.json.add.type.headers=false
spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
...@@ -44,6 +44,10 @@ spring.kafka.listener.type=batch ...@@ -44,6 +44,10 @@ spring.kafka.listener.type=batch
spring.jackson.time-zone=Asia/Shanghai spring.jackson.time-zone=Asia/Shanghai
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
# cron
vion.topicCleanup.preDay=7
vion.topicCleanup.cron=0 0 1 * * ?
# topic consumer config # topic consumer config
# FaceCapture # FaceCapture
vion.consumer.mallFaceCapture.autoStartup=true vion.consumer.mallFaceCapture.autoStartup=true
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!