MallDataStatisticsConsumer.java 15.6 KB
package com.viontech.keliu.consumer;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.http.HttpUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.constants.KafkaConstants;
import com.viontech.keliu.entity.MallDataStatisticsInfo;
import com.viontech.keliu.utils.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

@Service
@Slf4j
public class MallDataStatisticsConsumer {

    @Autowired
    private ObjectMapper objectMapper;

    @Value("${vion.consumer.storeDataStatistics.reidUrl:}")
    private String reidUrl;

    private ExecutorService executorService = ThreadUtil.newFixedExecutor(10, 1024, "dataStatistics-", true);

    @KafkaListener(topics = KafkaConstants.STORE_DATA_STATISTICS_TOPIC
            , autoStartup = "${vion.consumer.storeDataStatistics.autoStartup:false}"
            , groupId = "StoreDataStiatistics"
            , concurrency = "${vion.consumer.storeDataStatistics.concurrency:1}")
    public void consumerMallZoneMinuteCount(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) {
        if (CollectionUtils.isEmpty(recordList)) {
            return;
        }
        try {
            Map<String, List<ConsumerRecord<String, String>>> topicPartitionDataMap = recordList.stream().collect(Collectors.groupingBy(d -> d.topic() + "-" + d.partition()));
            for (Map.Entry<String, List<ConsumerRecord<String, String>>> entry : topicPartitionDataMap.entrySet()) {
                try {
                    long startTime = System.currentTimeMillis();
                    List<ConsumerRecord<String, String>> recordValues = entry.getValue();
                    if (!CollectionUtils.isEmpty(recordValues)) {
                        for (ConsumerRecord<String, String> consumerRecord : recordValues) {
                            try {
                                MallDataStatisticsInfo mallDataStatisticsInfo = objectMapper.readValue(consumerRecord.value(), MallDataStatisticsInfo.class);
                                executorService.execute(() -> {
                                    log.info("处理storeDataStatisticsInfo,mallId:{},countDate:{}, 开始", mallDataStatisticsInfo.getMallId(),  mallDataStatisticsInfo.getCountDate());
                                    execDataStatistics(mallDataStatisticsInfo);
                                });
                                // 提交Offset
                                Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
                                offsets.put(
                                        new TopicPartition(consumerRecord.topic(), consumerRecord.partition()),
                                        new OffsetAndMetadata(consumerRecord.offset() + 1) // 提交下一条偏移量
                                );
                                consumer.commitSync(offsets);
                            } catch (Exception ee) {
                                log.error("处理storeDataStatisticsInfo[{}], JsonDeserializerThrowable={}", entry.getKey(), ee.getMessage(), ee);
                            }
                        }
                    }
                    log.info("处理storeDataStatisticsInfo[{}], batchHandle耗时:{} ms", entry.getKey(), System.currentTimeMillis() - startTime);
                } catch (Throwable e) {
                    log.error("处理storeDataStatisticsInfo[{}], Throwable={}", entry.getKey(), e.getMessage(), e);
                }
            }
        } catch (Throwable exx) {
            log.error("处理mallDataStatisticsInfo.Throwable={}", exx.getMessage(), exx);
        }
    }

    private void execDataStatistics(MallDataStatisticsInfo mallDataStatisticsInfo) {
        if (mallDataStatisticsInfo.getMallId() == null || mallDataStatisticsInfo.getCountDate() == null) {
            log.info("数据处理错误:mallId={}, countDate={}", mallDataStatisticsInfo.getMallId(), mallDataStatisticsInfo.getCountDate());
            return;
        }

        if (reidUrl.isEmpty()) {
            log.info("数据处理错误:reidUrl is null: {}", reidUrl);
            return;
        }
        log.info("开始执行mallId:{} ,countDate:{} 统计任务", mallDataStatisticsInfo.getMallId(), mallDataStatisticsInfo.getCountDate());
        String date = DateUtil.format(mallDataStatisticsInfo.getCountDate(), "yyyy-MM-dd");

        String url = reidUrl + "/reid/tool/residenceByMall?mallId=" + mallDataStatisticsInfo.getMallId() + "&countDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 平均滞留时间 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("平均滞留时间计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/faceResidence?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 滞留时间 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("滞留时间计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/acrossCustomer/reCall?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 穿堂客流统计 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("穿堂客流计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/dataStat/reCallOne?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 客流数据重汇 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("客流数据重汇异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/faceRecognition/aggregation?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 监控点进入人次统计 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("监控点进入人次统计异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/gateFaceResidence?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 区域滞留时间 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("区域滞留时间异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/cashier?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 收银区域客流 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("收银区域客流异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/customerGroup/reCall?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 顾客组数 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("顾客组数计算异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/personRecord/reCall?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 接待分析统计 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("接待分析统计异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/deepNum?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 深逛人数统计 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("深逛人数统计异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/heatmap/merge?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 每日热力数据聚合 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("每日热力数据聚合异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/dataMonitor/reCall?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 数据运维 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("数据运维异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/mallCompare/reCall?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 跨店比对 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("跨店比对异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/kafkaPersonReception/resend?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 接待数据发送kafka 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("接待数据发送kafka异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/restaurant/countData?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 餐厅排队指标统计 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("餐厅排队指标统计异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/channelProduct/countData?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 机位数据统计 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("机位数据统计异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/mallProduct/countData?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 商品数据统计 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("商品数据统计异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/mallProductGate/countData?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 商品分区数据统计 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("商品分区数据统计异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/gateFlowDirection?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 门店分区客流流向 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("门店分区客流流向异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/mallBusinessRecord?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 门店营业时间统计任务 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("门店营业时间统计任务异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/mallResidenceDistribution?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 门店停留时长分布 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("门店停留时长分布异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/effectVisitors/update?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 门店有效顾客统计 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("门店有效顾客统计异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/visitorsMovingLing?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 顾客店内分区轨迹 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("顾客店内分区轨迹异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/fittingVisitors?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 试衣顾客统计 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("试衣顾客统计异常:{}", url, e);
        }

        url = reidUrl + "/reid/tool/attentionPassersby?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
        try {
            String result = HttpUtil.get(url);
            log.info("调用mallId:{} 过店关注人次 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
        } catch (Exception e) {
            log.error("过店关注人次异常:{}", url, e);
        }
    }
}