PersonService.java 11.6 KB
package com.viontech.match.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.model.BodyFeature;
import com.viontech.keliu.model.FaceFeature;
import com.viontech.keliu.model.Person;
import com.viontech.keliu.model.Pool;
import com.viontech.match.config.Constant;
import com.viontech.match.entity.vo.RequestVo;
import com.viontech.match.entity.vo.ResponseVo;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScriptScoreQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;

/**
 * .
 *
 * @author 谢明辉
 * @date 2020/11/27
 */

@Service
@Slf4j
public class PersonService {
    @Resource
    private RestHighLevelClient client;
    @Resource
    private PoolService poolService;
    @Resource
    private ObjectMapper objectMapper;

    /**
     * 人员比对
     */
    public ResponseVo matchPerson(RequestVo requestVo) throws Exception {
        String rid = requestVo.getRid();
        String poolId = requestVo.getPersonPoolId();
        List<String> unionPersonPoolId = requestVo.getUnionPersonPoolId();
        List<String> poolIds = new ArrayList<>();
        List<Person> result = new ArrayList<>();
        List<Pool> poolStatus = new ArrayList<>();

        if (unionPersonPoolId != null && unionPersonPoolId.size() > 0) {
            poolIds.addAll(unionPersonPoolId);
        }
        if (poolId != null) {
            poolIds.add(poolId);
        }
        log.info("人员匹配操作开始,PoolIds:[{}]", poolIds.toString());
        try {
            for (String id : poolIds) {
                Pool pool = new Pool();
                pool.setPersonPoolId(id);
                if (poolService.existPool(id)) {
                    pool.setStatus(0);
                    List<Person> people = matchPerson(id, requestVo.getPerson());
                    result.addAll(people);
                } else {
                    pool.setStatus(1);
                }
                poolStatus.add(pool);
            }
            ResponseVo success = ResponseVo.success(rid, "success");
            if (result.size() > 0) {
                success.setMatchPersons(result);
            }
            success.setMatch(1);
            success.setPersonPoolStatus(poolStatus);
            log.info("人员匹配操作完成,PoolIds:[{}},结果:[{}]", poolIds.toString(), objectMapper.writeValueAsString(success));
            return success;
        } catch (ElasticsearchStatusException e) {
            if (e.status() == RestStatus.BAD_REQUEST && e.getDetailedMessage().contains(Constant.CLASS_CAST_EXCEPTION)) {
                for (String id : poolIds) {
                    RequestVo requestVo1 = new RequestVo();
                    requestVo1.setRid("1");
                    requestVo1.setPoolId(id);
                    requestVo1.setFlushPool(1);
                    poolService.deletePool(requestVo1);
                }
            }

            log.error("matchPerson", e);
            ResponseVo error = ResponseVo.error(rid, e.getDetailedMessage());
            error.setMatch(0);
            return error;
        }
    }

    /**
     * 修改人员特征
     */
    public ResponseVo updatePerson(RequestVo requestVo) {
        String rid = requestVo.getRid();
        Person person = requestVo.getPerson();
        String personId = person.getPersonId();
        String poolId = requestVo.getPoolId();
        log.info("人员修改操作开始,poolId:[{}],personId:[{}]", poolId, personId);
        // 先删除
        try {
            BulkByScrollResponse bulkByScrollResponse = deletePerson(poolId, personId);
            BulkResponse bulkItemResponses = addPerson(poolId, Collections.singletonList(person));
        } catch (IOException e) {
            log.error("人员修改操作异常", e);
            return ResponseVo.error(rid, "update failed");
        }
        log.info("人员修改操作完成,poolId:[{}],personId:[{}]", poolId, personId);
        return ResponseVo.success(rid);
    }

    /**
     * 添加人员
     */
    public BulkResponse addPerson(String poolId, List<Person> personPool) throws IOException {
        BulkRequest bulkRequest = new BulkRequest(poolId);
        for (Person person : personPool) {
            Integer age = person.getAge();
            String gender = person.getGender();
            String personId = person.getPersonId();
            List<FaceFeature> faceFeatures = person.getFaceFeatures();
            if (faceFeatures != null && faceFeatures.size() > 0) {
                for (FaceFeature faceFeature : faceFeatures) {
                    Double[] feature = faceFeature.getFeature();
                    if (feature != null && feature.length == Constant.FACE_FEATURE_DIMS) {
                        String fid = faceFeature.getFid();

                        IndexRequest indexRequest = new IndexRequest(poolId)
                                .source(XContentType.JSON, "personId", personId, "data", feature, "fid", fid, "age", age, "gender", gender);
                        bulkRequest.add(indexRequest);
                    }
                }
            }
            List<BodyFeature> bodyFeatures = person.getBodyFeatures();
            if (bodyFeatures != null && bodyFeatures.size() > 0) {
                for (BodyFeature bodyFeature : bodyFeatures) {
                    Double[] feature = bodyFeature.getFeature();
                    if (feature == null || feature.length < Constant.BODY_FEATURE_DIMS) {
                        continue;
                    }
                    if (feature.length > Constant.BODY_FEATURE_DIMS) {
                        feature = Arrays.copyOfRange(feature, 3, Constant.BODY_FEATURE_DIMS + 3);
                    }

                    String fid = bodyFeature.getBid();

                    IndexRequest indexRequest = new IndexRequest(poolId)
                            .source(XContentType.JSON, "personId", personId, "body", feature, "fid", fid, "age", age, "gender", gender);
                    bulkRequest.add(indexRequest);
                }
            }
        }
        if (bulkRequest.requests().size() == 0) {
            return null;
        }
        BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        poolService.refreshPool(poolId);
        return bulk;
    }

    /**
     * 删除人员
     */
    public BulkByScrollResponse deletePerson(String poolId, String personId) throws IOException {
        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(poolId)
                .setQuery(new TermQueryBuilder("personId", personId))
                .setRefresh(true);
        return client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
    }


    public List<Person> matchPerson(String poolId, Person person) throws Exception {
        List<Person> matchResult = new ArrayList<>();

        List<FaceFeature> faceFeatures = person.getFaceFeatures();
        matchFace(faceFeatures, poolId, matchResult);

        List<BodyFeature> bodyFeatures = person.getBodyFeatures();
        matchBody(bodyFeatures, poolId, matchResult);

        if (matchResult.size() > Constant.MATCH_RESULT_SIZE) {
            matchResult = matchResult.stream().sorted(Comparator.comparingInt(Person::getScore)).limit(5).collect(Collectors.toList());
        }
        return matchResult;
    }

    private void matchFace(List<FaceFeature> faceFeatures, String poolId, List<Person> matchResult) throws IOException {

        if (faceFeatures != null && faceFeatures.size() > 0) {

            for (FaceFeature faceFeature : faceFeatures) {
                Double[] feature = faceFeature.getFeature();
                if (feature == null || feature.length != Constant.FACE_FEATURE_DIMS) {
                    log.info("人脸特征维数小于512,跳过比对");
                    continue;
                }

                Script script = new Script(
                        ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
                        "(cosineSimilarity(params.face, 'data') + 1) / 2 * 100", Collections.singletonMap("face", feature));
                ScriptScoreQueryBuilder queryBuilder = QueryBuilders.scriptScoreQuery(QueryBuilders.existsQuery("data"), script);

                matchResult.addAll(match0(poolId, queryBuilder));
            }
        } else {
            log.info("no face feature");
        }
    }

    private void matchBody(List<BodyFeature> bodyFeatures, String poolId, List<Person> matchResult) throws IOException {
        if (bodyFeatures != null && bodyFeatures.size() > 0) {
            for (BodyFeature faceFeature : bodyFeatures) {
                Double[] feature = faceFeature.getFeature();
                if (feature == null || feature.length < Constant.BODY_FEATURE_DIMS) {
                    log.info("人体特征维数小于2048,跳过比对");
                    continue;
                }
                if (feature.length > Constant.BODY_FEATURE_DIMS) {
                    feature = Arrays.copyOfRange(feature, 3, Constant.BODY_FEATURE_DIMS + 3);
                }

                Script script = new Script(
                        ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
                        "(cosineSimilarity(params.body, 'body') + 1) / 2 * 100", Collections.singletonMap("body", feature));
                ScriptScoreQueryBuilder queryBuilder = QueryBuilders.scriptScoreQuery(QueryBuilders.existsQuery("body"), script);

                matchResult.addAll(match0(poolId, queryBuilder));
            }
        }
    }

    private List<Person> match0(String poolId, ScriptScoreQueryBuilder scriptScoreQueryBuilder) throws IOException {
        List<Person> persons = new ArrayList<>();

        SearchSourceBuilder builder = new SearchSourceBuilder()
                .size(Constant.MATCH_RESULT_SIZE)
                .query(scriptScoreQueryBuilder)
                .fetchSource(new String[]{"personId", "age", "gender", "fid"}, null);
        SearchRequest searchRequest = new SearchRequest(poolId)
                .source(builder);
        SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);

        SearchHits hits = search.getHits();
        SearchHit[] hits1 = hits.getHits();
        for (SearchHit item : hits1) {
            Map<String, Object> source = item.getSourceAsMap();
            Person p = new Person();
            p.setPersonId((String) source.get("personId"));
            p.setAge((Integer) source.get("age"));
            p.setGender((String) source.get("gender"));
            p.setScore((int) item.getScore());
            p.setPersonPoolId(item.getIndex());
            persons.add(p);
        }
        return persons;
    }


}