Commit 23abf342 by HlQ

[add]

1.添加获取 Topic 工具类
2.添加关注 mall 相关接口
3.mall 解绑事件逻辑修改
4.设备离线事件记录保存逻辑修改
[chg] 优化代码
1 parent 0dc30233
......@@ -34,7 +34,7 @@ public class LogAspect {
private final ObjectMapper objectMapper;
@Pointcut("execution(* vion.controller.*.*(..))")
@Pointcut("execution(* vion.controller..*.*(..))")
public void logPointcut() {
}
......
package vion.constant;
import lombok.Getter;
@Getter
public enum AgentTypeEnum {
STORE((short) 1, "store"),
MALL((short) 2, "mall"),
;
MALL((short) 2, "mall");
private final Short code;
private final String desc;
......@@ -13,7 +15,4 @@ public enum AgentTypeEnum {
this.desc = desc;
}
public Short getCode() {
return code;
}
}
package vion.constant;
import lombok.Getter;
@Getter
public enum EventStatusEnum {
UNKNOWN((short) -1, "未知"),
SUCCESS((short) 1, "成功"),
FAIL((short) 0, "失败"),
;
FAIL((short) 0, "失败");
private final Short code;
private final String desc;
......@@ -14,11 +16,4 @@ public enum EventStatusEnum {
this.desc = desc;
}
public Short getCode() {
return code;
}
public String getDesc() {
return desc;
}
}
......@@ -2,15 +2,15 @@ package vion.constant;
public class MqttTopic {
//store agent 事件监听
public final static CharSequence STORE_EVENT_TOPIC = "/SA/{}/event/push";
public final static String STORE_EVENT_TOPIC = "/SA/{}/event/push";
//store agent 升级监听
public final static CharSequence STORE_OTA_TOPIC = "/SA/{}/ota";
public final static String STORE_OTA_TOPIC = "/SA/{}/ota";
//store agent 服务列表监听
public final static CharSequence STORE_SERVICE_TOPIC = "/SA/{}/service/push";
public final static String STORE_SERVICE_TOPIC = "/SA/{}/service/push";
//mall agent 事件监听
public final static CharSequence MALL_EVENT_TOPIC = "/MA/{}/event/push";
public final static String MALL_EVENT_TOPIC = "/MA/{}/event/push";
//mall agent 升级监听
public final static CharSequence MALL_OTA_TOPIC = "/MA/{}/ota";
public final static String MALL_OTA_TOPIC = "/MA/{}/ota";
//mall agent 服务列表监听
public final static CharSequence MALL_SERVICE_TOPIC = "/MA/{}/service/push";
public final static String MALL_SERVICE_TOPIC = "/MA/{}/service/push";
}
......@@ -56,7 +56,12 @@ public enum RedisKeyEnum {
* 合同同步时间
*/
CONTRACT_SYNC_TIME("contract:sync:time"),
CONTRACT_SYNC_NEW_TIME("contract:sync:new:time");
CONTRACT_SYNC_NEW_TIME("contract:sync:new:time"),
/**
* agent
*/
AGENT("agent:");
private final String val;
......
......@@ -73,7 +73,7 @@ public class EventController {
@PostMapping("/unbind/{agentUid}/{mallUid}")
@SaCheckPermission(value = "event:unbind", orRole = "admin")
public String unbind(List<Long> idList) {
return agentEventService.unbind(idList);
public String unbind(@PathVariable String agentUid, @PathVariable String mallUid, List<Long> idList) {
return agentEventService.unbind(agentUid, mallUid, idList);
}
}
......@@ -3,14 +3,14 @@ package vion.controller.monitor;
import cn.dev33.satoken.annotation.SaCheckPermission;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import vion.dto.monitor.MallDTO;
import vion.model.monitor.MAccount;
import vion.service.monitor.IMallService;
import vion.vo.monitor.MallVO;
import java.util.List;
/**
* mall 信息
*/
......@@ -21,6 +21,12 @@ public class MallController {
private final IMallService mallService;
@GetMapping("/account")
@SaCheckPermission(value = "mall:account:list", orRole = "admin")
public Page<MAccount> listAccount(MAccount dto) {
return mallService.listAccount(dto);
}
@GetMapping
@SaCheckPermission(value = "mall:list", orRole = "admin")
public Page<MallVO> list(MallDTO dto) {
......@@ -32,4 +38,28 @@ public class MallController {
public MallVO get(@PathVariable Long id) {
return mallService.get(id);
}
@GetMapping("/attention")
@SaCheckPermission(value = "mall:attention:list", orRole = "admin")
public Page<MallVO> listAttention(MallDTO dto) {
return mallService.listAttention(dto);
}
@PostMapping("/attention")
@SaCheckPermission(value = "mall:attention:edit", orRole = "admin")
public String attention(@RequestBody List<MallDTO> dtoList) {
return mallService.attention(dtoList);
}
@PostMapping("/attention/account/{accountUid}")
@SaCheckPermission(value = "mall:attention:account", orRole = "admin")
public String attentionByAccount(@PathVariable String accountUid) {
return mallService.attentionByAccount(accountUid);
}
@PostMapping("/attention/{id}")
@SaCheckPermission(value = "mall:attention:cancel", orRole = "admin")
public String cancelAttention(@PathVariable Long id) {
return mallService.cancelAttention(id);
}
}
......@@ -4,7 +4,10 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import vion.dto.monitor.MqttAuthDTO;
import vion.service.monitor.IAgentService;
......@@ -34,8 +37,4 @@ public class MqttController {
agentService.disconnect(disconnectInfo);
}
@GetMapping("/stopEvent")
public void stopEvent() {
agentService.stopEvent();
}
}
......@@ -41,7 +41,12 @@ public class MallDTO extends BaseDTO {
/**
* mall营业状态
*/
private Boolean status;
private Short status;
/**
* 是否关注 0:不关注 1:关注
*/
private Short attention;
/**
* 时区
......
package vion.event.mqtt;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.dromara.hutool.core.bean.BeanUtil;
import org.dromara.hutool.core.collection.CollUtil;
import org.dromara.hutool.core.text.StrUtil;
import org.springframework.stereotype.Service;
import vion.constant.EventStatusEnum;
import vion.constant.MqttMessageType;
import vion.model.monitor.EventRecord;
import vion.model.monitor.OfflineDevice;
import vion.model.monitor.RAgentEvent;
import vion.service.monitor.IAgentService;
import vion.service.monitor.IEventRecordService;
import vion.service.monitor.IRAgentEventService;
import vion.utils.JsonUtil;
import vion.utils.TopicUtil;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
/**
......@@ -36,39 +30,36 @@ import java.util.List;
public class MqttClientMessageListener {
private final MqttClientTemplate client;
private final IAgentService agentService;
private final IEventRecordService recordService;
private final IRAgentEventService agentEventService;
@MqttClientSubscribe("${mqtt.client.server-topic:/MS/receive}")
public void onMessage(String topic, byte[] payload) {
String payloadStr = new String(payload, StandardCharsets.UTF_8);
log.info("topic:{} payload:{}", topic, payloadStr);
log.info("topic:{} payload:{}", topic, StrUtil.utf8Str(payload));
try {
JsonNode object = JsonUtil.parseTree(payloadStr);
String type = object.path("eventType").asText();
JsonNode jsonObj = JsonUtil.parseTree(payload);
String type = jsonObj.path("eventType").asText();
switch (MqttMessageType.getEnumByType(type)) {
case MqttMessageType.REGISTER:
log.info("设备注册:{}", payloadStr);
String agentUid = object.path("agentUid").asText();
log.info("设备注册:{}", jsonObj);
String agentUid = jsonObj.path("agentUid").asText();
updateTaskByAgent(agentUid);
break;
case MqttMessageType.DEVICE_OFFLINE:
//设备离线
handleDeviceOffline(payloadStr);
handleDeviceOffline(jsonObj.toString());
break;
case MqttMessageType.WHOLEDAY_ANALYZE:
//全天分析异常
handleWholeDayAnalyze(payloadStr);
handleWholeDayAnalyze(jsonObj.toString());
break;
case MqttMessageType.PASSENGER_FLOW_INTERRUPT:
//客流数据中断
handlePassengerFlowInterrupt(payloadStr);
handlePassengerFlowInterrupt(jsonObj.toString());
break;
default:
log.info("未定义的消息类型:{}, payload:{}", type, payloadStr);
log.info("未定义的消息类型:{}, payload:{}", type, jsonObj);
break;
}
} catch (Exception e) {
......@@ -78,55 +69,29 @@ public class MqttClientMessageListener {
//下发事件
private void updateTaskByAgent(String agentUid) {
List<RAgentEvent> list = agentEventService.lambdaQuery().eq(RAgentEvent::getAgentUid, agentUid)
List<RAgentEvent> list = agentEventService.lambdaQuery()
.eq(RAgentEvent::getAgentUid, agentUid)
.eq(RAgentEvent::getControlSwitch, 1).list();
if (CollUtil.isEmpty(list)) {
log.info("agent:{} 未配置事件监测", agentUid);
log.error("agent:{} 未配置事件监测", agentUid);
return;
}
String topic = agentService.getAgentEventTopic(agentUid);
var topic = TopicUtil.getEventTopic(agentUid);
if (StrUtil.isBlank(topic)) {
log.info("agent:{} 获取topic失败", agentUid);
log.error("agent:{} 获取topic失败", agentUid);
return;
}
log.info("agent:{} 开始下发事件监测", agentUid);
log.info("agent:{} topic:{} 开始下发事件监测", agentUid, topic);
for (RAgentEvent agentEvent : list) {
client.publish(StrUtil.format("/SA/{}/event/push", agentUid), JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2);
client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2);
}
log.info("agent:{} 结束下发事件监测", agentUid);
log.info("agent:{} topic:{} 结束下发事件监测", agentUid, topic);
}
//设备离线上报
private void handleDeviceOffline(String payloadStr) {
EventRecord eventRecord = JsonUtil.parseObject(payloadStr, EventRecord.class);
String list = (String) eventRecord.getData();
List<OfflineDevice> deviceList = JsonUtil.parseArray(list, OfflineDevice.class);
List<EventRecord> records = new ArrayList<>();
if (CollUtil.isEmpty(deviceList)) {
EventRecord eventRecord1 = new EventRecord();
BeanUtil.copyProperties(eventRecord, eventRecord1);
eventRecord1.setEventUid(eventRecord.getEventUid());
records.add(eventRecord1);
} else {
for (int i = 0; i < deviceList.size(); i++) {
OfflineDevice device = deviceList.get(i);
EventRecord eventRecord1 = new EventRecord();
BeanUtil.copyProperties(eventRecord, eventRecord1);
eventRecord1.setEventUid(eventRecord.getEventUid());
eventRecord1.setData(JsonUtil.toJsonString(device));
eventRecord1.setMallUid(deviceList.get(i).getMallUid());
if (StringUtils.isEmpty(device.getDevices())) {
eventRecord1.setStatus(EventStatusEnum.SUCCESS.getCode());
} else {
eventRecord1.setStatus(EventStatusEnum.FAIL.getCode());
}
records.add(eventRecord1);
}
}
if (CollUtil.isNotEmpty(records)) {
recordService.saveBatch(records);
}
recordService.save(eventRecord);
}
//全天分析异常上报
......
......@@ -3,6 +3,7 @@ package vion.model.monitor;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Getter;
import lombok.Setter;
import vion.dto.BaseDTO;
import java.time.LocalDateTime;
......@@ -12,7 +13,7 @@ import java.time.LocalDateTime;
@Getter
@Setter
@TableName(value = "m_account")
public class MAccount {
public class MAccount extends BaseDTO {
@TableId(value = "id", type = IdType.AUTO)
private Long id;
......
......@@ -58,7 +58,13 @@ public class Mall {
* mall营业状态
*/
@TableField(value = "\"status\"")
private Boolean status;
private Short status;
/**
* 是否关注 0:不关注 1:关注
*/
@TableField(value = "attention")
private Short attention;
/**
* 时区
......
......@@ -3,9 +3,14 @@ package vion.model.monitor;
import lombok.Getter;
import lombok.Setter;
import java.time.LocalDateTime;
@Getter
@Setter
public class OfflineDevice {
private String mallUid;
private String devices;
private String serialNo;
private String ip;
private String gateName;
private LocalDateTime offlineTime;
}
......@@ -21,9 +21,6 @@ import org.dromara.hutool.core.util.RandomUtil;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import vion.constant.AgentTypeEnum;
import vion.constant.EventStatusEnum;
import vion.constant.MqttTopic;
import vion.dto.monitor.AgentDTO;
import vion.dto.monitor.MqttAuthDTO;
import vion.dto.monitor.OrgDTO;
......@@ -31,6 +28,7 @@ import vion.mapper.monitor.AgentMapper;
import vion.model.monitor.*;
import vion.service.monitor.*;
import vion.utils.JsonUtil;
import vion.utils.TopicUtil;
import vion.vo.monitor.AgentVO;
import java.time.LocalDateTime;
......@@ -54,7 +52,6 @@ public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> imp
private final IRAgentServiceService rAgentServiceService;
private final IRAgentEventService irAgentEventService;
private final IUpgradeService upgradeService;
private final IEventRecordService eventRecordService;
private final Converter converter;
private final MqttClientTemplate client;
private final IMAccountService accountService;
......@@ -113,7 +110,12 @@ public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> imp
@Override
public String upgradeCommand(String uid, Long upgradeId) {
var topic = StrUtil.format("/SA/{}/ota", uid);
var topic = TopicUtil.getOTATopic(uid);
if (StrUtil.isBlank(topic)) {
var msg = StrUtil.format("agent:{} 获取topic失败", uid);
log.error(msg);
return msg;
}
var node = JsonUtil.createObj().put("upgradeId", upgradeId);
client.publish(topic, JsonUtil.toJsonByte(node), MqttQoS.QOS2);
return "升级指令下发成功,请稍后查看Agent版本号";
......@@ -183,8 +185,14 @@ public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> imp
@Override
public String assign(String uid, List<ServiceInfo> serviceInfoList) {
String SERVICE_PUSH_TOPIC = "/SA/{}/service/push";
return client.publish(StrUtil.format(SERVICE_PUSH_TOPIC, uid), JsonUtil.toJsonByte(serviceInfoList), MqttQoS.QOS2) ? "服务列表下发成功" : "服务列表下发失败";
var topic = TopicUtil.getServiceTopic(uid);
if (StrUtil.isBlank(topic)) {
var msg = StrUtil.format("agent:{} 获取topic失败", uid);
log.error(msg);
return msg;
}
return client.publish(topic, JsonUtil.toJsonByte(serviceInfoList), MqttQoS.QOS2) ? "服务列表下发成功" :
"服务列表下发失败";
}
@Override
......@@ -389,39 +397,4 @@ public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> imp
});
eventRecordService.saveBatch(recList);*/
}
@Override
public String getAgentEventTopic(String agentUid) {
Agent agent = this.lambdaQuery().eq(Agent::getUid, agentUid).one();
if (ObjUtil.isNull(agent)) {
log.info("agent {} 不存在", agentUid);
return null;
}
if (ObjUtil.equals(agent.getType(), AgentTypeEnum.STORE.getCode())) {
return StrUtil.format(MqttTopic.STORE_EVENT_TOPIC, agentUid);
} else if (ObjUtil.equals(agent.getType(), AgentTypeEnum.MALL.getCode())) {
return StrUtil.format(MqttTopic.MALL_EVENT_TOPIC, agentUid);
} else {
log.info("未知的agent类型");
}
return null;
}
@Override
public void stopEvent() {
RAgentEvent event = new RAgentEvent();
event.setAgentUid("c02b9adc08f767f87439");
event.setAccountUid("bc81d07e-d779-11e8-b3d5-7cd30ac4c9a6");
event.setMallUid("e15b9a8e-d782-11e8-b19d-7cd30ac4c9a6");
event.setEventType("abcdefg");
event.setControlSwitch(EventStatusEnum.FAIL.getCode());
irAgentEventService.lambdaUpdate()
.eq(RAgentEvent::getAgentUid, "c02b9adc08f767f87439")
.eq(RAgentEvent::getAccountUid, "bc81d07e-d779-11e8-b3d5-7cd30ac4c9a6")
.eq(RAgentEvent::getMallUid, "e15b9a8e-d782-11e8-b19d-7cd30ac4c9a6")
.eq(RAgentEvent::getEventType, "DEVICE_OFFLINE")
.set(RAgentEvent::getControlSwitch, EventStatusEnum.FAIL.getCode()).update();
String topic = this.getAgentEventTopic(event.getAgentUid());
client.publish(topic, JsonUtil.toJsonByte(event), MqttQoS.QOS2);
}
}
......@@ -6,18 +6,29 @@ import com.github.yulichang.base.MPJBaseServiceImpl;
import com.github.yulichang.wrapper.MPJLambdaWrapper;
import io.github.linpeilie.Converter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.dromara.hutool.core.lang.Assert;
import org.dromara.hutool.core.lang.Opt;
import org.dromara.hutool.core.text.StrUtil;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import vion.dto.monitor.MallDTO;
import vion.mapper.monitor.MallMapper;
import vion.model.monitor.EventRecord;
import vion.model.monitor.MAccount;
import vion.model.monitor.Mall;
import vion.model.monitor.RAgentEvent;
import vion.service.monitor.IEventRecordService;
import vion.service.monitor.IMAccountService;
import vion.service.monitor.IMallService;
import vion.service.monitor.IRAgentEventService;
import vion.utils.JsonUtil;
import vion.utils.TopicUtil;
import vion.vo.monitor.MallVO;
import java.util.List;
import java.util.stream.Collectors;
/**
......@@ -26,12 +37,21 @@ import java.util.stream.Collectors;
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class MallServiceImpl extends MPJBaseServiceImpl<MallMapper, Mall> implements IMallService {
private final IMAccountService accountService;
private final IEventRecordService eventRecordService;
private final IRAgentEventService agentEventService;
private final MqttClientTemplate client;
private final Converter converter;
@Override
public Page<MAccount> listAccount(MAccount dto) {
return accountService.lambdaQuery(dto).page(Page.of(dto.getPageNum(), dto.getPageSize()));
}
@Override
public Page<MallVO> list(MallDTO dto) {
Assert.notNull(dto.getAgentType(), "agentType 不能为空");
var wrapper = new MPJLambdaWrapper<>(converter.convert(dto, Mall.class))
......@@ -39,6 +59,28 @@ public class MallServiceImpl extends MPJBaseServiceImpl<MallMapper, Mall> implem
.selectAs(MAccount::getName, MallVO::getAccountName)
.leftJoin(MAccount.class, MAccount::getUid, Mall::getAccountUid)
.orderByDesc(Mall::getUpdateTime);
return this.selectJoinListPage(Page.of(dto.getPageNum(), dto.getPageSize()), MallVO.class, wrapper);
}
@Override
public MallVO get(Long id) {
var wrapper = new MPJLambdaWrapper<Mall>()
.selectAll(Mall.class)
.selectAs(MAccount::getName, MallVO::getAccountName)
.leftJoin(MAccount.class, MAccount::getUid, Mall::getAccountUid)
.eq(Mall::getId, id);
return this.selectJoinOne(MallVO.class, wrapper);
}
@Override
public Page<MallVO> listAttention(MallDTO dto) {
Assert.notNull(dto.getAgentType(), "agentType 不能为空");
var wrapper = new MPJLambdaWrapper<>(converter.convert(dto, Mall.class))
.selectAll(Mall.class)
.selectAs(MAccount::getName, MallVO::getAccountName)
.leftJoin(MAccount.class, MAccount::getUid, Mall::getAccountUid)
.eq(Mall::getAttention, 1)
.orderByDesc(Mall::getUpdateTime);
var mallVOPage = this.selectJoinListPage(Page.of(dto.getPageNum(), dto.getPageSize()), MallVO.class, wrapper);
Opt.ofEmptyAble(mallVOPage.getRecords())
.ifPresent(r -> {
......@@ -57,12 +99,33 @@ public class MallServiceImpl extends MPJBaseServiceImpl<MallMapper, Mall> implem
}
@Override
public MallVO get(Long id) {
var wrapper = new MPJLambdaWrapper<Mall>()
.selectAll(Mall.class)
.selectAs(MAccount::getName, MallVO::getAccountName)
.leftJoin(MAccount.class, MAccount::getUid, Mall::getAccountUid)
.eq(Mall::getId, id);
return this.selectJoinOne(MallVO.class, wrapper);
public String attention(List<MallDTO> dtoList) {
this.updateBatchById(converter.convert(dtoList, Mall.class));
return "关注成功";
}
@Override
public String attentionByAccount(String accountUid) {
var mallList = this.lambdaQuery().eq(Mall::getAccountUid, accountUid).list();
mallList.forEach(mall -> mall.setAttention((short) 1));
this.updateBatchById(mallList);
return "批量关注成功";
}
@Override
@Transactional(rollbackFor = Exception.class)
public String cancelAttention(Long id) {
var attentionMall = this.getById(id);
this.lambdaUpdate().set(Mall::getAttention, 0).eq(Mall::getId, id).update(new Mall());
var agentEventList = agentEventService.lambdaQuery().eq(RAgentEvent::getMallUid, attentionMall.getUid()).list();
var topic = TopicUtil.getEventTopic(attentionMall.getAgentUid());
if (StrUtil.isBlank(topic)) {
var msg = StrUtil.format("agent:{} 获取topic失败", attentionMall.getAgentUid());
log.error(msg);
return msg;
}
agentEventList.forEach(agentEvent -> client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2));
agentEventService.lambdaUpdate().eq(RAgentEvent::getMallUid, attentionMall.getUid()).remove();
return "取消成功";
}
}
......@@ -2,6 +2,7 @@ package vion.service.impl.monitor;
import com.github.yulichang.base.MPJBaseServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.dromara.hutool.core.collection.CollUtil;
......@@ -11,6 +12,7 @@ import vion.mapper.monitor.RAgentEventMapper;
import vion.model.monitor.RAgentEvent;
import vion.service.monitor.IRAgentEventService;
import vion.utils.JsonUtil;
import vion.utils.TopicUtil;
import java.util.ArrayList;
import java.util.List;
......@@ -23,6 +25,7 @@ import java.util.stream.Collectors;
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class RAgentEventServiceImpl extends MPJBaseServiceImpl<RAgentEventMapper, RAgentEvent> implements IRAgentEventService {
private final MqttClientTemplate client;
......@@ -50,14 +53,28 @@ public class RAgentEventServiceImpl extends MPJBaseServiceImpl<RAgentEventMapper
if (CollUtil.isNotEmpty(insList)) {
this.saveBatch(insList);
}
dtoList.forEach(dto -> client.publish(StrUtil.format("/SA/{}/event/push", agentUid), JsonUtil.toJsonByte(dto), MqttQoS.QOS2));
var topic = TopicUtil.getEventTopic(agentUid);
if (StrUtil.isBlank(topic)) {
var msg = StrUtil.format("agent:{} 获取topic失败", agentUid);
log.error(msg);
return msg;
}
dtoList.forEach(dto -> client.publish(topic, JsonUtil.toJsonByte(dto), MqttQoS.QOS2));
return "绑定成功";
}
@Override
public String unbind(List<Long> idList) {
// todo
// client.publish(StrUtil.format("/SA/{}/event/push", agentUid), JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2);
public String unbind(String agentUid, String mallUid, List<Long> idList) {
var agentEvents = this.listByIds(idList);
agentEvents.forEach(r -> r.setControlSwitch((short) 0));
var topic = TopicUtil.getEventTopic(agentUid);
if (StrUtil.isBlank(topic)) {
var msg = StrUtil.format("agent:{} 获取topic失败", agentUid);
log.error(msg);
return msg;
}
agentEvents.forEach(agentEvent -> client.publish(topic, JsonUtil.toJsonByte(agentEvent),
MqttQoS.QOS2));
return this.removeBatchByIds(idList) ? "解绑成功" : "解绑失败";
}
}
......@@ -65,9 +65,4 @@ public interface IAgentService extends MPJBaseService<Agent> {
// region 系统级别事件监控
void checkLicense();
// endregion
String getAgentEventTopic(String agentUid);
void stopEvent();
}
......@@ -3,17 +3,30 @@ package vion.service.monitor;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.github.yulichang.base.MPJBaseService;
import vion.dto.monitor.MallDTO;
import vion.model.monitor.MAccount;
import vion.model.monitor.Mall;
import vion.vo.monitor.MallVO;
import java.util.List;
/**
* @author vion
* @date 2024/10/31
*/
public interface IMallService extends MPJBaseService<Mall> {
Page<MAccount> listAccount(MAccount dto);
Page<MallVO> list(MallDTO dto);
MallVO get(Long id);
Page<MallVO> listAttention(MallDTO dto);
String attention(List<MallDTO> dtoList);
String attentionByAccount(String accountUid);
String cancelAttention(Long id);
}
......@@ -13,6 +13,6 @@ public interface IRAgentEventService extends MPJBaseService<RAgentEvent> {
String bind(String agentUid, String mallUid, List<RAgentEvent> dtoList);
String unbind(List<Long> idList);
String unbind(String agentUid, String mallUid, List<Long> idList);
}
package vion.utils;
import lombok.extern.slf4j.Slf4j;
import org.dromara.hutool.core.math.NumberUtil;
import org.dromara.hutool.core.text.StrUtil;
import org.dromara.hutool.core.util.ObjUtil;
import org.dromara.hutool.extra.spring.SpringUtil;
import org.redisson.api.RedissonClient;
import vion.constant.AgentTypeEnum;
import vion.constant.MqttTopic;
import vion.constant.RedisKeyEnum;
import vion.model.monitor.Agent;
import vion.service.monitor.IAgentService;
/**
* @author vion
* @date 2024/11/4
*/
@Slf4j
public class TopicUtil {
public static String getEventTopic(String agentUid) {
return getTopic(agentUid, MqttTopic.STORE_EVENT_TOPIC, MqttTopic.MALL_EVENT_TOPIC);
}
public static String getServiceTopic(String agentUid) {
return getTopic(agentUid, MqttTopic.STORE_SERVICE_TOPIC, MqttTopic.MALL_SERVICE_TOPIC);
}
public static String getOTATopic(String agentUid) {
return getTopic(agentUid, MqttTopic.STORE_OTA_TOPIC, MqttTopic.MALL_OTA_TOPIC);
}
private static String getTopic(String agentUid, String storeTopic, String mallTopic) {
var agent = getAgent(agentUid);
if (agent == null) {
return null;
}
if (NumberUtil.equals(agent.getType(), AgentTypeEnum.STORE.getCode())) {
return StrUtil.format(storeTopic, agentUid);
} else if (ObjUtil.equals(agent.getType(), AgentTypeEnum.MALL.getCode())) {
return StrUtil.format(mallTopic, agentUid);
} else {
log.info("未知的agent类型");
return null;
}
}
private static Agent getAgent(String agentUid) {
var redissonClient = SpringUtil.getBean(RedissonClient.class);
var agent = (Agent) redissonClient.getBucket(RedisKeyEnum.AGENT.getVal() + agentUid).get();
if (ObjUtil.isNull(agent)) {
var agentService = SpringUtil.getBean(IAgentService.class);
var agentList = agentService.list();
agentList.forEach(a -> redissonClient.getBucket(RedisKeyEnum.AGENT.getVal() + a.getUid()).set(a));
agent = agentList.stream().filter(a -> StrUtil.equals(a.getUid(), agentUid)).findFirst().orElse(null);
}
if (ObjUtil.isNull(agent)) {
log.info("未找到agent信息");
return null;
}
return agent;
}
}
......@@ -44,7 +44,12 @@ public class MallVO {
/**
* mall营业状态
*/
private Boolean status;
private Short status;
/**
* 是否关注 0:不关注 1:关注
*/
private Short attention;
/**
* 时区
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!