Commit 113515d4 by 姚冰

[chg] 对接es8.15,使用elasticsearch-java

1 parent a19efee4
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId> <artifactId>spring-boot-starter</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
...@@ -32,18 +33,45 @@ ...@@ -32,18 +33,45 @@
<artifactId>android-json</artifactId> <artifactId>android-json</artifactId>
<groupId>com.vaadin.external.google</groupId> <groupId>com.vaadin.external.google</groupId>
</exclusion> </exclusion>
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId> <artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <!-- <dependency>-->
<groupId>org.springframework.boot</groupId> <!-- <groupId>org.elasticsearch.client</groupId>-->
<artifactId>spring-boot-starter-data-elasticsearch</artifactId> <!-- <artifactId>elasticsearch-rest-high-level-client</artifactId>-->
</dependency> <!-- <version>8.15.0</version>-->
<!-- </dependency>-->
<!-- &lt;!&ndash; Elasticsearch Rest Hight Level Client 的依赖 &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-data-elasticsearch</artifactId>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-starter-data-elasticsearch</artifactId>-->
<!-- <version></version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.elasticsearch.client</groupId>-->
<!-- <artifactId>elasticsearch-rest-client</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
...@@ -60,6 +88,43 @@ ...@@ -60,6 +88,43 @@
<version>4.4</version> <version>4.4</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/co.elastic.clients/elasticsearch-java -->
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.15.0</version>
<exclusions>
<exclusion>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
</exclusion>
<exclusion>
<artifactId>elasticsearch-rest-client</artifactId>
<groupId>org.elasticsearch.client</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.17.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<artifactId>elasticsearch-rest-client</artifactId>
<groupId>org.elasticsearch.client</groupId>
<version>8.15.0</version>
</dependency>
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency> <dependency>
<groupId>com.viontech.keliu</groupId> <groupId>com.viontech.keliu</groupId>
<artifactId>AlgApiClient</artifactId> <artifactId>AlgApiClient</artifactId>
......
package com.viontech.match.config; package com.viontech.match.config;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope; import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider; import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager; import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.reactor.IOReactorException; import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.ssl.SSLContextBuilder;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
@Component @Component
public class ElasticsearchConfiguration { public class ElasticsearchConfiguration {
...@@ -27,8 +41,7 @@ public class ElasticsearchConfiguration { ...@@ -27,8 +41,7 @@ public class ElasticsearchConfiguration {
} }
@Bean @Bean
public RestHighLevelClient createClient(CustomElasticsearchProperties properties) { public ElasticsearchClient createElasticsearchClient(CustomElasticsearchProperties properties) {
HttpHost[] esHosts = properties.getUris().stream().filter(StringUtils::hasLength).map(HttpHost::create).toArray(HttpHost[]::new); HttpHost[] esHosts = properties.getUris().stream().filter(StringUtils::hasLength).map(HttpHost::create).toArray(HttpHost[]::new);
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
if (properties.getUsername() != null) { if (properties.getUsername() != null) {
...@@ -48,17 +61,66 @@ public class ElasticsearchConfiguration { ...@@ -48,17 +61,66 @@ public class ElasticsearchConfiguration {
} }
PoolingNHttpClientConnectionManager finalConnectionManager = connectionManager; PoolingNHttpClientConnectionManager finalConnectionManager = connectionManager;
RestClientBuilder builder = RestClient.builder(esHosts).setRequestConfigCallback(requestConfigBuilder -> { RestClient restClient = RestClient.builder(esHosts)
requestConfigBuilder.setConnectTimeout(properties.getConnectionTimeout()); .setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setSocketTimeout(properties.getSocketTimeout()); requestConfigBuilder.setConnectTimeout(properties.getConnectionTimeout());
requestConfigBuilder.setConnectionRequestTimeout(-1); requestConfigBuilder.setSocketTimeout(properties.getSocketTimeout());
return requestConfigBuilder; requestConfigBuilder.setConnectionRequestTimeout(-1);
}).setHttpClientConfigCallback(httpClientBuilder -> { return requestConfigBuilder;
httpClientBuilder.disableAuthCaching(); })
httpClientBuilder.setMaxConnTotal(properties.getMaxConnTotal()); .setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnPerRoute(properties.getMaxConnPerRoute()); httpClientBuilder.setMaxConnTotal(properties.getMaxConnTotal());
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setConnectionManager(finalConnectionManager); httpClientBuilder.setMaxConnPerRoute(properties.getMaxConnPerRoute());
}); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return new RestHighLevelClient(builder); httpClientBuilder.setConnectionManager(finalConnectionManager);
// try {
// // 创建一个信任所有证书的 TrustStrategy 策略
// TrustStrategy acceptTrustStrategy = new TrustStrategy() {
// @Override
// public boolean isTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
// return true;
// }
// };
// // 使用 SSLContextBuilder 创建 SSLContext
// SSLContext sslContext = SSLContextBuilder.create().loadTrustMaterial(null, acceptTrustStrategy).build();
// httpClientBuilder.setSSLContext(sslContext);
// } catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException e) {
// e.printStackTrace();
// }
try {
SSLContext sslContext = SSLContextBuilder.create().loadTrustMaterial(null, (TrustStrategy) (chain, authType) -> true).build();
// sslContext.init(null, new TrustManager[] {
// new X509TrustManager() {
// @Override
// public void checkClientTrusted(X509Certificate[] x509Certificates, String s)
// throws CertificateException {
// // 忽略证书错误 信任任何客户端证书
// }
//
// @Override
// public void checkServerTrusted(X509Certificate[] x509Certificates, String s)
// throws CertificateException {
// // 忽略证书错误 信任任何客户端证书
// }
//
// @Override
// public X509Certificate[] getAcceptedIssuers() {
// return new X509Certificate[0];
// }
// }
// },
// null);
httpClientBuilder.setSSLContext(sslContext);
} catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
System.out.println("忽略证书错误");
}
httpClientBuilder.setSSLHostnameVerifier(new NoopHostnameVerifier());
return httpClientBuilder;
})
.build();
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
return new ElasticsearchClient(transport);
} }
} }
package com.viontech.match.entity;
import lombok.Data;
import java.util.Date;
@Data
public class PersonInfo {
private String unid;
private String personId;
private String _score;
private Double[] body;
private Double[] data;
private Integer age;
private String gender;
private Integer bodyType;
private Date countTime;
private String fid;
private String channelSerialNum;
private Long gateId;
private String direction;
public PersonInfo(String unid, String personId, Double[] body,Double[] data, Integer age, String gender, Integer bodyType, Date countTime, String fid, String channelSerialNum, Long gateId, String direction) {
this.unid = unid;
this.personId = personId;
this.body = body;
this.data = data;
this.age = age;
this.gender = gender;
this.bodyType = bodyType;
this.countTime = countTime;
this.fid = fid;
this.channelSerialNum = channelSerialNum;
this.gateId = gateId;
this.direction = direction;
this._score = "1";
}
}
package com.viontech.match.entity;
import lombok.Data;
import java.util.Date;
@Data
public class SearchResultHit {
private String index;
private String id;
private Double score;
// private PersonInfo source;
private String personId;
private String unid;
private String _score;
private Integer age;
private String gender;
private Integer bodyType;
private Date countTime;
private String fid;
private String channelSerialNum;
private Long gateId;
private String direction;
}
package com.viontech.match.runner; package com.viontech.match.runner;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch.ilm.*;
import co.elastic.clients.elasticsearch.ilm.get_lifecycle.Lifecycle;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.indexlifecycle.DeleteAction;
import org.elasticsearch.client.indexlifecycle.LifecycleAction;
import org.elasticsearch.client.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.client.indexlifecycle.Phase;
import org.elasticsearch.client.indexlifecycle.PutLifecyclePolicyRequest;
import org.elasticsearch.client.indexlifecycle.SetPriorityAction;
import org.elasticsearch.common.unit.TimeValue;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.context.LifecycleAutoConfiguration;
import org.springframework.boot.autoconfigure.context.LifecycleProperties;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
...@@ -39,25 +34,25 @@ public class ILM implements CommandLineRunner { ...@@ -39,25 +34,25 @@ public class ILM implements CommandLineRunner {
private Integer deleteAfterDays; private Integer deleteAfterDays;
@Autowired @Autowired
private RestHighLevelClient restHighLevelClient; private ElasticsearchClient client;
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) throws Exception {
try { try {
Map<String, Phase> phases = new HashMap<>(); Actions hotAction = new Actions.Builder()
Map<String, LifecycleAction> setPriorityAction = new HashMap<>(); .setPriority(p -> p.priority(100))
setPriorityAction.put(SetPriorityAction.NAME, new SetPriorityAction(100)); .build();
phases.put("hot", new Phase("hot", TimeValue.ZERO, setPriorityAction)); Actions deleteAction = new Actions.Builder().delete(s -> s.deleteSearchableSnapshot(true)).build();
IlmPolicy ilmPolicy = new IlmPolicy.Builder()
Map<String, LifecycleAction> deleteActions = .phases(new Phases.Builder()
Collections.singletonMap(DeleteAction.NAME, new DeleteAction()); .hot(new Phase.Builder().actions(hotAction).minAge(Time.of(t -> t.time("0ms"))).build())
phases.put("delete", new Phase("delete", .delete(new Phase.Builder().actions(deleteAction).minAge(Time.of(t -> t.time(deleteAfterDays + "d"))).build())
new TimeValue(deleteAfterDays, TimeUnit.DAYS), deleteActions)); .build())
.build();
LifecyclePolicy policy = new LifecyclePolicy(LIFECYCLE_NAME, phases); PutLifecycleRequest request = new PutLifecycleRequest.Builder().name(LIFECYCLE_NAME).policy(ilmPolicy).build();
PutLifecyclePolicyRequest lifecyclePolicyRequest = new PutLifecyclePolicyRequest(policy); PutLifecycleResponse response = client.ilm().putLifecycle(request);
AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indexLifecycle().putLifecyclePolicy(lifecyclePolicyRequest, RequestOptions.DEFAULT); GetLifecycleResponse getResponse = client.ilm().getLifecycle(new GetLifecycleRequest.Builder().name(LIFECYCLE_NAME).build());
if (acknowledgedResponse.isAcknowledged()) { if (getResponse.get(LIFECYCLE_NAME) != null) {
log.info(LIFECYCLE_NAME + "生命周期创建完成"); log.info(LIFECYCLE_NAME + "生命周期创建完成");
} }
} catch (Exception e) { } catch (Exception e) {
......
package com.viontech.match.service; package com.viontech.match.service;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.*;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch._types.query_dsl.*;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
import co.elastic.clients.elasticsearch.core.search.SourceConfig;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.ingest.simulate.Document;
import co.elastic.clients.json.JsonData;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.fasterxml.jackson.annotation.JsonValue;
import com.viontech.keliu.model.BodyFeature; import com.viontech.keliu.model.BodyFeature;
import com.viontech.keliu.model.FaceFeature; import com.viontech.keliu.model.FaceFeature;
import com.viontech.keliu.model.Person; import com.viontech.keliu.model.Person;
import com.viontech.keliu.model.Pool; import com.viontech.keliu.model.Pool;
import com.viontech.match.config.Constant; import com.viontech.match.config.Constant;
import com.viontech.match.entity.PersonInfo;
import com.viontech.match.entity.SearchResultHit;
import com.viontech.match.entity.vo.RequestVo; import com.viontech.match.entity.vo.RequestVo;
import com.viontech.match.entity.vo.ResponseVo; import com.viontech.match.entity.vo.ResponseVo;
import com.viontech.match.util.Utils; import com.viontech.match.util.Utils;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScriptScoreQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ParsedValueCount;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.IOException; import java.io.IOException;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
...@@ -73,7 +50,9 @@ import java.util.stream.Stream; ...@@ -73,7 +50,9 @@ import java.util.stream.Stream;
public class PersonService { public class PersonService {
private static final String[] FETCH_SOURCE = new String[]{"personId", "age", "gender", "fid", "counttime", "channelSerialNum", "body_type"}; private static final String[] FETCH_SOURCE = new String[]{"personId", "age", "gender", "fid", "counttime", "channelSerialNum", "body_type"};
@Resource @Resource
private RestHighLevelClient client; private ElasticsearchClient client;
@Resource @Resource
private PoolService poolService; private PoolService poolService;
...@@ -100,13 +79,7 @@ public class PersonService { ...@@ -100,13 +79,7 @@ public class PersonService {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
try { try {
String[] indices = poolIds.toArray(new String[poolIds.size()]); String[] indices = poolIds.toArray(new String[poolIds.size()]);
SearchRequest request = new SearchRequest(indices);
IndicesOptions defaultIndicesOptions = request.indicesOptions();
EnumSet<IndicesOptions.Option> options = defaultIndicesOptions.getOptions();
options.add(IndicesOptions.Option.IGNORE_UNAVAILABLE);
EnumSet<IndicesOptions.WildcardStates> expandWildcards = defaultIndicesOptions.getExpandWildcards();
IndicesOptions newIndicesOptions = new IndicesOptions(options, expandWildcards);
request.indicesOptions(newIndicesOptions);
long count = 0; long count = 0;
List<BodyFeature> bodyFeatures = person.getBodyFeatures(); List<BodyFeature> bodyFeatures = person.getBodyFeatures();
...@@ -133,23 +106,24 @@ public class PersonService { ...@@ -133,23 +106,24 @@ public class PersonService {
error.setCount(0); error.setCount(0);
return error; return error;
} }
SearchSourceBuilder builder = getSearchSourceBuilder(feature, person, 1); Aggregation countAggregation = AggregationBuilders.valueCount(s -> s.field("unid"));
builder.size(0);
ValueCountAggregationBuilder countAggregationBuilder = AggregationBuilders.count("countId").field("_id");
builder.aggregation(countAggregationBuilder);
request.source(builder);
log.debug("rid:{} CountMatchPerson条件{}:{}", rid, indices, builder.toString());
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
ParsedValueCount countId = response.getAggregations().get("countId"); // BoolQuery.Builder builder = getSearchSourceBuilder(feature, person, 1);
count = countId.getValue(); ScriptScoreQuery scriptScoreQuery = getScriptScoreQuery(feature, person, 1);
SearchRequest request = new SearchRequest.Builder().index(Arrays.asList(indices)).ignoreUnavailable(true).aggregations("countId", countAggregation).size(0).query(scriptScoreQuery._toQuery()).build();
log.debug("rid:{} CountMatchPerson条件{}:{}", rid, indices, scriptScoreQuery.toString());
SearchResponse<SearchResultHit> response = client.search(request, SearchResultHit.class);
// ParsedValueCount countId = response.aggregations().get("countId").valueCount().value();
count = (long) response.aggregations().get("countId").valueCount().value();
ResponseVo result = ResponseVo.success(rid, "success"); ResponseVo result = ResponseVo.success(rid, "success");
result.setCount(count); result.setCount(count);
log.info("rid:{} CountMatchPerson完成,PoolIds:[{}],count:{},耗时:{}", rid, poolIds, count, System.currentTimeMillis() - startTime); log.info("rid:{} CountMatchPerson完成,PoolIds:[{}],count:{},耗时:{}", rid, poolIds, count, System.currentTimeMillis() - startTime);
return result; return result;
} catch (ElasticsearchStatusException e) { } catch (ElasticsearchException e) {
ResponseVo error = ResponseVo.error(rid, e.getDetailedMessage()); ResponseVo error = ResponseVo.error(rid, e.getMessage());
error.setCount(0); error.setCount(0);
return error; return error;
} }
...@@ -208,8 +182,8 @@ public class PersonService { ...@@ -208,8 +182,8 @@ public class PersonService {
success.setPersonPoolStatus(poolStatus); success.setPersonPoolStatus(poolStatus);
log.info("人员匹配操作完成,PoolIds:{}, poolStatus:{},rid:{},耗时:{}", poolIds, JSON.toJSONString(poolStatus), rid, System.currentTimeMillis() - startTime); log.info("人员匹配操作完成,PoolIds:{}, poolStatus:{},rid:{},耗时:{}", poolIds, JSON.toJSONString(poolStatus), rid, System.currentTimeMillis() - startTime);
return success; return success;
} catch (ElasticsearchStatusException e) { } catch (ElasticsearchException e) {
if (e.status() == RestStatus.BAD_REQUEST && e.getDetailedMessage().contains(Constant.CLASS_CAST_EXCEPTION)) { if (e.status() == 400 && e.getMessage().contains(Constant.CLASS_CAST_EXCEPTION)) {
for (String id : poolIds) { for (String id : poolIds) {
RequestVo requestVo1 = new RequestVo(); RequestVo requestVo1 = new RequestVo();
requestVo1.setRid("1"); requestVo1.setRid("1");
...@@ -220,7 +194,7 @@ public class PersonService { ...@@ -220,7 +194,7 @@ public class PersonService {
} }
log.error("matchPerson", e); log.error("matchPerson", e);
ResponseVo error = ResponseVo.error(rid, e.getDetailedMessage()); ResponseVo error = ResponseVo.error(rid, e.getMessage());
error.setMatch(0); error.setMatch(0);
return error; return error;
} }
...@@ -243,7 +217,7 @@ public class PersonService { ...@@ -243,7 +217,7 @@ public class PersonService {
if (!poolService.existPool(poolId)) { if (!poolService.existPool(poolId)) {
poolService.createPool(requestVo, false); poolService.createPool(requestVo, false);
} }
BulkByScrollResponse bulkByScrollResponse = deletePerson(poolId, personId); DeleteByQueryResponse bulkByScrollResponse = deletePerson(poolId, personId);
BulkResponse bulkItemResponses = addPerson(poolId, Collections.singletonList(person)); BulkResponse bulkItemResponses = addPerson(poolId, Collections.singletonList(person));
} catch (Exception e) { } catch (Exception e) {
log.error("人员修改操作异常", e); log.error("人员修改操作异常", e);
...@@ -263,7 +237,8 @@ public class PersonService { ...@@ -263,7 +237,8 @@ public class PersonService {
* @throws IOException elasticsearch 所产生的异常 * @throws IOException elasticsearch 所产生的异常
*/ */
public BulkResponse addPerson(String poolId, List<Person> personPool) throws IOException { public BulkResponse addPerson(String poolId, List<Person> personPool) throws IOException {
BulkRequest bulkRequest = new BulkRequest(poolId); // BulkRequest bulkRequest = new BulkRequest.Builder().index(poolId).build();
List<BulkOperation> operationList = new ArrayList<>();
for (Person person : personPool) { for (Person person : personPool) {
if (person == null) { if (person == null) {
continue; continue;
...@@ -284,12 +259,14 @@ public class PersonService { ...@@ -284,12 +259,14 @@ public class PersonService {
String fid = faceFeature.getFid(); String fid = faceFeature.getFid();
String unid = faceFeature.getUnid(); String unid = faceFeature.getUnid();
Long gateId = faceFeature.getGateId(); Long gateId = faceFeature.getGateId();
IndexRequest indexRequest = new IndexRequest(poolId) // IndexRequest<PersonInfo> indexRequest = new IndexRequest.Builder<PersonInfo>().index(poolId)
.source(XContentType.JSON, "personId", personId, "unid", unid, // .document(new PersonInfo(0L, unid, personId, feature, age, gender, bodyType, personCountTime, fid, personChannelSerialNum, gateId, direction)).build();
"data", feature, "fid", fid, "age", age, "gender", gender, "body_type", bodyType, // .source(XContentType.JSON, "personId", personId, "unid", unid,
"counttime", personCountTime == null ? null : Constant.DATE_FORMAT.get().format(personCountTime), // "data", feature, "fid", fid, "age", age, "gender", gender, "body_type", bodyType,
"channelSerialNum", personChannelSerialNum, "gateId", gateId, "direction", direction); // "counttime", personCountTime == null ? null : Constant.DATE_FORMAT.get().format(personCountTime),
bulkRequest.add(indexRequest); // "channelSerialNum", personChannelSerialNum, "gateId", gateId, "direction", direction);
// bulkRequest.add(indexRequest);
operationList.add(new BulkOperation.Builder().index(new IndexOperation.Builder<PersonInfo>().index(poolId).document(new PersonInfo(unid, personId, feature, null, age, gender, bodyType, personCountTime, fid, personChannelSerialNum, gateId, direction)).build()).build());
} }
} }
} }
...@@ -312,19 +289,22 @@ public class PersonService { ...@@ -312,19 +289,22 @@ public class PersonService {
Long gateId = bodyFeature.getGateId(); Long gateId = bodyFeature.getGateId();
Date counttime = bodyFeature.getCounttime() == null ? personCountTime : bodyFeature.getCounttime(); Date counttime = bodyFeature.getCounttime() == null ? personCountTime : bodyFeature.getCounttime();
String channelSerialNum = bodyFeature.getChannelSerialNum() == null ? personChannelSerialNum : person.getChannelSerialNum(); String channelSerialNum = bodyFeature.getChannelSerialNum() == null ? personChannelSerialNum : person.getChannelSerialNum();
IndexRequest indexRequest = new IndexRequest(poolId) // IndexRequest<PersonInfo> indexRequest = new IndexRequest.Builder<PersonInfo>().index(poolId)
.source(XContentType.JSON, "personId", personId, "unid", unid, // .document(new PersonInfo(0L, unid, personId, feature, age, gender, bodyType, counttime, fid, channelSerialNum, gateId, direction)).build();
"body", feature, "fid", fid, "age", age, "gender", gender, "body_type", bodyType, // .source(XContentType.JSON, "personId", personId, "unid", unid,
"counttime", counttime == null ? null : Constant.DATE_FORMAT.get().format(counttime) // "body", feature, "fid", fid, "age", age, "gender", gender, "body_type", bodyType,
, "channelSerialNum", channelSerialNum, "gateId", gateId, "direction", direction); // "counttime", counttime == null ? null : Constant.DATE_FORMAT.get().format(counttime)
bulkRequest.add(indexRequest); // , "channelSerialNum", channelSerialNum, "gateId", gateId, "direction", direction);
// bulkRequest.add(indexRequest);
operationList.add(new BulkOperation.Builder().index(new IndexOperation.Builder<PersonInfo>().index(poolId).document(new PersonInfo(unid, personId, feature, null, age, gender, bodyType, personCountTime, fid, personChannelSerialNum, gateId, direction)).build()).build());
} }
} }
} }
if (bulkRequest.requests().size() == 0) {
if (operationList.size() == 0) {
return null; return null;
} }
BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT); BulkResponse bulk = client.bulk(new BulkRequest.Builder().index(poolId).operations(operationList).build());
poolService.refreshPool(poolId); poolService.refreshPool(poolId);
return bulk; return bulk;
} }
...@@ -338,11 +318,11 @@ public class PersonService { ...@@ -338,11 +318,11 @@ public class PersonService {
* @return 删除结果 * @return 删除结果
* @throws IOException elasticsearch 所产生的异常 * @throws IOException elasticsearch 所产生的异常
*/ */
public BulkByScrollResponse deletePerson(String poolId, String personId) throws IOException { public DeleteByQueryResponse deletePerson(String poolId, String personId) throws IOException {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(poolId) DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest.Builder().index(poolId)
.setQuery(new TermQueryBuilder("personId", personId)) .query(q -> q.match(m -> m.field("personId").query(personId)))
.setRefresh(true); .refresh(true).build();
return client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT); return client.deleteByQuery(deleteByQueryRequest);
} }
/** /**
...@@ -421,6 +401,36 @@ public class PersonService { ...@@ -421,6 +401,36 @@ public class PersonService {
} }
} }
private List<Float> getFeature(Double[] feature) {
List<Double> list = Arrays.asList(feature);
return list.stream().map(x -> x.floatValue()).collect(Collectors.toList());
}
private KnnSearch getKnnSearch(Double[] feature, Person person, int type) {
return new KnnSearch.Builder()
.field("body")
.queryVector(getFeature(feature))
.numCandidates(type == 0 ? Constant.FACE_MATCH_RESULT_SIZE : Constant.BODY_MATCH_RESULT_SIZE)
.build();
}
private ScriptScoreQuery getScriptScoreQuery(Double[] feature, Person person, int type) {
// return new Script.Builder()
// .lang("painless")
// .source("(cosineSimilarity(params.face, 'data') + 1) / 2 * 100")
// .params(Collections.singletonMap("body", feature))
// .build();
return new ScriptScoreQuery.Builder()
.query(getSearchSourceBuilder(feature, person, type)._toQuery())
.script(new Script.Builder()
.lang("painless")
.source("(_score + 1)/2 * 100")
.params("body", JsonData.of(0.0f))
.build())
.boost(1.0f)
.build();
}
/** /**
* *
* @param feature * @param feature
...@@ -428,129 +438,178 @@ public class PersonService { ...@@ -428,129 +438,178 @@ public class PersonService {
* @param type 0,人脸,1人体 * @param type 0,人脸,1人体
* @return * @return
*/ */
private SearchSourceBuilder getSearchSourceBuilder(Double[] feature, Person person, int type) { private KnnQuery getSearchSourceBuilder(Double[] feature, Person person, int type) {
Script script = null; // Script script = null;
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); // QueryBuilders boolQuery = QueryBuilders.bool();
List<Query> queries = new ArrayList<>();
KnnQuery.Builder knnQuery = new KnnQuery.Builder();
if (null != feature && type == 0) { if (null != feature && type == 0) {
script = new Script( // Query query = QueryBuilders.knn()
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, // .field("data")
"(cosineSimilarity(params.face, 'data') + 1) / 2 * 100", Collections.singletonMap("face", feature)); // .queryVector(getFeature(feature))
boolQuery.filter(QueryBuilders.existsQuery("data")); // .numCandidates(Constant.FACE_MATCH_RESULT_SIZE)
// .similarity(person.getFaceMinScore())
// .build()._toQuery();
// queries.add(query);
knnQuery.field("data").queryVector(getFeature(feature)).numCandidates(Constant.FACE_MATCH_RESULT_SIZE);
// script = new Script(
// ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
// "(cosineSimilarity(params.face, 'data') + 1) / 2 * 100", Collections.singletonMap("face", feature));
// boolQuery.filter(QueryBuilders.existsQuery("data"));
} else if (null != feature && type == 1){ } else if (null != feature && type == 1){
script = new Script( // Query query = QueryBuilders.knn()
ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, // .field("body")
"(cosineSimilarity(params.body, 'body') + 1) / 2 * 100", Collections.singletonMap("body", feature)); // .queryVector(getFeature(feature))
boolQuery.filter(QueryBuilders.existsQuery("body")); // .numCandidates(Constant.FACE_MATCH_RESULT_SIZE)
// .similarity(person.getBodyMinScore())
//// .similarity("l2_norm")
// .build()._toQuery();
// queries.add(query);
knnQuery.field("body").queryVector(getFeature(feature)).numCandidates(Constant.FACE_MATCH_RESULT_SIZE);
// knnQuery.field("body").numCandidates(Constant.FACE_MATCH_RESULT_SIZE);
// script = new Script(
// ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG,
// "(cosineSimilarity(params.body, 'body') + 1) / 2 * 100", Collections.singletonMap("body", feature));
// boolQuery.filter(QueryBuilders.existsQuery("body"));
queries.add(new Query.Builder().exists(s -> s.field("body").boost(1.0f)).build());
} }
// 根据通道号过滤 // 根据通道号过滤
List<String> channelSerialNums = person.getChannelSerialNums(); List<String> channelSerialNums = person.getChannelSerialNums();
if (CollectionUtils.isNotEmpty(channelSerialNums)) { if (CollectionUtils.isNotEmpty(channelSerialNums)) {
boolQuery.filter(QueryBuilders.termsQuery("channelSerialNum", channelSerialNums)); queries.add(QueryBuilders.terms().field("channelSerialNum").terms((TermsQueryField) channelSerialNums).build()._toQuery());
} }
// 根据监控点id进行过滤 // 根据监控点id进行过滤
List<Long> gateIdIn = person.getGateIdIn(); List<Long> gateIdIn = person.getGateIdIn();
if (CollectionUtils.isNotEmpty(gateIdIn)) { if (CollectionUtils.isNotEmpty(gateIdIn)) {
boolQuery.filter(QueryBuilders.termsQuery("gateId", gateIdIn)); // boolQuery.filter(QueryBuilders.termsQuery("gateId", gateIdIn));
queries.add(QueryBuilders.terms().field("gateId").terms((TermsQueryField) gateIdIn).build()._toQuery());
} }
// 根据人的ID进行过滤 // 根据人的ID进行过滤
if (StringUtils.isNotEmpty(person.getPersonUnid())) { if (StringUtils.isNotEmpty(person.getPersonUnid())) {
boolQuery.filter(QueryBuilders.termQuery("personId", person.getPersonUnid())); // boolQuery.filter(QueryBuilders.termQuery("personId", person.getPersonUnid()));
queries.add(QueryBuilders.term().field("personId").value(person.getPersonUnid()).build()._toQuery());
} }
if (StringUtils.isNotBlank(person.getCompareDirection())) { if (StringUtils.isNotBlank(person.getCompareDirection())) {
boolQuery.filter(QueryBuilders.termQuery("direction", person.getCompareDirection())); // boolQuery.filter(QueryBuilders.term().field("direction")., person.getCompareDirection()));
// QueryBuilders.terms().field("gateId").terms(Arrays.asList(person.getCompareDirection().split(",")))
queries.add(QueryBuilders.term().field("direction").value(person.getCompareDirection()).build()._toQuery());
} }
// RangeQuery.Builder builder = QueryBuilders.range().term(s -> s.field("counttime"));
// 根据时间过滤 // 根据时间过滤
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("counttime"); // RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("counttime");
Date counttimeGTE = person.getCounttimeGTE(); Date counttimeGTE = person.getCounttimeGTE();
Date counttimeLTE = person.getCounttimeLTE(); Date counttimeLTE = person.getCounttimeLTE();
if (counttimeGTE != null) { // if (counttimeGTE != null) {
rangeQueryBuilder.gte(Constant.DATE_FORMAT.get().format(counttimeGTE)); //// rangeQueryBuilder.gte(Constant.DATE_FORMAT.get().format(counttimeGTE));
} // builder.term(s -> s.gte(Constant.DATE_FORMAT.get().format(counttimeGTE)));
if (counttimeLTE != null) { // }
rangeQueryBuilder.lte(Constant.DATE_FORMAT.get().format(counttimeLTE)); // if (counttimeLTE != null) {
} //// rangeQueryBuilder.lte(Constant.DATE_FORMAT.get().format(counttimeLTE));
// builder.term(s -> s.lte(Constant.DATE_FORMAT.get().format(counttimeLTE)));
// }
if (counttimeGTE != null || counttimeLTE != null) { if (counttimeGTE != null || counttimeLTE != null) {
boolQuery.filter(rangeQueryBuilder); // boolQuery.filter(rangeQueryBuilder);
queries.add(QueryBuilders.range().term(s -> s.field("counttime").gte(Constant.DATE_FORMAT.get().format(counttimeGTE)).lte(Constant.DATE_FORMAT.get().format(counttimeLTE))).build()._toQuery());
} }
SearchSourceBuilder builder;
if (null == script) { // queryBuilder.filter(queries);
builder = new SearchSourceBuilder().query(boolQuery); // SearchSourceBuilder builder;
} else { // BoolQuery.Builder boolQuery = new BoolQuery.Builder().filter(queries);
ScriptScoreQueryBuilder queryBuilder = QueryBuilders.scriptScoreQuery(boolQuery, script); // if (null == script) {
builder = new SearchSourceBuilder().query(queryBuilder); // builder = new SearchSourceBuilder().query(boolQuery);
// query = new BoolQuery.Builder().filter(queries).build();
// } else
{
// ScriptScoreQueryBuilder queryBuilder = QueryBuilders.scriptScoreQuery(boolQuery, script);
// builder = new SearchSourceBuilder().query(queryBuilder);
//控制最小分数,只有特征比对时才有分数 //控制最小分数,只有特征比对时才有分数
if (0 == type && null != person.getFaceMinScore()) { // if (0 == type && null != person.getFaceMinScore()) {
builder.minScore(person.getFaceMinScore()); //// builder.minScore(person.getFaceMinScore());
} else if (1 == type && null != person.getBodyMinScore()) { // queries.add(QueryBuilders.scriptScore().minScore(person.getFaceMinScore()).build().query());
builder.minScore(person.getBodyMinScore()); // } else if (1 == type && null != person.getBodyMinScore()) {
} //// builder.minScore(person.getBodyMinScore());
// queries.add(QueryBuilders.scriptScore().minScore(person.getBodyMinScore()).build().query());
// }
} }
return builder;
return knnQuery.filter(queries).build();
// new SearchRequest.Builder().index(poolId).query(q -> q.knn(k -> k.field("body").queryVector(getFeature(feature)).numCandidates(Constant.FACE_MATCH_RESULT_SIZE))).scriptFields("", new ScriptField.Builder().build()).size(matchResultSize).minScore(person.getBodyMinScore());
} }
private SearchRequest getSearchRequest(String rid, String poolId, Integer matchResultSize, Double[] feature, Person person, int type, Boolean agg) { private SearchRequest getSearchRequest(String rid, String poolId, Integer matchResultSize, Double[] feature, Person person, int type, Boolean agg) {
SearchSourceBuilder builder = getSearchSourceBuilder(feature, person, type); // BoolQuery.Builder builder = getSearchSourceBuilder(feature, person, type);
ScriptScoreQuery scriptScoreQuery = getScriptScoreQuery(feature, person, 1);
if (agg) { if (agg) {
MaxAggregationBuilder maxScoreAgg = AggregationBuilders.max("max_score").script(new Script("_score")); // AggregationBuilders.max("max_score", s -> s.field("_score").script(new Script.Builder().source("_score").build()));
TermsAggregationBuilder personIdAgg = AggregationBuilders.terms("by_personId").field("personId"); // Aggregation aggregation = AggregationBuilders.max(s -> s.field("max_score").script(new Script.Builder().source("_score").build()));
personIdAgg.subAggregation(maxScoreAgg); // Aggregation termsAgg = AggregationBuilders.terms("by_personId", TermsAggregationDefinition.builder("by_personId").field("personId").order("max_score", false));
personIdAgg.order(BucketOrder.aggregation("max_score", false)); // Aggregation termsAgg2 = AggregationBuilders.terms(s -> s.field("personId").size(matchResultSize).size(matchResultSize));
personIdAgg.size(matchResultSize); // MaxAggregation.Builder maxBucketAggregation = new MaxAggregation.Builder().field("max_score").script(new Script.Builder().source("_score").build());
builder.aggregation(personIdAgg); // TermsAggregation termsAgg = new TermsAggregation.Builder().field("personId").format("by_personId").size(matchResultSize).child(maxBucketAggregation);
// MetricAg
// termsAgg2.subAggregation(aggregation);
// termsAgg2.order(BucketOrder.aggregation("max_score", false));
// termsAgg.size(matchResultSize);
// MaxAggregationBuilder maxScoreAgg = AggregationBuilders.max("max_score").script(new Script("_score"));
// TermsAggregationBuilder personIdAgg = AggregationBuilders.terms("by_personId").field("personId");
// personIdAgg.subAggregation(maxScoreAgg);
// personIdAgg.order(BucketOrder.aggregation("max_score", false));
// personIdAgg.size(matchResultSize);
// builder.aggregation(personIdAgg);
} else { } else {
builder.fetchSource(FETCH_SOURCE, null) // builder.size(matchResultSize);
.size(matchResultSize);
} }
SourceConfig sourceConfig = new SourceConfig.Builder().filter(s -> s.includes(Arrays.asList(FETCH_SOURCE))).build();
log.debug("rid:{} poolId:{} 匹配时参数:{}", rid, poolId, builder.toString()); log.debug("rid:{} poolId:{} 匹配时参数:{}", rid, poolId, scriptScoreQuery.toString());
// KnnSearch knnQuery = getKnnSearch(feature, person, type);
return new SearchRequest(poolId).source(builder); // ScriptScoreQuery scriptScoreQuery = getScriptScoreQuery(feature, person, type);
return new SearchRequest.Builder().index(poolId).query(scriptScoreQuery._toQuery()).source(sourceConfig).size(matchResultSize).minScore(person.getBodyMinScore().doubleValue()).build();
} }
private List<Person> match0(SearchRequest searchRequest, Boolean agg) throws Exception { private List<Person> match0(SearchRequest searchRequest, Boolean agg) throws Exception {
String poolId = searchRequest.indices()[0]; String poolId = searchRequest.index().get(0);
List<Person> persons = new ArrayList<>(); List<Person> persons = new ArrayList<>();
SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT); // log.info("request:{}", searchRequest.toString());
SearchResponse<SearchResultHit> search = client.search(searchRequest, SearchResultHit.class);
if (agg) { if (agg) {
Aggregations aggregations = search.getAggregations(); Map aggregations = search.aggregations();
Terms byPersonId = aggregations.get("by_personId"); TermsAggregation byPersonId = (TermsAggregation) aggregations.get("by_personId");
List<? extends Terms.Bucket> buckets = byPersonId.getBuckets(); log.info("aggregation:{}", byPersonId.toString());
for (Terms.Bucket bucket : buckets) { // List<Bucket> buckets = byPersonId.;
String personId = (String) bucket.getKey(); // for (Terms.Bucket bucket : buckets) {
Max maxScore = bucket.getAggregations().get("max_score"); // String personId = (String) bucket.getKey();
double value = maxScore.getValue(); // Max maxScore = bucket.getAggregations().get("max_score");
Person person = new Person().setPersonId(personId).setScore((float) value).setPersonPoolId(poolId); // double value = maxScore.getValue();
persons.add(person); // Person person = new Person().setPersonId(personId).setScore((float) value).setPersonPoolId(poolId);
} // persons.add(person);
// }
} else { } else {
SearchHits hits = search.getHits(); HitsMetadata hits = search.hits();
SearchHit[] hits1 = hits.getHits(); // log.info("match0:{}", hits.toString());
for (SearchHit item : hits1) { List<Hit<SearchResultHit>> hits1 = hits.hits();
Map<String, Object> source = item.getSourceAsMap(); for (Hit item : hits1) {
SearchResultHit hit = (SearchResultHit) item.source();
Person p = new Person(); Person p = new Person();
p.setPersonId((String) source.get("personId")); p.setPersonId((String) hit.getPersonId());
p.setAge((Integer) source.get("age")); p.setAge(hit.getAge());
p.setGender((String) source.get("gender")); p.setGender((String) hit.getGender());
p.setChannelSerialNum((String) source.get("channelSerialNum")); p.setChannelSerialNum(hit.getChannelSerialNum());
p.setBodyType((Integer) source.get("body_type")); p.setBodyType(hit.getBodyType());
p.setCounttime(Optional.ofNullable((String) source.get("counttime")) p.setCounttime(Optional.ofNullable(hit.getCountTime())
.map(x -> { .map(x -> {
try { return x;
return Constant.DATE_FORMAT.get().parse(x);
} catch (ParseException ignore) {
}
return null;
}) })
.orElse(null)); .orElse(null));
p.setScore(item.getScore()); p.setScore(item.score().floatValue());
p.setPersonPoolId(item.getIndex()); p.setPersonPoolId(item.index());
persons.add(p); persons.add(p);
} }
} }
......
package com.viontech.match.service; package com.viontech.match.service;
import com.fasterxml.jackson.databind.ObjectMapper; import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.mapping.AllField;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.CountRequest;
import co.elastic.clients.elasticsearch.core.CountResponse;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import com.alibaba.fastjson.JSONObject;
//import com.fasterxml.jackson.databind.ObjectMapper;
import com.viontech.keliu.model.Person; import com.viontech.keliu.model.Person;
import com.viontech.match.config.Constant; import com.viontech.match.config.Constant;
import com.viontech.match.entity.PoolInfo; import com.viontech.match.entity.PoolInfo;
...@@ -9,35 +20,20 @@ import com.viontech.match.entity.vo.ResponseVo; ...@@ -9,35 +20,20 @@ import com.viontech.match.entity.vo.ResponseVo;
import com.viontech.match.runner.ILM; import com.viontech.match.runner.ILM;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Request; import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response; import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.rest.RestStatus;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.HashMap; import java.util.*;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/** /**
* . * .
...@@ -51,13 +47,22 @@ public class PoolService { ...@@ -51,13 +47,22 @@ public class PoolService {
@Resource @Resource
private PersonService personService; private PersonService personService;
@Resource @Resource
private RestHighLevelClient client; private ElasticsearchClient client;
@Resource // @Resource
private ObjectMapper objectMapper; // private ObjectMapper objectMapper;
@Value("${vion.index.number_of_shards:1}") @Value("${vion.index.number_of_shards:1}")
private Integer shards; private Integer shards;
@Value("${vion.index.number_of_replicas:0}") @Value("${vion.index.number_of_replicas:0}")
private Integer replicas; private Integer replicas;
@Value("${vion.index.translog.durability:async}")
private String translogDurability;
// @Value("${vion.index.refresh_interval}")
// private String refreshInterval;
@Value("${vion.index.translog.sync_interval}")
private String translogSyncInterval;
@Value("${vion.index.merge.scheduler.max_thread_count}")
private Integer mergeThreadCount;
/** /**
* 添加特征池 * 添加特征池
...@@ -76,32 +81,46 @@ public class PoolService { ...@@ -76,32 +81,46 @@ public class PoolService {
String poolId = requestVo.getPoolId(); String poolId = requestVo.getPoolId();
log.info("特征池创建操作开始:[{}},IML:{}", poolId, requestVo.isUseILMPolicy()); log.info("特征池创建操作开始:[{}},IML:{}", poolId, requestVo.isUseILMPolicy());
try { try {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(poolId);
XContentBuilder builder = getCreateIndexContentBuilder(); IndexSettings.Builder settings = new IndexSettings.Builder()
createIndexRequest.mapping(builder); .numberOfShards(String.valueOf(shards))
Settings.Builder setting = Settings.builder(); .numberOfReplicas(String.valueOf(replicas));
setting.put("index.number_of_shards", shards); if (StringUtils.isNotEmpty(translogDurability)) {
setting.put("index.number_of_replicas", replicas); settings.translog(t -> t.durability(TranslogDurability.Async));
// setting.put("index.translog.durability", translogDurability);
if (StringUtils.isNotEmpty(translogSyncInterval)) {
settings.translog(t -> t.syncInterval(i -> i.time(translogSyncInterval)));
// setting.put("index.translog.sync_interval", translogSyncInterval);
}
}
if (mergeThreadCount != null) {
// setting.put("index.merge.scheduler.max_thread_count", mergeThreadCount);
settings.merge(m -> m.scheduler(s -> s.maxThreadCount(mergeThreadCount)));
}
if (requestVo.isUseILMPolicy()) { if (requestVo.isUseILMPolicy()) {
setting.put("index.lifecycle.name", ILM.LIFECYCLE_NAME); // setting.put("index.lifecycle.name", ILM.LIFECYCLE_NAME);
settings.lifecycle(l -> l.name(ILM.LIFECYCLE_NAME));
} }
createIndexRequest.settings(setting);
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); TypeMapping.Builder builder = getCreateIndexContentBuilder();
CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(poolId).mappings(builder.build()).settings(settings.build()).build();
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest);
if (addPerson) { if (addPerson) {
List<Person> personPool = requestVo.getPersonPool(); List<Person> personPool = requestVo.getPersonPool();
if (CollectionUtils.isNotEmpty(personPool)) { if (CollectionUtils.isNotEmpty(personPool)) {
BulkResponse bulkItemResponses = personService.addPerson(poolId, personPool); BulkResponse bulkItemResponses = personService.addPerson(poolId, personPool);
if (bulkItemResponses != null) { if (bulkItemResponses != null) {
log.info(bulkItemResponses.buildFailureMessage()); log.info(bulkItemResponses.toString());
} }
} }
} }
log.info("特征池创建操作完成:[{}],ILM:{}", poolId, requestVo.isUseILMPolicy()); log.info("特征池创建操作完成:[{}],ILM:{}", poolId, requestVo.isUseILMPolicy());
return ResponseVo.success(rid); return ResponseVo.success(rid);
} catch (ElasticsearchStatusException e) { } catch (ElasticsearchException e) {
if (e.status() == RestStatus.BAD_REQUEST && e.getMessage().contains(Constant.STR_POOL_ID_ALREADY_EXISTS)) { if (e.status() == 400 && e.getMessage().contains(Constant.STR_POOL_ID_ALREADY_EXISTS)) {
return ResponseVo.error(rid, 20, "already exit poolId"); return ResponseVo.error(rid, 20, "already exit poolId");
} else { } else {
throw e; throw e;
...@@ -133,15 +152,16 @@ public class PoolService { ...@@ -133,15 +152,16 @@ public class PoolService {
} }
} }
} else if (flushPool == 1) { } else if (flushPool == 1) {
AcknowledgedResponse delete = client.indices().delete(new DeleteIndexRequest(poolId), RequestOptions.DEFAULT); //删除索引
DeleteIndexResponse delete = client.indices().delete(new DeleteIndexRequest.Builder().index(poolId).build());
} }
log.info("特征池删除操作完成:[{}],FlushPool:[{}]", poolId, flushPool); log.info("特征池删除操作完成:[{}],FlushPool:[{}]", poolId, flushPool);
return ResponseVo.success(rid); return ResponseVo.success(rid);
} catch (ElasticsearchStatusException e) { } catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) { if (e.status() == 404) {
return ResponseVo.poolIdNotExists(rid); return ResponseVo.poolIdNotExists(rid);
} else { } else {
return ResponseVo.error(rid, e.getDetailedMessage()); return ResponseVo.error(rid, e.getMessage());
} }
} }
} }
...@@ -160,21 +180,23 @@ public class PoolService { ...@@ -160,21 +180,23 @@ public class PoolService {
List<Person> personPool = requestVo.getPersonPool(); List<Person> personPool = requestVo.getPersonPool();
Integer updateType = requestVo.getUpdateType(); Integer updateType = requestVo.getUpdateType();
log.info("特征池修改操作开始:[{}],updateType:[{}]", poolId, updateType); log.info("特征池修改操作开始:[{}],updateType:[{}]", poolId, updateType);
if (!client.indices().exists(new GetIndexRequest(poolId), RequestOptions.DEFAULT)) { co.elastic.clients.elasticsearch.indices.ExistsRequest poolIdExists = new ExistsRequest.Builder().index(poolId).build();
BooleanResponse poolIdExistsResponse = client.indices().exists(poolIdExists);
if (!poolIdExistsResponse.value()) {
return ResponseVo.poolIdNotExists(rid); return ResponseVo.poolIdNotExists(rid);
} }
try { try {
BulkResponse bulkItemResponses = personService.addPerson(poolId, personPool); BulkResponse bulkItemResponses = personService.addPerson(poolId, personPool);
if (bulkItemResponses != null && bulkItemResponses.hasFailures()) { if (bulkItemResponses != null && bulkItemResponses.errors()) {
log.info(bulkItemResponses.buildFailureMessage()); log.info(bulkItemResponses.items().toString());
return ResponseVo.error(rid, bulkItemResponses.buildFailureMessage()); return ResponseVo.error(rid, bulkItemResponses.items().toString());
} }
log.info("特征池修改操作完成:[{}],updateType:[{}]", poolId, updateType); log.info("特征池修改操作完成:[{}],updateType:[{}]", poolId, updateType);
return ResponseVo.success(rid, "success"); return ResponseVo.success(rid, "success");
} catch (ElasticsearchStatusException e) { } catch (ElasticsearchException e) {
log.error("modifyPool", e); log.error("modifyPool", e);
return ResponseVo.error(rid, e.getDetailedMessage()); return ResponseVo.error(rid, e.getMessage());
} }
} }
...@@ -195,7 +217,7 @@ public class PoolService { ...@@ -195,7 +217,7 @@ public class PoolService {
log.info("查询特征池操作开始:[{}],rid:[{}]", poolId, rid); log.info("查询特征池操作开始:[{}],rid:[{}]", poolId, rid);
if (listAll != 0) { if (listAll != 0) {
poolInfos = queryPoolInfo(null); poolInfos = queryPoolInfo(poolId);
} else { } else {
if (!existPool(poolId)) { if (!existPool(poolId)) {
return ResponseVo.poolIdNotExists(rid); return ResponseVo.poolIdNotExists(rid);
...@@ -207,150 +229,103 @@ public class PoolService { ...@@ -207,150 +229,103 @@ public class PoolService {
log.info("查询特征池操作完成:[{}],rid:[{}]", poolId, rid); log.info("查询特征池操作完成:[{}],rid:[{}]", poolId, rid);
return success; return success;
} catch (ElasticsearchStatusException e) { } catch (ElasticsearchException e) {
log.error("queryPool", e); log.error("queryPool", e);
return ResponseVo.error(rid, e.getDetailedMessage()); return ResponseVo.error(rid, e.getMessage());
} }
} }
public void refreshPool(String... poolIds) throws IOException { public void refreshPool(String... poolIds) throws IOException {
RefreshRequest refreshRequest = new RefreshRequest(poolIds); RefreshRequest refreshRequest = new RefreshRequest.Builder().index(Arrays.asList(poolIds)).build();
RefreshResponse refresh = client.indices().refresh(refreshRequest, RequestOptions.DEFAULT); RefreshResponse refresh = client.indices().refresh(refreshRequest);
log.info("刷新索引:{},成功:{},失败:{}", poolIds, refresh.getSuccessfulShards(), refresh.getFailedShards()); log.info("刷新索引:{},成功:{},失败:{}", poolIds, refresh.shards().successful(), refresh.shards().failed());
} }
public XContentBuilder getCreateIndexContentBuilder() throws IOException { public TypeMapping.Builder getCreateIndexContentBuilder() throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder(); // JSONObject content = new JSONObject();
builder.startObject(); // JSONObject properties = new JSONObject();
{ // JSONObject data = new JSONObject();
builder.startObject("properties"); // data.put("type", "dense_vector");
{ // data.put("dims", Constant.FACE_FEATURE_DIMS);
// 人脸特征 // JSONObject body = new JSONObject();
builder.startObject("data"); // body.put("type", "dense_vector");
{ // body.put("dims", Constant.BODY_FEATURE_DIMS_2048);
builder.field("type", "dense_vector"); // JSONObject personId = new JSONObject();
builder.field("dims", Constant.FACE_FEATURE_DIMS); // personId.put("type", "keyword");
} //
builder.endObject(); // JSONObject unid = new JSONObject();
// unid.put("type", "keyword");
// 人体特征 // JSONObject fid = new JSONObject();
builder.startObject("body"); // fid.put("type", "text");
{ // JSONObject age = new JSONObject();
builder.field("type", "dense_vector"); // age.put("type", "integer");
builder.field("dims", Constant.BODY_FEATURE_DIMS_2048); // age.put("doc_values", false);
} // age.put("index", false);
builder.endObject(); // JSONObject gender = new JSONObject();
// gender.put("type", "keyword");
// 人员id // gender.put("doc_values", false);
builder.startObject("personId"); // gender.put("index", false);
{ // JSONObject counttime = new JSONObject();
builder.field("type", "keyword"); // counttime.put("type", "date");
} // counttime.put("format", "yyyy-MM-dd HH:mm:ss");
builder.endObject(); // JSONObject channelSerialNum = new JSONObject();
// channelSerialNum.put("type", "keyword");
// 抓拍id // JSONObject body_type = new JSONObject();
builder.startObject("unid"); // body_type.put("type", "integer");
{ // body_type.put("doc_values", false);
builder.field("type", "keyword"); // body_type.put("index", false);
} // JSONObject direction = new JSONObject();
builder.endObject(); // direction.put("type", "keyword");
// JSONObject gateId = new JSONObject();
// 特征id // gateId.put("type", "long");
builder.startObject("fid"); // properties.put("data", data);
{ // properties.put("body", body);
builder.field("type", "text"); // properties.put("personId", personId);
} // properties.put("unid", unid);
builder.endObject(); // properties.put("fid", fid);
// properties.put("age", age);
// 年龄 // properties.put("gender", gender);
builder.startObject("age"); // properties.put("counttime", counttime);
{ // properties.put("channelSerialNum", channelSerialNum);
builder.field("type", "integer"); // properties.put("body_type", body_type);
builder.field("doc_values", false); // properties.put("gateId", gateId);
builder.field("index", false); // properties.put("direction", direction);
} //
builder.endObject(); // InputStream is = new ByteArrayInputStream(properties.toJSONString().getBytes());
// 性别 Map<String, Property> properties1 = new HashMap<>();
builder.startObject("gender"); properties1.put("direction", new Property.Builder().keyword(k -> k.index(true)).build());
{ properties1.put("fid", new Property.Builder().text(t -> t.index(true)).build());
builder.field("type", "keyword"); properties1.put("gateId", new Property.Builder().long_(t -> t.index(true)).build());
builder.field("doc_values", false); properties1.put("personId", new Property.Builder().keyword(k -> k.index(true)).build());
builder.field("index", false); properties1.put("body", new Property.Builder().denseVector(d -> d.dims(Constant.BODY_FEATURE_DIMS_2048).similarity("cosine")).build());
} properties1.put("data", new Property.Builder().denseVector(d -> d.dims(Constant.FACE_FEATURE_DIMS).similarity("cosine")).build());
builder.endObject(); properties1.put("body_type", new Property.Builder().integer(i -> i.index(false).docValues(false)).build());
properties1.put("channelSerialNum", new Property.Builder().keyword(k -> k.index(true)).build());
// 时间 properties1.put("age", new Property.Builder().integer(i -> i.index(false).docValues(false)).build());
builder.startObject("counttime"); properties1.put("gender", new Property.Builder().keyword(k -> k.index(false).docValues(false)).build());
{ properties1.put("counttime", new Property.Builder().date(d -> d.format("yyyy-MM-dd HH:mm:ss")).build());
builder.field("type", "date"); properties1.put("unid", new Property.Builder().keyword(k -> k.index(true)).build());
builder.field("format", "yyyy-MM-dd HH:mm:ss");
} // builder.properties(properties1);
builder.endObject(); TypeMapping.Builder builder = new TypeMapping.Builder().properties(properties1);
// 通道序列号
builder.startObject("channelSerialNum");
{
builder.field("type", "keyword");
}
builder.endObject();
// 身体特征类型类型
builder.startObject("body_type");
{
builder.field("type", "integer");
builder.field("doc_values", false);
builder.field("index", false);
}
builder.endObject();
// 监控点 ID
builder.startObject("gateId");
{
builder.field("type", "long");
}
builder.endObject();
// 方向 direction
builder.startObject("direction");
{
builder.field("type", "keyword");
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
return builder; return builder;
} }
public boolean existPool(String poolId) throws IOException { public boolean existPool(String poolId) throws IOException {
return client.indices().exists(new GetIndexRequest(poolId), RequestOptions.DEFAULT); BooleanResponse exists = client.indices().exists(new ExistsRequest.Builder().index(poolId).build());
return exists.value();
} }
public List<PoolInfo> queryPoolInfo(String poolId) throws IOException { public List<PoolInfo> queryPoolInfo(String poolId) throws IOException {
String endPoint = poolId == null ? "/_stats" : "/" + poolId + "/_stats"; CountResponse response = client.count(new CountRequest.Builder().index(poolId).build());
LinkedList<PoolInfo> poolInfos = new LinkedList<>(); LinkedList<PoolInfo> poolInfos = new LinkedList<>();
RestClient lowLevelClient = client.getLowLevelClient(); PoolInfo poolInfo = new PoolInfo();
Request request = new Request("GET", endPoint); poolInfo.setPersonCount( response.count());
Response response = lowLevelClient.performRequest(request); poolInfo.setPoolId(poolId);
HttpEntity entity = response.getEntity(); poolInfos.add(poolInfo);
InputStream content = entity.getContent();
HashMap responseMap = objectMapper.readValue(content, HashMap.class);
HashMap<String, HashMap> indices1 = (HashMap<String, HashMap>) responseMap.get("indices");
for (Map.Entry<String, HashMap> entry : indices1.entrySet()) {
String id = entry.getKey();
HashMap value = entry.getValue();
HashMap<String, HashMap> primaries = (HashMap<String, HashMap>) value.get("primaries");
HashMap docs = primaries.get("docs");
Integer count = (Integer) docs.get("count");
PoolInfo poolInfo = new PoolInfo();
poolInfo.setPersonCount(count.longValue());
poolInfo.setPoolId(id);
poolInfos.add(poolInfo);
}
return poolInfos; return poolInfos;
} }
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!