PersonService.java 16.1 KB
package com.viontech.match.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.model.BodyFeature;
import com.viontech.keliu.model.FaceFeature;
import com.viontech.keliu.model.Person;
import com.viontech.keliu.model.Pool;
import com.viontech.match.config.Constant;
import com.viontech.match.entity.vo.RequestVo;
import com.viontech.match.entity.vo.ResponseVo;
import com.viontech.match.util.Utils;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScriptScoreQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.IOException;
import java.text.ParseException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * .
 *
 * @author 谢明辉
 * @version 0.0.1
 */

@Service
@Slf4j
public class PersonService {
    private static final String[] FETCH_SOURCE = new String[]{"personId", "age", "gender", "fid", "counttime", "channelSerialNum", "body_type"};
    @Resource
    private RestHighLevelClient client;
    @Resource
    private PoolService poolService;
    @Resource
    private ObjectMapper objectMapper;

    /**
     * 人员比对
     *
     * @param requestVo rid,personPoolId,unionPersonPoolId,person
     *
     * @return ResponseVo
     * @throws Exception --
     */
    public ResponseVo matchPerson(RequestVo requestVo) throws Exception {
        String rid = requestVo.getRid();
        String poolId = requestVo.getPersonPoolId();
        List<String> unionPersonPoolId = requestVo.getUnionPersonPoolId();
        List<String> poolIds = new ArrayList<>();
        List<Person> matchFaces = new ArrayList<>();
        List<Person> matchBodies = new ArrayList<>();
        List<Pool> poolStatus = new ArrayList<>();

        if (unionPersonPoolId != null && unionPersonPoolId.size() > 0) {
            poolIds.addAll(unionPersonPoolId);
        }
        if (poolId != null) {
            poolIds.add(poolId);
        }
        log.info("人员匹配操作开始,PoolIds:[{}]", poolIds.toString());
        try {
            for (String id : poolIds) {
                Pool pool = new Pool();
                pool.setPersonPoolId(id);
                if (poolService.existPool(id)) {
                    pool.setStatus(0);
                    List<Person> face = matchPerson(id, requestVo.getPerson(), 0);
                    matchFaces.addAll(face);
                    List<Person> body = matchPerson(id, requestVo.getPerson(), 1);
                    matchBodies.addAll(body);
                } else {
                    pool.setStatus(1);
                }
                poolStatus.add(pool);
            }
            ResponseVo success = ResponseVo.success(rid, "success");
            if (matchFaces.size() > 0) {
                success.setMatchPersons(matchFaces);
            }
            if (matchBodies.size() > 0) {
                success.setMatchBodies(matchBodies);
            }
            success.setMatch(1);
            success.setPersonPoolStatus(poolStatus);
            log.info("人员匹配操作完成,PoolIds:[{}}", poolIds.toString());
            return success;
        } catch (ElasticsearchStatusException e) {
            if (e.status() == RestStatus.BAD_REQUEST && e.getDetailedMessage().contains(Constant.CLASS_CAST_EXCEPTION)) {
                for (String id : poolIds) {
                    RequestVo requestVo1 = new RequestVo();
                    requestVo1.setRid("1");
                    requestVo1.setPoolId(id);
                    requestVo1.setFlushPool(1);
                    poolService.deletePool(requestVo1);
                }
            }

            log.error("matchPerson", e);
            ResponseVo error = ResponseVo.error(rid, e.getDetailedMessage());
            error.setMatch(0);
            return error;
        }
    }

    /**
     * 修改人员特征(先删除再添加)
     *
     * @param requestVo rid,person, poolId,
     *
     * @return ResponseVo
     */
    public ResponseVo updatePerson(RequestVo requestVo) {
        String rid = requestVo.getRid();
        Person person = requestVo.getPerson();
        String personId = person.getPersonId();
        String poolId = requestVo.getPoolId();
        log.info("人员修改操作开始,poolId:[{}],personId:[{}]", poolId, personId);
        try {
            BulkByScrollResponse bulkByScrollResponse = deletePerson(poolId, personId);
            BulkResponse bulkItemResponses = addPerson(poolId, Collections.singletonList(person));
        } catch (IOException e) {
            log.error("人员修改操作异常", e);
            return ResponseVo.error(rid, "update failed");
        }
        log.info("人员修改操作完成,poolId:[{}],personId:[{}]", poolId, personId);
        return ResponseVo.success(rid);
    }

    /**
     * 添加人员
     *
     * @param poolId     特征池Id
     * @param personPool 要添加的人员列表
     *
     * @return 批量添加的结果
     * @throws IOException elasticsearch 所产生的异常
     */
    public BulkResponse addPerson(String poolId, List<Person> personPool) throws IOException {
        BulkRequest bulkRequest = new BulkRequest(poolId);
        for (Person person : personPool) {
            if (person == null) {
                continue;
            }
            Integer age = person.getAge();
            String gender = person.getGender();
            String personId = person.getPersonId();
            Date counttime = person.getCounttime();
            Integer bodyType = person.getBodyType();
            String channelSerialNum = person.getChannelSerialNum();
            List<FaceFeature> faceFeatures = person.getFaceFeatures();
            if (faceFeatures != null && faceFeatures.size() > 0) {
                for (FaceFeature faceFeature : faceFeatures) {
                    Double[] feature = faceFeature.getFeature();
                    if (feature != null && feature.length == Constant.FACE_FEATURE_DIMS) {
                        String fid = faceFeature.getFid();

                        IndexRequest indexRequest = new IndexRequest(poolId)
                                .source(XContentType.JSON, "personId", personId,
                                        "data", feature, "fid", fid, "age", age, "gender", gender, "body_type", bodyType,
                                        "counttime", counttime == null ? null : Constant.DATE_FORMAT.get().format(counttime), "channelSerialNum", channelSerialNum);
                        bulkRequest.add(indexRequest);
                    }
                }
            }
            List<BodyFeature> bodyFeatures = person.getBodyFeatures();
            if (bodyFeatures != null && bodyFeatures.size() > 0) {
                for (BodyFeature bodyFeature : bodyFeatures) {
                    Double[] feature = bodyFeature.getFeature();
                    if (feature == null || feature.length < Constant.BODY_FEATURE_DIMS_2048) {
                        continue;
                    } else if (feature.length == Constant.BODY_FEATURE_DIMS_2110) {
                        feature = Utils.transferBodyFeature(feature);
                    }

                    if (feature.length > Constant.BODY_FEATURE_DIMS_2048) {
                        feature = Arrays.copyOfRange(feature, 3, Constant.BODY_FEATURE_DIMS_2048 + 3);
                    }

                    String fid = bodyFeature.getBid();

                    IndexRequest indexRequest = new IndexRequest(poolId)
                            .source(XContentType.JSON, "personId", personId,
                                    "body", feature, "fid", fid, "age", age, "gender", gender, "body_type", bodyType,
                                    "counttime", counttime == null ? null : Constant.DATE_FORMAT.get().format(counttime), "channelSerialNum", channelSerialNum);
                    bulkRequest.add(indexRequest);
                }
            }
        }
        if (bulkRequest.requests().size() == 0) {
            return null;
        }
        BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        poolService.refreshPool(poolId);
        return bulk;
    }

    /**
     * 删除人员
     *
     * @param poolId   特征池Id
     * @param personId 人员Id,对应数据库personUnid
     *
     * @return 删除结果
     * @throws IOException elasticsearch 所产生的异常
     */
    public BulkByScrollResponse deletePerson(String poolId, String personId) throws IOException {
        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(poolId)
                .setQuery(new TermQueryBuilder("personId", personId))
                .setRefresh(true);
        return client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);
    }

    /**
     * 人员匹配入口
     *
     * @param poolId 在哪个特征池中匹配
     * @param person 用来匹配的人员信息
     * @param type   匹配类型 0人脸 1人体
     *
     * @return 匹配结果
     * @throws Exception --
     */
    public List<Person> matchPerson(String poolId, Person person, int type) throws Exception {
        List<Person> matchResult = new ArrayList<>();
        int matchResultSize;
        if (type == 0) {
            matchFace(poolId, matchResult, person);
            matchResultSize = Constant.FACE_MATCH_RESULT_SIZE;
        } else {
            matchBody(poolId, matchResult, person);
            matchResultSize = Constant.BODY_MATCH_RESULT_SIZE;
        }

        Stream<Person> stream = matchResult.stream().sorted(Comparator.comparingDouble(Person::getScore).reversed());
        if (matchResult.size() > matchResultSize) {
            stream = stream.limit(matchResultSize);
        }
        matchResult = stream.collect(Collectors.toList());
        return matchResult;
    }

    private void matchFace(String poolId, List<Person> matchResult, Person person) throws Exception {
        List<FaceFeature> faceFeatures = person.getFaceFeatures();
        if (faceFeatures != null && faceFeatures.size() > 0) {
            for (FaceFeature faceFeature : faceFeatures) {
                Double[] feature = faceFeature.getFeature();
                if (feature == null || feature.length != Constant.FACE_FEATURE_DIMS) {
                    log.info("人脸特征维数小于512,跳过比对");
                    continue;
                }

                SearchRequest searchRequest = getSearchRequest(poolId, Constant.FACE_MATCH_RESULT_SIZE, feature, person, 0);
                matchResult.addAll(match0(searchRequest));
            }
        } else {
            log.info("no face feature");
        }
    }

    private void matchBody(String poolId, List<Person> matchResult, Person person) throws Exception {
        List<BodyFeature> bodyFeatures = person.getBodyFeatures();
        if (bodyFeatures != null && bodyFeatures.size() > 0) {
            for (BodyFeature faceFeature : bodyFeatures) {
                Double[] feature = faceFeature.getFeature();
                if (feature == null || feature.length < Constant.BODY_FEATURE_DIMS_2048) {
                    log.info("人体特征维数小于2048,跳过比对");
                    continue;
                } else if (feature.length == Constant.BODY_FEATURE_DIMS_2110) {
                    feature = Utils.transferBodyFeature(feature);
                }

                if (feature.length > Constant.BODY_FEATURE_DIMS_2048) {
                    feature = Arrays.copyOfRange(feature, 3, Constant.BODY_FEATURE_DIMS_2048 + 3);
                }

                SearchRequest searchRequest = getSearchRequest(poolId, Constant.BODY_MATCH_RESULT_SIZE, feature, person, 1);

                matchResult.addAll(match0(searchRequest));
            }
        }
    }

    private SearchRequest getSearchRequest(String poolId, Integer matchResultSize, Double[] feature, Person person, int type) {
        Script script;
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        if (type == 0) {
            script = new Script(
                    ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
                    "(cosineSimilarity(params.face, 'data') + 1) / 2 * 100", Collections.singletonMap("face", feature));
            boolQuery.filter(QueryBuilders.existsQuery("data"));
        } else {
            script = new Script(
                    ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
                    "(cosineSimilarity(params.body, 'body') + 1) / 2 * 100", Collections.singletonMap("body", feature));
            boolQuery.filter(QueryBuilders.existsQuery("body"));
        }

        // 根据通道号过滤
        List<String> channelSerialNums = person.getChannelSerialNums();
        if (channelSerialNums != null && channelSerialNums.size() > 0) {
            boolQuery.filter(QueryBuilders.termsQuery("channelSerialNum", channelSerialNums));
        }
        if(person.getPersonUnid() != null && !person.getPersonUnid().equals("")){
            boolQuery.filter(QueryBuilders.termQuery("personId", person.getPersonUnid()));
        }

        // 根据时间过滤
        RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("counttime");
        Date counttimeGTE = person.getCounttimeGTE();
        Date counttimeLTE = person.getCounttimeLTE();
        if (counttimeGTE != null) {
            rangeQueryBuilder.gte(Constant.DATE_FORMAT.get().format(counttimeGTE));
        }
        if (counttimeLTE != null) {
            rangeQueryBuilder.lte(Constant.DATE_FORMAT.get().format(counttimeLTE));
        }
        if (counttimeGTE != null || counttimeLTE != null) {
            boolQuery.filter(rangeQueryBuilder);
        }


        ScriptScoreQueryBuilder queryBuilder = QueryBuilders.scriptScoreQuery(boolQuery, script);
        SearchSourceBuilder builder = new SearchSourceBuilder()
                .size(matchResultSize)
                .query(queryBuilder)
                .fetchSource(FETCH_SOURCE, null);

        return new SearchRequest(poolId).source(builder);
    }

    private List<Person> match0(SearchRequest searchRequest) throws Exception {
        List<Person> persons = new ArrayList<>();
        SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);

        SearchHits hits = search.getHits();
        SearchHit[] hits1 = hits.getHits();
        for (SearchHit item : hits1) {
            Map<String, Object> source = item.getSourceAsMap();
            Person p = new Person();
            p.setPersonId((String) source.get("personId"));
            p.setAge((Integer) source.get("age"));
            p.setGender((String) source.get("gender"));
            p.setChannelSerialNum((String) source.get("channelSerialNum"));
            p.setBodyType((Integer) source.get("body_type"));

            p.setCounttime(Optional.ofNullable((String) source.get("counttime"))
                    .map(x -> {
                        try {
                            return Constant.DATE_FORMAT.get().parse(x);
                        } catch (ParseException ignore) {
                        }
                        return null;
                    })
                    .orElse(null));

            p.setScore(item.getScore());
            p.setPersonPoolId(item.getIndex());
            persons.add(p);
        }
        return persons;
    }


}