Commit a9f661be by 姚冰

[add] agent注册添加事件下发

1 parent 66424764
package vion.constant;
public enum AgentTypeEnum {
STORE((short) 1, "store"),
MALL((short) 2, "mall"),
;
private final Short code;
private final String desc;
AgentTypeEnum(Short code, String desc) {
this.code = code;
this.desc = desc;
}
public Short getCode() {
return code;
}
}
......@@ -7,6 +7,7 @@ import java.util.Arrays;
@Getter
public enum MqttMessageType {
REGISTER("REGISTER"),
DEVICE_OFFLINE("DEVICE_OFFLINE"),
UPGRADE("UPGRADE"),
WHOLEDAYA_ANALYZE("WHOLE_ANALYZE_EXCEPTION");
......
package vion.constant;
public class MqttTopic {
//store agent 事件监听
public final static CharSequence STORE_EVENT_TOPIC = "/SA/{}/event/push";
//store agent 升级监听
public final static CharSequence STORE_OTA_TOPIC = "/SA/{}/ota";
//store agent 服务列表监听
public final static CharSequence STORE_SERVICE_TOPIC = "/SA/{}/service/push";
//mall agent 事件监听
public final static CharSequence MALL_EVENT_TOPIC = "/MA/{}/event/push";
//mall agent 升级监听
public final static CharSequence MALL_OTA_TOPIC = "/MA/{}/ota";
//mall agent 服务列表监听
public final static CharSequence MALL_SERVICE_TOPIC = "/MA/{}/service/push";
}
......@@ -4,10 +4,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import vion.dto.MqttAuthDTO;
import vion.service.monitor.IAgentService;
......@@ -36,4 +33,7 @@ public class MqttController {
public void disconnect(@RequestBody JsonNode disconnectInfo) {
agentService.disconnect(disconnectInfo);
}
@GetMapping("/stopEvent")
public void stopEvent() {agentService.stopEvent();}
}
package vion.event.mqtt;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.RequiredArgsConstructor;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener;
import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.dromara.hutool.core.bean.BeanUtil;
import org.dromara.hutool.core.text.StrUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -14,8 +19,13 @@ import org.tio.core.ChannelContext;
import vion.constant.EventStatusEnum;
import vion.constant.MqttMessageType;
import vion.dto.OfflineDevice;
import vion.mapper.monitor.AgentMapper;
import vion.mapper.monitor.RAgentEventMapper;
import vion.model.monitor.EventRecord;
import vion.model.monitor.RAgentEvent;
import vion.service.monitor.IAgentService;
import vion.service.monitor.IEventRecordService;
import vion.service.monitor.IRAgentEventService;
import vion.utils.JsonUtil;
import java.nio.charset.StandardCharsets;
......@@ -30,11 +40,19 @@ import static vion.constant.MqttMessageType.WHOLEDAYA_ANALYZE;
*/
@Service
@MqttClientSubscribe("${mqtt.client.server-topic:/MS/receive}")
@RequiredArgsConstructor
public class MqttClientMessageListener implements IMqttClientMessageListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientMessageListener.class);
@Autowired
private IEventRecordService recordService;
private final MqttClientTemplate client;
private final IAgentService agentService;
private final IEventRecordService recordService;
private final AgentMapper agentMapper;
private final RAgentEventMapper agentEventMapper;
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
......@@ -46,6 +64,11 @@ public class MqttClientMessageListener implements IMqttClientMessageListener {
String type = object.get("eventType").asText();
switch (MqttMessageType.getEnumByType(type)) {
case MqttMessageType.REGISTER:
logger.info("设备注册:{}", payloadStr);
String agentUid = object.get("agentUid").asText();
updateTaskByAgent(agentUid);
break;
case MqttMessageType.DEVICE_OFFLINE:
//设备离线
handleDeviceOffline(payloadStr);
......@@ -63,6 +86,27 @@ public class MqttClientMessageListener implements IMqttClientMessageListener {
}
}
//下发事件
private void updateTaskByAgent(String agentUid) {
// 发送消息
// MqttClientKit.publish(topic, payload);
QueryWrapper<RAgentEvent> wrapper = new QueryWrapper<RAgentEvent>().eq("agent_uid", agentUid).eq("control_switch", 1);
List<RAgentEvent> list = agentEventMapper.selectList(wrapper);
if (list == null || list.isEmpty()) {
logger.info("agent:{} 未配置事件监测", agentUid);
return;
}
String topic = agentService.getAgentEventTopic(agentUid);
if (topic == null) {
logger.info("agent:{} 获取topic失败", agentUid);
return;
}
logger.info("agent:{} 开始下发事件监测", agentUid);
for (RAgentEvent agentEvent : list) {
client.publish(StrUtil.format("/SA/{}/event/push", agentUid), JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2);
}
}
//设备离线上报
private void handleDeviceOffline(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class);
......
package vion.service.impl.monitor;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.segments.MergeSegments;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
......@@ -18,8 +24,12 @@ import org.dromara.hutool.core.text.split.SplitUtil;
import org.dromara.hutool.core.util.ByteUtil;
import org.dromara.hutool.core.util.ObjUtil;
import org.dromara.hutool.core.util.RandomUtil;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import vion.constant.AgentTypeEnum;
import vion.constant.EventStatusEnum;
import vion.constant.MqttTopic;
import vion.dto.AgentDTO;
import vion.dto.MqttAuthDTO;
import vion.mapper.monitor.AgentMapper;
......@@ -45,9 +55,12 @@ public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> imp
private final IServiceInfoService serviceInfoService;
private final IServiceRecordService serviceRecordService;
private final IRAgentServiceService rAgentServiceService;
private final IRAgentEventService irAgentEventService;
private final IUpgradeService upgradeService;
private final Converter converter;
private final MqttClientTemplate client;
@Value(value = "${mqtt.client.server-topic:/MS/receive}")
private String serverTopic;
@Override
public Page<AgentVO> list(AgentDTO dto) {
......@@ -183,6 +196,7 @@ public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> imp
.put("clientid", agent.getUid())
.put("username", agent.getUsername())
.put("password", agent.getPassword())
.put("serverTopic", serverTopic)
.putPOJO("serviceList", serviceList)
.toString();
}
......@@ -210,7 +224,7 @@ public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> imp
@Override
public ObjectNode auth(MqttAuthDTO dto) {
var clientid = dto.getClientid();
if (StrUtil.equals(clientid, "MS8011")) {
if (StrUtil.equals(clientid, "MS8011") || StrUtil.equals(clientid, "test")) {
log.info("Monitor Server 认证成功. clientid: {}", dto.getClientid());
return JsonUtil.createObj().put("result", "allow")
.putPOJO("acl", JsonUtil.createArr()
......@@ -268,4 +282,39 @@ public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> imp
log.info("Agent 断开连接 Mqtt Server. clientid:{}, ip:{}", clientid, ipaddr);
}
@Override
public String getAgentEventTopic(String agentUid) {
Agent agent = this.baseMapper.selectOne(new QueryWrapper<Agent>().eq("uid", agentUid));
if (agent == null) {
log.info("agent {} 不存在", agentUid);
return null;
}
if (agent.getType() == AgentTypeEnum.STORE.getCode()) {
return StrUtil.format(MqttTopic.STORE_EVENT_TOPIC, agentUid);
} else if (agent.getType() == AgentTypeEnum.MALL.getCode()) {
return StrUtil.format(MqttTopic.MALL_EVENT_TOPIC, agentUid);
} else {
log.info("未知的agent类型");
}
return null;
}
@Override
public void stopEvent() {
RAgentEvent event = new RAgentEvent();
event.setAgentUid("c02b9adc08f767f87439");
event.setAccountUid("bc81d07e-d779-11e8-b3d5-7cd30ac4c9a6");
event.setMallUid("e15b9a8e-d782-11e8-b19d-7cd30ac4c9a6");
event.setEventType("abcdefg");
event.setControlSwitch(EventStatusEnum.FAIL.getCode());
irAgentEventService.lambdaUpdate()
.eq(RAgentEvent::getAgentUid, "c02b9adc08f767f87439")
.eq(RAgentEvent::getAccountUid, "bc81d07e-d779-11e8-b3d5-7cd30ac4c9a6")
.eq(RAgentEvent::getMallUid, "e15b9a8e-d782-11e8-b19d-7cd30ac4c9a6")
.eq(RAgentEvent::getEventType, "DEVICE_OFFLINE")
.set(RAgentEvent::getControlSwitch, EventStatusEnum.FAIL.getCode()).update();
String topic = this.getAgentEventTopic(event.getAgentUid());
client.publish(topic, JsonUtil.toJsonByte(event), MqttQoS.QOS2);
}
}
......@@ -51,4 +51,9 @@ public interface IAgentService extends MPJBaseService<Agent> {
void disconnect(JsonNode disconnectInfo);
// endregion
String getAgentEventTopic(String agentUid);
void stopEvent();
}
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!