AgentServiceImpl.java 12.4 KB
package vion.service.impl.monitor;

import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.yulichang.base.MPJBaseServiceImpl;
import com.github.yulichang.wrapper.MPJLambdaWrapper;
import io.github.linpeilie.Converter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.dromara.hutool.core.collection.CollUtil;
import org.dromara.hutool.core.date.TimeUtil;
import org.dromara.hutool.core.lang.Opt;
import org.dromara.hutool.core.text.StrUtil;
import org.dromara.hutool.core.text.split.SplitUtil;
import org.dromara.hutool.core.util.ByteUtil;
import org.dromara.hutool.core.util.ObjUtil;
import org.dromara.hutool.core.util.RandomUtil;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import vion.dto.AgentDTO;
import vion.dto.MqttAuthDTO;
import vion.mapper.monitor.AgentMapper;
import vion.model.monitor.*;
import vion.service.monitor.*;
import vion.utils.JsonUtil;
import vion.vo.AgentVO;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @author vion
 * @date 2024/10/16
 */
@Service
@Slf4j
@RequiredArgsConstructor
public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> implements IAgentService {

    private final IAgentRecordService agentRecordService;
    private final IServiceInfoService serviceInfoService;
    private final IServiceRecordService serviceRecordService;
    private final IRAgentServiceService rAgentServiceService;
    private final IUpgradeService upgradeService;
    private final Converter converter;
    private final MqttClientTemplate client;

    @Override
    public Page<AgentVO> list(AgentDTO dto) {
        var wrapper = new MPJLambdaWrapper<>(converter.convert(dto, Agent.class))
                .selectAll(Agent.class)
                .selectAssociation(AgentRecord.class, AgentVO::getAgentRecord)
                .leftJoin(AgentRecord.class, AgentRecord::getAgentUid, Agent::getUid)
                .orderByAsc(Agent::getCreateTime);
        var agentVOPage = this.selectJoinListPage(Page.of(dto.getPageNum(), dto.getPageSize()), AgentVO.class, wrapper);
        Opt.ofEmptyAble(agentVOPage.getRecords())
                .ifPresent(r -> {
                    var agentUidList = r.stream().map(AgentVO::getUid).toList();
                    var serviceRecordWrapper = new MPJLambdaWrapper<ServiceRecord>()
                            .select("DISTINCT on (agent_uid, service_uid) t.*")
                            .selectAs("si", ServiceInfo::getName, ServiceRecord::getName)
                            .leftJoin(ServiceInfo.class, "si", ServiceInfo::getUid, "t", ServiceRecord::getServiceUid)
                            .in(ServiceRecord::getAgentUid, agentUidList)
                            .eq(ServiceRecord::getStatus, false)
                            .orderByDesc(ServiceRecord::getAgentUid, ServiceRecord::getServiceUid, ServiceRecord::getMonitorTime);
                    var serviceRecordList = serviceRecordService.selectJoinList(ServiceRecord.class, serviceRecordWrapper);
                    var serviceNameMap = serviceRecordList.stream().collect(Collectors.groupingBy(ServiceRecord::getAgentUid, Collectors.mapping(ServiceRecord::getName, Collectors.joining(","))));
                    r.forEach(a -> a.setServiceName(serviceNameMap.get(a.getUid())));
                });
        return agentVOPage;
    }

    @Override
    public AgentVO getVOById(Long id) {
        var wrapper = new MPJLambdaWrapper<Agent>()
                .selectAll(Agent.class)
                .selectAsClass(ServiceInfo.class, AgentVO.class)
                .leftJoin(ServiceInfo.class, ServiceInfo::getUid, Agent::getUid)
                .eq(Agent::getId, id)
                .orderByAsc(Agent::getCreateTime);
        return this.selectJoinOne(AgentVO.class, wrapper);
    }

    @Override
    public String update(AgentDTO dto) {
        var agent = converter.convert(dto, Agent.class);
        return this.updateById(agent) ? "更新成功" : "更新失败";
    }

    @Override
    public List<ServiceInfo> getServicesByUid(String uid) {
        var wrapper = new MPJLambdaWrapper<ServiceInfo>()
                .selectAs(RAgentService::getId, ServiceInfo::getId)
                .selectAs(ServiceInfo::getUid, ServiceInfo::getUid)
                .selectAs(ServiceInfo::getName, ServiceInfo::getName)
                .selectAs(ServiceInfo::getType, ServiceInfo::getType)
                .selectAs(RAgentService::getUrl, ServiceInfo::getUrl)
                .selectAs(RAgentService::getMethod, ServiceInfo::getMethod)
                .selectAs(RAgentService::getUsername, ServiceInfo::getUsername)
                .selectAs(RAgentService::getPassword, ServiceInfo::getPassword)
                .selectAs(ServiceInfo::getDescription, ServiceInfo::getDescription)
                .selectAs(RAgentService::getCreateTime, ServiceInfo::getCreateTime)
                .selectAs(RAgentService::getUpdateTime, ServiceInfo::getUpdateTime)
                .leftJoin(RAgentService.class, RAgentService::getServiceUid, ServiceInfo::getUid)
                .eq(RAgentService::getAgentUid, uid)
                .orderByAsc(ServiceInfo::getCreateTime);
        var serviceInfoList = serviceInfoService.selectJoinList(ServiceInfo.class, wrapper);
        return serviceInfoList;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String saveServicesByUid(String uid, List<ServiceInfo> serviceInfoList) {
        if (CollUtil.isNotEmpty(serviceInfoList)) {
            var rAgentServiceList = serviceInfoList.stream().map(s -> {
                var rAgentService = new RAgentService();
                rAgentService.setAgentUid(uid);
                rAgentService.setServiceUid(s.getUid());
                rAgentService.setUrl(s.getUrl());
                rAgentService.setMethod(s.getMethod());
                rAgentService.setUsername(s.getUsername());
                rAgentService.setPassword(s.getPassword());
                return rAgentService;
            }).toList();
            rAgentServiceService.saveBatch(rAgentServiceList);
        }
        return "更新成功";
    }

    @Override
    public String updateAgent2ServiceById(Long id, RAgentService rAgentService) {
        return rAgentServiceService.updateById(rAgentService) ? "更新成功" : "更新失败";
    }

    @Override
    public String removeAgent2ServiceById(Long id) {
        return rAgentServiceService.removeById(id) ? "删除成功 : " : "删除失败";
    }

    @Override
    public String assign(String uid, List<ServiceInfo> serviceInfoList) {
        var servicesJson = JsonUtil.toJsonString(serviceInfoList);
        String SERVICE_PUSH_TOPIC = "/SA/{}/service/push";
        return client.publish(StrUtil.format(SERVICE_PUSH_TOPIC, uid), ByteUtil.toBytes(servicesJson, StandardCharsets.UTF_8), MqttQoS.QOS2) ? "服务列表下发成功" : "服务列表下发失败";
    }

    @Override
    public String reg(Agent agent) {
        agent.setStatus((short) 1);
        if (StrUtil.isBlank(agent.getUid())) {
            return "注册失败,Agent 唯一标识为空";
        }

        var one = this.lambdaQuery().eq(Agent::getUid, agent.getUid()).one();
        if (ObjUtil.isNull(one)) {
            agent.setUsername(RandomUtil.randomString(agent.getUid(), 8));
            agent.setPassword(RandomUtil.randomString(12));
            this.save(agent);
        } else {
            this.lambdaUpdate().eq(Agent::getUid, agent.getUid()).update(agent);
            agent = one;
        }
        log.info("Agent 注册成功. clientid:{}, username:{}", agent.getUid(), agent.getUsername());

        var wrapper = new MPJLambdaWrapper<ServiceInfo>()
                .selectAs(ServiceInfo::getUid, ServiceInfo::getUid)
                .selectAs(ServiceInfo::getName, ServiceInfo::getName)
                .selectAs(ServiceInfo::getType, ServiceInfo::getType)
                .selectAs(RAgentService::getUrl, ServiceInfo::getUrl)
                .selectAs(RAgentService::getMethod, ServiceInfo::getMethod)
                .selectAs(RAgentService::getUsername, ServiceInfo::getUsername)
                .selectAs(RAgentService::getPassword, ServiceInfo::getPassword)
                .leftJoin(RAgentService.class, RAgentService::getServiceUid, ServiceInfo::getUid)
                .eq(RAgentService::getAgentUid, agent.getUid())
                .orderByAsc(ServiceInfo::getCreateTime);
        var serviceList = serviceInfoService.selectJoinList(ServiceInfo.class, wrapper);

        return JsonUtil.createObj()
                .put("clientid", agent.getUid())
                .put("username", agent.getUsername())
                .put("password", agent.getPassword())
                .putPOJO("serviceList", serviceList)
                .toString();
    }

    @Override
    public Upgrade getUpgradeInfo(Short type) {
        return upgradeService.lambdaQuery()
                .eq(Upgrade::getType, type)
                .orderByDesc(Upgrade::getBuildTime)
                .last("limit 1")
                .one();
    }

    @Override
    public String recAgentRecord(AgentRecord agentRecord) {
        agentRecord.setBootTime(TimeUtil.of(agentRecord.getBootTimestamp() * 1000));
        return agentRecordService.save(agentRecord) ? "success" : "error";
    }

    @Override
    public String recServiceInfo(List<ServiceRecord> recList) {
        return serviceRecordService.saveBatch(recList) ? "success" : "error";
    }

    @Override
    public ObjectNode auth(MqttAuthDTO dto) {
        var clientid = dto.getClientid();
        if (StrUtil.equals(clientid, "MS8011")) {
            log.info("Monitor Server 认证成功. clientid: {}", dto.getClientid());
            return JsonUtil.createObj().put("result", "allow")
                    .putPOJO("acl", JsonUtil.createArr()
                            .addPOJO(JsonUtil.createObj()
                                    .put("action", "all")
                                    .put("permission", "allow")
                                    .put("topic", "#")));
        }
        var username = dto.getUsername();
        var password = dto.getPassword();
        var agent = this.lambdaQuery().eq(Agent::getUid, clientid)
                .eq(Agent::getUsername, username)
                .eq(Agent::getPassword, password)
                .oneOpt();
        if (agent.isEmpty()) {
            log.info("Agent 认证失败: {}", dto.getClientid());
            return JsonUtil.createObj().put("result", "deny");
        }
        var obj = JsonUtil.createObj();
        obj.put("result", "allow")
                // todo expire_at 用法暂不清楚
                // obj.put("expire_at", Instant.now().getEpochSecond() + 15);
                .put("superuser", false)
                .putPOJO("acl", JsonUtil.createArr()
                        .addPOJO(JsonUtil.createObj()
                                .put("action", "all")
                                .put("permission", "allow")
                                .put("topic", "#")));
        log.info("Agent 认证成功. clientid: {}", dto.getClientid());
        return obj;
    }

    @Override
    public void connected(JsonNode connectedInfo) {
        var clientid = connectedInfo.path("clientid").asText();
        var ipaddr = connectedInfo.path("ipaddress").asText();
        var ip = SplitUtil.split(ipaddr, ":").getFirst();
        if (StrUtil.equals(clientid, "MS8011")) {
            log.info("Monitor Server 连接 Mqtt Server 成功. clientid:{}, ip:{}", clientid, ip);
            return;
        }
        this.lambdaUpdate().set(Agent::getStatus, 1).set(Agent::getIpaddr, ip).eq(Agent::getUid, clientid).update(new Agent());
        log.info("Agent 连接 Mqtt Server 成功. clientid:{}, ip:{}", clientid, ip);
    }

    @Override
    public void disconnect(JsonNode disconnectInfo) {
        var clientid = disconnectInfo.path("clientid").asText();
        var ipaddr = disconnectInfo.path("ipaddress").asText();
        if (StrUtil.equals(clientid, "MS8011")) {
            log.info("Monitor Server 断开连接 Mqtt Server 成功. clientid:{}", clientid);
            return;
        }
        this.lambdaUpdate().set(Agent::getStatus, 0).eq(Agent::getUid, clientid).update();
        log.info("Agent 断开连接 Mqtt Server. clientid:{}, ip:{}", clientid, ipaddr);
    }

}