PersonService.java 13.5 KB
package com.viontech.match.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.model.BodyFeature;
import com.viontech.keliu.model.FaceFeature;
import com.viontech.keliu.model.Person;
import com.viontech.keliu.model.Pool;
import com.viontech.match.config.Constant;
import com.viontech.match.entity.vo.RequestVo;
import com.viontech.match.entity.vo.ResponseVo;
import com.viontech.match.util.Utils;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScriptScoreQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * .
 *
 * @author 谢明辉
 * @version 0.0.1
 */

@Service
@Slf4j
public class PersonService {
    @Resource
    private RestHighLevelClient client;
    @Resource
    private PoolService poolService;
    @Resource
    private ObjectMapper objectMapper;

    /**
     * 人员比对
     *
     * @param requestVo rid,personPoolId,unionPersonPoolId,person
     *
     * @return ResponseVo
     * @throws Exception --
     */
    public ResponseVo matchPerson(RequestVo requestVo) throws Exception {
        String rid = requestVo.getRid();
        String poolId = requestVo.getPersonPoolId();
        List<String> unionPersonPoolId = requestVo.getUnionPersonPoolId();
        List<String> poolIds = new ArrayList<>();
        List<Person> matchFaces = new ArrayList<>();
        List<Person> matchBodies = new ArrayList<>();
        List<Pool> poolStatus = new ArrayList<>();

        if (unionPersonPoolId != null && unionPersonPoolId.size() > 0) {
            poolIds.addAll(unionPersonPoolId);
        }
        if (poolId != null) {
            poolIds.add(poolId);
        }
        log.info("人员匹配操作开始,PoolIds:[{}]", poolIds.toString());
        try {
            for (String id : poolIds) {
                Pool pool = new Pool();
                pool.setPersonPoolId(id);
                if (poolService.existPool(id)) {
                    pool.setStatus(0);
                    List<Person> face = matchPerson(id, requestVo.getPerson(), 0);
                    matchFaces.addAll(face);
                    List<Person> body = matchPerson(id, requestVo.getPerson(), 1);
                    matchBodies.addAll(body);
                } else {
                    pool.setStatus(1);
                }
                poolStatus.add(pool);
            }
            ResponseVo success = ResponseVo.success(rid, "success");
            if (matchFaces.size() > 0) {
                success.setMatchPersons(matchFaces);
            }
            if (matchBodies.size() > 0) {
                success.setMatchBodies(matchBodies);
            }
            success.setMatch(1);
            success.setPersonPoolStatus(poolStatus);
            log.info("人员匹配操作完成,PoolIds:[{}},结果:[{}]", poolIds.toString(), objectMapper.writeValueAsString(success));
            return success;
        } catch (ElasticsearchStatusException e) {
            if (e.status() == RestStatus.BAD_REQUEST && e.getDetailedMessage().contains(Constant.CLASS_CAST_EXCEPTION)) {
                for (String id : poolIds) {
                    RequestVo requestVo1 = new RequestVo();
                    requestVo1.setRid("1");
                    requestVo1.setPoolId(id);
                    requestVo1.setFlushPool(1);
                    poolService.deletePool(requestVo1);
                }
            }

            log.error("matchPerson", e);
            ResponseVo error = ResponseVo.error(rid, e.getDetailedMessage());
            error.setMatch(0);
            return error;
        }
    }

    /**
     * 修改人员特征
     *
     * @param requestVo rid,person, poolId,
     *
     * @return ResponseVo
     */
    public ResponseVo updatePerson(RequestVo requestVo) {
        String rid = requestVo.getRid();
        Person person = requestVo.getPerson();
        String personId = person.getPersonId();
        String poolId = requestVo.getPoolId();
        log.info("人员修改操作开始,poolId:[{}],personId:[{}]", poolId, personId);
        // 先删除
        try {
            BulkByScrollResponse bulkByScrollResponse = deletePerson(poolId, personId);
            BulkResponse bulkItemResponses = addPerson(poolId, Collections.singletonList(person));
        } catch (IOException e) {
            log.error("人员修改操作异常", e);
            return ResponseVo.error(rid, "update failed");
        }
        log.info("人员修改操作完成,poolId:[{}],personId:[{}]", poolId, personId);
        return ResponseVo.success(rid);
    }

    /**
     * 添加人员
     *
     * @param poolId     特征池Id
     * @param personPool 要添加的人员列表
     *
     * @return 批量添加的结果
     * @throws IOException elasticsearch 所产生的异常
     */
    public BulkResponse addPerson(String poolId, List<Person> personPool) throws IOException {
        BulkRequest bulkRequest = new BulkRequest(poolId);
        for (Person person : personPool) {
            if (person == null) {
                continue;
            }
            Integer age = person.getAge();
            String gender = person.getGender();
            String personId = person.getPersonId();
            List<FaceFeature> faceFeatures = person.getFaceFeatures();
            if (faceFeatures != null && faceFeatures.size() > 0) {
                for (FaceFeature faceFeature : faceFeatures) {
                    Double[] feature = faceFeature.getFeature();
                    if (feature != null && feature.length == Constant.FACE_FEATURE_DIMS) {
                        String fid = faceFeature.getFid();

                        IndexRequest indexRequest = new IndexRequest(poolId)
                                .source(XContentType.JSON, "personId", personId, "data", feature, "fid", fid, "age", age, "gender", gender);
                        bulkRequest.add(indexRequest);
                    }
                }
            }
            List<BodyFeature> bodyFeatures = person.getBodyFeatures();
            if (bodyFeatures != null && bodyFeatures.size() > 0) {
                for (BodyFeature bodyFeature : bodyFeatures) {
                    Double[] feature = bodyFeature.getFeature();
                    if (feature == null || feature.length < Constant.BODY_FEATURE_DIMS_2048) {
                        continue;
                    } else if (feature.length == Constant.BODY_FEATURE_DIMS_2110) {
                        feature = Utils.transferBodyFeature(feature);
                    }

                    if (feature.length > Constant.BODY_FEATURE_DIMS_2048) {
                        feature = Arrays.copyOfRange(feature, 3, Constant.BODY_FEATURE_DIMS_2048 + 3);
                    }

                    String fid = bodyFeature.getBid();

                    IndexRequest indexRequest = new IndexRequest(poolId)
                            .source(XContentType.JSON, "personId", personId, "body", feature, "fid", fid, "age", age, "gender", gender);
                    bulkRequest.add(indexRequest);
                }
            }
        }
        if (bulkRequest.requests().size() == 0) {
            return null;
        }
        BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        poolService.refreshPool(poolId);
        return bulk;
    }

    /**
     * 删除人员
     * @param poolId 特征池Id
     * @param personId 人员Id,对应数据库personUnid
     * @return 删除结果
     * @throws IOException elasticsearch 所产生的异常
     */
    public BulkByScrollResponse deletePerson(String poolId, String personId) throws IOException {
        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(poolId)
                .setQuery(new TermQueryBuilder("personId", personId))
                .setRefresh(true);
        return client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
    }


    public List<Person> matchPerson(String poolId, Person person, int type) throws Exception {
        List<Person> matchResult = new ArrayList<>();
        int matchResultSize = 0;
        if (type == 0) {
            List<FaceFeature> faceFeatures = person.getFaceFeatures();
            matchFace(faceFeatures, poolId, matchResult);
            matchResultSize = Constant.FACE_MATCH_RESULT_SIZE;
        } else {
            List<BodyFeature> bodyFeatures = person.getBodyFeatures();
            matchBody(bodyFeatures, poolId, matchResult);
            matchResultSize = Constant.BODY_MATCH_RESULT_SIZE;
        }

        Stream<Person> stream = matchResult.stream().sorted(Comparator.comparingInt(Person::getScore).reversed());
        if (matchResult.size() > matchResultSize) {
            stream = stream.limit(matchResultSize);
        }
        matchResult = stream.collect(Collectors.toList());
        return matchResult;
    }

    private void matchFace(List<FaceFeature> faceFeatures, String poolId, List<Person> matchResult) throws IOException {

        if (faceFeatures != null && faceFeatures.size() > 0) {

            for (FaceFeature faceFeature : faceFeatures) {
                Double[] feature = faceFeature.getFeature();
                if (feature == null || feature.length != Constant.FACE_FEATURE_DIMS) {
                    log.info("人脸特征维数小于512,跳过比对");
                    continue;
                }

                Script script = new Script(
                        ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
                        "(cosineSimilarity(params.face, 'data') + 1) / 2 * 100", Collections.singletonMap("face", feature));
                ScriptScoreQueryBuilder queryBuilder = QueryBuilders.scriptScoreQuery(QueryBuilders.existsQuery("data"), script);

                matchResult.addAll(match0(poolId, queryBuilder));
            }
        } else {
            log.info("no face feature");
        }
    }

    private void matchBody(List<BodyFeature> bodyFeatures, String poolId, List<Person> matchResult) throws IOException {
        if (bodyFeatures != null && bodyFeatures.size() > 0) {
            for (BodyFeature faceFeature : bodyFeatures) {
                Double[] feature = faceFeature.getFeature();
                if (feature == null || feature.length < Constant.BODY_FEATURE_DIMS_2048) {
                    log.info("人体特征维数小于2048,跳过比对");
                    continue;
                } else if (feature.length == Constant.BODY_FEATURE_DIMS_2110) {
                    feature = Utils.transferBodyFeature(feature);
                }

                if (feature.length > Constant.BODY_FEATURE_DIMS_2048) {
                    feature = Arrays.copyOfRange(feature, 3, Constant.BODY_FEATURE_DIMS_2048 + 3);
                }

                Script script = new Script(
                        ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
                        "(cosineSimilarity(params.body, 'body') + 1) / 2 * 100", Collections.singletonMap("body", feature));
                ScriptScoreQueryBuilder queryBuilder = QueryBuilders.scriptScoreQuery(QueryBuilders.existsQuery("body"), script);

                matchResult.addAll(match0(poolId, queryBuilder, Constant.BODY_MATCH_RESULT_SIZE));
            }
        }
    }

    private List<Person> match0(String poolId, ScriptScoreQueryBuilder scriptScoreQueryBuilder) throws IOException {
        return match0(poolId, scriptScoreQueryBuilder, Constant.FACE_MATCH_RESULT_SIZE);
    }

    private List<Person> match0(String poolId, ScriptScoreQueryBuilder scriptScoreQueryBuilder, Integer matchResultSize) throws IOException {
        List<Person> persons = new ArrayList<>();

        SearchSourceBuilder builder = new SearchSourceBuilder()
                .size(matchResultSize)
                .query(scriptScoreQueryBuilder)
                .fetchSource(new String[]{"personId", "age", "gender", "fid"}, null);
        SearchRequest searchRequest = new SearchRequest(poolId)
                .source(builder);
        SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);

        SearchHits hits = search.getHits();
        SearchHit[] hits1 = hits.getHits();
        for (SearchHit item : hits1) {
            Map<String, Object> source = item.getSourceAsMap();
            Person p = new Person();
            p.setPersonId((String) source.get("personId"));
            p.setAge((Integer) source.get("age"));
            p.setGender((String) source.get("gender"));
            p.setScore((int) item.getScore());
            p.setPersonPoolId(item.getIndex());
            persons.add(p);
        }
        return persons;
    }


}