RAgentEventServiceImpl.java 2.93 KB
package vion.service.impl.monitor;

import com.github.yulichang.base.MPJBaseServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.dromara.hutool.core.collection.CollUtil;
import org.dromara.hutool.core.text.StrUtil;
import org.springframework.stereotype.Service;
import vion.mapper.monitor.RAgentEventMapper;
import vion.model.monitor.RAgentEvent;
import vion.service.monitor.IRAgentEventService;
import vion.utils.JsonUtil;
import vion.utils.TopicUtil;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * @author vion
 * @date 2024/10/31
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class RAgentEventServiceImpl extends MPJBaseServiceImpl<RAgentEventMapper, RAgentEvent> implements IRAgentEventService {

    private final MqttClientTemplate client;

    @Override
    public String bind(String agentUid, String mallUid, List<RAgentEvent> dtoList) {
        dtoList.forEach(ae -> ae.setAgentUid(agentUid));
        var agentEventList = this.lambdaQuery().eq(RAgentEvent::getMallUid, mallUid).list();
        var eventUid2AEMap = agentEventList.stream().collect(Collectors.toMap(RAgentEvent::getEventUid, Function.identity()));

        List<RAgentEvent> updList = new ArrayList<>();
        List<RAgentEvent> insList = new ArrayList<>();
        dtoList.forEach(dto -> {
            if (eventUid2AEMap.containsKey(dto.getEventUid())) {
                var ae = eventUid2AEMap.get(dto.getEventUid());
                dto.setId(ae.getId());
                updList.add(dto);
            } else {
                insList.add(dto);
            }
        });
        if (CollUtil.isNotEmpty(updList)) {
            this.updateBatchById(updList);
        }
        if (CollUtil.isNotEmpty(insList)) {
            this.saveBatch(insList);
        }
        var topic = TopicUtil.getEventTopic(agentUid);
        if (StrUtil.isBlank(topic)) {
            var msg = StrUtil.format("agent:{} 获取topic失败", agentUid);
            log.error(msg);
            return msg;
        }
        dtoList.forEach(dto -> client.publish(topic, JsonUtil.toJsonByte(dto), MqttQoS.QOS2));
        return "绑定成功";
    }

    @Override
    public String unbind(String agentUid, String mallUid, List<Long> idList) {
        var agentEvents = this.listByIds(idList);
        agentEvents.forEach(r -> r.setControlSwitch((short) 0));
        var topic = TopicUtil.getEventTopic(agentUid);
        if (StrUtil.isBlank(topic)) {
            var msg = StrUtil.format("agent:{} 获取topic失败", agentUid);
            log.error(msg);
            return msg;
        }
        agentEvents.forEach(agentEvent -> client.publish(topic, JsonUtil.toJsonByte(agentEvent),
                MqttQoS.QOS2));
        return this.removeBatchByIds(idList) ? "解绑成功" : "解绑失败";
    }
}