Commit 4f7cf1fb by wenshuaiying

替换topic的名字及配置key

1 parent c8f1cb7b
...@@ -29,21 +29,21 @@ public class MallDataStatisticsConsumer { ...@@ -29,21 +29,21 @@ public class MallDataStatisticsConsumer {
@Autowired @Autowired
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
@Value("${vion.consumer.mallDataStatistics.reidUrl:}") @Value("${vion.consumer.storeDataStatistics.reidUrl:}")
private String reidUrl; private String reidUrl;
@Value("${vion.consumer.mallDataStatistics.zoneNum:true}") @Value("${vion.consumer.storeDataStatistics.zoneNum:true}")
private boolean zoneNumEnable; private boolean zoneNumEnable;
@Value("${vion.consumer.mallDataStatistics.payEvent:false}") @Value("${vion.consumer.storeDataStatistics.payEvent:false}")
private boolean payEventEnable; private boolean payEventEnable;
private ExecutorService executorService = ThreadUtil.newFixedExecutor(1, 1024, "dataStatistics-", true); private ExecutorService executorService = ThreadUtil.newFixedExecutor(1, 1024, "dataStatistics-", true);
@KafkaListener(topics = KafkaConstants.STORE_DATA_STATISTICS_TOPIC @KafkaListener(topics = KafkaConstants.STORE_DATA_STATISTICS_TOPIC
, autoStartup = "${vion.consumer.mallDataStatistics.autoStartup:false}" , autoStartup = "${vion.consumer.storeDataStatistics.autoStartup:false}"
, groupId = "MallDataStiatistics" , groupId = "StoreDataStiatistics"
, concurrency = "${vion.consumer.mallDataStatistics.concurrency:1}") , concurrency = "${vion.consumer.storeDataStatistics.concurrency:1}")
public void consumerMallZoneMinuteCount(List<ConsumerRecord<String, String>> recordList, Acknowledgment ack) { public void consumerMallZoneMinuteCount(List<ConsumerRecord<String, String>> recordList, Acknowledgment ack) {
if (CollectionUtils.isEmpty(recordList)) { if (CollectionUtils.isEmpty(recordList)) {
return; return;
...@@ -61,18 +61,18 @@ public class MallDataStatisticsConsumer { ...@@ -61,18 +61,18 @@ public class MallDataStatisticsConsumer {
try { try {
MallDataStatisticsInfo mallDataStatisticsInfo = objectMapper.readValue(consumerRecord.value(), MallDataStatisticsInfo.class); MallDataStatisticsInfo mallDataStatisticsInfo = objectMapper.readValue(consumerRecord.value(), MallDataStatisticsInfo.class);
executorService.execute(() -> { executorService.execute(() -> {
log.info("处理mallDataStatisticsInfo,mallId:{},countDate:{}, 开始", mallDataStatisticsInfo.getMallId(), mallDataStatisticsInfo.getCountDate()); log.info("处理storeDataStatisticsInfo,mallId:{},countDate:{}, 开始", mallDataStatisticsInfo.getMallId(), mallDataStatisticsInfo.getCountDate());
execDataStatistics(mallDataStatisticsInfo); execDataStatistics(mallDataStatisticsInfo);
}); });
// execDataStatistics(mallDataStatisticsInfo); // execDataStatistics(mallDataStatisticsInfo);
} catch (Exception ee) { } catch (Exception ee) {
log.error("处理mallDataStatisticsInfo[{}], JsonDeserializerThrowable={}", entry.getKey(), ee.getMessage(), ee); log.error("处理storeDataStatisticsInfo[{}], JsonDeserializerThrowable={}", entry.getKey(), ee.getMessage(), ee);
} }
} }
} }
log.info("处理mallDataStatisticsInfo[{}], batchHandle耗时:{} ms", entry.getKey(), System.currentTimeMillis() - startTime); log.info("处理storeDataStatisticsInfo[{}], batchHandle耗时:{} ms", entry.getKey(), System.currentTimeMillis() - startTime);
} catch (Throwable e) { } catch (Throwable e) {
log.error("处理mallDataStatisticsInfo[{}], Throwable={}", entry.getKey(), e.getMessage(), e); log.error("处理storeDataStatisticsInfo[{}], Throwable={}", entry.getKey(), e.getMessage(), e);
} }
} }
} catch (Throwable exx) { } catch (Throwable exx) {
......
...@@ -31,11 +31,11 @@ import java.util.stream.Collectors; ...@@ -31,11 +31,11 @@ import java.util.stream.Collectors;
@Component @Component
@Slf4j @Slf4j
public class MallFaceCaptureConsumer { public class MallFaceCaptureConsumer {
@Value("${vion.consumer.mallFaceCapture.batchEnable:0}") @Value("${vion.consumer.storeFaceCapture.batchEnable:0}")
private Integer batchEnable; private Integer batchEnable;
@Value("${vion.consumer.mallFaceCapture.batchSize:0}") @Value("${vion.consumer.storeFaceCapture.batchSize:0}")
private Integer batchSize; private Integer batchSize;
@Value("${vion.consumer.mallFaceCapture.batchThreadNum:0}") @Value("${vion.consumer.storeFaceCapture.batchThreadNum:0}")
private Integer batchThreadNum; private Integer batchThreadNum;
@Autowired @Autowired
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
...@@ -46,10 +46,10 @@ public class MallFaceCaptureConsumer { ...@@ -46,10 +46,10 @@ public class MallFaceCaptureConsumer {
@Resource @Resource
private SpeedStatService speedStatService; private SpeedStatService speedStatService;
@KafkaListener(topicPattern = "Mall_FaceCapture_.*" @KafkaListener(topicPattern = "Store_FaceCapture_.*"
, autoStartup = "${vion.consumer.mallFaceCapture.autoStartup:false}" , autoStartup = "${vion.consumer.StoreFaceCapture.autoStartup:false}"
, groupId = "MallFaceCaptureToDb" , groupId = "StoreFaceCaptureToDb"
, concurrency = "${vion.consumer.mallFaceCapture.concurrency:1}") , concurrency = "${vion.consumer.storeFaceCapture.concurrency:1}")
public void consumerMallFaceCapture(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) { public void consumerMallFaceCapture(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) {
if (CollectionUtils.isEmpty(recordList)) { if (CollectionUtils.isEmpty(recordList)) {
return; return;
......
...@@ -26,9 +26,9 @@ import java.util.stream.Collectors; ...@@ -26,9 +26,9 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
@Component @Component
public class MallGateMinuteCountDataConsumer { public class MallGateMinuteCountDataConsumer {
@Value("${vion.consumer.mallGateMinuteCount.batchSize:20}") @Value("${vion.consumer.storeGateMinuteCount.batchSize:20}")
private Integer batchSize; private Integer batchSize;
@Value("${vion.consumer.mallGateMinuteCount.batchThreadNum:5}") @Value("${vion.consumer.storeGateMinuteCount.batchThreadNum:5}")
private Integer batchThreadNum; private Integer batchThreadNum;
@Autowired @Autowired
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
...@@ -38,9 +38,9 @@ public class MallGateMinuteCountDataConsumer { ...@@ -38,9 +38,9 @@ public class MallGateMinuteCountDataConsumer {
private SpeedStatService speedStatService; private SpeedStatService speedStatService;
@KafkaListener(topics = KafkaConstants.STORE_GATE_MINUTE_COUNT_DATA_TOPIC @KafkaListener(topics = KafkaConstants.STORE_GATE_MINUTE_COUNT_DATA_TOPIC
, autoStartup = "${vion.consumer.mallGateMinuteCount.autoStartup:false}" , autoStartup = "${vion.consumer.storeGateMinuteCount.autoStartup:false}"
, groupId = "MallGateMinuteCountToDb" , groupId = "MallGateMinuteCountToDb"
, concurrency = "${vion.consumer.mallGateMinuteCount.concurrency:1}") , concurrency = "${vion.consumer.storeGateMinuteCount.concurrency:1}")
public void consumerMallGateMinuteCount(List<ConsumerRecord<String, String>> recordList, Acknowledgment ack) { public void consumerMallGateMinuteCount(List<ConsumerRecord<String, String>> recordList, Acknowledgment ack) {
if (CollectionUtils.isEmpty(recordList)) { if (CollectionUtils.isEmpty(recordList)) {
return; return;
......
...@@ -31,11 +31,11 @@ import java.util.stream.Collectors; ...@@ -31,11 +31,11 @@ import java.util.stream.Collectors;
@Component @Component
@Slf4j @Slf4j
public class MallPersonLabelConsumer { public class MallPersonLabelConsumer {
@Value("${vion.consumer.mallPersonLabel.batchEnable:0}") @Value("${vion.consumer.storePersonLabel.batchEnable:0}")
private Integer batchEnable; private Integer batchEnable;
@Value("${vion.consumer.mallPersonLabel.batchSize:0}") @Value("${vion.consumer.storePersonLabel.batchSize:0}")
private Integer batchSize; private Integer batchSize;
@Value("${vion.consumer.mallPersonLabel.batchThreadNum:0}") @Value("${vion.consumer.storePersonLabel.batchThreadNum:0}")
private Integer batchThreadNum; private Integer batchThreadNum;
@Autowired @Autowired
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
...@@ -46,10 +46,10 @@ public class MallPersonLabelConsumer { ...@@ -46,10 +46,10 @@ public class MallPersonLabelConsumer {
@Resource @Resource
private SpeedStatService speedStatService; private SpeedStatService speedStatService;
@KafkaListener(topicPattern = "Mall_PersonLabel_.*" @KafkaListener(topicPattern = "Store_PersonLabel_.*"
, autoStartup = "${vion.consumer.mallPersonLabel.autoStartup:false}" , autoStartup = "${vion.consumer.storePersonLabel.autoStartup:false}"
, groupId = "MallPersonLabelToDb" , groupId = "StorePersonLabelToDb"
, concurrency = "${vion.consumer.mallPersonLabel.concurrency:1}") , concurrency = "${vion.consumer.storePersonLabel.concurrency:1}")
public void consumerMallPersonLabel(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) { public void consumerMallPersonLabel(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) {
if (CollectionUtils.isEmpty(recordList)) { if (CollectionUtils.isEmpty(recordList)) {
return; return;
......
...@@ -34,9 +34,9 @@ public class MallRetryFaceCaptureConsumer { ...@@ -34,9 +34,9 @@ public class MallRetryFaceCaptureConsumer {
private SpeedStatService speedStatService; private SpeedStatService speedStatService;
@KafkaListener(topics = KafkaConstants.TOPIC_STORE_RETRY_FACECAPTURE @KafkaListener(topics = KafkaConstants.TOPIC_STORE_RETRY_FACECAPTURE
, autoStartup = "${vion.consumer.mallRetryFaceCapture.autoStartup:false}" , autoStartup = "${vion.consumer.storeRetryFaceCapture.autoStartup:false}"
, groupId = "MallRetryFaceCaptureToDb" , groupId = "StoreRetryFaceCaptureToDb"
, concurrency = "${vion.consumer.mallRetryFaceCapture.concurrency:1}") , concurrency = "${vion.consumer.storeRetryFaceCapture.concurrency:1}")
public void consumerMallRetryFaceCapture(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) { public void consumerMallRetryFaceCapture(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) {
if (CollectionUtils.isEmpty(recordList)) { if (CollectionUtils.isEmpty(recordList)) {
return; return;
......
...@@ -34,9 +34,9 @@ public class MallRetryPersonLabelConsumer { ...@@ -34,9 +34,9 @@ public class MallRetryPersonLabelConsumer {
private SpeedStatService speedStatService; private SpeedStatService speedStatService;
@KafkaListener(topics = KafkaConstants.TOPIC_STORE_RETRY_PERSONLABEL @KafkaListener(topics = KafkaConstants.TOPIC_STORE_RETRY_PERSONLABEL
, autoStartup = "${vion.consumer.mallRetryPersonLabel.autoStartup:false}" , autoStartup = "${vion.consumer.storeRetryPersonLabel.autoStartup:false}"
, groupId = "MallRetryPersonLabelToDb" , groupId = "StoreRetryPersonLabelToDb"
, concurrency = "${vion.consumer.mallRetryPersonLabel.concurrency:1}") , concurrency = "${vion.consumer.storeRetryPersonLabel.concurrency:1}")
public void consumerMallRetryPersonLabel(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) { public void consumerMallRetryPersonLabel(List<ConsumerRecord<String, String>> recordList, Consumer<?, ?> consumer) {
if (CollectionUtils.isEmpty(recordList)) { if (CollectionUtils.isEmpty(recordList)) {
return; return;
......
...@@ -26,9 +26,9 @@ import java.util.stream.Collectors; ...@@ -26,9 +26,9 @@ import java.util.stream.Collectors;
@Slf4j @Slf4j
@Component @Component
public class MallZoneMinuteCountDataConsumer { public class MallZoneMinuteCountDataConsumer {
@Value("${vion.consumer.mallZoneMinuteCount.batchSize:20}") @Value("${vion.consumer.storeZoneMinuteCount.batchSize:20}")
private Integer batchSize; private Integer batchSize;
@Value("${vion.consumer.mallZoneMinuteCount.batchThreadNum:5}") @Value("${vion.consumer.storeZoneMinuteCount.batchThreadNum:5}")
private Integer batchThreadNum; private Integer batchThreadNum;
@Autowired @Autowired
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
...@@ -38,9 +38,9 @@ public class MallZoneMinuteCountDataConsumer { ...@@ -38,9 +38,9 @@ public class MallZoneMinuteCountDataConsumer {
private SpeedStatService speedStatService; private SpeedStatService speedStatService;
@KafkaListener(topics = KafkaConstants.STORE_ZONE_MINUTE_COUNT_DATA_TOPIC @KafkaListener(topics = KafkaConstants.STORE_ZONE_MINUTE_COUNT_DATA_TOPIC
, autoStartup = "${vion.consumer.mallZoneMinuteCount.autoStartup:false}" , autoStartup = "${vion.consumer.storeZoneMinuteCount.autoStartup:false}"
, groupId = "MallZoneMinuteCountToDb" , groupId = "StoreZoneMinuteCountToDb"
, concurrency = "${vion.consumer.mallZoneMinuteCount.concurrency:1}") , concurrency = "${vion.consumer.storeZoneMinuteCount.concurrency:1}")
public void consumerMallZoneMinuteCount(List<ConsumerRecord<String, String>> recordList, Acknowledgment ack) { public void consumerMallZoneMinuteCount(List<ConsumerRecord<String, String>> recordList, Acknowledgment ack) {
if (CollectionUtils.isEmpty(recordList)) { if (CollectionUtils.isEmpty(recordList)) {
return; return;
......
...@@ -50,35 +50,35 @@ vion.topicCleanup.cron=0 0 1 * * ? ...@@ -50,35 +50,35 @@ vion.topicCleanup.cron=0 0 1 * * ?
# topic consumer config # topic consumer config
# FaceCapture # FaceCapture
vion.consumer.mallFaceCapture.autoStartup=true vion.consumer.storeFaceCapture.autoStartup=true
vion.consumer.mallFaceCapture.concurrency=1 vion.consumer.storeFaceCapture.concurrency=1
vion.consumer.mallFaceCapture.batchEnable=1 vion.consumer.storeFaceCapture.batchEnable=1
vion.consumer.mallFaceCapture.batchSize=100 vion.consumer.storeFaceCapture.batchSize=100
vion.consumer.mallFaceCapture.batchThreadNum=10 vion.consumer.storeFaceCapture.batchThreadNum=10
vion.consumer.mallRetryFaceCapture.autoStartup=false vion.consumer.storeRetryFaceCapture.autoStartup=false
vion.consumer.mallRetryFaceCapture.concurrency=1 vion.consumer.storeRetryFaceCapture.concurrency=1
# PersonLabel # PersonLabel
vion.consumer.mallPersonLabel.autoStartup=false vion.consumer.storePersonLabel.autoStartup=false
vion.consumer.mallPersonLabel.concurrency=1 vion.consumer.storePersonLabel.concurrency=1
vion.consumer.mallPersonLabel.batchEnable=1 vion.consumer.storePersonLabel.batchEnable=1
vion.consumer.mallPersonLabel.batchSize=100 vion.consumer.storePersonLabel.batchSize=100
vion.consumer.mallPersonLabel.batchThreadNum=10 vion.consumer.storePersonLabel.batchThreadNum=10
vion.consumer.mallRetryPersonLabel.autoStartup=false vion.consumer.storeRetryPersonLabel.autoStartup=false
vion.consumer.mallRetryPersonLabel.concurrency=1 vion.consumer.storeRetryPersonLabel.concurrency=1
# d_gate_minute_count_data # d_gate_minute_count_data
vion.consumer.mallGateMinuteCount.autoStartup=true vion.consumer.storeGateMinuteCount.autoStartup=true
vion.consumer.mallGateMinuteCount.concurrency=1 vion.consumer.storeGateMinuteCount.concurrency=1
vion.consumer.mallGateMinuteCount.batchSize=20 vion.consumer.storeGateMinuteCount.batchSize=20
vion.consumer.mallGateMinuteCount.batchThreadNum=5 vion.consumer.storeGateMinuteCount.batchThreadNum=5
# d_zone_minute_count_data # d_zone_minute_count_data
vion.consumer.mallZoneMinuteCount.autoStartup=true vion.consumer.storeZoneMinuteCount.autoStartup=true
vion.consumer.mallZoneMinuteCount.concurrency=1 vion.consumer.storeZoneMinuteCount.concurrency=1
vion.consumer.mallZoneMinuteCount.batchSize=20 vion.consumer.storeZoneMinuteCount.batchSize=20
vion.consumer.mallZoneMinuteCount.batchThreadNum=5 vion.consumer.storeZoneMinuteCount.batchThreadNum=5
# mall dataStiatistics # store dataStiatistics
vion.consumer.mallDataStatistics.autoStartup=true vion.consumer.storeDataStatistics.autoStartup=true
vion.consumer.mallDataStatistics.concurrency=1 vion.consumer.storeDataStatistics.concurrency=1
vion.consumer.mallDataStatistics.reidUrl=http://10.0.14.11:23456 vion.consumer.storeDataStatistics.reidUrl=http://10.0.14.11:23456
vion.consumer.mallDataStatistics.zoneNum=false vion.consumer.storeDataStatistics.zoneNum=false
vion.consumer.mallDataStatistics.payEvent=false vion.consumer.storeDataStatistics.payEvent=false
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!