Commit cdf7edc0 by 姚冰

[add] 添加mqtt消息回调,框架搭建

1 parent 9a21c5a6
package vion.constant;
import lombok.Getter;
import java.util.Arrays;
@Getter
public enum MqttMessageType {
DEVICE_OFFLINE("DEVICE_OFFLINE"),
UPGRADE("UPGRADE"),
WHOLEDAYA_ANALYZE("WHOLE_ANALYZE_EXCEPTION");
private final String type;
MqttMessageType(String type) {
this.type = type;
}
public static MqttMessageType getEnumByType(String type) {
return Arrays.stream(values())
.filter(e -> e.type == type)
.findFirst()
.orElse(null);
}
}
\ No newline at end of file \ No newline at end of file
package vion.event.mqtt;
import net.dreamlu.iot.mqtt.codec.MqttPublishMessage;
import net.dreamlu.iot.mqtt.core.client.IMqttClientMessageListener;
import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
import org.dromara.hutool.json.JSON;
import org.json.JSONArray;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.tio.core.ChannelContext;
import vion.constant.MqttMessageType;
import vion.model.monitor.EventRecord;
import vion.utils.JsonUtil;
import java.nio.charset.StandardCharsets;
import static vion.constant.MqttMessageType.WHOLEDAYA_ANALYZE;
/**
* 客户端消息监听的另一种方式
*
* @author L.cm
*/
@Service
@MqttClientSubscribe("${mqtt.client.server-topic:/MS/receive}")
public class MqttClientMessageListener implements IMqttClientMessageListener {
private static final Logger logger = LoggerFactory.getLogger(MqttClientMessageListener.class);
@Override
public void onMessage(ChannelContext context, String topic, MqttPublishMessage message, byte[] payload) {
String payloadStr = new String(payload, StandardCharsets.UTF_8);
logger.info("topic:{} payload:{}", topic, payloadStr);
try {
JSONObject object = new JSONObject(payloadStr);
String type = object.getString("type");
switch (MqttMessageType.getEnumByType(type)) {
case MqttMessageType.DEVICE_OFFLINE:
//设备离线
break;
case MqttMessageType.WHOLEDAYA_ANALYZE:
//全天分析异常
break;
default:
logger.info("未定义的消息类型:{}, payload:{}", type, payloadStr);
break;
}
} catch (Exception e) {
logger.error("解析数据异常", e);
}
}
//设备离线上报
private void handleDeviceOffline(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class);
// JSONArray arrary = (JSONArray) eventRecord.getData();
// logger.info("设备离线:{}", JSON.toJSONString(eventRecord));
}
//全天分析异常上报
private void handleWholeDayAnalyze(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class);
}
}
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!