VAServerService.java 12.3 KB
package com.viontech.fanxing.task.service;

import com.alibaba.fastjson.JSONObject;
import com.viontech.fanxing.commons.base.LocalCache;
import com.viontech.fanxing.commons.config.VionConfig;
import com.viontech.fanxing.commons.constant.RedisKeys;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Task;
import com.viontech.fanxing.commons.model.main.VaServerInfo;
import com.viontech.fanxing.commons.service.RedisService;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.vaserver.VaServerOverViewModel;
import com.viontech.fanxing.task.repository.VAServerRedisRepository;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RMap;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 与 视频分析服务 交互的 service
 *
 * @author 谢明辉
 * @date 2021/7/13
 */

@Service
@Slf4j
public class VAServerService {
    protected final static Pattern IP_PATTERN = Pattern.compile("((?:1[0-9][0-9]\\.|2[0-4][0-9]\\.|25[0-5]\\.|[1-9][0-9]\\.|[0-9]\\.){3}(?:1[0-9][0-9]|2[0-4][0-9]|25[0-5]|[1-9][0-9]|[0-9]))");


    @Resource
    private RedisService redisService;
    @Resource
    private VAServerRedisRepository vaServerRedisRepository;
    @Resource
    private TaskDataService taskDataService;
    @Resource
    private VAServerHttpService vaServerHttpService;
    @Resource
    private VionConfig vionConfig;

    /**
     * 设备注册
     */
    public void registerVAServer(VaServerInfo vaServerInfo) {
        String devId = vaServerInfo.getDevID();
        VaServerInfo vaServer = vaServerRedisRepository.getVAServerInfoById(devId);
        Float availableResources = vaServer == null ? vaServerInfo.getVideoResource() : vaServer.getAvailableResources();
        vaServerInfo.setAvailableResources(availableResources);
        vaServerRedisRepository.addOrUpdate(devId, vaServerInfo);
        keepalive(devId);
    }

    /**
     * 设备心跳
     */
    public void keepalive(String devId) {
        VaServerInfo vaserverInfo = vaServerRedisRepository.getVAServerInfoById(devId);
        if (vaserverInfo != null) {
            vaserverInfo.setStatus(1);
            vaServerRedisRepository.addOrUpdate(devId, vaserverInfo);
        } else {
            throw new FanXingException("设备未注册");
        }

        RBucket<Date> bucket = redisService.getValue(RedisKeys.getVAServerKeepAliveKey(devId));
        bucket.set(new Date());
        bucket.expire(2, TimeUnit.MINUTES);
    }

    /**
     * 下发任务,关联任务和vaServer,修改vaServer可用资源数
     */
    public boolean executeTask(TaskData taskData, VaServerInfo server) {
        Task task = taskData.getTask();
        // 执行任务下发,成功后关联任务和vaServer,减少vaServer的可用资源数量

        vaServerHttpService.addTask(taskData, server);

        RMap<String, String> map = redisService.getTaskVaServerMap();
        map.put(task.getUnid(), server.getDevID());

        modifyVAServerResource(server.getDevID(), -task.getResourceNeed());
        return true;
    }

    /**
     * 1. 解除任务和vaServer的关联
     * 2. vaServer 资源数增加
     * 3. 请求 vaServer 终止任务
     * <p>
     * 删除任务
     */
    public boolean terminateTask(String taskUnid) {
        TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid);
        if (taskData == null) {
            return true;
        }
        Task task = taskData.getTask();

        RMap<String, String> map = redisService.getTaskVaServerMap();
        String vaServerId = map.get(taskUnid);

        // 如果vaServerId不为空,需要终止任务
        if (vaServerId != null) {
            VaServerInfo vaServerInfo = vaServerRedisRepository.getVAServerInfoById(vaServerId);
            if (0 == vaServerInfo.getStatus()) {
                return false;
            }
            // 下发终止任务请求
            vaServerHttpService.rmTask(taskUnid, vaServerInfo);
            // 解除任务和 vaServer 关联, 恢复资源数量
            map.remove(taskUnid);

            modifyVAServerResource(vaServerId, task.getResourceNeed());
            return true;
        }
        return true;
    }

    public void modifyVAServerResource(String devId, float param) {
        RLock vaServerLock = redisService.getLockMust("lock:vaserver:" + devId);
        try {
            VaServerInfo vaServerInfo = vaServerRedisRepository.getVAServerInfoById(devId);
            if (vaServerInfo == null) {
                return;
            }
            float v = vaServerInfo.getAvailableResources() + param;
            if (v > vaServerInfo.getVideoResource()) {
                v = vaServerInfo.getVideoResource();
            }
            vaServerInfo.setAvailableResources(v);
            vaServerRedisRepository.addOrUpdate(devId, vaServerInfo);
        } finally {
            vaServerLock.forceUnlock();
        }
    }

    /**
     * 修改任务
     */
    public boolean updateTask(TaskData taskData) {
        VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskData.getTask().getUnid());
        vaServerHttpService.updateTask(taskData, vaServerInfo);
        return true;
    }

    /**
     * 截图
     */
    public Object snapshot(String taskUnid) {
        VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
        if (vaServerInfo != null) {
            if (vaServerInfo.getStatus() == 0) {
                throw new FanXingException("设备离线");
            }
            return vaServerHttpService.snapshot(taskUnid, vaServerInfo);
        } else {
            throw new FanXingException("任务不在运行状态", taskUnid);
        }
    }

    /**
     * 获取点播地址
     */
    public Object getAnalyzeStream(String taskUnid) {
        VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
        if (vaServerInfo != null) {
            if (vaServerInfo.getStatus() == 0) {
                throw new FanXingException("设备离线");
            }
            return vaServerHttpService.getAnalyzeStream(taskUnid, vaServerInfo);
        } else {
            throw new FanXingException("任务不在运行状态", taskUnid);
        }
    }

    /**
     * 输出分析流
     */
    public String startAnalyzeStream(String taskUnid) {
        TaskData taskData = taskDataService.getRepository().getTaskDataByUnid(taskUnid);
        if (taskData == null) {
            throw new FanXingException("找不到对应的任务");
        }
        VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
        if (vaServerInfo != null) {
            if (vaServerInfo.getStatus() == 0) {
                throw new FanXingException("设备离线");
            }
            String rtmpUrl = vionConfig.getSrs().getRtmpUrl(taskUnid);
            String httpUrl = "/srs/live/" + taskUnid + ".flv";
            vaServerHttpService.startAnalyzeStream(taskUnid, vaServerInfo, rtmpUrl);
            return httpUrl;
        } else {
            throw new FanXingException("任务不在运行状态", taskUnid);
        }
    }

    /**
     * 录像上传
     */
    public Object uploadVideo() {
        return null;
    }

    /**
     * 获取 VAServer 运行状态配置参数等
     */
    public JSONObject getStatus(String devId) {
        VaServerInfo vaServerInfo = vaServerRedisRepository.getVAServerInfoById(devId);
        if (vaServerInfo != null) {
            if (vaServerInfo.getStatus() == 0) {
                throw new FanXingException("设备离线");
            }
            return vaServerHttpService.status(vaServerInfo);
        } else {
            throw new FanXingException("无法获取到对应的设备", devId);
        }
    }

    /**
     * 场景切换
     */
    public Object switchScene(String taskUnid, String sceneId) {
        VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
        if (vaServerInfo != null) {
            if (vaServerInfo.getStatus() == 0) {
                throw new FanXingException("设备离线");
            }
            return vaServerHttpService.switchScene(taskUnid, vaServerInfo, sceneId);
        } else {
            throw new FanXingException("任务不在运行状态", taskUnid);
        }
    }

    /**
     * 任务轮训状态切换
     */
    public Object updateRotationStatus(String taskUnid, Integer rotationStatus) {
        VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
        if (vaServerInfo != null) {
            return vaServerHttpService.updateRotationStatus(taskUnid, rotationStatus, vaServerInfo);
        } else {
            throw new FanXingException("任务不在运行状态", taskUnid);
        }
    }

    /**
     * 任务轮训状态查询
     */
    public Object getRotationStatus(String taskUnid) {
        VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
        if (vaServerInfo != null) {
            if (vaServerInfo.getStatus() == 0) {
                throw new FanXingException("设备离线");
            }
            return vaServerHttpService.getRotationStatus(taskUnid, vaServerInfo);
        } else {
            throw new FanXingException("任务不在运行状态", taskUnid);
        }
    }

    public JSONObject getCurrentScene(String taskUnid) {
        VaServerInfo vaServerInfo = taskDataService.taskRunOn(taskUnid);
        if (vaServerInfo != null) {
            if (vaServerInfo.getStatus() == 0) {
                throw new FanXingException("设备离线");
            }
            return vaServerHttpService.getCurrentScene(taskUnid, vaServerInfo);
        } else {
            throw new FanXingException("任务不在运行状态", taskUnid);
        }
    }

    public Object getDefaultAlgorithmConfig(String taskAlgType) {
        RMap<String, VaServerInfo> map = vaServerRedisRepository.getVaServerInfoMap();
        for (VaServerInfo item : map.readAllValues()) {
            if (item.getStatus() == 1) {
                try {
                    return vaServerHttpService.getDefaultAlgorithmConfig(item, taskAlgType);
                } catch (Exception e) {
                    log.error(item.getDevID() + "访问失败", e);
                }
            }
        }
        throw new FanXingException("无法获取到默认配置");
    }

    /**
     * 获取所有vaServer信息
     */
    @LocalCache(value = "vaServerOverview", duration = 10, timeunit = TimeUnit.SECONDS)
    public Collection<VaServerOverViewModel> vaServerOverview() {
        HashMap<String, VaServerOverViewModel> map = new HashMap<>();
        Collection<VaServerInfo> vaServerInfos = vaServerRedisRepository.getVaServerInfoMap().readAllValues();
        for (VaServerInfo vaServer : vaServerInfos) {
            String serviceBaseUrl = vaServer.getServiceBaseUrl();
            Matcher matcher = IP_PATTERN.matcher(serviceBaseUrl);
            String ip = "无法获取";
            if (matcher.find()) {
                ip = matcher.group();
            }
            JSONObject status = new JSONObject();
            try {
                if (vaServer.getStatus() == 1) {
                    status = vaServerHttpService.status(vaServer);
                    status.remove("resource");
                }
            } catch (Exception e) {
                log.error("", e);
            }

            status.put("devId", vaServer.getDevID());
            status.put("serviceName", vaServer.getDockerContainerName());
            status.put("status", vaServer.getStatus());
            status.put("platType", vaServer.getPlatType());
            status.put("softVersion", vaServer.getSoftVersion());
            status.put("algoVersion", vaServer.getAlgoVersion());
            status.put("availableResource", vaServer.getAvailableResources());
            status.put("videoResource", vaServer.getVideoResource());

            VaServerOverViewModel model = map.computeIfAbsent(ip, x -> new VaServerOverViewModel());
            model.setIp(ip).addInfo(status);
            model.addTotal(vaServer.getVideoResource()).addUsed(vaServer.getVideoResource() - vaServer.getAvailableResources());
        }

        return map.values();
    }

    public VAServerRedisRepository getVaServerRedisRepository() {
        return vaServerRedisRepository;
    }
}