MallZoneMinuteCountDataConsumer.java 7.99 KB
package com.viontech.keliu.consumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.constants.KafkaConstants;
import com.viontech.keliu.constants.ProcessConstants;
import com.viontech.keliu.constants.RedisConstants;
import com.viontech.keliu.dao.DZoneMinuteCountDataDao;
import com.viontech.keliu.entity.ZoneCountData;
import com.viontech.keliu.service.SpeedStatService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

@Slf4j
@Component
public class MallZoneMinuteCountDataConsumer {
    @Value("${vion.consumer.mallZoneMinuteCount.batchSize:20}")
    private Integer batchSize;
    @Value("${vion.consumer.mallZoneMinuteCount.batchThreadNum:5}")
    private Integer batchThreadNum;
    @Autowired
    private ObjectMapper objectMapper;
    @Resource
    private DZoneMinuteCountDataDao dZoneMinuteCountDataDao;
    @Resource
    private SpeedStatService speedStatService;

    @KafkaListener(topics = KafkaConstants.MALL_ZONE_MINUTE_COUNT_DATA_TOPIC
            , autoStartup = "${vion.consumer.mallZoneMinuteCount.autoStartup:false}"
            , groupId = "MallZoneMinuteCountToDb"
            , concurrency = "${vion.consumer.mallZoneMinuteCount.concurrency:1}")
    public void consumerMallZoneMinuteCount(List<ConsumerRecord<String, String>> recordList, Acknowledgment ack) {
        if (CollectionUtils.isEmpty(recordList)) {
            return;
        }
        try {
            long startTime = System.currentTimeMillis();
            List<ZoneCountData> dataList = new ArrayList<>();
            for (ConsumerRecord<String, String> consumerRecord : recordList) {
                try {
                    ZoneCountData dataContent = objectMapper.readValue(consumerRecord.value(), ZoneCountData.class);
                    if (dataContent != null) {
                        dataList.add(dataContent);
                    }
                } catch (Exception ee) {
                    log.error("处理Mall_ZoneMinuteCount.offset={}, JsonDeserializerThrowable={}", consumerRecord.offset(), ee.getMessage(), ee);
                }
            }
            if (!CollectionUtils.isEmpty(dataList)) {
                // 先插入再更新
                List<ZoneCountData> insertList = dataList.stream().filter(d -> ProcessConstants.DbOperationType.INSERT.equals(d.getOperationType())).collect(Collectors.toList());
                batchHandle(insertList, ProcessConstants.DbOperationType.INSERT);
                List<ZoneCountData> updateList = dataList.stream().filter(d -> ProcessConstants.DbOperationType.UPDATE.equals(d.getOperationType())).collect(Collectors.toList());
                batchHandle(updateList, ProcessConstants.DbOperationType.UPDATE);
            }
            log.info("处理Mall_ZoneMinuteCount, batchHandle, {}条,耗时:{} ms", dataList.size(), System.currentTimeMillis() - startTime);
        } catch (Throwable exx) {
            log.error("处理Mall_ZoneMinuteCount.Throwable={}", exx.getMessage(), exx);
        }
        ack.acknowledge();
    }

    /**
     * 分批处理
     * @param recordList
     */
    private void batchHandle(List<ZoneCountData> recordList, String operationType) {
        if (CollectionUtils.isEmpty(recordList)) {
            return;
        }
        // 总记录数
        int total = recordList.size();
        ExecutorService threadPool = Executors.newFixedThreadPool(batchThreadNum);

        List<Future> futureList = new ArrayList<>();
        for (int i = 0; i < total; i += batchSize) {
            List<ZoneCountData> subList = recordList.subList(i, Math.min(i + batchSize, total));
            Future<?> future = threadPool.submit(() -> {
                try {
                    if (ProcessConstants.DbOperationType.INSERT.equals(operationType)) {
                        insertHandle(subList);
                    } else if (ProcessConstants.DbOperationType.UPDATE.equals(operationType)) {
                        updateHandle(subList);
                    }
                } catch (Exception ex) {
                    log.error("处理Mall_ZoneMinuteCount分批处理, batchSize={}, batchException={}", subList.size(), ex.getMessage(), ex);
                }
            });
            futureList.add(future);
        }

        threadPool.shutdown();
        for (Future future : futureList) {
            try {
                future.get();
            } catch (Exception e) {
                log.error("batchHandle.getFuture.Exception={}", e.getMessage(), e);
            }
        }
    }

    /**
     * 处理插入操作
     * 首先进行批量插入,插入失败后单条插入
     * @param insertList
     */
    private void insertHandle(List<ZoneCountData> insertList) {
        if (CollectionUtils.isEmpty(insertList)) {
            return;
        }
        // 成功插入条数
        Integer insertCount = 0;
        try {
            dZoneMinuteCountDataDao.batchInsert(insertList);
            insertCount = insertList.size();
        } catch (Exception ex) {
            log.error("处理Mall_ZoneMinuteCount.insertHandle={}, batchInsert.Exception={}", insertList.size(), ex.getMessage(), ex);
            log.info("准备二次单条插入,处理Mall_ZoneMinuteCount.insertHandle={}, batchInsert.Exception={}", insertList.size(), ex.getMessage());
            for (ZoneCountData countData : insertList) {
                try {
                    dZoneMinuteCountDataDao.insert(countData);
                    insertCount = insertCount + 1;
                } catch (Exception e) {
                    try {
                        log.info("数据二次写入错误:{}", objectMapper.writeValueAsString(countData));
                        log.error("数据二次写入错误:{}", e.getMessage(), e);
                    } catch (Exception ee) {
                        log.error("数据二次写入错误日志:{}", ee.getMessage(), ee);
                    }
                }
            }
        }
        speedStatService.stat(RedisConstants.PDS_MALLZONEMINUTECOUNT_WRITE, insertCount);
    }

    /**
     * 处理更新操作
     * 首先进行批量插入,插入失败后单条插入
     * @param updateList
     */
    private void updateHandle(List<ZoneCountData> updateList) {
        if (CollectionUtils.isEmpty(updateList)) {
            return;
        }
        // 成功插入条数
        Integer updateCount = 0;
        try {
            dZoneMinuteCountDataDao.batchUpdate(updateList);
            updateCount = updateList.size();
        } catch (Exception ex) {
            log.error("处理Mall_ZoneMinuteCount.updateHandle={}, batchUpdate.Exception={}", updateList.size(), ex.getMessage(), ex);
            log.info("准备二次单条更新,处理Mall_ZoneMinuteCount.updateHandle={}, batchUpdate.Exception={}", updateList.size(), ex.getMessage());
            for (ZoneCountData countData : updateList) {
                try {
                    dZoneMinuteCountDataDao.update(countData);
                    updateCount = updateCount + 1;
                } catch (Exception e) {
                    try {
                        log.info("数据二次更新错误:{}", objectMapper.writeValueAsString(countData));
                        log.error("数据二次更新错误:{}", e.getMessage(), e);
                    } catch (Exception ee) {
                        log.error("数据二次更新错误日志:{}", ee.getMessage(), ee);
                    }
                }
            }
        }
        speedStatService.stat(RedisConstants.PDS_MALLZONEMINUTECOUNT_WRITE, updateCount);
    }
}