PersonPoolService.java 6.54 KB
package com.viontech.match.service;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
import co.elastic.clients.elasticsearch._types.query_dsl.TermsQueryField;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest;
import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import com.viontech.keliu.model.BodyFeature;
import com.viontech.keliu.model.FaceFeature;
import com.viontech.keliu.model.Person;
import com.viontech.match.config.Constant;
import com.viontech.match.entity.PersonInfo;
import com.viontech.match.util.Utils;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;

@Service
@Slf4j
public class PersonPoolService {
    @Resource
    private ElasticsearchClient client;

    /**
     * 添加人员
     *
     * @param poolId     特征池Id
     * @param personPool 要添加的人员列表
     *
     * @return 批量添加的结果
     * @throws IOException elasticsearch 所产生的异常
     */
    public BulkResponse addPerson(String poolId, List<Person> personPool) throws IOException {
//        BulkRequest bulkRequest = new BulkRequest.Builder().index(poolId).build();
        List<BulkOperation> operationList = new ArrayList<>();
        for (Person person : personPool) {
            if (person == null) {
                continue;
            }
            Integer age = person.getAge();
            String gender = person.getGender();
            String personId = person.getPersonId();
            Date personCountTime = person.getCounttime();
            String direction = person.getDirection();
            Long mallId = person.getMallId();
            //Long gateId = person.getGateId();
            Integer bodyType = person.getBodyType();
            String personChannelSerialNum = person.getChannelSerialNum();
            List<FaceFeature> faceFeatures = person.getFaceFeatures();
            if (CollectionUtils.isNotEmpty(faceFeatures)) {
                for (FaceFeature faceFeature : faceFeatures) {
                    Double[] feature = faceFeature.getFeature();
                    if (feature != null && feature.length == Constant.FACE_FEATURE_DIMS) {
                        String fid = faceFeature.getFid();
                        String unid = faceFeature.getUnid();
                        Long gateId = faceFeature.getGateId();
                        operationList.add(new BulkOperation.Builder().index(new IndexOperation.Builder<PersonInfo>().index(poolId).document(new PersonInfo(unid, personId, null, feature, age, gender, bodyType, personCountTime, fid, personChannelSerialNum, gateId, direction, mallId)).build()).build());
                    }
                }
            }
            List<BodyFeature> bodyFeatures = person.getBodyFeatures();
            if (CollectionUtils.isNotEmpty(bodyFeatures)) {
                for (BodyFeature bodyFeature : bodyFeatures) {
                    Double[] feature = bodyFeature.getFeature();
                    if (feature == null || feature.length < Constant.BODY_FEATURE_DIMS_2048) {
                        continue;
                    } else if (feature.length == Constant.BODY_FEATURE_DIMS_2110) {
                        feature = Utils.transferBodyFeature(feature);
                    }

                    if (feature.length > Constant.BODY_FEATURE_DIMS_2048) {
                        feature = Arrays.copyOfRange(feature, 3, Constant.BODY_FEATURE_DIMS_2048 + 3);
                    }

                    String fid = bodyFeature.getBid();
                    String unid = bodyFeature.getUnid();
                    Long gateId = bodyFeature.getGateId();
                    Date counttime = bodyFeature.getCounttime() == null ? personCountTime : bodyFeature.getCounttime();
                    String channelSerialNum = bodyFeature.getChannelSerialNum() == null ? personChannelSerialNum : person.getChannelSerialNum();
                    operationList.add(new BulkOperation.Builder().index(new IndexOperation.Builder<PersonInfo>().index(poolId).document(new PersonInfo(unid, personId, feature, null, age, gender, bodyType, counttime, fid, personChannelSerialNum, gateId, direction, mallId)).build()).build());
                }
            }
        }

        if (operationList.size() == 0) {
            return null;
        }
        BulkResponse bulk = client.bulk(new BulkRequest.Builder().index(poolId).operations(operationList).build());
//        poolService.refreshPool(poolId);
        return bulk;
    }

    /**
     * 删除人员
     *
     * @param poolId   特征池Id
     * @param personId 人员Id,对应数据库personUnid
     *
     * @return 删除结果
     * @throws IOException elasticsearch 所产生的异常
     */
    public DeleteByQueryResponse deletePerson(String poolId, String personId) throws IOException {
        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest.Builder().index(poolId)
                .query(q -> q.match(m -> m.field("personId").query(personId)))
                .refresh(true).build();
        return client.deleteByQuery(deleteByQueryRequest);
    }

    public DeleteByQueryResponse deletePersonByMallId(String poolId, Long mallId) throws IOException {
        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest.Builder().index(poolId)
                .query(q -> q.match(m -> m.field("mallId").query(mallId)))
                .refresh(true)
                .build();
        return client.deleteByQuery(deleteByQueryRequest);
    }

    public DeleteByQueryResponse deletePersonByFid(String poolId, List<String> fidList) throws IOException {
        DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest.Builder().index(poolId)
                .query(QueryBuilders.terms().field("fid").terms(new TermsQueryField.Builder().value(fidList.stream().map(FieldValue::of).collect(Collectors.toList())).build()).build()._toQuery())
                .refresh(true)
                .build();
        return client.deleteByQuery(deleteByQueryRequest);
    }
}