Commit a79cc7e9 by 姚冰

[add] 添加统计任务处理逻辑,配置

1 parent a682c58c
......@@ -57,6 +57,11 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.20</version>
</dependency>
</dependencies>
<build>
......
......@@ -29,4 +29,6 @@ public class KafkaConstants {
* d_zone_day_count_data统计数据的topic
*/
public static final String MALL_ZONE_DAY_COUNT_DATA_TOPIC = "Mall_zone_day_count_data";
public static final String MALL_DATA_STATISTICS_TOPIC = "Mall_dataStatistics";
}
package com.viontech.keliu.consumer;
import cn.hutool.http.HttpUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.constants.KafkaConstants;
import com.viontech.keliu.entity.FaceDataContent;
import com.viontech.keliu.entity.MallDataStatisticsInfo;
import com.viontech.keliu.utils.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
@Slf4j
public class MallDataStatisticsConsumer {
@Autowired
private ObjectMapper objectMapper;
@Value("${vion.consumer.mallDataStatistics.reidUrl:}")
private String reidUrl;
@Value("${vion.consumer.mallDataStatistics.zoneNum:true}")
private boolean zoneNumEnable;
@KafkaListener(topics = KafkaConstants.MALL_DATA_STATISTICS_TOPIC
, autoStartup = "${vion.consumer.mallDataStatistics.autoStartup:false}"
, groupId = "MallDataStiatistics"
, concurrency = "${vion.consumer.mallDataStatistics.concurrency:1}")
public void consumerMallZoneMinuteCount(List<ConsumerRecord<String, String>> recordList, Acknowledgment ack) {
if (CollectionUtils.isEmpty(recordList)) {
return;
}
try {
Map<String, List<ConsumerRecord<String, String>>> topicPartitionDataMap = recordList.stream().collect(Collectors.groupingBy(d -> d.topic() + "-" + d.partition()));
for (Map.Entry<String, List<ConsumerRecord<String, String>>> entry : topicPartitionDataMap.entrySet()) {
try {
long startTime = System.currentTimeMillis();
List<ConsumerRecord<String, String>> recordValues = entry.getValue();
if (!CollectionUtils.isEmpty(recordValues)) {
ConsumerRecord<String, String> lastRecord = recordValues.get(recordValues.size() - 1);
List<FaceDataContent> faceDataList = new ArrayList<>();
for (ConsumerRecord<String, String> consumerRecord : recordValues) {
try {
MallDataStatisticsInfo mallDataStatisticsInfo = objectMapper.readValue(consumerRecord.value(), MallDataStatisticsInfo.class);
execDataStatistics(mallDataStatisticsInfo);
} catch (Exception ee) {
log.error("处理mallDataStatisticsInfo[{}], JsonDeserializerThrowable={}", entry.getKey(), ee.getMessage(), ee);
}
}
}
log.info("处理mallDataStatisticsInfo[{}], batchHandle耗时:{} ms", entry.getKey(), System.currentTimeMillis() - startTime);
} catch (Throwable e) {
log.error("处理mallDataStatisticsInfo[{}], Throwable={}", entry.getKey(), e.getMessage(), e);
}
}
} catch (Throwable exx) {
log.error("处理mallDataStatisticsInfo.Throwable={}", exx.getMessage(), exx);
}
ack.acknowledge();
}
private void execDataStatistics(MallDataStatisticsInfo mallDataStatisticsInfo) {
if (mallDataStatisticsInfo.getMallId() == null || mallDataStatisticsInfo.getCountDate() == null) {
log.info("数据处理错误:mallId={}, countDate={}", mallDataStatisticsInfo.getMallId(), mallDataStatisticsInfo.getCountDate());
return;
}
if (reidUrl.isEmpty()) {
log.info("数据处理错误:reidUrl is null: {}", reidUrl);
return;
}
log.info("开始执行mallId:{} ,countDate:{} 统计任务", mallDataStatisticsInfo.getMallId(), mallDataStatisticsInfo.getCountDate());
String date = DateUtil.format(mallDataStatisticsInfo.getCountDate(), "yyyy-MM-dd");
String url = reidUrl + "/reid/tool/zoneAssociation?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
try {
String result = HttpUtil.get(url);
log.info("调用mallId:{} 店铺关联 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
} catch (Exception e) {
log.error("店铺关联计算异常:{}", url, e);
}
url = reidUrl + "/reid/tool/residenceByMall?mallId=" + mallDataStatisticsInfo.getMallId() + "&countDate=" + date;
try {
String result = HttpUtil.get(url);
log.info("调用mallId:{} 平均滞留时间 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
} catch (Exception e) {
log.error("平均滞留时间计算异常:{}", url, e);
}
url = reidUrl + "/reid/tool/faceResidence?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
try {
String result = HttpUtil.get(url);
log.info("调用mallId:{} 滞留时间 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
} catch (Exception e) {
log.error("滞留时间计算异常:{}", url, e);
}
url = reidUrl + "/reid/tool/mallResidence?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
try {
String result = HttpUtil.get(url);
log.info("调用mallId:{} 店铺平均滞留时间 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
} catch (Exception e) {
log.error("店铺平均滞留时间计算异常:{}", url, e);
}
url = reidUrl + "/reid/tool/zoneFaceResidence?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
try {
String result = HttpUtil.get(url);
log.info("调用mallId:{} 店铺滞留时间 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
} catch (Exception e) {
log.error("店铺滞留时间计算异常:{}", url, e);
}
url = reidUrl + "/reid/tool/zoneAcrossCustomer?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
try {
String result = HttpUtil.get(url);
log.info("调用mallId:{} 穿堂客流 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
} catch (Exception e) {
log.error("穿堂客流计算异常:{}", url, e);
}
url = reidUrl + "/reid/tool/reStatistics?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
try {
String result = HttpUtil.get(url);
log.info("调用mallId:{} 客流数据重绘 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
} catch (Exception e) {
log.error("客流数据重绘计算异常:{}", url, e);
}
url = reidUrl + "/reid/tool/passengerSource?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
try {
String result = HttpUtil.get(url);
log.info("调用mallId:{} 客流来源 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
} catch (Exception e) {
log.error("客流来源计算异常:{}", url, e);
}
url = reidUrl + "/reid/tool/zoneDeepShopping?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
try {
String result = HttpUtil.get(url);
log.info("调用mallId:{} 深逛数据 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
} catch (Exception e) {
log.error("深逛数据计算异常:{}", url, e);
}
url = reidUrl + "/reid/tool/customerGroup/reCall?mallIds=" + mallDataStatisticsInfo.getMallId() + "&startDate=" + date + "&endDate=" + date;
try {
String result = HttpUtil.get(url);
log.info("调用mallId:{} 顾客组数 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
} catch (Exception e) {
log.error("顾客组数计算异常:{}", url, e);
}
url = reidUrl + "/reid/tool/archive/get?mallId=" + mallDataStatisticsInfo.getMallId() + "&countDate=" + date;
try {
String result = HttpUtil.get(url);
log.info("调用mallId:{} 顾客档案 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
} catch (Exception e) {
log.error("顾客档案计算异常:{}", url, e);
}
if (zoneNumEnable) {
url = reidUrl + "/reid/tool/mallGateFromTo?mallId=" + mallDataStatisticsInfo.getMallId() + "&countDate=" + date;
try {
String result = HttpUtil.get(url);
log.info("调用mallId:{} 顾客导流 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
} catch (Exception e) {
log.error("顾客导流计算异常:{}", url, e);
}
}
url = reidUrl + "/reid/tool/payEvent?mallId=" + mallDataStatisticsInfo.getMallId() + "&countDate=" + date;
try {
String result = HttpUtil.get(url);
log.info("调用mallId:{} 收银事件 执行结果:{}", mallDataStatisticsInfo.getMallId(), result);
} catch (Exception e) {
log.error("收银事件计算异常:{}", url, e);
}
}
}
package com.viontech.keliu.entity;
import lombok.Data;
import java.util.Date;
@Data
public class MallDataStatisticsInfo {
private Long mallId;
private String mallName;
private Date countDate;
}
......@@ -70,4 +70,10 @@ vion.consumer.mallGateMinuteCount.batchThreadNum=5
vion.consumer.mallZoneMinuteCount.autoStartup=true
vion.consumer.mallZoneMinuteCount.concurrency=1
vion.consumer.mallZoneMinuteCount.batchSize=20
vion.consumer.mallZoneMinuteCount.batchThreadNum=5
\ No newline at end of file
vion.consumer.mallZoneMinuteCount.batchThreadNum=5
# mall dataStiatistics
vion.consumer.mallDataStatistics.autoStartup=true
vion.consumer.mallDataStatistics.concurrency=1
vion.consumer.mallDataStatistics.reidUrl=http://10.0.14.11:23456
vion.consumer.mallDataStatistics.zoneNum=false
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!