KafkaTopicService.java
5.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package com.viontech.keliu.service;
import cn.hutool.core.collection.CollectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Service;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* @author: msl
* Date: 2025/5/30
*/
@Service
@Slf4j
public class KafkaTopicService {
private final AdminClient adminClient;
public KafkaTopicService(KafkaAdmin kafkaAdmin) {
this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
/**
* 首先匹配指定正则表达式的topic,然后根据topic中日期进行过滤,最后删除匹配的topic,如果topic中没有数据,则删除该topic
* @param topicPattern topic正则表达式
* @param maxDate 最大有效日期
* @throws ExecutionException
* @throws InterruptedException
*/
public void deleteEmptyTopicsByTopicDate(String topicPattern, LocalDate maxDate) throws ExecutionException, InterruptedException {
// 1. 获取匹配的Topic列表
Pattern pattern = Pattern.compile(topicPattern);
Set<String> topics = adminClient.listTopics().names().get().stream()
.filter(topic -> pattern.matcher(topic).matches())
.collect(Collectors.toSet());
// 2.校验topic中的日期
topics = topics.stream().filter(d -> expireTopic(d, maxDate)).collect(Collectors.toSet());
if (CollectionUtil.isEmpty(topics)) {
return;
}
// 3. 获取Topic描述
Map<String, TopicDescription> topicDescriptions = adminClient.describeTopics(topics).all().get();
// 4. 检查并删除空Topic
List<String> topicsToDelete = new ArrayList<>();
for (String topic : topics) {
if (isTopicEmpty(topic, topicDescriptions.get(topic))) {
topicsToDelete.add(topic);
}
}
// 4. 执行删除操作
if (CollectionUtil.isNotEmpty(topicsToDelete)) {
DeleteTopicsResult deleteResult = adminClient.deleteTopics(topicsToDelete);
deleteResult.all().get();
log.info("已删除空Topic: {}", topicsToDelete);
}
}
/**
* 删除匹配指定正则表达式的空Topic
*/
public void deleteEmptyTopics(String topicPattern) throws ExecutionException, InterruptedException {
// 1. 获取匹配的Topic列表
Pattern pattern = Pattern.compile(topicPattern);
Set<String> topics = adminClient.listTopics().names().get().stream()
.filter(topic -> pattern.matcher(topic).matches())
.collect(Collectors.toSet());
if (CollectionUtil.isEmpty(topics)) {
return;
}
// 2. 获取Topic描述
Map<String, TopicDescription> topicDescriptions = adminClient.describeTopics(topics).all().get();
// 3. 检查并删除空Topic
List<String> topicsToDelete = new ArrayList<>();
for (String topic : topics) {
if (isTopicEmpty(topic, topicDescriptions.get(topic))) {
topicsToDelete.add(topic);
}
}
// 4. 执行删除操作
if (CollectionUtil.isNotEmpty(topicsToDelete)) {
DeleteTopicsResult deleteResult = adminClient.deleteTopics(topicsToDelete);
deleteResult.all().get();
log.info("已删除空Topic: {}", topicsToDelete);
}
}
/**
* 检查Topic是否为空
*/
private boolean isTopicEmpty(String topic, TopicDescription description)
throws ExecutionException, InterruptedException {
// 1. 创建Topic分区列表
List<TopicPartition> partitions = description.partitions().stream()
.map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
// 2. 获取最早偏移量
Map<TopicPartition, OffsetSpec> startOffsetsMap = partitions.stream()
.collect(Collectors.toMap(p -> p, p -> OffsetSpec.earliest()));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> startOffsets =
adminClient.listOffsets(startOffsetsMap).all().get();
// 3. 获取最新偏移量
Map<TopicPartition, OffsetSpec> endOffsetsMap = partitions.stream()
.collect(Collectors.toMap(p -> p, p -> OffsetSpec.latest()));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
adminClient.listOffsets(endOffsetsMap).all().get();
// 4. 检查所有分区是否为空
return partitions.stream().allMatch(partition ->
startOffsets.get(partition).offset() == endOffsets.get(partition).offset()
);
}
/**
* 过期Topic
* @param topic
* @param maxDate
* @return true:过期,false:未过期
*/
private boolean expireTopic(String topic, LocalDate maxDate) {
LocalDate topicDate = com.viontech.keliu.utils.DateUtil.extractDate(topic);
if (topicDate == null) {
return false;
}
int comparison = topicDate.compareTo(maxDate);
if (comparison < 0) {
return true;
} else {
return false;
}
}
}