TopicCleanupJob.java 1.94 KB
package com.viontech.keliu.cron;

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import com.viontech.keliu.service.KafkaTopicService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.Date;

/**
 * kafka topic 清理任务
 * @author: msl
 * Date: 2025/5/30
 */
@Slf4j
@Component
public class TopicCleanupJob {
    @Value("${vion.topicCleanup.preDay:7}")
    private Integer preDay;
    @Resource
    private KafkaTopicService kafkaTopicService;

    @Scheduled(cron = "${vion.topicCleanup.cron:-}")
    public void doTopicCleanup() {
        DateTime preDate = DateUtil.offsetDay(new Date(), preDay * (-1));
        DateTime countDate = DateUtil.beginOfDay(preDate);
        LocalDate maxCountDate = countDate.toInstant().atZone(ZoneId.of("Asia/Shanghai")).toLocalDate();

        log.info("doTopicCleanup execute start, preDay: {}, countDate: {}", preDay, maxCountDate);
        try {
            kafkaTopicService.deleteEmptyTopicsByTopicDate("Mall_FaceCapture_.*", maxCountDate);
        } catch (Exception e) {
            log.error("doTopicCleanup[Mall_FaceCapture_].Exception={}", e.getMessage(), e);
        }
        log.info("doTopicCleanup execute[Mall_FaceCapture_] end, preDay: {}, countDate: {}", preDay, maxCountDate);
        try {
            kafkaTopicService.deleteEmptyTopicsByTopicDate("Mall_PersonLabel_.*", maxCountDate);
        } catch (Exception e) {
            log.error("doTopicCleanup[Mall_PersonLabel_].Exception={}", e.getMessage(), e);
        }
        log.info("doTopicCleanup execute[Mall_PersonLabel_] end, preDay: {}, countDate: {}", preDay, maxCountDate);
        log.info("doTopicCleanup execute end, preDay: {}, countDate: {}", preDay, maxCountDate);
    }
}