RAgentEventServiceImpl.java
5.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package vion.service.impl.monitor;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.github.yulichang.base.MPJBaseServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.dromara.hutool.core.collection.CollUtil;
import org.dromara.hutool.core.text.StrUtil;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import vion.mapper.monitor.MallMapper;
import vion.mapper.monitor.RAgentEventMapper;
import vion.model.monitor.Mall;
import vion.model.monitor.RAgentEvent;
import vion.service.monitor.IRAgentEventService;
import vion.utils.JsonUtil;
import vion.utils.TopicUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author vion
* @date 2024/10/31
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class RAgentEventServiceImpl extends MPJBaseServiceImpl<RAgentEventMapper, RAgentEvent> implements IRAgentEventService {
private final MallMapper mallMapper;
private final MqttClientTemplate client;
@Override
public String bind(String agentUid, String mallUid, List<RAgentEvent> dtoList) {
dtoList.forEach(ae -> ae.setAgentUid(agentUid));
var agentEventList = this.lambdaQuery().eq(RAgentEvent::getMallUid, mallUid).list();
var eventUid2AEMap = agentEventList.stream().collect(Collectors.toMap(RAgentEvent::getEventUid, Function.identity()));
List<RAgentEvent> updList = new ArrayList<>();
List<RAgentEvent> insList = new ArrayList<>();
dtoList.forEach(dto -> {
if (eventUid2AEMap.containsKey(dto.getEventUid())) {
var ae = eventUid2AEMap.get(dto.getEventUid());
dto.setId(ae.getId());
updList.add(dto);
} else {
insList.add(dto);
}
});
if (CollUtil.isNotEmpty(updList)) {
this.updateBatchById(updList);
}
if (CollUtil.isNotEmpty(insList)) {
this.saveBatch(insList);
}
var topic = TopicUtil.getEventTopic(agentUid);
if (StrUtil.isBlank(topic)) {
var msg = StrUtil.format("agent:{} 获取topic失败", agentUid);
log.error(msg);
return msg;
}
dtoList.forEach(dto -> client.publish(topic, JsonUtil.toJsonByte(dto), MqttQoS.QOS2));
return "绑定成功";
}
@Override
public String unbind(String agentUid, String mallUid, List<Long> idList) {
var agentEvents = this.listByIds(idList);
agentEvents.forEach(r -> r.setControlSwitch((short) 0));
var topic = TopicUtil.getEventTopic(agentUid);
if (StrUtil.isBlank(topic)) {
var msg = StrUtil.format("agent:{} 获取topic失败", agentUid);
log.error(msg);
return msg;
}
agentEvents.forEach(agentEvent -> client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2));
return this.removeBatchByIds(idList) ? "解绑成功" : "解绑失败";
}
@Override
@Transactional(rollbackFor = Exception.class)
public String bindBatch(String agentUid, RAgentEvent dto) {
var topic = TopicUtil.getEventTopic(agentUid);
if (StrUtil.isBlank(topic)) {
var msg = StrUtil.format("agent:{} 获取topic失败", agentUid);
log.error(msg);
return msg;
}
if (StrUtil.isNotBlank(dto.getAccountUid())) {
var accountUid = dto.getAccountUid();
var mallList = mallMapper.selectList(Wrappers.<Mall>lambdaQuery().eq(Mall::getAccountUid, accountUid));
mallList.forEach(m -> m.setAttention((short) 1));
mallMapper.updateById(mallList);
var agentEventList = mallList.stream().map(m -> {
var rAgentEvent = new RAgentEvent();
rAgentEvent.setAgentUid(agentUid);
rAgentEvent.setAccountUid(accountUid);
rAgentEvent.setMallUid(m.getUid());
rAgentEvent.setEventUid(dto.getEventUid());
rAgentEvent.setEventType(dto.getEventType());
rAgentEvent.setCron(dto.getCron());
rAgentEvent.setThreshold(dto.getThreshold());
rAgentEvent.setControlSwitch((short) 1);
return rAgentEvent;
}).toList();
this.saveBatch(agentEventList);
agentEventList.forEach(agentEvent -> client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2));
}
if (CollUtil.isNotEmpty(dto.getMallUidList())) {
var mallUidList = dto.getMallUidList();
var mallList = mallMapper.selectList(Wrappers.<Mall>lambdaQuery().in(Mall::getUid, mallUidList));
mallList.forEach(m -> m.setAttention((short) 1));
mallMapper.updateById(mallList);
var agentEventList = mallList.stream().map(m -> {
var rAgentEvent = new RAgentEvent();
rAgentEvent.setAgentUid(agentUid);
rAgentEvent.setAccountUid(m.getAccountUid());
rAgentEvent.setMallUid(m.getUid());
rAgentEvent.setEventUid(dto.getEventUid());
rAgentEvent.setEventType(dto.getEventType());
rAgentEvent.setCron(dto.getCron());
rAgentEvent.setThreshold(dto.getThreshold());
rAgentEvent.setControlSwitch((short) 1);
return rAgentEvent;
}).toList();
this.saveBatch(agentEventList);
agentEventList.forEach(agentEvent -> client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2));
}
return "批量绑定成功";
}
}