Commit cf909009 by wenshuaiying

kafka的topic去掉后缀,避免消费大量的topic

1 parent 6bde3333
STORE_FACECAPTURE_TOPIC_PREFIX
\ No newline at end of file
package com.viontech.keliu.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.constants.KafkaConstants;
import com.viontech.keliu.constants.RedisConstants;
import com.viontech.keliu.dao.DFaceRecognitionDao;
import com.viontech.keliu.entity.FaceDataContent;
import com.viontech.keliu.service.KafkaProducerService;
import com.viontech.keliu.service.SpeedStatService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@Component
@Slf4j
public class MallFaceCaptureConsumer {
@Value("${vion.consumer.storeFaceCapture.batchEnable:0}")
private Integer batchEnable;
@Value("${vion.consumer.storeFaceCapture.batchSize:0}")
private Integer batchSize;
@Value("${vion.consumer.storeFaceCapture.batchThreadNum:0}")
private Integer batchThreadNum;
@Autowired
private ObjectMapper objectMapper;
@Resource
private DFaceRecognitionDao dFaceRecognitionDao;
@Resource
private KafkaProducerService kafkaProducerService;
@Resource
private SpeedStatService speedStatService;
@KafkaListener(topicPattern = "Store_FaceCapture"
, autoStartup = "${vion.consumer.storeFaceCapture.autoStartup:false}"
, groupId = "StoreFaceCaptureToDb"
, concurrency = "${vion.consumer.storeFaceCapture.concurrency:1}")
public void consumerMallFaceCapture(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) {
if (CollectionUtils.isEmpty(recordList)) {
return;
}
try {
Map<String, List<ConsumerRecord<String, String>>> topicPartitionDataMap = recordList.stream().collect(Collectors.groupingBy(d -> d.topic() + "-" + d.partition()));
for (Map.Entry<String, List<ConsumerRecord<String, String>>> entry : topicPartitionDataMap.entrySet()) {
try {
long startTime = System.currentTimeMillis();
List<ConsumerRecord<String, String>> recordValues = entry.getValue();
if (!CollectionUtils.isEmpty(recordValues)) {
ConsumerRecord<String, String> lastRecord = recordValues.get(recordValues.size() - 1);
List<FaceDataContent> faceDataList = new ArrayList<>();
for (ConsumerRecord<String, String> consumerRecord : recordValues) {
try {
FaceDataContent faceDataContent = objectMapper.readValue(consumerRecord.value(), FaceDataContent.class);
if (faceDataContent != null) {
faceDataList.add(faceDataContent);
}
} catch (Exception ee) {
log.error("处理Mall_FaceCapture[{}], JsonDeserializerThrowable={}", entry.getKey(), ee.getMessage(), ee);
}
}
// 插入数据
if (!CollectionUtils.isEmpty(faceDataList)) {
long dbStartTime = System.currentTimeMillis();
if (batchEnable == 1) {
// 分批处理
batchHandle(faceDataList);
} else {
try {
dFaceRecognitionDao.batchInsert(faceDataList);
speedStatService.stat(RedisConstants.PDS_MALLFACECAPTURE_WRITE, faceDataList.size());
} catch (Exception ex) {
log.error("处理Mall_FaceCapture[{}], batchSize={}, batchInsert.Exception={}", entry.getKey(), faceDataList.size(), ex.getMessage());
// 批量插入重试队列
long startSendTime = System.currentTimeMillis();
kafkaProducerService.sendMessages(KafkaConstants.TOPIC_STORE_RETRY_FACECAPTURE, faceDataList);
log.info("处理Mall_FaceCapture[{}], batchSendFinish, {}条,耗时:{} ms", entry.getKey(), faceDataList.size(), System.currentTimeMillis() - startSendTime);
}
}
log.info("处理Mall_FaceCapture[{}], dbHandleData, {}条,耗时:{} ms", entry.getKey(), faceDataList.size(), System.currentTimeMillis() - dbStartTime);
}
// 提交Offset
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(
new TopicPartition(lastRecord.topic(), lastRecord.partition()),
new OffsetAndMetadata(lastRecord.offset() + 1) // 提交下一条偏移量
);
consumer.commitSync(offsets);
}
log.info("处理Mall_FaceCapture[{}], batchHandle, {}条,耗时:{} ms", entry.getKey(), recordValues.size(), System.currentTimeMillis() - startTime);
} catch (Throwable e) {
log.error("处理Mall_FaceCapture[{}], Throwable={}", entry.getKey(), e.getMessage(), e);
}
}
log.info("处理Mall_FaceCapture, batchHandleFinish");
} catch (Throwable exx) {
log.error("处理Mall_FaceCapture.Throwable={}", exx.getMessage(), exx);
}
}
/**
* 分批处理
* @param recordList
*/
private void batchHandle(List<FaceDataContent> recordList) {
// 总记录数
int total = recordList.size();
ExecutorService threadPool = Executors.newFixedThreadPool(batchThreadNum);
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < total; i += batchSize) {
List<FaceDataContent> faceDataList = recordList.subList(i, Math.min(i + batchSize, total));
Future<?> future = threadPool.submit(() -> {
try {
dFaceRecognitionDao.batchInsert(faceDataList);
speedStatService.stat(RedisConstants.PDS_MALLFACECAPTURE_WRITE, faceDataList.size());
} catch (Exception ex) {
log.error("处理Mall_FaceCapture分批处理, batchSize={}, batchInsert.Exception={}", faceDataList.size(), ex.getMessage());
// 批量插入重试队列
long startSendTime = System.currentTimeMillis();
kafkaProducerService.sendMessages(KafkaConstants.TOPIC_STORE_RETRY_FACECAPTURE, faceDataList);
log.info("处理Mall_FaceCapture分批处理, batchSendFinish, {}条,耗时:{} ms", faceDataList.size(), System.currentTimeMillis() - startSendTime);
}
});
futureList.add(future);
}
threadPool.shutdown();
for (Future future : futureList) {
try {
future.get();
} catch (Exception e) {
log.error("batchHandle.getFuture.Exception={}", e.getMessage(), e);
}
}
}
}
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!