KafkaTopicService.java 5.69 KB
package com.viontech.keliu.service;

import cn.hutool.core.collection.CollectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Service;

import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
 * @author: msl
 * Date: 2025/5/30
 */
@Service
@Slf4j
public class KafkaTopicService {

    private final AdminClient adminClient;

    public KafkaTopicService(KafkaAdmin kafkaAdmin) {
        this.adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }

    /**
     * 首先匹配指定正则表达式的topic,然后根据topic中日期进行过滤,最后删除匹配的topic,如果topic中没有数据,则删除该topic
     * @param topicPattern topic正则表达式
     * @param maxDate 最大有效日期
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public void deleteEmptyTopicsByTopicDate(String topicPattern, LocalDate maxDate) throws ExecutionException, InterruptedException {
        // 1. 获取匹配的Topic列表
        Pattern pattern = Pattern.compile(topicPattern);
        Set<String> topics = adminClient.listTopics().names().get().stream()
                .filter(topic -> pattern.matcher(topic).matches())
                .collect(Collectors.toSet());
        // 2.校验topic中的日期
        topics = topics.stream().filter(d -> expireTopic(d, maxDate)).collect(Collectors.toSet());
        if (CollectionUtil.isEmpty(topics)) {
            return;
        }

        // 3. 获取Topic描述
        Map<String, TopicDescription> topicDescriptions = adminClient.describeTopics(topics).all().get();

        // 4. 检查并删除空Topic
        List<String> topicsToDelete = new ArrayList<>();
        for (String topic : topics) {
            if (isTopicEmpty(topic, topicDescriptions.get(topic))) {
                topicsToDelete.add(topic);
            }
        }

        // 4. 执行删除操作
        if (CollectionUtil.isNotEmpty(topicsToDelete)) {
            DeleteTopicsResult deleteResult = adminClient.deleteTopics(topicsToDelete);
            deleteResult.all().get();
            log.info("已删除空Topic: {}", topicsToDelete);
        }
    }

    /**
     * 删除匹配指定正则表达式的空Topic
     */
    public void deleteEmptyTopics(String topicPattern) throws ExecutionException, InterruptedException {
        // 1. 获取匹配的Topic列表
        Pattern pattern = Pattern.compile(topicPattern);
        Set<String> topics = adminClient.listTopics().names().get().stream()
                .filter(topic -> pattern.matcher(topic).matches())
                .collect(Collectors.toSet());
        if (CollectionUtil.isEmpty(topics)) {
            return;
        }

        // 2. 获取Topic描述
        Map<String, TopicDescription> topicDescriptions = adminClient.describeTopics(topics).all().get();

        // 3. 检查并删除空Topic
        List<String> topicsToDelete = new ArrayList<>();
        for (String topic : topics) {
            if (isTopicEmpty(topic, topicDescriptions.get(topic))) {
                topicsToDelete.add(topic);
            }
        }

        // 4. 执行删除操作
        if (CollectionUtil.isNotEmpty(topicsToDelete)) {
            DeleteTopicsResult deleteResult = adminClient.deleteTopics(topicsToDelete);
            deleteResult.all().get();
            log.info("已删除空Topic: {}", topicsToDelete);
        }
    }

    /**
     * 检查Topic是否为空
     */
    private boolean isTopicEmpty(String topic, TopicDescription description)
            throws ExecutionException, InterruptedException {
        // 1. 创建Topic分区列表
        List<TopicPartition> partitions = description.partitions().stream()
                .map(p -> new TopicPartition(topic, p.partition()))
                .collect(Collectors.toList());

        // 2. 获取最早偏移量
        Map<TopicPartition, OffsetSpec> startOffsetsMap = partitions.stream()
                .collect(Collectors.toMap(p -> p, p -> OffsetSpec.earliest()));

        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> startOffsets =
                adminClient.listOffsets(startOffsetsMap).all().get();

        // 3. 获取最新偏移量
        Map<TopicPartition, OffsetSpec> endOffsetsMap = partitions.stream()
                .collect(Collectors.toMap(p -> p, p -> OffsetSpec.latest()));

        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
                adminClient.listOffsets(endOffsetsMap).all().get();

        // 4. 检查所有分区是否为空
        return partitions.stream().allMatch(partition ->
                startOffsets.get(partition).offset() == endOffsets.get(partition).offset()
        );
    }

    /**
     * 过期Topic
     * @param topic
     * @param maxDate
     * @return true:过期,false:未过期
     */
    private boolean expireTopic(String topic, LocalDate maxDate) {
        LocalDate topicDate = com.viontech.keliu.utils.DateUtil.extractDate(topic);
        if (topicDate == null) {
            return false;
        }

        int comparison = topicDate.compareTo(maxDate);
        if (comparison < 0) {
            return true;
        } else {
            return false;
        }
    }
}