MallFaceCaptureConsumer.java 7.51 KB
package com.viontech.keliu.consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.constants.KafkaConstants;
import com.viontech.keliu.constants.RedisConstants;
import com.viontech.keliu.dao.DFaceRecognitionDao;
import com.viontech.keliu.entity.FaceDataContent;
import com.viontech.keliu.service.KafkaProducerService;
import com.viontech.keliu.service.SpeedStatService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

@Component
@Slf4j
public class MallFaceCaptureConsumer {
    @Value("${vion.consumer.mallFaceCapture.batchEnable:0}")
    private Integer batchEnable;
    @Value("${vion.consumer.mallFaceCapture.batchSize:0}")
    private Integer batchSize;
    @Value("${vion.consumer.mallFaceCapture.batchThreadNum:0}")
    private Integer batchThreadNum;
    @Autowired
    private ObjectMapper objectMapper;
    @Resource
    private DFaceRecognitionDao dFaceRecognitionDao;
    @Resource
    private KafkaProducerService kafkaProducerService;
    @Resource
    private SpeedStatService speedStatService;

    @KafkaListener(topicPattern = "Mall_FaceCapture_.*"
            , autoStartup = "${vion.consumer.mallFaceCapture.autoStartup:false}"
            , groupId = "MallFaceCaptureToDb"
            , concurrency = "${vion.consumer.mallFaceCapture.concurrency:1}")
    public void consumerMallFaceCapture(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) {
        if (CollectionUtils.isEmpty(recordList)) {
            return;
        }
        try {
            Map<String, List<ConsumerRecord<String, String>>> topicPartitionDataMap = recordList.stream().collect(Collectors.groupingBy(d -> d.topic() + "-" + d.partition()));
            for (Map.Entry<String, List<ConsumerRecord<String, String>>> entry : topicPartitionDataMap.entrySet()) {
                try {
                    long startTime = System.currentTimeMillis();
                    List<ConsumerRecord<String, String>> recordValues = entry.getValue();
                    if (!CollectionUtils.isEmpty(recordValues)) {
                        ConsumerRecord<String, String> lastRecord = recordValues.get(recordValues.size() - 1);
                        List<FaceDataContent> faceDataList = new ArrayList<>();
                        for (ConsumerRecord<String, String> consumerRecord : recordValues) {
                            try {
                                FaceDataContent faceDataContent = objectMapper.readValue(consumerRecord.value(), FaceDataContent.class);
                                if (faceDataContent != null) {
                                    faceDataList.add(faceDataContent);
                                }
                            } catch (Exception ee) {
                                log.error("处理Mall_FaceCapture[{}], JsonDeserializerThrowable={}", entry.getKey(), ee.getMessage(), ee);
                            }
                        }
                        // 插入数据
                        if (!CollectionUtils.isEmpty(faceDataList)) {
                            if (batchEnable == 1) {
                                // 分批处理
                                batchHandle(faceDataList);
                            } else {
                                try {
                                    dFaceRecognitionDao.batchInsert(faceDataList);
                                    speedStatService.stat(RedisConstants.PDS_MALLFACECAPTURE_WRITE, faceDataList.size());
                                } catch (Exception ex) {
                                    log.error("处理Mall_FaceCapture[{}], batchSize={}, batchInsert.Exception={}", entry.getKey(), faceDataList.size(), ex.getMessage(), ex);
                                    // 批量插入重试队列
                                    long startSendTime = System.currentTimeMillis();
                                    kafkaProducerService.sendMessages(KafkaConstants.TOPIC_MALL_RETRY_FACECAPTURE, faceDataList);
                                    log.info("处理Mall_FaceCapture[{}], batchSendFinish, {}条,耗时:{} ms", entry.getKey(), faceDataList.size(), System.currentTimeMillis() - startSendTime);
                                }
                            }
                        }
                        // 提交Offset
                        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
                        offsets.put(
                                new TopicPartition(lastRecord.topic(), lastRecord.partition()),
                                new OffsetAndMetadata(lastRecord.offset() + 1) // 提交下一条偏移量
                        );
                        consumer.commitSync(offsets);
                    }
                    log.info("处理Mall_FaceCapture[{}], batchHandle, {}条,耗时:{} ms", entry.getKey(), recordValues.size(), System.currentTimeMillis() - startTime);
                } catch (Throwable e) {
                    log.error("处理Mall_FaceCapture[{}], Throwable={}", entry.getKey(), e.getMessage(), e);
                }
            }
        } catch (Throwable exx) {
            log.error("处理Mall_FaceCapture.Throwable={}", exx.getMessage(), exx);
        }
    }

    /**
     * 分批处理
     * @param recordList
     */
    private void batchHandle(List<FaceDataContent> recordList) {
        // 总记录数
        int total = recordList.size();
        ExecutorService threadPool = Executors.newFixedThreadPool(batchThreadNum);

        List<Future> futureList = new ArrayList<>();
        for (int i = 0; i < total; i += batchSize) {
            List<FaceDataContent> faceDataList = recordList.subList(i, Math.min(i + batchSize, total));
            Future<?> future = threadPool.submit(() -> {
                try {
                    dFaceRecognitionDao.batchInsert(faceDataList);
                    speedStatService.stat(RedisConstants.PDS_MALLFACECAPTURE_WRITE, faceDataList.size());
                } catch (Exception ex) {
                    log.error("处理Mall_FaceCapture分批处理, batchSize={}, batchInsert.Exception={}", faceDataList.size(), ex.getMessage(), ex);
                    // 批量插入重试队列
                    long startSendTime = System.currentTimeMillis();
                    kafkaProducerService.sendMessages(KafkaConstants.TOPIC_MALL_RETRY_FACECAPTURE, faceDataList);
                    log.info("处理Mall_FaceCapture分批处理, batchSendFinish, {}条,耗时:{} ms", faceDataList.size(), System.currentTimeMillis() - startSendTime);
                }
            });
            futureList.add(future);
        }

        threadPool.shutdown();
        for (Future future : futureList) {
            try {
                future.get();
            } catch (Exception e) {
                log.error("batchHandle.getFuture.Exception={}", e.getMessage(), e);
            }
        }
    }
}