Commit 8c5661a5 by 姚冰

[chg] 索引是否存在逻辑优化

1 parent 97494f0c
...@@ -8,6 +8,7 @@ import org.apache.http.HttpHost; ...@@ -8,6 +8,7 @@ import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope; import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider; import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.TrustStrategy; import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider;
...@@ -17,6 +18,7 @@ import org.apache.http.impl.nio.reactor.IOReactorConfig; ...@@ -17,6 +18,7 @@ import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.reactor.IOReactorException; import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.ssl.SSLContextBuilder;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -54,12 +56,18 @@ public class ElasticsearchConfiguration { ...@@ -54,12 +56,18 @@ public class ElasticsearchConfiguration {
connectionManager = new PoolingNHttpClientConnectionManager(new connectionManager = new PoolingNHttpClientConnectionManager(new
DefaultConnectingIOReactor(ioReactorConfig)); DefaultConnectingIOReactor(ioReactorConfig));
// 设置最大连接数 // 设置最大连接数
connectionManager.setMaxTotal(100); // 整个连接池的最大连接数 connectionManager.setMaxTotal(200); // 整个连接池的最大连接数
connectionManager.setDefaultMaxPerRoute(20); connectionManager.setDefaultMaxPerRoute(40);
} catch (IOReactorException e) { } catch (IOReactorException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(5000) // 连接超时:50毫秒
.setSocketTimeout(5000) // 套接字超时:50毫秒
.setConnectionRequestTimeout(50) // 请求超时:50毫秒
.build();
PoolingNHttpClientConnectionManager finalConnectionManager = connectionManager; PoolingNHttpClientConnectionManager finalConnectionManager = connectionManager;
RestClient restClient = RestClient.builder(esHosts) RestClient restClient = RestClient.builder(esHosts)
.setRequestConfigCallback(requestConfigBuilder -> { .setRequestConfigCallback(requestConfigBuilder -> {
...@@ -73,6 +81,8 @@ public class ElasticsearchConfiguration { ...@@ -73,6 +81,8 @@ public class ElasticsearchConfiguration {
httpClientBuilder.setMaxConnPerRoute(properties.getMaxConnPerRoute()); httpClientBuilder.setMaxConnPerRoute(properties.getMaxConnPerRoute());
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
httpClientBuilder.setConnectionManager(finalConnectionManager); httpClientBuilder.setConnectionManager(finalConnectionManager);
// httpClientBuilder.setDefaultRequestConfig(requestConfig);
// try { // try {
// // 创建一个信任所有证书的 TrustStrategy 策略 // // 创建一个信任所有证书的 TrustStrategy 策略
// TrustStrategy acceptTrustStrategy = new TrustStrategy() { // TrustStrategy acceptTrustStrategy = new TrustStrategy() {
......
...@@ -576,49 +576,31 @@ public class PersonService { ...@@ -576,49 +576,31 @@ public class PersonService {
private SearchRequest getSearchRequest(String rid, String poolId, Integer matchResultSize, Double[] feature, Person person, int type, Boolean agg) { private SearchRequest getSearchRequest(String rid, String poolId, Integer matchResultSize, Double[] feature, Person person, int type, Boolean agg) {
// BoolQuery.Builder builder = getSearchSourceBuilder(feature, person, type); // BoolQuery.Builder builder = getSearchSourceBuilder(feature, person, type);
Query scriptScoreQuery = getScriptScoreQuery(feature, person, type); Query scriptScoreQuery = getScriptScoreQuery(feature, person, type);
if (agg) {
// AggregationBuilders.max("max_score", s -> s.field("_score").script(new Script.Builder().source("_score").build()));
// Aggregation aggregation = AggregationBuilders.max(s -> s.field("max_score").script(new Script.Builder().source("_score").build()));
// Aggregation termsAgg = AggregationBuilders.terms("by_personId", TermsAggregationDefinition.builder("by_personId").field("personId").order("max_score", false));
// Aggregation termsAgg2 = AggregationBuilders.terms(s -> s.field("personId").size(matchResultSize).size(matchResultSize));
// MaxAggregation.Builder maxBucketAggregation = new MaxAggregation.Builder().field("max_score").script(new Script.Builder().source("_score").build());
// TermsAggregation termsAgg = new TermsAggregation.Builder().field("personId").format("by_personId").size(matchResultSize).child(maxBucketAggregation);
// MetricAg
// termsAgg2.subAggregation(aggregation);
// termsAgg2.order(BucketOrder.aggregation("max_score", false));
// termsAgg.size(matchResultSize);
// MaxAggregationBuilder maxScoreAgg = AggregationBuilders.max("max_score").script(new Script("_score"));
// TermsAggregationBuilder personIdAgg = AggregationBuilders.terms("by_personId").field("personId");
// personIdAgg.subAggregation(maxScoreAgg);
// personIdAgg.order(BucketOrder.aggregation("max_score", false));
// personIdAgg.size(matchResultSize);
// builder.aggregation(personIdAgg);
} else {
// builder.size(matchResultSize);
}
SourceConfig sourceConfig = new SourceConfig.Builder().filter(s -> s.includes(Arrays.asList(FETCH_SOURCE))).build(); SourceConfig sourceConfig = new SourceConfig.Builder().filter(s -> s.includes(Arrays.asList(FETCH_SOURCE))).build();
// log.debug("rid:{} poolId:{} 匹配时参数:{}", rid, poolId, scriptScoreQuery.toString());
// KnnSearch knnQuery = getKnnSearch(feature, person, type);
// ScriptScoreQuery scriptScoreQuery = getScriptScoreQuery(feature, person, type);
double minScore = 0.0; double minScore = 0.0;
if (type == 0) { if (type == 0) {
minScore = person.getFaceMinScore().doubleValue()/100; minScore = person.getFaceMinScore().doubleValue()/100;
} else if (type == 1) { } else if (type == 1) {
minScore = person.getBodyMinScore().doubleValue()/100; minScore = person.getBodyMinScore().doubleValue()/100;
} }
return new SearchRequest.Builder().index(poolId).query(scriptScoreQuery).source(sourceConfig).size(matchResultSize).minScore(minScore).build(); return new SearchRequest.Builder().index(poolId).query(scriptScoreQuery).source(sourceConfig)
.size(matchResultSize)
.minScore(minScore)
.timeout("50ms")
.build();
} }
private List<Person> match0(SearchRequest searchRequest, Boolean agg) throws Exception { private List<Person> match0(SearchRequest searchRequest, Boolean agg) throws Exception {
String poolId = searchRequest.index().get(0); String poolId = searchRequest.index().get(0);
List<Person> persons = new ArrayList<>(); List<Person> persons = new ArrayList<>();
// log.info("request:{}", JSON.tojsonS); // log.info("request:{}", JSON.tojsonS);
SearchResponse<SearchResultHit> search = client.search(searchRequest, SearchResultHit.class); try {
if (agg) { SearchResponse<SearchResultHit> search = client.search(searchRequest, SearchResultHit.class);
Map aggregations = search.aggregations(); if (agg) {
TermsAggregation byPersonId = (TermsAggregation) aggregations.get("by_personId"); Map aggregations = search.aggregations();
log.info("aggregation:{}", byPersonId.toString()); TermsAggregation byPersonId = (TermsAggregation) aggregations.get("by_personId");
log.info("aggregation:{}", byPersonId.toString());
// List<Bucket> buckets = byPersonId.; // List<Bucket> buckets = byPersonId.;
// for (Terms.Bucket bucket : buckets) { // for (Terms.Bucket bucket : buckets) {
// String personId = (String) bucket.getKey(); // String personId = (String) bucket.getKey();
...@@ -627,34 +609,37 @@ public class PersonService { ...@@ -627,34 +609,37 @@ public class PersonService {
// Person person = new Person().setPersonId(personId).setScore((float) value).setPersonPoolId(poolId); // Person person = new Person().setPersonId(personId).setScore((float) value).setPersonPoolId(poolId);
// persons.add(person); // persons.add(person);
// } // }
} else { } else {
HitsMetadata hits = search.hits(); HitsMetadata hits = search.hits();
// log.info("match0:{}", hits.toString()); // log.info("match0:{}", hits.toString());
List<Hit<SearchResultHit>> hits1 = hits.hits(); List<Hit<SearchResultHit>> hits1 = hits.hits();
for (Hit item : hits1) { for (Hit item : hits1) {
SearchResultHit hit = (SearchResultHit) item.source(); SearchResultHit hit = (SearchResultHit) item.source();
Person p = new Person(); Person p = new Person();
p.setPersonId((String) hit.getPersonId()); p.setPersonId((String) hit.getPersonId());
p.setAge(hit.getAge()); p.setAge(hit.getAge());
p.setGender((String) hit.getGender()); p.setGender((String) hit.getGender());
p.setChannelSerialNum(hit.getChannelSerialNum()); p.setChannelSerialNum(hit.getChannelSerialNum());
p.setBodyType(hit.getBody_type()); p.setBodyType(hit.getBody_type());
p.setCaptureUnid(hit.getUnid()); p.setCaptureUnid(hit.getUnid());
p.setCounttime(Optional.ofNullable((String)hit.getCounttime()) p.setCounttime(Optional.ofNullable((String)hit.getCounttime())
.map(x -> { .map(x -> {
try { try {
return Constant.DATE_FORMAT.get().parse(x); return Constant.DATE_FORMAT.get().parse(x);
} catch (ParseException e) { } catch (ParseException e) {
return null; return null;
} }
}) })
.orElse(null)); .orElse(null));
p.setScore(item.score().floatValue() * 100); p.setScore(item.score().floatValue() * 100);
p.setPersonPoolId(item.index()); p.setPersonPoolId(item.index());
persons.add(p); persons.add(p);
}
} }
} catch (Exception e) {
log.error("match0 excepton", e.getMessage());
} }
return persons; return persons;
......
...@@ -27,6 +27,7 @@ import org.elasticsearch.client.RequestOptions; ...@@ -27,6 +27,7 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response; import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
...@@ -34,6 +35,8 @@ import java.io.ByteArrayInputStream; ...@@ -34,6 +35,8 @@ import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/** /**
* . * .
...@@ -64,6 +67,8 @@ public class PoolService { ...@@ -64,6 +67,8 @@ public class PoolService {
@Value("${vion.index.merge.scheduler.max_thread_count}") @Value("${vion.index.merge.scheduler.max_thread_count}")
private Integer mergeThreadCount; private Integer mergeThreadCount;
private final static ConcurrentMap<String, Boolean> poolMap = new ConcurrentHashMap<>();
/** /**
* 添加特征池 * 添加特征池
* *
...@@ -81,9 +86,12 @@ public class PoolService { ...@@ -81,9 +86,12 @@ public class PoolService {
String poolId = requestVo.getPoolId(); String poolId = requestVo.getPoolId();
// log.info("特征池创建操作开始:[{}},IML:{}", poolId, requestVo.isUseILMPolicy()); // log.info("特征池创建操作开始:[{}},IML:{}", poolId, requestVo.isUseILMPolicy());
try { try {
String shardsStr = String.valueOf(shards);
if (poolId.contains("staff") || poolId.contains("person")) {
shardsStr = "1";
}
IndexSettings.Builder settings = new IndexSettings.Builder() IndexSettings.Builder settings = new IndexSettings.Builder()
.numberOfShards(String.valueOf(shards)) .numberOfShards(shardsStr)
.numberOfReplicas(String.valueOf(replicas)); .numberOfReplicas(String.valueOf(replicas));
if (StringUtils.isNotEmpty(translogDurability)) { if (StringUtils.isNotEmpty(translogDurability)) {
settings.translog(t -> t.durability(TranslogDurability.Async)); settings.translog(t -> t.durability(TranslogDurability.Async));
...@@ -202,26 +210,6 @@ public class PoolService { ...@@ -202,26 +210,6 @@ public class PoolService {
} }
// public ResponseVo deletePoolData(RequestVo requestVo) throws Exception {
// String rid = requestVo.getRid();
// Integer flushPool = requestVo.getFlushPool();
// String poolId = requestVo.getPoolId();
// Long mallId = requestVo.getMallId();
// log.info("特征池删除mallId:{}数据操作开始:[{}]", mallId,poolId);
// try {
// personService.deletePersonByMallId(poolId, mallId);
// log.info("特征池删除mallId:{}操作完成:[{}]", mallId, poolId);
// return ResponseVo.success(rid);
// } catch (ElasticsearchException e) {
// if (e.status() == 404) {
// return ResponseVo.poolIdNotExists(rid);
// } else {
// return ResponseVo.error(rid, e.getMessage());
// }
// }
// }
/** /**
* 修改特征池(添加人员) * 修改特征池(添加人员)
* *
...@@ -235,7 +223,7 @@ public class PoolService { ...@@ -235,7 +223,7 @@ public class PoolService {
String poolId = requestVo.getPoolId(); String poolId = requestVo.getPoolId();
List<Person> personPool = requestVo.getPersonPool(); List<Person> personPool = requestVo.getPersonPool();
Integer updateType = requestVo.getUpdateType(); Integer updateType = requestVo.getUpdateType();
// log.info("特征池修改操作开始:[{}],updateType:[{}]", poolId, updateType); log.info("特征池修改操作开始:[{}],updateType:[{}],size:{}", poolId, updateType, personPool.size());
co.elastic.clients.elasticsearch.indices.ExistsRequest poolIdExists = new ExistsRequest.Builder().index(poolId).build(); co.elastic.clients.elasticsearch.indices.ExistsRequest poolIdExists = new ExistsRequest.Builder().index(poolId).build();
BooleanResponse poolIdExistsResponse = client.indices().exists(poolIdExists); BooleanResponse poolIdExistsResponse = client.indices().exists(poolIdExists);
if (!poolIdExistsResponse.value()) { if (!poolIdExistsResponse.value()) {
...@@ -246,6 +234,9 @@ public class PoolService { ...@@ -246,6 +234,9 @@ public class PoolService {
if (bulkItemResponses != null && bulkItemResponses.errors()) { if (bulkItemResponses != null && bulkItemResponses.errors()) {
log.info(bulkItemResponses.items().toString()); log.info(bulkItemResponses.items().toString());
return ResponseVo.error(rid, bulkItemResponses.items().toString()); return ResponseVo.error(rid, bulkItemResponses.items().toString());
} else if (bulkItemResponses == null) {
log.info("bulkItemResponses is null");
return ResponseVo.error(rid, bulkItemResponses.items().toString());
} }
log.info("特征池修改:[{}],updateType:[{}]", poolId, updateType); log.info("特征池修改:[{}],updateType:[{}]", poolId, updateType);
...@@ -319,8 +310,25 @@ public class PoolService { ...@@ -319,8 +310,25 @@ public class PoolService {
} }
public boolean existPool(String poolId) throws IOException { public boolean existPool(String poolId) throws IOException {
BooleanResponse exists = client.indices().exists(new ExistsRequest.Builder().index(poolId).build()); if (poolMap.containsKey(poolId)) {
return exists.value(); return true;
} else {
BooleanResponse exists = client.indices().exists(new ExistsRequest.Builder().index(poolId).build());
if (exists.value()) {
poolMap.put(poolId, true);
}
return exists.value();
}
}
@Scheduled(cron = "0 0/2 * * * ?")
public void checkPoolExist() throws IOException {
Map<String, Boolean> tmpMap = poolMap;
for (Map.Entry<String, Boolean> entry : tmpMap.entrySet()) {
if (!existPool(entry.getKey())) {
poolMap.remove(entry.getKey());
}
}
} }
public List<PoolInfo> queryPoolInfo(String poolId) throws IOException { public List<PoolInfo> queryPoolInfo(String poolId) throws IOException {
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!