VAServerHttpService.java 10.6 KB
package com.viontech.fanxing.task.service;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ecwid.consul.v1.ConsulClient;
import com.viontech.fanxing.commons.constant.ChannelType;
import com.viontech.fanxing.commons.exception.FanXingException;
import com.viontech.fanxing.commons.model.Channel;
import com.viontech.fanxing.task.model.TaskData;
import com.viontech.fanxing.task.model.vaserver.VATask;
import com.viontech.fanxing.task.model.vaserver.VaServerInfo;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

import javax.annotation.Resource;
import java.time.Duration;

/**
 * .
 *
 * @author 谢明辉
 * @date 2021/7/20
 */

@Service
@Slf4j
public class VAServerHttpService {

    @Resource
    private OpsClientService opsClientService;
    @Resource
    private ConsulClient consulClient;

    /**
     * 下发任务
     */
    public JSONObject addTask(TaskData taskData, VaServerInfo vaServerInfo) {
        VATask vaTask = new VATask(taskData);
        if (vaTask.getStream_type().equals(ChannelType.STREAM_VIDEO_CLOUD.value)) {
            Channel channel = opsClientService.getChannelByChannelUnid(vaTask.getChannel_unid());
            String deviceUnid = channel.getDeviceUnid();
            vaTask.setDevice_unid(deviceUnid);
        }

        String path = "/api/vaserver/v1/task";
        Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
                .post()
                .uri(uriBuilder -> uriBuilder.path(path).build())
                .bodyValue(vaTask)
                .retrieve()
                .bodyToMono(String.class);
        String response = getResponse(stringMono, Duration.ofSeconds(60));
        log.info("下发任务结果:{}", response);
        return JSON.parseObject(response);
    }

    /**
     * 编辑任务
     */
    public JSONObject updateTask(TaskData taskData, VaServerInfo vaServerInfo) {
        String path = "/api/vaserver/v1/task";
        VATask vaTask = new VATask(taskData);
        if (vaTask.getStream_type().equals(ChannelType.STREAM_VIDEO_CLOUD.value)) {
            Channel channel = opsClientService.getChannelByChannelUnid(vaTask.getChannel_unid());
            String deviceUnid = channel.getDeviceUnid();
            vaTask.setDevice_unid(deviceUnid);
        }
        Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
                .put()
                .uri(uriBuilder -> uriBuilder.path(path).build())
                .bodyValue(vaTask)
                .retrieve()
                .bodyToMono(String.class);
        String response = getResponse(stringMono, Duration.ofSeconds(60));
        log.info("更新任务结果:{}", response);
        return JSON.parseObject(response);
    }

    /**
     * 删除任务
     */
    public JSONObject rmTask(String taskUnid, VaServerInfo vaServerInfo) {
        String path = "/api/vaserver/v1/deleteTask";

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("task_unid", taskUnid);

        Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
                .post()
                .uri(uriBuilder -> uriBuilder.path(path).build())
                .bodyValue(jsonObject.toString())
                .retrieve()
                .bodyToMono(String.class);
        String response = getResponse(stringMono);
        log.info("删除任务结果:{}", response);
        return JSON.parseObject(response);
    }

    /**
     * 截图
     */
    public JSONObject snapshot(String taskUnid, VaServerInfo vaServerInfo) {
        String path = "/api/vaserver/v1/snapshot";

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("task_unid", taskUnid);

        Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
                .post()
                .uri(uriBuilder -> uriBuilder.path(path).build())
                .bodyValue(jsonObject.toString())
                .retrieve()
                .bodyToMono(String.class);

        String block = getResponse(stringMono);
        return JSON.parseObject(block);
    }

    /**
     * 获取点播地址
     */
    public JSONObject getAnalyzeStream(String taskUnid, VaServerInfo vaServerInfo) {
        String path = "/api/vaserver/v1/get_analyze_stream";

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("task_unid", taskUnid);

        Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
                .post()
                .uri(uriBuilder -> uriBuilder.path(path).build())
                .bodyValue(jsonObject.toString())
                .retrieve()
                .bodyToMono(String.class);

        String response = getResponse(stringMono);
        log.info("获取分析流地址结果 : {}", response);
        return JSON.parseObject(response);
    }

    /**
     * 开始输出分析流
     */
    public JSONObject startAnalyzeStream(String taskUnid, VaServerInfo vaServerInfo, String url) {
        String path = "/api/vaserver/v1/start_analyze_stream";

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("task_unid", taskUnid);
        jsonObject.put("isDrawRect", 1);
        jsonObject.put("mediaServerPushUrl", url);

        Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
                .post()
                .uri(uriBuilder -> uriBuilder.path(path).build())
                .bodyValue(jsonObject.toString())
                .retrieve()
                .bodyToMono(String.class);

        String response = getResponse(stringMono);
        log.info("输出分析流结果:{}", response);
        return JSON.parseObject(response);
    }

    /**
     * 切换预置位
     */
    public JSONObject switchScene(String taskUnid, VaServerInfo vaServerInfo, String sceneId) {
        String path = "/api/vaserver/v1/switch_scene";

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("task_unid", taskUnid);
        jsonObject.put("sceneID", sceneId);

        Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
                .post()
                .uri(uriBuilder -> uriBuilder.path(path).build())
                .bodyValue(jsonObject.toString())
                .retrieve()
                .bodyToMono(String.class);

        String response = getResponse(stringMono);
        log.info("场景切换结果:{}", response);
        return JSON.parseObject(response);
    }

    /**
     * 任务轮训状态切换
     */
    public JSONObject updateRotationStatus(String taskUnid, Integer rotationStatus, VaServerInfo vaServerInfo) {
        String path = "/api/vaserver/v1/alternate";

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("task_unid", taskUnid);
        jsonObject.put("alternateStatus", rotationStatus.toString());

        Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
                .post()
                .uri(uriBuilder -> uriBuilder.path(path).build())
                .bodyValue(jsonObject.toString())
                .retrieve()
                .bodyToMono(String.class);

        String response = getResponse(stringMono);
        log.info("轮训状态控制结果:{}", response);
        return JSON.parseObject(response);
    }

    /**
     * 任务轮训状态查询
     */
    public JSONObject getRotationStatus(String taskUnid, VaServerInfo vaServerInfo) {
        String path = "/api/vaserver/v1/getAlternate";

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("task_unid", taskUnid);

        Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
                .post()
                .uri(uriBuilder -> uriBuilder.path(path).build())
                .bodyValue(jsonObject.toString())
                .retrieve()
                .bodyToMono(String.class);

        String response = getResponse(stringMono);
        log.info("获取轮训状态:{}", response);
        return JSON.parseObject(response);
    }

    /**
     * 运行状态查询
     */
    public JSONObject status(VaServerInfo vaServerInfo) {
        String path = "/api/vaserver/v1/status";

        Mono<String> stringMono = WebClient.create(vaServerInfo.getServiceBaseUrl())
                .get()
                .uri(uriBuilder -> uriBuilder
                        .path(path)
                        .build())
                .retrieve()
                .bodyToMono(String.class);

        String response = getResponse(stringMono);
        log.info("运行状态查询:{}", response);
        return JSON.parseObject(response);
    }

    /**
     * 获取默认算法配置
     */
    public JSONObject getDefaultAlgorithmConfig(VaServerInfo vaServerInfo, String taskAlgType) {
        String path = "/api/vaserver/v1/runtime_config";
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("task_algo_type", taskAlgType);
        Mono<String> mono = WebClient.create(vaServerInfo.getServiceBaseUrl())
                .post()
                .uri(path)
                .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
                .bodyValue(jsonObject)
                .retrieve()
                .bodyToMono(String.class);

        String block = getResponse(mono);
        log.info("默认配置获取:{}", block);
        return JSON.parseObject(block);

    }

    /**
     * 获取任务当前预置位
     */
    public JSONObject getCurrentScene(String taskUnid, VaServerInfo vaServerInfo) {
        String path = "/api/vaserver/v1/get_current_scene";
        JSONObject obj = new JSONObject();
        obj.put("task_unid", taskUnid);
        Mono<String> mono = WebClient.create(vaServerInfo.getServiceBaseUrl())
                .post()
                .uri(path)
                .header(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE)
                .bodyValue(obj)
                .retrieve()
                .bodyToMono(String.class);

        String block = getResponse(mono);
        log.info("获取当前预置位:{}", block);
        return JSON.parseObject(block);
    }

    private <T> T getResponse(Mono<T> mono, Duration duration) {
        try {
            return mono.block(duration);
        } catch (Exception e) {
            throw new FanXingException("访问设备失败");
        }
    }

    private <T> T getResponse(Mono<T> mono) {
        try {
            return mono.block(Duration.ofSeconds(20));
        } catch (Exception e) {
            throw new FanXingException("访问设备失败");
        }
    }


}