Commit 6b20b0ad by 文帅营

将kafka的topic提取前缀:可以复用kafka集群

1 parent 0ec20bfe
package com.viontech.keliu.constants;
public class KafkaConstants {
// 商业人脸抓拍重试topic
public static final String TOPIC_STORE_RETRY_FACECAPTURE = "Store_Retry_FaceCapture";
// 商业人员标签重试topic
public static final String TOPIC_STORE_RETRY_PERSONLABEL = "Store_Retry_PersonLabel";
/**
* d_gate_minute_count_data统计数据的topic
*/
public static final String STORE_GATE_MINUTE_COUNT_DATA_TOPIC = "Store_gate_minute_count_data";
/**
* d_gate_hour_count_data统计数据的topic
*/
public static final String STORE_GATE_HOUR_COUNT_DATA_TOPIC = "Store_gate_hour_count_data";
/**
* d_gate_day_count_data统计数据的topic
*/
public static final String STORE_GATE_DAY_COUNT_DATA_TOPIC = "Store_gate_day_count_data";
/**
* d_zone_minute_count_data统计数据的topic
*/
public static final String STORE_ZONE_MINUTE_COUNT_DATA_TOPIC = "Store_zone_minute_count_data";
/**
* d_zone_hour_count_data统计数据的topic
*/
public static final String STORE_ZONE_HOUR_COUNT_DATA_TOPIC = "Store_zone_hour_count_data";
/**
* d_zone_day_count_data统计数据的topic
*/
public static final String STORE_ZONE_DAY_COUNT_DATA_TOPIC = "Store_zone_day_count_data";
public static final String STORE_DATA_STATISTICS_TOPIC = "Store_dataStatistics";
}
......@@ -35,9 +35,9 @@ public class MallDataStatisticsConsumer {
private ExecutorService executorService = ThreadUtil.newFixedExecutor(25, 5000, "dataStatistics-", true);
@KafkaListener(topics = KafkaConstants.STORE_DATA_STATISTICS_TOPIC
@KafkaListener(topics = "${vion.kafka.prefix}_dataStatistics"
, autoStartup = "${vion.consumer.storeDataStatistics.autoStartup:false}"
, groupId = "StoreDataStiatistics"
, groupId = "${vion.kafka.prefix}DataStiatistics"
, concurrency = "${vion.consumer.storeDataStatistics.concurrency:1}")
public void consumerMallZoneMinuteCount(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) {
if (CollectionUtils.isEmpty(recordList)) {
......
......@@ -46,14 +46,18 @@ public class MallFaceCaptureConsumer {
@Resource
private SpeedStatService speedStatService;
@KafkaListener(topicPattern = "Store_FaceCapture"
@Value("${vion.kafka.prefix}")
private String kafkaPrefix;
@KafkaListener(topicPattern = "${vion.kafka.prefix}_FaceCapture"
, autoStartup = "${vion.consumer.storeFaceCapture.autoStartup:false}"
, groupId = "StoreFaceCaptureToDb"
, groupId = "${vion.kafka.prefix}FaceCaptureToDb"
, concurrency = "${vion.consumer.storeFaceCapture.concurrency:1}")
public void consumerMallFaceCapture(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) {
if (CollectionUtils.isEmpty(recordList)) {
return;
}
String retryTopic = kafkaPrefix + "_Retry_FaceCapture";
try {
Map<String, List<ConsumerRecord<String, String>>> topicPartitionDataMap = recordList.stream().collect(Collectors.groupingBy(d -> d.topic() + "-" + d.partition()));
for (Map.Entry<String, List<ConsumerRecord<String, String>>> entry : topicPartitionDataMap.entrySet()) {
......@@ -87,7 +91,7 @@ public class MallFaceCaptureConsumer {
log.error("处理Mall_FaceCapture[{}], batchSize={}, batchInsert.Exception={}", entry.getKey(), faceDataList.size(), ex.getMessage());
// 批量插入重试队列
long startSendTime = System.currentTimeMillis();
kafkaProducerService.sendMessages(KafkaConstants.TOPIC_STORE_RETRY_FACECAPTURE, faceDataList);
kafkaProducerService.sendMessages(retryTopic, faceDataList);
log.info("处理Mall_FaceCapture[{}], batchSendFinish, {}条,耗时:{} ms", entry.getKey(), faceDataList.size(), System.currentTimeMillis() - startSendTime);
}
}
......@@ -117,6 +121,7 @@ public class MallFaceCaptureConsumer {
* @param recordList
*/
private void batchHandle(List<FaceDataContent> recordList) {
String retryTopic = kafkaPrefix + "_Retry_FaceCapture";
// 总记录数
int total = recordList.size();
ExecutorService threadPool = Executors.newFixedThreadPool(batchThreadNum);
......@@ -132,7 +137,7 @@ public class MallFaceCaptureConsumer {
log.error("处理Mall_FaceCapture分批处理, batchSize={}, batchInsert.Exception={}", faceDataList.size(), ex.getMessage());
// 批量插入重试队列
long startSendTime = System.currentTimeMillis();
kafkaProducerService.sendMessages(KafkaConstants.TOPIC_STORE_RETRY_FACECAPTURE, faceDataList);
kafkaProducerService.sendMessages(retryTopic, faceDataList);
log.info("处理Mall_FaceCapture分批处理, batchSendFinish, {}条,耗时:{} ms", faceDataList.size(), System.currentTimeMillis() - startSendTime);
}
});
......
......@@ -37,9 +37,9 @@ public class MallGateMinuteCountDataConsumer {
@Resource
private SpeedStatService speedStatService;
@KafkaListener(topics = KafkaConstants.STORE_GATE_MINUTE_COUNT_DATA_TOPIC
@KafkaListener(topics = "${vion.kafka.prefix}_gate_minute_count_data"
, autoStartup = "${vion.consumer.storeGateMinuteCount.autoStartup:false}"
, groupId = "StoreGateMinuteCountToDb"
, groupId = "${vion.kafka.prefix}GateMinuteCountToDb"
, concurrency = "${vion.consumer.storeGateMinuteCount.concurrency:1}")
public void consumerMallGateMinuteCount(List<ConsumerRecord<String, String>> recordList, Acknowledgment ack) {
if (CollectionUtils.isEmpty(recordList)) {
......
......@@ -45,15 +45,18 @@ public class MallPersonLabelConsumer {
private KafkaProducerService kafkaProducerService;
@Resource
private SpeedStatService speedStatService;
@Value("${vion.kafka.prefix}")
private String kafkaPrefix;
@KafkaListener(topicPattern = "Store_PersonLabel"
@KafkaListener(topicPattern = "${vion.kafka.prefix}_PersonLabel"
, autoStartup = "${vion.consumer.storePersonLabel.autoStartup:false}"
, groupId = "StorePersonLabelToDb"
, groupId = "${vion.kafka.prefix}PersonLabelToDb"
, concurrency = "${vion.consumer.storePersonLabel.concurrency:1}")
public void consumerMallPersonLabel(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) {
if (CollectionUtils.isEmpty(recordList)) {
return;
}
String retryTopic = kafkaPrefix + "_Retry_PersonLabel";
try {
Map<String, List<ConsumerRecord<String, String>>> topicPartitionDataMap = recordList.stream().collect(Collectors.groupingBy(d -> d.topic() + "-" + d.partition()));
for (Map.Entry<String, List<ConsumerRecord<String, String>>> entry : topicPartitionDataMap.entrySet()) {
......@@ -86,7 +89,7 @@ public class MallPersonLabelConsumer {
log.error("处理Mall_PersonLabel[{}], batchSize={}, batchInsert.Exception={}", entry.getKey(), labelList.size(), ex.getMessage());
// 批量插入重试队列
long startSendTime = System.currentTimeMillis();
kafkaProducerService.sendMessages(KafkaConstants.TOPIC_STORE_RETRY_PERSONLABEL, labelList);
kafkaProducerService.sendMessages(retryTopic, labelList);
log.info("处理Mall_PersonLabel[{}], batchSendFinish耗时:{} ms", entry.getKey(), System.currentTimeMillis() - startSendTime);
}
}
......@@ -120,6 +123,7 @@ public class MallPersonLabelConsumer {
int total = recordList.size();
ExecutorService threadPool = Executors.newFixedThreadPool(batchThreadNum);
String retryTopic = kafkaPrefix + "_Retry_PersonLabel";
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < total; i += batchSize) {
List<PersonLabelContent> labelList = recordList.subList(i, Math.min(i + batchSize, total));
......@@ -131,7 +135,7 @@ public class MallPersonLabelConsumer {
log.error("处理Mall_PersonLabel分批处理, batchSize={}, batchInsert.Exception={}", labelList.size(), ex.getMessage());
// 批量插入重试队列
long startSendTime = System.currentTimeMillis();
kafkaProducerService.sendMessages(KafkaConstants.TOPIC_STORE_RETRY_PERSONLABEL, labelList);
kafkaProducerService.sendMessages(retryTopic, labelList);
log.info("处理Mall_PersonLabel分批处理, batchSendFinish, {}条,耗时:{} ms", labelList.size(), System.currentTimeMillis() - startSendTime);
}
});
......
......@@ -33,9 +33,9 @@ public class MallRetryFaceCaptureConsumer {
@Resource
private SpeedStatService speedStatService;
@KafkaListener(topics = KafkaConstants.TOPIC_STORE_RETRY_FACECAPTURE
@KafkaListener(topics = "${vion.kafka.prefix}_Retry_FaceCapture"
, autoStartup = "${vion.consumer.storeRetryFaceCapture.autoStartup:false}"
, groupId = "StoreRetryFaceCaptureToDb"
, groupId = "${vion.kafka.prefix}RetryFaceCaptureToDb"
, concurrency = "${vion.consumer.storeRetryFaceCapture.concurrency:1}")
public void consumerMallRetryFaceCapture(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) {
if (CollectionUtils.isEmpty(recordList)) {
......
......@@ -33,9 +33,9 @@ public class MallRetryPersonLabelConsumer {
@Resource
private SpeedStatService speedStatService;
@KafkaListener(topics = KafkaConstants.TOPIC_STORE_RETRY_PERSONLABEL
@KafkaListener(topics = "${vion.kafka.prefix}_Retry_PersonLabel"
, autoStartup = "${vion.consumer.storeRetryPersonLabel.autoStartup:false}"
, groupId = "StoreRetryPersonLabelToDb"
, groupId = "${vion.kafka.prefix}RetryPersonLabelToDb"
, concurrency = "${vion.consumer.storeRetryPersonLabel.concurrency:1}")
public void consumerMallRetryPersonLabel(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) {
if (CollectionUtils.isEmpty(recordList)) {
......
......@@ -37,9 +37,9 @@ public class MallZoneMinuteCountDataConsumer {
@Resource
private SpeedStatService speedStatService;
@KafkaListener(topics = KafkaConstants.STORE_ZONE_MINUTE_COUNT_DATA_TOPIC
@KafkaListener(topics = "${vion.kafka.prefix}_zone_minute_count_data"
, autoStartup = "${vion.consumer.storeZoneMinuteCount.autoStartup:false}"
, groupId = "StoreZoneMinuteCountToDb"
, groupId = "${vion.kafka.prefix}ZoneMinuteCountToDb"
, concurrency = "${vion.consumer.storeZoneMinuteCount.concurrency:1}")
public void consumerMallZoneMinuteCount(List<ConsumerRecord<String, String>> recordList, Acknowledgment ack) {
if (CollectionUtils.isEmpty(recordList)) {
......
......@@ -49,6 +49,7 @@ vion.topicCleanup.preDay=7
vion.topicCleanup.cron=0 0 1 * * ?
# topic consumer config
vion.kafka.prefix=Store
# FaceCapture
vion.consumer.storeFaceCapture.autoStartup=true
vion.consumer.storeFaceCapture.concurrency=1
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!