RAgentEventServiceImpl.java 6.59 KB
package vion.service.impl.monitor;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.github.yulichang.base.MPJBaseServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.dromara.hutool.core.collection.CollUtil;
import org.dromara.hutool.core.text.StrUtil;
import org.dromara.hutool.core.util.ObjUtil;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import vion.mapper.monitor.MallMapper;
import vion.mapper.monitor.RAgentEventMapper;
import vion.model.monitor.Mall;
import vion.model.monitor.RAgentEvent;
import vion.service.monitor.IRAgentEventService;
import vion.utils.JsonUtil;
import vion.utils.TopicUtil;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * @author vion
 * @date 2024/10/31
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class RAgentEventServiceImpl extends MPJBaseServiceImpl<RAgentEventMapper, RAgentEvent> implements IRAgentEventService {

    private final MallMapper mallMapper;
    private final MqttClientTemplate client;

    @Override
    public String bind(String agentUid, String mallUid, List<RAgentEvent> dtoList) {
        dtoList.forEach(ae -> ae.setAgentUid(agentUid));
        var agentEventList = this.lambdaQuery().eq(RAgentEvent::getMallUid, mallUid).list();
        var eventUid2AEMap = agentEventList.stream().collect(Collectors.toMap(RAgentEvent::getEventUid, Function.identity()));

        List<RAgentEvent> updList = new ArrayList<>();
        List<RAgentEvent> insList = new ArrayList<>();
        dtoList.forEach(dto -> {
            if (eventUid2AEMap.containsKey(dto.getEventUid())) {
                var ae = eventUid2AEMap.get(dto.getEventUid());
                dto.setId(ae.getId());
                updList.add(dto);
            } else {
                insList.add(dto);
            }
        });
        if (CollUtil.isNotEmpty(updList)) {
            this.updateBatchById(updList);
        }
        if (CollUtil.isNotEmpty(insList)) {
            this.saveBatch(insList);
        }
        var topic = TopicUtil.getEventTopic(agentUid);
        if (StrUtil.isBlank(topic)) {
            var msg = StrUtil.format("agent:{} 获取topic失败", agentUid);
            log.error(msg);
            return msg;
        }
        dtoList.forEach(dto -> client.publish(topic, JsonUtil.toJsonByte(dto), MqttQoS.QOS2));
        return "绑定成功";
    }

    @Override
    public String unbind(String agentUid, String mallUid, List<Long> idList) {
        var agentEvents = this.listByIds(idList);
        agentEvents.forEach(r -> r.setControlSwitch((short) 0));
        var topic = TopicUtil.getEventTopic(agentUid);
        if (StrUtil.isBlank(topic)) {
            var msg = StrUtil.format("agent:{} 获取topic失败", agentUid);
            log.error(msg);
            return msg;
        }
        agentEvents.forEach(agentEvent -> client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2));
        return this.removeBatchByIds(idList) ? "解绑成功" : "解绑失败";
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String bindBatch(String agentUid, RAgentEvent dto) {
        var topic = TopicUtil.getEventTopic(agentUid);
        if (StrUtil.isBlank(topic)) {
            var msg = StrUtil.format("agent:{} 获取topic失败", agentUid);
            log.error(msg);
            return msg;
        }
        if (StrUtil.isNotBlank(dto.getAccountUid())) {
            var accountUid = dto.getAccountUid();
            var mallList = mallMapper.selectList(Wrappers.<Mall>lambdaQuery().eq(Mall::getAccountUid, accountUid));
            mallList.forEach(m -> m.setAttention((short) 1));
            mallMapper.updateById(mallList);
            var existMall2EventList = this.lambdaQuery()
                    .eq(RAgentEvent::getAccountUid, accountUid)
                    .eq(RAgentEvent::getEventUid, dto.getEventUid())
                    .list().stream().map(RAgentEvent::getMallUid).toList();

            var agentEventList = mallList.stream().map(m -> {
                if (existMall2EventList.contains(m.getUid())) {
                    return null;
                }
                var rAgentEvent = new RAgentEvent();
                rAgentEvent.setAgentUid(agentUid);
                rAgentEvent.setAccountUid(accountUid);
                rAgentEvent.setMallUid(m.getUid());
                rAgentEvent.setEventUid(dto.getEventUid());
                rAgentEvent.setEventType(dto.getEventType());
                rAgentEvent.setCron(dto.getCron());
                rAgentEvent.setThreshold(dto.getThreshold());
                rAgentEvent.setControlSwitch((short) 1);
                return rAgentEvent;
            }).filter(ObjUtil::isNotNull).toList();
            this.saveBatch(agentEventList);
            agentEventList.forEach(agentEvent -> client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2));
        }
        if (CollUtil.isNotEmpty(dto.getMallUidList())) {
            var mallUidList = dto.getMallUidList();
            var mallList = mallMapper.selectList(Wrappers.<Mall>lambdaQuery().in(Mall::getUid, mallUidList));
            mallList.forEach(m -> m.setAttention((short) 1));
            mallMapper.updateById(mallList);

            var existMall2EventList = this.lambdaQuery()
                    .in(RAgentEvent::getMallUid, mallUidList)
                    .eq(RAgentEvent::getEventUid, dto.getEventUid())
                    .list().stream().map(RAgentEvent::getMallUid).toList();
            var agentEventList = mallList.stream().map(m -> {
                if (existMall2EventList.contains(m.getUid())) {
                    return null;
                }
                var rAgentEvent = new RAgentEvent();
                rAgentEvent.setAgentUid(agentUid);
                rAgentEvent.setAccountUid(m.getAccountUid());
                rAgentEvent.setMallUid(m.getUid());
                rAgentEvent.setEventUid(dto.getEventUid());
                rAgentEvent.setEventType(dto.getEventType());
                rAgentEvent.setCron(dto.getCron());
                rAgentEvent.setThreshold(dto.getThreshold());
                rAgentEvent.setControlSwitch((short) 1);
                return rAgentEvent;
            }).filter(ObjUtil::isNotNull).toList();
            this.saveBatch(agentEventList);
            agentEventList.forEach(agentEvent -> client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2));
        }
        return "批量绑定成功";
    }
}