PoolService.java 12.2 KB
package com.viontech.match.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.model.Person;
import com.viontech.match.config.Constant;
import com.viontech.match.entity.PoolInfo;
import com.viontech.match.entity.vo.RequestVo;
import com.viontech.match.entity.vo.ResponseVo;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
 * .
 *
 * @author 谢明辉
 * @version 0.0.1
 */
@Service
@Slf4j
public class PoolService {
    @Resource
    private PersonService personService;
    @Resource
    private RestHighLevelClient client;
    @Resource
    private ObjectMapper objectMapper;
    @Value("${vion.index.number_of_shards:1}")
    private Integer shards;
    @Value("${vion.index.number_of_replicas:0}")
    private Integer replicas;

    /**
     * 添加特征池
     *
     * @param requestVo rid,poolId,personPool
     *
     * @return ResponseVo
     * @throws Exception --
     */
    public ResponseVo createPool(RequestVo requestVo) throws Exception {
        String rid = requestVo.getRid();
        String poolId = requestVo.getPoolId();
        log.info("特征池创建操作开始:[{}}", poolId);
        try {
            CreateIndexRequest createIndexRequest = new CreateIndexRequest(poolId);
            XContentBuilder builder = getCreateIndexContentBuilder();
            createIndexRequest.mapping(builder);
            Settings.Builder setting = Settings.builder();
            setting.put("index.number_of_shards", shards);
            setting.put("index.number_of_replicas", replicas);
            createIndexRequest.settings(setting);
            CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);

            List<Person> personPool = requestVo.getPersonPool();
            if (personPool != null && personPool.size() > 0) {
                BulkResponse bulkItemResponses = personService.addPerson(poolId, personPool);
                if (bulkItemResponses != null) {
                    log.info(bulkItemResponses.buildFailureMessage());
                }
            }

            log.info("特征池创建操作完成:[{}]", poolId);
            return ResponseVo.success(rid);
        } catch (ElasticsearchStatusException e) {
            if (e.status() == RestStatus.BAD_REQUEST && e.getMessage().contains(Constant.STR_POOL_ID_ALREADY_EXISTS)) {
                return ResponseVo.error(rid, 20, "already exit poolId");
            } else {
                throw e;
            }
        }
    }


    /**
     * 删除特征池
     *
     * @param requestVo rid,flushPool,poolId,personIds,
     *
     * @return ResponseVo
     * @throws Exception --
     */
    public ResponseVo deletePool(RequestVo requestVo) throws Exception {
        String rid = requestVo.getRid();
        Integer flushPool = requestVo.getFlushPool();
        String poolId = requestVo.getPoolId();
        log.info("特征池删除操作开始:[{}],FlushPool:[{}]", poolId, flushPool);
        try {
            if (0 == flushPool) {
                List<HashMap<String, String>> personIds = requestVo.getPersonIds();
                for (HashMap<String, String> item : personIds) {
                    String personId = item.get("personId");
                    if (personId != null) {
                        personService.deletePerson(poolId, personId);
                    }
                }
            } else if (flushPool == 1) {
                AcknowledgedResponse delete = client.indices().delete(new DeleteIndexRequest(poolId), RequestOptions.DEFAULT);
            }
            log.info("特征池删除操作完成:[{}],FlushPool:[{}]", poolId, flushPool);
            return ResponseVo.success(rid);
        } catch (ElasticsearchStatusException e) {
            if (e.status() == RestStatus.NOT_FOUND) {
                return ResponseVo.poolIdNotExists(rid);
            } else {
                return ResponseVo.error(rid, e.getDetailedMessage());
            }
        }
    }

    /**
     * 修改特征池(添加人员)
     *
     * @param requestVo rid,poolId,personPool,updateType
     *
     * @return ResponseVo
     * @throws Exception --
     */
    public ResponseVo modifyPool(RequestVo requestVo) throws Exception {
        String rid = requestVo.getRid();
        String poolId = requestVo.getPoolId();
        List<Person> personPool = requestVo.getPersonPool();
        Integer updateType = requestVo.getUpdateType();
        log.info("特征池修改操作开始:[{}],updateType:[{}]", poolId, updateType);
        if (!client.indices().exists(new GetIndexRequest(poolId), RequestOptions.DEFAULT)) {
            return ResponseVo.poolIdNotExists(rid);
        }
        try {
            BulkResponse bulkItemResponses = personService.addPerson(poolId, personPool);
            if (bulkItemResponses != null && bulkItemResponses.hasFailures()) {
                log.info(bulkItemResponses.buildFailureMessage());
                return ResponseVo.error(rid, bulkItemResponses.buildFailureMessage());
            }

            log.info("特征池修改操作完成:[{}],updateType:[{}]", poolId, updateType);
            return ResponseVo.success(rid, "success");
        } catch (ElasticsearchStatusException e) {
            log.error("modifyPool", e);
            return ResponseVo.error(rid, e.getDetailedMessage());
        }
    }

    /**
     * 查询特征池信息
     *
     * @param requestVo rid,listAll,poolId
     *
     * @return ResponseVo
     * @throws Exception --
     */
    public ResponseVo queryPool(RequestVo requestVo) throws Exception {
        String rid = requestVo.getRid();
        Integer listAll = requestVo.getListAll();
        String poolId = requestVo.getPoolId();
        try {
            List<PoolInfo> poolInfos;
            log.info("查询特征池操作开始:[{}],rid:[{}]", poolId, rid);

            if (listAll != 0) {
                poolInfos = queryPoolInfo(null);
            } else {
                if (!existPool(poolId)) {
                    return ResponseVo.poolIdNotExists(rid);
                }
                poolInfos = queryPoolInfo(poolId);
            }
            ResponseVo success = ResponseVo.success(rid, "success");
            success.setPoolIds(poolInfos);
            log.info("查询特征池操作完成:[{}],rid:[{}]", poolId, rid);

            return success;
        } catch (ElasticsearchStatusException e) {
            log.error("queryPool", e);
            return ResponseVo.error(rid, e.getDetailedMessage());
        }
    }

    public void refreshPool(String... poolIds) throws IOException {
        RefreshRequest refreshRequest = new RefreshRequest(poolIds);
        RefreshResponse refresh = client.indices().refresh(refreshRequest, RequestOptions.DEFAULT);
        log.info("刷新索引:{},成功:{},失败:{}", poolIds, refresh.getSuccessfulShards(), refresh.getFailedShards());
    }


    public XContentBuilder getCreateIndexContentBuilder() throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.startObject("properties");
            {
                // 人脸特征
                builder.startObject("data");
                {
                    builder.field("type", "dense_vector");
                    builder.field("dims", Constant.FACE_FEATURE_DIMS);
                }
                builder.endObject();

                // 人体特征
                builder.startObject("body");
                {
                    builder.field("type", "dense_vector");
                    builder.field("dims", Constant.BODY_FEATURE_DIMS_2048);
                }
                builder.endObject();

                // 人员id
                builder.startObject("personId");
                {
                    builder.field("type", "keyword");
                }
                builder.endObject();

                // 抓拍id
                builder.startObject("unid");
                {
                    builder.field("type", "keyword");
                }
                builder.endObject();

                // 特征id
                builder.startObject("fid");
                {
                    builder.field("type", "text");
                }
                builder.endObject();

                // 年龄
                builder.startObject("age");
                {
                    builder.field("type", "integer");
                    builder.field("doc_values", false);
                    builder.field("index", false);
                }
                builder.endObject();

                // 性别
                builder.startObject("gender");
                {
                    builder.field("type", "keyword");
                    builder.field("doc_values", false);
                    builder.field("index", false);
                }
                builder.endObject();

                // 时间
                builder.startObject("counttime");
                {
                    builder.field("type", "date");
                    builder.field("format", "yyyy-MM-dd HH:mm:ss");
                }
                builder.endObject();

                // 通道序列号
                builder.startObject("channelSerialNum");
                {
                    builder.field("type", "keyword");
                }
                builder.endObject();

                // 身体特征类型类型
                builder.startObject("body_type");
                {
                    builder.field("type", "integer");
                    builder.field("doc_values", false);
                    builder.field("index", false);
                }
                builder.endObject();


            }
            builder.endObject();
        }
        builder.endObject();
        return builder;
    }

    public boolean existPool(String poolId) throws IOException {
        return client.indices().exists(new GetIndexRequest(poolId), RequestOptions.DEFAULT);
    }

    public List<PoolInfo> queryPoolInfo(String poolId) throws IOException {
        String endPoint = poolId == null ? "/_stats" : "/" + poolId + "/_stats";
        LinkedList<PoolInfo> poolInfos = new LinkedList<>();
        RestClient lowLevelClient = client.getLowLevelClient();
        Request request = new Request("GET", endPoint);
        Response response = lowLevelClient.performRequest(request);
        HttpEntity entity = response.getEntity();
        InputStream content = entity.getContent();
        HashMap responseMap = objectMapper.readValue(content, HashMap.class);
        HashMap<String, HashMap> indices1 = (HashMap<String, HashMap>) responseMap.get("indices");
        for (Map.Entry<String, HashMap> entry : indices1.entrySet()) {
            String id = entry.getKey();
            HashMap value = entry.getValue();
            HashMap<String, HashMap> primaries = (HashMap<String, HashMap>) value.get("primaries");
            HashMap docs = primaries.get("docs");
            Integer count = (Integer) docs.get("count");
            PoolInfo poolInfo = new PoolInfo();
            poolInfo.setPersonCount(count.longValue());
            poolInfo.setPoolId(id);
            poolInfos.add(poolInfo);
        }
        return poolInfos;
    }

}