Commit dd9c30c2 by 毛树良

[chg]:全天分析后统计数据优化

1 parent e255fecb
......@@ -33,7 +33,7 @@ public class MallDataStatisticsConsumer {
@Value("${vion.consumer.storeDataStatistics.reidUrl:}")
private String reidUrl;
private ExecutorService executorService = ThreadUtil.newFixedExecutor(10, 1024, "dataStatistics-", true);
private ExecutorService executorService = ThreadUtil.newFixedExecutor(10, 2000, "dataStatistics-", true);
@KafkaListener(topics = KafkaConstants.STORE_DATA_STATISTICS_TOPIC
, autoStartup = "${vion.consumer.storeDataStatistics.autoStartup:false}"
......@@ -57,6 +57,9 @@ public class MallDataStatisticsConsumer {
log.info("处理storeDataStatisticsInfo,mallId:{},countDate:{}, 开始", mallDataStatisticsInfo.getMallId(), mallDataStatisticsInfo.getCountDate());
execDataStatistics(mallDataStatisticsInfo);
});
} catch (Exception ee) {
log.error("处理storeDataStatisticsInfo[{}], JsonDeserializerThrowable={}", entry.getKey(), ee.getMessage(), ee);
}
// 提交Offset
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(
......@@ -64,9 +67,6 @@ public class MallDataStatisticsConsumer {
new OffsetAndMetadata(consumerRecord.offset() + 1) // 提交下一条偏移量
);
consumer.commitSync(offsets);
} catch (Exception ee) {
log.error("处理storeDataStatisticsInfo[{}], JsonDeserializerThrowable={}", entry.getKey(), ee.getMessage(), ee);
}
}
}
log.info("处理storeDataStatisticsInfo[{}], batchHandle耗时:{} ms", entry.getKey(), System.currentTimeMillis() - startTime);
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!