PoolService.java 13.1 KB
package com.viontech.match.service;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.mapping.AllField;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.CountRequest;
import co.elastic.clients.elasticsearch.core.CountResponse;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import com.alibaba.fastjson.JSONObject;
//import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.model.Person;
import com.viontech.match.config.Constant;
import com.viontech.match.entity.PoolInfo;
import com.viontech.match.entity.vo.RequestVo;
import com.viontech.match.entity.vo.ResponseVo;
import com.viontech.match.runner.ILM;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;

/**
 * .
 *
 * @author 谢明辉
 * @version 0.0.1
 */
@Service
@Slf4j
public class PoolService {
    @Resource
    private PersonService personService;
    @Resource
    private ElasticsearchClient client;
//    @Resource
//    private ObjectMapper objectMapper;
    @Value("${vion.index.number_of_shards:1}")
    private Integer shards;
    @Value("${vion.index.number_of_replicas:0}")
    private Integer replicas;
    @Value("${vion.index.translog.durability:async}")
    private String translogDurability;

//    @Value("${vion.index.refresh_interval}")
//    private String refreshInterval;
    @Value("${vion.index.translog.sync_interval}")
    private String translogSyncInterval;
    @Value("${vion.index.merge.scheduler.max_thread_count}")
    private Integer mergeThreadCount;

    /**
     * 添加特征池
     *
     * @param requestVo rid, poolId, personPool
     *
     * @return ResponseVo
     * @throws Exception --
     */
    public ResponseVo createPool(RequestVo requestVo) throws Exception {
        return createPool(requestVo, true);
    }

    public ResponseVo createPool(RequestVo requestVo, boolean addPerson) throws Exception {
        String rid = requestVo.getRid();
        String poolId = requestVo.getPoolId();
//        log.info("特征池创建操作开始:[{}},IML:{}", poolId, requestVo.isUseILMPolicy());
        try {

            IndexSettings.Builder settings = new IndexSettings.Builder()
                    .numberOfShards(String.valueOf(shards))
                    .numberOfReplicas(String.valueOf(replicas));
            if (StringUtils.isNotEmpty(translogDurability)) {
                settings.translog(t -> t.durability(TranslogDurability.Async));
                if (StringUtils.isNotEmpty(translogSyncInterval)) {
                    settings.translog(t -> t.syncInterval(i -> i.time(translogSyncInterval)));
                }
            }
            if (mergeThreadCount != null) {
                settings.merge(m -> m.scheduler(s -> s.maxThreadCount(mergeThreadCount)));
            }

            if (requestVo.isUseILMPolicy()) {
                settings.lifecycle(l -> l.name(ILM.LIFECYCLE_NAME));
            }

            TypeMapping.Builder builder = getCreateIndexContentBuilder();
            CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(poolId).mappings(builder.build()).settings(settings.build()).build();
            CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest);

            if (addPerson) {
                List<Person> personPool = requestVo.getPersonPool();
                if (CollectionUtils.isNotEmpty(personPool)) {
                    BulkResponse bulkItemResponses = personService.addPerson(poolId, personPool);
                    if (bulkItemResponses != null) {
                        log.info(bulkItemResponses.toString());
                    }
                }
            }

            log.info("特征池创建完成:[{}],ILM:{}", poolId, requestVo.isUseILMPolicy());
            return ResponseVo.success(rid);
        } catch (ElasticsearchException e) {
            if (e.status() == 400 && e.getMessage().contains(Constant.STR_POOL_ID_ALREADY_EXISTS)) {
                return ResponseVo.error(rid, 20, "already exit poolId");
            } else {
                throw e;
            }
        }
    }


    /**
     * 删除特征池
     *
     * @param requestVo rid,flushPool,poolId,personIds,
     *
     * @return ResponseVo
     * @throws Exception --
     */
    public ResponseVo deletePool(RequestVo requestVo) throws Exception {
        String rid = requestVo.getRid();
        Integer flushPool = requestVo.getFlushPool();
        String poolId = requestVo.getPoolId();
//        log.info("特征池删除操作开始:[{}],FlushPool:[{}]", poolId, flushPool);
        try {
            if (0 == flushPool) {
                List<HashMap<String, String>> personIds = requestVo.getPersonIds();
                for (HashMap<String, String> item : personIds) {
                    String personId = item.get("personId");
                    if (personId != null) {
                        personService.deletePerson(poolId, personId);
                    }
                }
            } else if (flushPool == 1) {
                //删除索引
                DeleteIndexResponse delete = client.indices().delete(new DeleteIndexRequest.Builder().index(poolId).build());
            }
            log.info("特征池删除完成:[{}],FlushPool:[{}]", poolId, flushPool);
            return ResponseVo.success(rid);
        } catch (ElasticsearchException e) {
            if (e.status() == 404) {
                return ResponseVo.poolIdNotExists(rid);
            } else {
                return ResponseVo.error(rid, e.getMessage());
            }
        }
    }

    public ResponseVo deletePoolData(RequestVo requestVo) throws Exception {
        String rid = requestVo.getRid();
        Integer flushPool = requestVo.getFlushPool();
        String poolId = requestVo.getPoolId();
        Long mallId = requestVo.getMallId();
        log.info("特征池删除mallId:{}数据操作开始:[{}]", mallId,poolId);
        try {
            personService.deletePersonByMallId(poolId, mallId);
            log.info("特征池删除mallId:{}操作完成:[{}]", mallId, poolId);
            return ResponseVo.success(rid);
        } catch (ElasticsearchException e) {
            if (e.status() == 404) {
                return ResponseVo.poolIdNotExists(rid);
            } else {
                return ResponseVo.error(rid, e.getMessage());
            }
        }
    }


    public ResponseVo deleteStaffPoolData(RequestVo requestVo) throws Exception {
        String rid = requestVo.getRid();
        Integer flushPool = requestVo.getFlushPool();
        String poolId = requestVo.getPoolId();
        List<String> fidList = requestVo.getFidList();
        log.info("店员特征池删除数据操作开始:[{}]",poolId);
        try {
            personService.deletePersonByFid(poolId, fidList);
            log.info("店员特征池删除操作完成:[{}]",poolId);
            return ResponseVo.success(rid);
        } catch (ElasticsearchException e) {
            if (e.status() == 404) {
                return ResponseVo.poolIdNotExists(rid);
            } else {
                return ResponseVo.error(rid, e.getMessage());
            }
        }
    }


    /**
     * 修改特征池(添加人员)
     *
     * @param requestVo rid,poolId,personPool,updateType
     *
     * @return ResponseVo
     * @throws Exception --
     */
    public ResponseVo modifyPool(RequestVo requestVo) throws Exception {
        String rid = requestVo.getRid();
        String poolId = requestVo.getPoolId();
        List<Person> personPool = requestVo.getPersonPool();
        Integer updateType = requestVo.getUpdateType();
//        log.info("特征池修改操作开始:[{}],updateType:[{}]", poolId, updateType);
        co.elastic.clients.elasticsearch.indices.ExistsRequest poolIdExists = new ExistsRequest.Builder().index(poolId).build();
        BooleanResponse poolIdExistsResponse = client.indices().exists(poolIdExists);
        if (!poolIdExistsResponse.value()) {
            return ResponseVo.poolIdNotExists(rid);
        }
        try {
            BulkResponse bulkItemResponses = personService.addPerson(poolId, personPool);
            if (bulkItemResponses != null && bulkItemResponses.errors()) {
                log.info(bulkItemResponses.items().toString());
                return ResponseVo.error(rid, bulkItemResponses.items().toString());
            }

            log.info("特征池修改:[{}],updateType:[{}]", poolId, updateType);
            return ResponseVo.success(rid, "success");
        } catch (ElasticsearchException e) {
            log.error("modifyPool", e);
            return ResponseVo.error(rid, e.getMessage());
        }
    }

    /**
     * 查询特征池信息
     *
     * @param requestVo rid,listAll,poolId
     *
     * @return ResponseVo
     * @throws Exception --
     */
    public ResponseVo queryPool(RequestVo requestVo) throws Exception {
        String rid = requestVo.getRid();
        Integer listAll = requestVo.getListAll();
        String poolId = requestVo.getPoolId();
        try {
            List<PoolInfo> poolInfos;
//            log.info("查询特征池操作开始:[{}],rid:[{}]", poolId, rid);

            if (listAll != 0) {
                poolInfos = queryPoolInfo(poolId);
            } else {
                if (!existPool(poolId)) {
                    return ResponseVo.poolIdNotExists(rid);
                }
                poolInfos = queryPoolInfo(poolId);
            }
            ResponseVo success = ResponseVo.success(rid, "success");
            success.setPoolIds(poolInfos);
            log.info("查询特征池完成:[{}],rid:[{}]", poolId, rid);

            return success;
        } catch (ElasticsearchException e) {
            log.error("queryPool", e);
            return ResponseVo.error(rid, e.getMessage());
        }
    }

    public void refreshPool(String... poolIds) throws IOException {
        RefreshRequest refreshRequest = new RefreshRequest.Builder().index(Arrays.asList(poolIds)).build();
        RefreshResponse refresh = client.indices().refresh(refreshRequest);
//        log.info("刷新索引:{},成功:{},失败:{}", poolIds, refresh.shards().successful(), refresh.shards().failed());
    }


    public TypeMapping.Builder getCreateIndexContentBuilder() throws IOException {
        Map<String, Property> properties = new HashMap<>();
        properties.put("direction", new Property.Builder().keyword(k -> k.index(true)).build());
        properties.put("fid", new Property.Builder().text(t -> t.index(true)).build());
        properties.put("gateId", new Property.Builder().long_(t -> t.index(true)).build());
        properties.put("personId", new Property.Builder().keyword(k -> k.index(true)).build());
        properties.put("body", new Property.Builder().denseVector(d -> d.dims(Constant.BODY_FEATURE_DIMS_2048).similarity("cosine")).build());
        properties.put("data", new Property.Builder().denseVector(d -> d.dims(Constant.FACE_FEATURE_DIMS).similarity("cosine")).build());
        properties.put("body_type", new Property.Builder().integer(i -> i.index(false).docValues(false)).build());
        properties.put("channelSerialNum", new Property.Builder().keyword(k -> k.index(true)).build());
        properties.put("age", new Property.Builder().integer(i -> i.index(false).docValues(false)).build());
        properties.put("gender", new Property.Builder().keyword(k -> k.index(false).docValues(false)).build());
        properties.put("counttime", new Property.Builder().date(d -> d.format("yyyy-MM-dd HH:mm:ss")).build());
        properties.put("unid", new Property.Builder().keyword(k -> k.index(true)).build());
        properties.put("mallId", new Property.Builder().keyword(k -> k.index(true)).build());

        TypeMapping.Builder builder = new TypeMapping.Builder().properties(properties);
        return builder;
    }

    public boolean existPool(String poolId) throws IOException {
        BooleanResponse exists = client.indices().exists(new ExistsRequest.Builder().index(poolId).build());
        return exists.value();
    }

    public List<PoolInfo> queryPoolInfo(String poolId) throws IOException {
        CountResponse response = client.count(new CountRequest.Builder().index(poolId).build());
        LinkedList<PoolInfo> poolInfos = new LinkedList<>();
        PoolInfo poolInfo = new PoolInfo();
        poolInfo.setPersonCount( response.count());
        poolInfo.setPoolId(poolId);
        poolInfos.add(poolInfo);

        return poolInfos;
    }

}