Commit 05d9b42e by 毛树良

[chg]:增加日志

1 parent 00bded54
......@@ -75,6 +75,7 @@ public class MallFaceCaptureConsumer {
}
// 插入数据
if (!CollectionUtils.isEmpty(faceDataList)) {
long dbStartTime = System.currentTimeMillis();
if (batchEnable == 1) {
// 分批处理
batchHandle(faceDataList);
......@@ -83,13 +84,14 @@ public class MallFaceCaptureConsumer {
dFaceRecognitionDao.batchInsert(faceDataList);
speedStatService.stat(RedisConstants.PDS_MALLFACECAPTURE_WRITE, faceDataList.size());
} catch (Exception ex) {
log.error("处理Mall_FaceCapture[{}], batchSize={}, batchInsert.Exception={}", entry.getKey(), faceDataList.size(), ex.getMessage(), ex);
log.error("处理Mall_FaceCapture[{}], batchSize={}, batchInsert.Exception={}", entry.getKey(), faceDataList.size(), ex.getMessage());
// 批量插入重试队列
long startSendTime = System.currentTimeMillis();
kafkaProducerService.sendMessages(KafkaConstants.TOPIC_MALL_RETRY_FACECAPTURE, faceDataList);
log.info("处理Mall_FaceCapture[{}], batchSendFinish, {}条,耗时:{} ms", entry.getKey(), faceDataList.size(), System.currentTimeMillis() - startSendTime);
}
}
log.info("处理Mall_FaceCapture[{}], dbHandleData, {}条,耗时:{} ms", entry.getKey(), faceDataList.size(), System.currentTimeMillis() - dbStartTime);
}
// 提交Offset
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
......@@ -104,6 +106,7 @@ public class MallFaceCaptureConsumer {
log.error("处理Mall_FaceCapture[{}], Throwable={}", entry.getKey(), e.getMessage(), e);
}
}
log.info("处理Mall_FaceCapture, batchHandleFinish");
} catch (Throwable exx) {
log.error("处理Mall_FaceCapture.Throwable={}", exx.getMessage(), exx);
}
......@@ -126,7 +129,7 @@ public class MallFaceCaptureConsumer {
dFaceRecognitionDao.batchInsert(faceDataList);
speedStatService.stat(RedisConstants.PDS_MALLFACECAPTURE_WRITE, faceDataList.size());
} catch (Exception ex) {
log.error("处理Mall_FaceCapture分批处理, batchSize={}, batchInsert.Exception={}", faceDataList.size(), ex.getMessage(), ex);
log.error("处理Mall_FaceCapture分批处理, batchSize={}, batchInsert.Exception={}", faceDataList.size(), ex.getMessage());
// 批量插入重试队列
long startSendTime = System.currentTimeMillis();
kafkaProducerService.sendMessages(KafkaConstants.TOPIC_MALL_RETRY_FACECAPTURE, faceDataList);
......
......@@ -74,6 +74,7 @@ public class MallPersonLabelConsumer {
}
}
if (!CollectionUtils.isEmpty(labelList)) {
long dbStartTime = System.currentTimeMillis();
if (batchEnable == 1) {
// 分批处理
batchHandle(labelList);
......@@ -82,13 +83,14 @@ public class MallPersonLabelConsumer {
dPersonLabelDao.batchInsert(labelList);
speedStatService.stat(RedisConstants.PDS_MALLPERSONLABEL_WRITE, labelList.size());
} catch (Exception ex) {
log.error("处理Mall_PersonLabel[{}], batchSize={}, batchInsert.Exception={}", entry.getKey(), labelList.size(), ex.getMessage(), ex);
log.error("处理Mall_PersonLabel[{}], batchSize={}, batchInsert.Exception={}", entry.getKey(), labelList.size(), ex.getMessage());
// 批量插入重试队列
long startSendTime = System.currentTimeMillis();
kafkaProducerService.sendMessages(KafkaConstants.TOPIC_MALL_RETRY_PERSONLABEL, labelList);
log.info("处理Mall_PersonLabel[{}], batchSendFinish耗时:{} ms", entry.getKey(), System.currentTimeMillis() - startSendTime);
}
}
log.info("处理Mall_PersonLabel[{}], dbHandleData, {}条,耗时:{} ms", entry.getKey(), labelList.size(), System.currentTimeMillis() - dbStartTime);
}
// 提交Offset
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
......@@ -103,6 +105,7 @@ public class MallPersonLabelConsumer {
log.error("处理Mall_PersonLabel[{}], Throwable={}", entry.getKey(), e.getMessage(), e);
}
}
log.info("处理Mall_PersonLabel, batchHandleFinish");
} catch (Throwable exx) {
log.error("处理Mall_PersonLabel.Throwable={}", exx.getMessage(), exx);
}
......@@ -125,7 +128,7 @@ public class MallPersonLabelConsumer {
dPersonLabelDao.batchInsert(labelList);
speedStatService.stat(RedisConstants.PDS_MALLPERSONLABEL_WRITE, labelList.size());
} catch (Exception ex) {
log.error("处理Mall_PersonLabel分批处理, batchSize={}, batchInsert.Exception={}", labelList.size(), ex.getMessage(), ex);
log.error("处理Mall_PersonLabel分批处理, batchSize={}, batchInsert.Exception={}", labelList.size(), ex.getMessage());
// 批量插入重试队列
long startSendTime = System.currentTimeMillis();
kafkaProducerService.sendMessages(KafkaConstants.TOPIC_MALL_RETRY_PERSONLABEL, labelList);
......
......@@ -34,7 +34,7 @@ spring.kafka.producer.acks=all
#spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.fetch-max-wait=100
spring.kafka.consumer.fetch-max-wait=50
spring.kafka.consumer.max-poll-records=100
#spring.kafka.consumer.properties.spring.json.trusted.packages=com.viontech.keliu.entity
spring.kafka.listener.ack-mode=manual_immediate
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!