AgentServiceImpl.java 18 KB
package vion.service.impl.monitor;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.yulichang.base.MPJBaseServiceImpl;
import com.github.yulichang.wrapper.MPJLambdaWrapper;
import io.github.linpeilie.Converter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.dreamlu.iot.mqtt.codec.MqttQoS;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.dromara.hutool.core.collection.CollUtil;
import org.dromara.hutool.core.date.TimeUtil;
import org.dromara.hutool.core.lang.Opt;
import org.dromara.hutool.core.text.StrUtil;
import org.dromara.hutool.core.text.split.SplitUtil;
import org.dromara.hutool.core.util.ObjUtil;
import org.dromara.hutool.core.util.RandomUtil;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import vion.dto.monitor.AgentDTO;
import vion.dto.monitor.MqttAuthDTO;
import vion.dto.monitor.OrgDTO;
import vion.mapper.monitor.AgentMapper;
import vion.model.monitor.*;
import vion.service.monitor.*;
import vion.utils.JsonUtil;
import vion.utils.TopicUtil;
import vion.vo.monitor.AgentVO;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * @author vion
 * @date 2024/10/16
 */
@Service
@Slf4j
@RequiredArgsConstructor
public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> implements IAgentService {

    private final IAgentRecordService agentRecordService;
    private final IServiceInfoService serviceInfoService;
    private final IServiceRecordService serviceRecordService;
    private final IRAgentServiceService rAgentServiceService;
    private final IRAgentEventService irAgentEventService;
    private final IUpgradeService upgradeService;
    private final Converter converter;
    private final MqttClientTemplate client;
    private final IMAccountService accountService;
    private final IMallService mallService;
    @Value(value = "${mqtt.client.server-topic:/MS/receive}")
    private String serverTopic;

    @Override
    public Page<AgentVO> list(AgentDTO dto) {
        var wrapper = new MPJLambdaWrapper<>(converter.convert(dto, Agent.class))
                .selectAll(Agent.class)
                .orderByAsc(Agent::getCreateTime);
        var agentVOPage = this.selectJoinListPage(Page.of(dto.getPageNum(), dto.getPageSize()), AgentVO.class, wrapper);
        Opt.ofEmptyAble(agentVOPage.getRecords())
                .ifPresent(r -> {
                    var agentUidList = r.stream().map(AgentVO::getUid).toList();
                    // 查询服务器信息
                    var agentRecWrapper = Wrappers.<AgentRecord>query()
                            .select("DISTINCT on (agent_uid) *")
                            .in("agent_uid", agentUidList)
                            .orderByDesc("agent_uid", "create_time");
                    var agentRecordList = agentRecordService.list(agentRecWrapper);
                    var uid2AgentRecMap = agentRecordList.stream().collect(Collectors.toMap(AgentRecord::getAgentUid, Function.identity()));
                    r.forEach(a -> a.setAgentRecord(uid2AgentRecMap.get(a.getUid())));

                    // 查询异常服务的名称
                    var serviceRecordWrapper = new MPJLambdaWrapper<ServiceRecord>()
                            .select("DISTINCT on (agent_uid, service_uid) t.*")
                            .selectAs("si", ServiceInfo::getName, ServiceRecord::getName)
                            .leftJoin(ServiceInfo.class, "si", ServiceInfo::getUid, "t", ServiceRecord::getServiceUid)
                            .in(ServiceRecord::getAgentUid, agentUidList)
                            .orderByDesc(ServiceRecord::getAgentUid, ServiceRecord::getServiceUid, ServiceRecord::getMonitorTime);
                    var serviceRecordList = serviceRecordService.selectJoinList(ServiceRecord.class, serviceRecordWrapper);
                    var serviceNameMap = serviceRecordList.stream().collect(Collectors.groupingBy(ServiceRecord::getAgentUid, Collectors.mapping(ServiceRecord::getName, Collectors.joining(","))));
                    r.forEach(a -> a.setServiceName(serviceNameMap.get(a.getUid())));
                });
        return agentVOPage;
    }

    @Override
    public AgentVO getVOById(Long id) {
        var wrapper = new MPJLambdaWrapper<Agent>()
                .selectAll(Agent.class)
                .selectAsClass(ServiceInfo.class, AgentVO.class)
                .leftJoin(ServiceInfo.class, ServiceInfo::getUid, Agent::getUid)
                .eq(Agent::getId, id)
                .orderByAsc(Agent::getCreateTime);
        return this.selectJoinOne(AgentVO.class, wrapper);
    }

    @Override
    public String update(AgentDTO dto) {
        var agent = converter.convert(dto, Agent.class);
        return this.updateById(agent) ? "更新成功" : "更新失败";
    }

    @Override
    public String upgradeCommand(String uid, Long upgradeId) {
        var topic = TopicUtil.getOTATopic(uid);
        if (StrUtil.isBlank(topic)) {
            var msg = StrUtil.format("agent:{} 获取topic失败", uid);
            log.error(msg);
            return msg;
        }
        var node = JsonUtil.createObj().put("upgradeId", upgradeId);
        client.publish(topic, JsonUtil.toJsonByte(node), MqttQoS.QOS2);
        return "升级指令下发成功,请稍后查看Agent版本号";
    }

    @Override
    public List<ServiceInfo> getServicesByUid(String uid) {
        var wrapper = new MPJLambdaWrapper<ServiceInfo>()
                .selectAs(RAgentService::getId, ServiceInfo::getId)
                .selectAs(ServiceInfo::getUid, ServiceInfo::getUid)
                .selectAs(ServiceInfo::getName, ServiceInfo::getName)
                .selectAs(ServiceInfo::getType, ServiceInfo::getType)
                .selectAs(RAgentService::getUrl, ServiceInfo::getUrl)
                .selectAs(RAgentService::getMethod, ServiceInfo::getMethod)
                .selectAs(RAgentService::getUsername, ServiceInfo::getUsername)
                .selectAs(RAgentService::getPassword, ServiceInfo::getPassword)
                .selectAs(ServiceInfo::getDescription, ServiceInfo::getDescription)
                .selectAs(RAgentService::getCreateTime, ServiceInfo::getCreateTime)
                .selectAs(RAgentService::getUpdateTime, ServiceInfo::getUpdateTime)
                .leftJoin(RAgentService.class, RAgentService::getServiceUid, ServiceInfo::getUid)
                .eq(RAgentService::getAgentUid, uid)
                .orderByAsc(ServiceInfo::getCreateTime);
        var serviceInfoList = serviceInfoService.selectJoinList(ServiceInfo.class, wrapper);
        Opt.ofEmptyAble(serviceInfoList)
                .ifPresent(r -> {
                    var serviceUidList = r.stream().map(ServiceInfo::getUid).toList();
                    var serviceRecWrapper = Wrappers.<ServiceRecord>query()
                            .select("DISTINCT on (service_uid) *")
                            .in("service_uid", serviceUidList)
                            .eq("agent_uid", uid)
                            .orderByDesc("service_uid", "monitor_time");
                    var serviceRecordList = serviceRecordService.list(serviceRecWrapper);
                    var serviceUid2SelfMap = serviceRecordList.stream().collect(Collectors.toMap(ServiceRecord::getServiceUid, Function.identity()));
                    r.forEach(tmp -> tmp.setServiceRecord(serviceUid2SelfMap.get(tmp.getUid())));
                });
        return serviceInfoList;
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public String saveServicesByUid(String uid, List<ServiceInfo> serviceInfoList) {
        if (CollUtil.isNotEmpty(serviceInfoList)) {
            var rAgentServiceList = serviceInfoList.stream().map(s -> {
                var rAgentService = new RAgentService();
                rAgentService.setAgentUid(uid);
                rAgentService.setServiceUid(s.getUid());
                rAgentService.setUrl(s.getUrl());
                rAgentService.setMethod(s.getMethod());
                rAgentService.setUsername(s.getUsername());
                rAgentService.setPassword(s.getPassword());
                return rAgentService;
            }).toList();
            rAgentServiceService.saveBatch(rAgentServiceList);
        }
        return "更新成功";
    }

    @Override
    public String updateAgent2ServiceById(Long id, RAgentService rAgentService) {
        return rAgentServiceService.updateById(rAgentService) ? "更新成功" : "更新失败";
    }

    @Override
    public String removeAgent2ServiceById(Long id) {
        return rAgentServiceService.removeById(id) ? "删除成功 : " : "删除失败";
    }

    @Override
    public String assign(String uid, List<ServiceInfo> serviceInfoList) {
        var topic = TopicUtil.getServiceTopic(uid);
        if (StrUtil.isBlank(topic)) {
            var msg = StrUtil.format("agent:{} 获取topic失败", uid);
            log.error(msg);
            return msg;
        }
        return client.publish(topic, JsonUtil.toJsonByte(serviceInfoList), MqttQoS.QOS2) ? "服务列表下发成功" :
                "服务列表下发失败";
    }

    @Override
    public List<AgentRecord> getForm(String uid, LocalDateTime startTime, LocalDateTime endTime) {
        return agentRecordService.lambdaQuery()
                .eq(AgentRecord::getAgentUid, uid)
                .between(AgentRecord::getCreateTime, startTime, endTime)
                .orderByAsc(AgentRecord::getCreateTime)
                .list();
    }

    @Override
    public String reg(Agent agent) {
        agent.setStatus((short) 1);
        if (StrUtil.isBlank(agent.getUid())) {
            return "注册失败,Agent 唯一标识为空";
        }

        var one = this.lambdaQuery().eq(Agent::getUid, agent.getUid()).one();
        if (ObjUtil.isNull(one)) {
            agent.setUsername(RandomUtil.randomString(agent.getUid(), 8));
            agent.setPassword(RandomUtil.randomString(12));
            this.save(agent);
        } else {
            this.lambdaUpdate().eq(Agent::getUid, agent.getUid()).update(agent);
            agent = one;
        }
        log.info("Agent 注册成功. clientid:{}, username:{}", agent.getUid(), agent.getUsername());

        var wrapper = new MPJLambdaWrapper<ServiceInfo>()
                .selectAs(ServiceInfo::getUid, ServiceInfo::getUid)
                .selectAs(ServiceInfo::getName, ServiceInfo::getName)
                .selectAs(ServiceInfo::getType, ServiceInfo::getType)
                .selectAs(RAgentService::getUrl, ServiceInfo::getUrl)
                .selectAs(RAgentService::getMethod, ServiceInfo::getMethod)
                .selectAs(RAgentService::getUsername, ServiceInfo::getUsername)
                .selectAs(RAgentService::getPassword, ServiceInfo::getPassword)
                .leftJoin(RAgentService.class, RAgentService::getServiceUid, ServiceInfo::getUid)
                .eq(RAgentService::getAgentUid, agent.getUid())
                .orderByAsc(ServiceInfo::getCreateTime);
        var serviceList = serviceInfoService.selectJoinList(ServiceInfo.class, wrapper);

        return JsonUtil.createObj()
                .put("clientid", agent.getUid())
                .put("username", agent.getUsername())
                .put("password", agent.getPassword())
                .put("serverTopic", serverTopic)
                .putPOJO("serviceList", serviceList)
                .toString();
    }

    @Override
    public Upgrade getUpgradeInfo(Long id) {
        return upgradeService.getById(id);
    }

    @Override
    public String recAgentRecord(AgentRecord agentRecord) {
        agentRecord.setBootTime(TimeUtil.of(agentRecord.getBootTimestamp() * 1000));
        agentRecordService.save(agentRecord);
        // 更新授权信息
        var licenseDate = agentRecord.getLicenseDate();
        this.lambdaUpdate().set(Agent::getLicenseDate, licenseDate).eq(Agent::getUid, agentRecord.getAgentUid()).update();
        return "success";
    }

    @Override
    public String recServiceInfo(List<ServiceRecord> recList) {
        return serviceRecordService.saveBatch(recList) ? "success" : "error";
    }

    @Override
    public String recOrgInfo(OrgDTO orgDTO) {
        var agentUid = orgDTO.getAgentUid();
        var agentType = orgDTO.getAgentType();
        var accountList = orgDTO.getAccountList();
        var mallList = orgDTO.getMallList();

        var existAccList = accountService.list();
        var uid2AccMap = existAccList.stream().collect(Collectors.toMap(MAccount::getUid, Function.identity()));
        var existMallList = mallService.list();
        var uid2MallMap = existMallList.stream().collect(Collectors.toMap(Mall::getUid, Function.identity()));

        List<MAccount> insAccList = new ArrayList<>();
        List<MAccount> updAccList = new ArrayList<>();
        accountList.forEach(a -> {
            if (uid2AccMap.containsKey(a.getUid())) {
                var acc = uid2AccMap.get(a.getUid());
                a.setId(acc.getId());
                updAccList.add(a);
            } else {
                a.setAgentUid(agentUid);
                a.setAgentType(agentType);
                insAccList.add(a);
            }
        });

        List<Mall> insMallList = new ArrayList<>();
        List<Mall> updMallList = new ArrayList<>();
        mallList.forEach(m -> {
            if (uid2MallMap.containsKey(m.getUid())) {
                var mall = uid2MallMap.get(m.getUid());
                m.setId(mall.getId());
                updMallList.add(m);
            } else {
                m.setAgentUid(agentUid);
                m.setAgentType(agentType);
                insMallList.add(m);
            }
        });
        if (CollUtil.isNotEmpty(insAccList)) {
            accountService.saveBatch(insAccList);
        }
        if (CollUtil.isNotEmpty(insMallList)) {
            mallService.saveBatch(insMallList);
        }
        if (CollUtil.isNotEmpty(updAccList)) {
            accountService.updateBatchById(updAccList);
        }
        if (CollUtil.isNotEmpty(updMallList)) {
            mallService.updateBatchById(updMallList);
        }
        return "success";
    }

    @Override
    public ObjectNode auth(MqttAuthDTO dto) {
        var clientid = dto.getClientid();
        if (StrUtil.equals(clientid, "MS8011") || StrUtil.equals(clientid, "test")) {
            log.info("Monitor Server 认证成功. clientid: {}", dto.getClientid());
            return JsonUtil.createObj().put("result", "allow")
                    .putPOJO("acl", JsonUtil.createArr()
                            .addPOJO(JsonUtil.createObj()
                                    .put("action", "all")
                                    .put("permission", "allow")
                                    .put("topic", "#")));
        }
        var username = dto.getUsername();
        var password = dto.getPassword();
        var agent = this.lambdaQuery().eq(Agent::getUid, clientid)
                .eq(Agent::getUsername, username)
                .eq(Agent::getPassword, password)
                .oneOpt();
        if (agent.isEmpty()) {
            log.info("Agent 认证失败: {}", dto.getClientid());
            return JsonUtil.createObj().put("result", "deny");
        }
        var obj = JsonUtil.createObj();
        obj.put("result", "allow")
                // todo expire_at 用法暂不清楚
                // obj.put("expire_at", Instant.now().getEpochSecond() + 15);
                .put("superuser", false)
                .putPOJO("acl", JsonUtil.createArr()
                        .addPOJO(JsonUtil.createObj()
                                .put("action", "all")
                                .put("permission", "allow")
                                .put("topic", "#")));
        log.info("Agent 认证成功. clientid: {}", dto.getClientid());
        return obj;
    }

    @Override
    public void connected(JsonNode connectedInfo) {
        var clientid = connectedInfo.path("clientid").asText();
        var ipaddr = connectedInfo.path("ipaddress").asText();
        var ip = SplitUtil.split(ipaddr, ":").getFirst();
        if (StrUtil.equals(clientid, "MS8011")) {
            log.info("Monitor Server 连接 Mqtt Server 成功. clientid:{}, ip:{}", clientid, ip);
            return;
        }
        this.lambdaUpdate().set(Agent::getStatus, 1).set(Agent::getIpaddr, ip).eq(Agent::getUid, clientid).update(new Agent());
        log.info("Agent 连接 Mqtt Server 成功. clientid:{}, ip:{}", clientid, ip);
    }

    @Override
    public void disconnect(JsonNode disconnectInfo) {
        var clientid = disconnectInfo.path("clientid").asText();
        var ipaddr = disconnectInfo.path("ipaddress").asText();
        if (StrUtil.equals(clientid, "MS8011")) {
            log.info("Monitor Server 断开连接 Mqtt Server 成功. clientid:{}", clientid);
            return;
        }
        this.lambdaUpdate().set(Agent::getStatus, 0).eq(Agent::getUid, clientid).update();
        log.info("Agent 断开连接 Mqtt Server. clientid:{}, ip:{}", clientid, ipaddr);
    }

    @Override
    public void checkLicense() {
        /*var agentList = this.lambdaQuery().isNotNull(Agent::getLicenseDate).list();
        List<EventRecord> recList = new ArrayList<>();
        agentList.forEach(a -> {
            var licenseDate = TimeUtil.parseDateByISO(a.getLicenseDate());
            // 当前日期减去30天,小于等于 licenseDate
            if (LocalDate.now().minusDays(30).isAfter(licenseDate) {
                var eventRecord = new EventRecord();
                eventRecord.setAgentUid(a.getUid());
                eventRecord.setEventUid("ttt");
                eventRecord.setEventType("LICENSE_EXPIRED");
                eventRecord.setStatus((short) 0);
                recList.add(eventRecord);
            }
        });
        eventRecordService.saveBatch(recList);*/
    }
}