Commit 19e66876 by HlQ

[add] 接收 Agent 发送的消息逻辑修改

1 parent dbd8519e
...@@ -7,12 +7,13 @@ import net.dreamlu.iot.mqtt.codec.MqttQoS; ...@@ -7,12 +7,13 @@ import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe; import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate; import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.dromara.hutool.core.collection.CollUtil; import org.dromara.hutool.core.collection.CollUtil;
import org.dromara.hutool.core.collection.ListUtil;
import org.dromara.hutool.core.text.StrUtil; import org.dromara.hutool.core.text.StrUtil;
import org.dromara.hutool.core.thread.ThreadUtil; import org.dromara.hutool.core.thread.ThreadUtil;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import vion.constant.MqttMessageType; import vion.constant.MqttMessageType;
import vion.model.monitor.EventRecord;
import vion.model.monitor.RAgentEvent; import vion.model.monitor.RAgentEvent;
import vion.model.monitor.SendData;
import vion.service.monitor.IEventRecordService; import vion.service.monitor.IEventRecordService;
import vion.service.monitor.IRAgentEventService; import vion.service.monitor.IRAgentEventService;
import vion.utils.JsonUtil; import vion.utils.JsonUtil;
...@@ -34,7 +35,7 @@ public class MqttClientMessageListener { ...@@ -34,7 +35,7 @@ public class MqttClientMessageListener {
@MqttClientSubscribe("${mqtt.client.server-topic:/MS/receive}") @MqttClientSubscribe("${mqtt.client.server-topic:/MS/receive}")
public void onMessage(String topic, byte[] payload) { public void onMessage(String topic, byte[] payload) {
log.info("topic:{} payload:{}", topic, StrUtil.utf8Str(payload)); log.info("topic:[{}] 收到消息", topic);
try { try {
JsonNode jsonObj = JsonUtil.parseTree(payload); JsonNode jsonObj = JsonUtil.parseTree(payload);
...@@ -94,9 +95,9 @@ public class MqttClientMessageListener { ...@@ -94,9 +95,9 @@ public class MqttClientMessageListener {
log.error("agent:{} 获取topic失败", agentUid); log.error("agent:{} 获取topic失败", agentUid);
return; return;
} }
log.info("agent:{} topic:{} 开始下发事件监测", agentUid, topic); log.info("收到注册消息,agent:{} topic:{} 开始下发事件监测", agentUid, topic);
// agent绑定事件过多,分批下发 // agent绑定事件过多,分批下发
var partition = CollUtil.partition(list, 5000); var partition = ListUtil.partition(list, 5000);
for (List<RAgentEvent> rAgentEvents : partition) { for (List<RAgentEvent> rAgentEvents : partition) {
for (RAgentEvent agentEvent : rAgentEvents) { for (RAgentEvent agentEvent : rAgentEvents) {
client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2); client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2);
...@@ -112,8 +113,7 @@ public class MqttClientMessageListener { ...@@ -112,8 +113,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleDeviceOffline(String payloadStr) { private void handleDeviceOffline(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
/** /**
...@@ -122,8 +122,7 @@ public class MqttClientMessageListener { ...@@ -122,8 +122,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleReid(String payloadStr) { private void handleReid(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
/** /**
...@@ -132,8 +131,7 @@ public class MqttClientMessageListener { ...@@ -132,8 +131,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleDeviceRegistration(String payloadStr) { private void handleDeviceRegistration(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
/** /**
...@@ -142,8 +140,7 @@ public class MqttClientMessageListener { ...@@ -142,8 +140,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleHeadcountRatio(String payloadStr) { private void handleHeadcountRatio(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
/** /**
...@@ -152,8 +149,7 @@ public class MqttClientMessageListener { ...@@ -152,8 +149,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleStaffRecognize(String payloadStr) { private void handleStaffRecognize(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
/** /**
...@@ -162,8 +158,7 @@ public class MqttClientMessageListener { ...@@ -162,8 +158,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handlePassengerFlowInterrupt(String payloadStr) { private void handlePassengerFlowInterrupt(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
// region store 事件 // region store 事件
...@@ -174,8 +169,7 @@ public class MqttClientMessageListener { ...@@ -174,8 +169,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleStoreCustomerUndulate(String payloadStr) { private void handleStoreCustomerUndulate(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
/** /**
...@@ -184,8 +178,7 @@ public class MqttClientMessageListener { ...@@ -184,8 +178,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleStoreGateDateUndulate(String payloadStr) { private void handleStoreGateDateUndulate(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
/** /**
...@@ -194,8 +187,7 @@ public class MqttClientMessageListener { ...@@ -194,8 +187,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleStoreInoutMatchRatio(String payloadStr) { private void handleStoreInoutMatchRatio(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
/** /**
...@@ -204,8 +196,7 @@ public class MqttClientMessageListener { ...@@ -204,8 +196,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleStoreSingleCluster(String payloadStr) { private void handleStoreSingleCluster(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
...@@ -215,8 +206,7 @@ public class MqttClientMessageListener { ...@@ -215,8 +206,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleStoreEnterRatio(String payloadStr) { private void handleStoreEnterRatio(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
// endregion // endregion
...@@ -228,8 +218,7 @@ public class MqttClientMessageListener { ...@@ -228,8 +218,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleMallInoutDiff(String payloadStr) { private void handleMallInoutDiff(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
/** /**
...@@ -238,8 +227,7 @@ public class MqttClientMessageListener { ...@@ -238,8 +227,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleMallShopInoutDiff(String payloadStr) { private void handleMallShopInoutDiff(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
/** /**
...@@ -248,8 +236,7 @@ public class MqttClientMessageListener { ...@@ -248,8 +236,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleMallDataUndulate(String payloadStr) { private void handleMallDataUndulate(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
/** /**
...@@ -258,8 +245,7 @@ public class MqttClientMessageListener { ...@@ -258,8 +245,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleMallGateDataUndulate(String payloadStr) { private void handleMallGateDataUndulate(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
/** /**
...@@ -268,8 +254,7 @@ public class MqttClientMessageListener { ...@@ -268,8 +254,7 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleMallShopDataUndulate(String payloadStr) { private void handleMallShopDataUndulate(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
/** /**
...@@ -278,9 +263,27 @@ public class MqttClientMessageListener { ...@@ -278,9 +263,27 @@ public class MqttClientMessageListener {
* @param payloadStr 事件记录 * @param payloadStr 事件记录
*/ */
private void handleMallDilatationUndulate(String payloadStr) { private void handleMallDilatationUndulate(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class); saveRecInDB(payloadStr);
recordService.save(eventRecord);
} }
// endregion // endregion
/**
* agent 发送过来的记录入库
*
* @param payloadStr 事件记录
*/
private void saveRecInDB(String payloadStr) {
SendData sendData = JsonUtil.parseObject(payloadStr, SendData.class);
var recList = sendData.getRecList();
if (CollUtil.isNotEmpty(recList)) {
var eventRecordList = recList.stream().peek(rec -> {
rec.setAgentUid(sendData.getAgentUid());
rec.setAgentType(sendData.getAgentType());
rec.setEventUid(sendData.getEventUid());
rec.setEventType(sendData.getEventType());
}).toList();
recordService.saveBatch(eventRecordList);
}
}
} }
package vion.model.monitor;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
@Getter
@Setter
public class SendData {
/**
* agent uid
*/
private String agentUid;
/**
* agent类型 1:store,2:mall
*/
private Short agentType;
/**
* 事件唯一ID
*/
private String eventUid;
/**
* 事件类型
*/
private String eventType;
/**
* 阈值
*/
private String threshold;
/**
* 记录
*/
private List<EventRecord> recList;
}
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!