Commit 00bded54 by 姚冰

[chg] reid统计添加单线程异步顺序处理,防止处理时间过长重复消费

1 parent f23c0832
package com.viontech.keliu.consumer; package com.viontech.keliu.consumer;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.http.HttpUtil; import cn.hutool.http.HttpUtil;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.constants.KafkaConstants; import com.viontech.keliu.constants.KafkaConstants;
...@@ -18,6 +19,7 @@ import org.springframework.util.CollectionUtils; ...@@ -18,6 +19,7 @@ import org.springframework.util.CollectionUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Service @Service
...@@ -33,6 +35,8 @@ public class MallDataStatisticsConsumer { ...@@ -33,6 +35,8 @@ public class MallDataStatisticsConsumer {
@Value("${vion.consumer.mallDataStatistics.zoneNum:true}") @Value("${vion.consumer.mallDataStatistics.zoneNum:true}")
private boolean zoneNumEnable; private boolean zoneNumEnable;
private ExecutorService executorService = ThreadUtil.newFixedExecutor(1, 1024, "dataStatistics-", true);
@KafkaListener(topics = KafkaConstants.MALL_DATA_STATISTICS_TOPIC @KafkaListener(topics = KafkaConstants.MALL_DATA_STATISTICS_TOPIC
, autoStartup = "${vion.consumer.mallDataStatistics.autoStartup:false}" , autoStartup = "${vion.consumer.mallDataStatistics.autoStartup:false}"
, groupId = "MallDataStiatistics" , groupId = "MallDataStiatistics"
...@@ -53,7 +57,11 @@ public class MallDataStatisticsConsumer { ...@@ -53,7 +57,11 @@ public class MallDataStatisticsConsumer {
for (ConsumerRecord<String, String> consumerRecord : recordValues) { for (ConsumerRecord<String, String> consumerRecord : recordValues) {
try { try {
MallDataStatisticsInfo mallDataStatisticsInfo = objectMapper.readValue(consumerRecord.value(), MallDataStatisticsInfo.class); MallDataStatisticsInfo mallDataStatisticsInfo = objectMapper.readValue(consumerRecord.value(), MallDataStatisticsInfo.class);
execDataStatistics(mallDataStatisticsInfo); executorService.execute(() -> {
log.info("处理mallDataStatisticsInfo,mallId:{},countDate:{}, 开始", mallDataStatisticsInfo.getMallId(), mallDataStatisticsInfo.getCountDate());
execDataStatistics(mallDataStatisticsInfo);
});
// execDataStatistics(mallDataStatisticsInfo);
} catch (Exception ee) { } catch (Exception ee) {
log.error("处理mallDataStatisticsInfo[{}], JsonDeserializerThrowable={}", entry.getKey(), ee.getMessage(), ee); log.error("处理mallDataStatisticsInfo[{}], JsonDeserializerThrowable={}", entry.getKey(), ee.getMessage(), ee);
} }
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!