MallDataStatisticsConsumer.java 10.6 KB
package com.viontech.keliu.consumer;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.http.HttpUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.constants.KafkaConstants;
import com.viontech.keliu.entity.FaceDataContent;
import com.viontech.keliu.entity.MallDataStatisticsInfo;
import com.viontech.keliu.utils.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

@Service
@Slf4j
public class MallDataStatisticsConsumer {

    @Autowired
    private ObjectMapper objectMapper;

    @Value("${vion.consumer.mallDataStatistics.reidUrl:}")
    private String reidUrl;

    @Value("${vion.consumer.mallDataStatistics.zoneNum:true}")
    private boolean zoneNumEnable;

    private ExecutorService executorService = ThreadUtil.newFixedExecutor(1, 1024, "dataStatistics-", true);

    @KafkaListener(topics = KafkaConstants.MALL_DATA_STATISTICS_TOPIC
            , autoStartup = "${vion.consumer.mallDataStatistics.autoStartup:false}"
            , groupId = "MallDataStiatistics"
            , concurrency = "${vion.consumer.mallDataStatistics.concurrency:1}")
    public void consumerMallZoneMinuteCount(List<ConsumerRecord<String, String>> recordList, Acknowledgment ack) {
        if (CollectionUtils.isEmpty(recordList)) {
            return;
        }
        try {
            Map<String, List<ConsumerRecord<String, String>>> topicPartitionDataMap = recordList.stream().collect(Collectors.groupingBy(d -> d.topic() + "-" + d.partition()));
            for (Map.Entry<String, List<ConsumerRecord<String, String>>> entry : topicPartitionDataMap.entrySet()) {
                try {
                    long startTime = System.currentTimeMillis();
                    List<ConsumerRecord<String, String>> recordValues = entry.getValue();
                    if (!CollectionUtils.isEmpty(recordValues)) {
                        ConsumerRecord<String, String> lastRecord = recordValues.get(recordValues.size() - 1);
                        List<FaceDataContent> faceDataList = new ArrayList<>();
                        for (ConsumerRecord<String, String> consumerRecord : recordValues) {
                            try {
                                MallDataStatisticsInfo mallDataStatisticsInfo = objectMapper.readValue(consumerRecord.value(), MallDataStatisticsInfo.class);
                                executorService.execute(() -> {
                                    log.info("处理mallDataStatisticsInfo,mallId:{},countDate:{}, 开始", mallDataStatisticsInfo.getMallId(),  mallDataStatisticsInfo.getCountDate());
                                    execDataStatistics(mallDataStatisticsInfo);
                                });
//                                execDataStatistics(mallDataStatisticsInfo);
                            } catch (Exception ee) {
                                log.error("处理mallDataStatisticsInfo[{}], JsonDeserializerThrowable={}", entry.getKey(), ee.getMessage(), ee);
                            }
                        }
                    }
                    log.info("处理mallDataStatisticsInfo[{}], batchHandle耗时:{} ms", entry.getKey(), System.currentTimeMillis() - startTime);
                } catch (Throwable e) {
                    log.error("处理mallDataStatisticsInfo[{}], Throwable={}", entry.getKey(), e.getMessage(), e);
                }
            }
        } catch (Throwable exx) {
            log.error("处理mallDataStatisticsInfo.Throwable={}", exx.getMessage(), exx);
        }
        ack.acknowledge();
    }

    private void execDataStatistics(MallDataStatisticsInfo mallDataStatisticsInfo) {
        if (mallDataStatisticsInfo.getMallId() == null || mallDataStatisticsInfo.getCountDate() == null) {
            log.info("数据处理错误:mallId={}, countDate={}", mallDataStatisticsInfo.getMallId(), mallDataStatisticsInfo.getCountDate());
            return;
        }

        if (reidUrl.isEmpty()) {
            log.info("数据处理错误:reidUrl is null: {}", reidUrl);
            return;
        }
        log.info("开始执行mallId:{} ,countDate:{} 统计任务", mallDataStatisticsInfo.getMallId(), mallDataStatisticsInfo.getCountDate());
        String date = DateUtil.format(mallDataStatisticsInfo.getCountDate(), "yyyy-MM-dd");
        String url = reidUrl + "/reid/tool/zoneAssociation?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 店铺关联 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("店铺关联计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/residenceByMall?mallId=" + mallDataStatisticsInfo.getMallId() + "&countDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 平均滞留时间 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("平均滞留时间计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/faceResidence?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 滞留时间 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("滞留时间计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/mallResidence?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 店铺平均滞留时间 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("店铺平均滞留时间计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/zoneFaceResidence?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 店铺滞留时间 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("店铺滞留时间计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/zoneAcrossCustomer?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 穿堂客流 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("穿堂客流计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/reStatistics?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 客流数据重绘 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("客流数据重绘计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/zoneCustomerOverlap?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 交叉客群 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("交叉客群计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/passengerSource?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 客流来源 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("客流来源计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/zoneDeepShopping?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 深逛数据 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("深逛数据计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/customerGroup/reCall?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 顾客组数 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("顾客组数计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/archive/get?mallId=" + mallDataStatisticsInfo.getMallId() + "&countDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 顾客档案 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("顾客档案计算异常:{}", url, e);
        }

        if (zoneNumEnable) {
            url = reidUrl + "/reid/tool/mallGateFromTo?mallId=" + mallDataStatisticsInfo.getMallId() + "&countDate=" + date;
            try {
                String result = HttpUtil.get(url);
                log.info("调用mallId:{} 顾客导流 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
            } catch (Exception e) {
                log.error("顾客导流计算异常:{}", url, e);
            }
        }

        url = reidUrl + "/reid/tool/payEvent?mallId=" + mallDataStatisticsInfo.getMallId() + "&countDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 收银事件 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("收银事件计算异常:{}", url, e);
        }
    }
}