MallGateMinuteCountDataConsumer.java
8.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package com.viontech.keliu.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.constants.KafkaConstants;
import com.viontech.keliu.constants.ProcessConstants;
import com.viontech.keliu.constants.RedisConstants;
import com.viontech.keliu.dao.DGateMinuteCountDataDao;
import com.viontech.keliu.entity.GateCountData;
import com.viontech.keliu.service.SpeedStatService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@Slf4j
@Component
public class MallGateMinuteCountDataConsumer {
@Value("${vion.consumer.mallGateMinuteCount.batchSize:20}")
private Integer batchSize;
@Value("${vion.consumer.mallGateMinuteCount.batchThreadNum:5}")
private Integer batchThreadNum;
@Autowired
private ObjectMapper objectMapper;
@Resource
private DGateMinuteCountDataDao dGateMinuteCountDataDao;
@Resource
private SpeedStatService speedStatService;
@KafkaListener(topics = KafkaConstants.MALL_GATE_MINUTE_COUNT_DATA_TOPIC
, autoStartup = "${vion.consumer.mallGateMinuteCount.autoStartup:false}"
, groupId = "MallGateMinuteCountToDb"
, concurrency = "${vion.consumer.mallGateMinuteCount.concurrency:1}")
public void consumerMallGateMinuteCount(List<ConsumerRecord<String, String>> recordList, Acknowledgment ack) {
if (CollectionUtils.isEmpty(recordList)) {
return;
}
try {
long startTime = System.currentTimeMillis();
List<GateCountData> dataList = new ArrayList<>();
for (ConsumerRecord<String, String> consumerRecord : recordList) {
try {
GateCountData dataContent = objectMapper.readValue(consumerRecord.value(), GateCountData.class);
if (dataContent != null) {
dataList.add(dataContent);
}
} catch (Exception ee) {
log.error("处理Mall_GateMinuteCount.offset={}, JsonDeserializerThrowable={}", consumerRecord.offset(), ee.getMessage(), ee);
}
}
if (!CollectionUtils.isEmpty(dataList)) {
// 先插入再更新
List<GateCountData> insertList = dataList.stream().filter(d -> ProcessConstants.DbOperationType.INSERT.equals(d.getOperationType())).collect(Collectors.toList());
batchHandle(insertList, ProcessConstants.DbOperationType.INSERT);
List<GateCountData> updateList = dataList.stream().filter(d -> ProcessConstants.DbOperationType.UPDATE.equals(d.getOperationType())).collect(Collectors.toList());
batchHandle(updateList, ProcessConstants.DbOperationType.UPDATE);
}
log.info("处理Mall_GateMinuteCount, batchHandle, {}条,耗时:{} ms", dataList.size(), System.currentTimeMillis() - startTime);
} catch (Throwable exx) {
log.error("处理Mall_GateMinuteCount.Throwable={}", exx.getMessage(), exx);
}
ack.acknowledge();
}
/**
* 分批处理
* @param recordList
*/
private void batchHandle(List<GateCountData> recordList, String operationType) {
if (CollectionUtils.isEmpty(recordList)) {
return;
}
// 总记录数
int total = recordList.size();
ExecutorService threadPool = Executors.newFixedThreadPool(batchThreadNum);
List<Future> futureList = new ArrayList<>();
for (int i = 0; i < total; i += batchSize) {
List<GateCountData> subList = recordList.subList(i, Math.min(i + batchSize, total));
Future<?> future = threadPool.submit(() -> {
try {
if (ProcessConstants.DbOperationType.INSERT.equals(operationType)) {
insertHandle(subList);
} else if (ProcessConstants.DbOperationType.UPDATE.equals(operationType)) {
updateHandle(subList);
}
} catch (Exception ex) {
log.error("处理Mall_GateMinuteCount分批处理, batchSize={}, batchException={}", subList.size(), ex.getMessage(), ex);
}
});
futureList.add(future);
}
threadPool.shutdown();
for (Future future : futureList) {
try {
future.get();
} catch (Exception e) {
log.error("batchHandle.getFuture.Exception={}", e.getMessage(), e);
}
}
}
/**
* 处理插入操作
* 首先进行批量插入,插入失败后单条插入
* @param insertList
*/
private void insertHandle(List<GateCountData> insertList) {
if (CollectionUtils.isEmpty(insertList)) {
return;
}
// 成功插入条数
Integer insertCount = 0;
try {
dGateMinuteCountDataDao.batchInsert(insertList);
insertCount = insertList.size();
} catch (Exception ex) {
log.error("处理Mall_GateMinuteCount.insertHandle={}, batchInsert.Exception={}", insertList.size(), ex.getMessage(), ex);
log.info("准备二次单条插入,处理Mall_GateMinuteCount.insertHandle={}, batchInsert.Exception={}", insertList.size(), ex.getMessage());
for (GateCountData gateCountData : insertList) {
try {
dGateMinuteCountDataDao.insert(gateCountData);
insertCount = insertCount + 1;
} catch (Exception e) {
try {
log.info("数据二次写入错误:{}", objectMapper.writeValueAsString(gateCountData));
log.error("数据二次写入错误:{}", e.getMessage(), e);
} catch (Exception ee) {
log.error("数据二次写入错误日志:{}", ee.getMessage(), ee);
}
}
}
}
speedStatService.stat(RedisConstants.PDS_MALLGATEMINUTECOUNT_WRITE, insertCount);
}
/**
* 处理更新操作
* 首先进行批量插入,插入失败后单条插入
* @param updateList
*/
private void updateHandle(List<GateCountData> updateList) {
if (CollectionUtils.isEmpty(updateList)) {
return;
}
// 成功插入条数
Integer updateCount = 0;
try {
dGateMinuteCountDataDao.batchUpdate(updateList);
updateCount = updateList.size();
} catch (Exception ex) {
log.error("处理Mall_GateMinuteCount.updateHandle={}, batchUpdate.Exception={}", updateList.size(), ex.getMessage(), ex);
log.info("准备二次单条更新,处理Mall_GateMinuteCount.updateHandle={}, batchUpdate.Exception={}", updateList.size(), ex.getMessage());
for (GateCountData gateCountData : updateList) {
try {
dGateMinuteCountDataDao.update(gateCountData);
updateCount = updateCount + 1;
} catch (Exception e) {
try {
log.info("数据二次更新错误:{}", objectMapper.writeValueAsString(gateCountData));
log.error("数据二次更新错误:{}", e.getMessage(), e);
} catch (Exception ee) {
log.error("数据二次更新错误日志:{}", ee.getMessage(), ee);
}
}
}
}
speedStatService.stat(RedisConstants.PDS_MALLGATEMINUTECOUNT_WRITE, updateCount);
}
}