Commit 5607af53 by 姚冰

Merge branch 'refs/heads/develop_es8.15_merge_0624' into develop_es8.15_springboot3

# Conflicts:
#	src/main/java/com/viontech/match/MatchApp.java
2 parents 4f08f31d 6f6a25b2
......@@ -4,8 +4,10 @@ import com.ulisesbocchio.jasyptspringboot.annotation.EnableEncryptableProperties
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync
@EnableEncryptableProperties
@ComponentScan(basePackages = {"com.viontech"})public class MatchApp {
......
package com.viontech.match.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class AsyncConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(10000);
executor.setThreadNamePrefix("Async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
......@@ -8,6 +8,7 @@ import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.impl.client.BasicCredentialsProvider;
......@@ -17,6 +18,7 @@ import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.reactor.IOReactorException;
import org.apache.http.ssl.SSLContextBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
......@@ -54,12 +56,18 @@ public class ElasticsearchConfiguration {
connectionManager = new PoolingNHttpClientConnectionManager(new
DefaultConnectingIOReactor(ioReactorConfig));
// 设置最大连接数
connectionManager.setMaxTotal(100); // 整个连接池的最大连接数
connectionManager.setDefaultMaxPerRoute(20);
connectionManager.setMaxTotal(properties.getMaxConnTotal()); // 整个连接池的最大连接数
connectionManager.setDefaultMaxPerRoute(properties.getMaxConnPerRoute());
} catch (IOReactorException e) {
throw new RuntimeException(e);
}
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(5000) // 连接超时:50毫秒
.setSocketTimeout(5000) // 套接字超时:50毫秒
.setConnectionRequestTimeout(50) // 请求超时:50毫秒
.build();
PoolingNHttpClientConnectionManager finalConnectionManager = connectionManager;
RestClient restClient = RestClient.builder(esHosts)
.setRequestConfigCallback(requestConfigBuilder -> {
......@@ -73,6 +81,8 @@ public class ElasticsearchConfiguration {
httpClientBuilder.setMaxConnPerRoute(properties.getMaxConnPerRoute());
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
httpClientBuilder.setConnectionManager(finalConnectionManager);
// httpClientBuilder.setDefaultRequestConfig(requestConfig);
// try {
// // 创建一个信任所有证书的 TrustStrategy 策略
// TrustStrategy acceptTrustStrategy = new TrustStrategy() {
......
......@@ -13,6 +13,7 @@ import org.springframework.web.bind.annotation.RestController;
import jakarta.annotation.Resource;
import java.io.IOException;
import java.util.Arrays;
/**
* .
......@@ -44,7 +45,15 @@ public class MainController {
case QueryPersonPool:
return poolService.queryPool(requestVo);
case MatchPerson:
return personService.matchPerson(requestVo);
ResponseVo matchPerson = personService.matchPerson(requestVo);
if (matchPerson.getMatchBodies() != null && matchPerson.getMatchBodies().size() > 0) {
requestVo.getPerson().setPersonId(matchPerson.getMatchBodies().get(0).getPersonId());
} else if(matchPerson.getMatchPersons() != null && matchPerson.getMatchPersons().size() > 0) {
requestVo.getPerson().setPersonId(matchPerson.getMatchPersons().get(0).getPersonId());
}
poolService.asyncModifyPool(requestVo.getPersonPoolId(), requestVo.getPerson());
return matchPerson;
case UpdatePerson:
return personService.updatePerson(requestVo);
case CountMatchPerson:
......
......@@ -56,6 +56,7 @@ import java.util.stream.Stream;
@Slf4j
public class PersonService {
private static final String[] FETCH_SOURCE = new String[]{"personId", "age", "gender", "fid", "counttime", "channelSerialNum", "body_type","unid"};
public static final SourceConfig SOURCE_CONFIG = new SourceConfig.Builder().filter(s -> s.includes(Arrays.asList(FETCH_SOURCE))).build();
@Resource
private ElasticsearchClient client;
@Resource
......@@ -485,49 +486,31 @@ public class PersonService {
private SearchRequest getSearchRequest(String rid, String poolId, Integer matchResultSize, Double[] feature, Person person, int type, Boolean agg) {
// BoolQuery.Builder builder = getSearchSourceBuilder(feature, person, type);
Query scriptScoreQuery = getScriptScoreQuery(feature, person, type);
if (agg) {
// AggregationBuilders.max("max_score", s -> s.field("_score").script(new Script.Builder().source("_score").build()));
// Aggregation aggregation = AggregationBuilders.max(s -> s.field("max_score").script(new Script.Builder().source("_score").build()));
// Aggregation termsAgg = AggregationBuilders.terms("by_personId", TermsAggregationDefinition.builder("by_personId").field("personId").order("max_score", false));
// Aggregation termsAgg2 = AggregationBuilders.terms(s -> s.field("personId").size(matchResultSize).size(matchResultSize));
// MaxAggregation.Builder maxBucketAggregation = new MaxAggregation.Builder().field("max_score").script(new Script.Builder().source("_score").build());
// TermsAggregation termsAgg = new TermsAggregation.Builder().field("personId").format("by_personId").size(matchResultSize).child(maxBucketAggregation);
// MetricAg
// termsAgg2.subAggregation(aggregation);
// termsAgg2.order(BucketOrder.aggregation("max_score", false));
// termsAgg.size(matchResultSize);
// MaxAggregationBuilder maxScoreAgg = AggregationBuilders.max("max_score").script(new Script("_score"));
// TermsAggregationBuilder personIdAgg = AggregationBuilders.terms("by_personId").field("personId");
// personIdAgg.subAggregation(maxScoreAgg);
// personIdAgg.order(BucketOrder.aggregation("max_score", false));
// personIdAgg.size(matchResultSize);
// builder.aggregation(personIdAgg);
} else {
// builder.size(matchResultSize);
}
SourceConfig sourceConfig = new SourceConfig.Builder().filter(s -> s.includes(Arrays.asList(FETCH_SOURCE))).build();
// log.debug("rid:{} poolId:{} 匹配时参数:{}", rid, poolId, scriptScoreQuery.toString());
// KnnSearch knnQuery = getKnnSearch(feature, person, type);
// ScriptScoreQuery scriptScoreQuery = getScriptScoreQuery(feature, person, type);
double minScore = 0.0;
if (type == 0) {
minScore = person.getFaceMinScore().doubleValue()/100;
} else if (type == 1) {
minScore = person.getBodyMinScore().doubleValue()/100;
}
return new SearchRequest.Builder().index(poolId).query(scriptScoreQuery).source(sourceConfig).size(matchResultSize).minScore(minScore).build();
return new SearchRequest.Builder().index(poolId).query(scriptScoreQuery).source(SOURCE_CONFIG)
.size(matchResultSize)
.minScore(minScore)
.timeout("50ms")
.build();
}
private List<Person> match0(SearchRequest searchRequest, Boolean agg) throws Exception {
String poolId = searchRequest.index().get(0);
List<Person> persons = new ArrayList<>();
// log.info("request:{}", JSON.tojsonS);
SearchResponse<SearchResultHit> search = client.search(searchRequest, SearchResultHit.class);
if (agg) {
Map aggregations = search.aggregations();
TermsAggregation byPersonId = (TermsAggregation) aggregations.get("by_personId");
log.info("aggregation:{}", byPersonId.toString());
try {
log.info("人员匹配查es开始");
SearchResponse<SearchResultHit> search = client.search(searchRequest, SearchResultHit.class);
if (agg) {
Map aggregations = search.aggregations();
TermsAggregation byPersonId = (TermsAggregation) aggregations.get("by_personId");
log.info("aggregation:{}", byPersonId.toString());
// List<Bucket> buckets = byPersonId.;
// for (Terms.Bucket bucket : buckets) {
// String personId = (String) bucket.getKey();
......@@ -536,34 +519,37 @@ public class PersonService {
// Person person = new Person().setPersonId(personId).setScore((float) value).setPersonPoolId(poolId);
// persons.add(person);
// }
} else {
HitsMetadata hits = search.hits();
// log.info("match0:{}", hits.toString());
List<Hit<SearchResultHit>> hits1 = hits.hits();
for (Hit item : hits1) {
SearchResultHit hit = (SearchResultHit) item.source();
Person p = new Person();
p.setPersonId((String) hit.getPersonId());
p.setAge(hit.getAge());
p.setGender((String) hit.getGender());
p.setChannelSerialNum(hit.getChannelSerialNum());
p.setBodyType(hit.getBody_type());
p.setCaptureUnid(hit.getUnid());
p.setCounttime(Optional.ofNullable((String)hit.getCounttime())
.map(x -> {
try {
return Constant.DATE_FORMAT.get().parse(x);
} catch (ParseException e) {
return null;
}
})
.orElse(null));
p.setScore(item.score().floatValue() * 100);
p.setPersonPoolId(item.index());
persons.add(p);
} else {
HitsMetadata hits = search.hits();
log.info("人员匹配, 耗时:{}", search.took());
List<Hit<SearchResultHit>> hits1 = hits.hits();
for (Hit item : hits1) {
SearchResultHit hit = (SearchResultHit) item.source();
Person p = new Person();
p.setPersonId((String) hit.getPersonId());
p.setAge(hit.getAge());
p.setGender((String) hit.getGender());
p.setChannelSerialNum(hit.getChannelSerialNum());
p.setBodyType(hit.getBody_type());
p.setCaptureUnid(hit.getUnid());
p.setCounttime(Optional.ofNullable((String)hit.getCounttime())
.map(x -> {
try {
return Constant.DATE_FORMAT.get().parse(x);
} catch (ParseException e) {
return null;
}
})
.orElse(null));
p.setScore(item.score().floatValue() * 100);
p.setPersonPoolId(item.index());
persons.add(p);
}
}
} catch (Exception e) {
log.error(" n", e);
}
return persons;
......
......@@ -27,6 +27,8 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import jakarta.annotation.Resource;
......@@ -34,6 +36,8 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* .
......@@ -64,6 +68,8 @@ public class PoolService {
@Value("${vion.index.merge.scheduler.max_thread_count}")
private Integer mergeThreadCount;
private final static ConcurrentMap<String, Boolean> poolMap = new ConcurrentHashMap<>();
/**
* 添加特征池
*
......@@ -81,9 +87,12 @@ public class PoolService {
String poolId = requestVo.getPoolId();
// log.info("特征池创建操作开始:[{}},IML:{}", poolId, requestVo.isUseILMPolicy());
try {
String shardsStr = String.valueOf(shards);
if (poolId.contains("staff") || poolId.contains("person")) {
shardsStr = "1";
}
IndexSettings.Builder settings = new IndexSettings.Builder()
.numberOfShards(String.valueOf(shards))
.numberOfShards(shardsStr)
.numberOfReplicas(String.valueOf(replicas));
if (StringUtils.isNotEmpty(translogDurability)) {
settings.translog(t -> t.durability(TranslogDurability.Async));
......@@ -102,6 +111,7 @@ public class PoolService {
TypeMapping.Builder builder = getCreateIndexContentBuilder();
CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(poolId).mappings(builder.build()).settings(settings.build()).build();
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest);
poolMap.put(poolId, true);
if (addPerson) {
List<Person> personPool = requestVo.getPersonPool();
......@@ -150,6 +160,7 @@ public class PoolService {
} else if (flushPool == 1) {
//删除索引
DeleteIndexResponse delete = client.indices().delete(new DeleteIndexRequest.Builder().index(poolId).build());
poolMap.remove(poolId);
}
log.info("特征池删除完成:[{}],FlushPool:[{}]", poolId, flushPool);
return ResponseVo.success(rid);
......@@ -202,26 +213,6 @@ public class PoolService {
}
// public ResponseVo deletePoolData(RequestVo requestVo) throws Exception {
// String rid = requestVo.getRid();
// Integer flushPool = requestVo.getFlushPool();
// String poolId = requestVo.getPoolId();
// Long mallId = requestVo.getMallId();
// log.info("特征池删除mallId:{}数据操作开始:[{}]", mallId,poolId);
// try {
// personService.deletePersonByMallId(poolId, mallId);
// log.info("特征池删除mallId:{}操作完成:[{}]", mallId, poolId);
// return ResponseVo.success(rid);
// } catch (ElasticsearchException e) {
// if (e.status() == 404) {
// return ResponseVo.poolIdNotExists(rid);
// } else {
// return ResponseVo.error(rid, e.getMessage());
// }
// }
// }
/**
* 修改特征池(添加人员)
*
......@@ -235,10 +226,9 @@ public class PoolService {
String poolId = requestVo.getPoolId();
List<Person> personPool = requestVo.getPersonPool();
Integer updateType = requestVo.getUpdateType();
// log.info("特征池修改操作开始:[{}],updateType:[{}]", poolId, updateType);
co.elastic.clients.elasticsearch.indices.ExistsRequest poolIdExists = new ExistsRequest.Builder().index(poolId).build();
BooleanResponse poolIdExistsResponse = client.indices().exists(poolIdExists);
if (!poolIdExistsResponse.value()) {
log.info("特征池修改操作开始:[{}],updateType:[{}],size:{}", poolId, updateType, personPool.size());
if (!existPool(poolId)) {
return ResponseVo.poolIdNotExists(rid);
}
try {
......@@ -246,6 +236,9 @@ public class PoolService {
if (bulkItemResponses != null && bulkItemResponses.errors()) {
log.info(bulkItemResponses.items().toString());
return ResponseVo.error(rid, bulkItemResponses.items().toString());
} else if (bulkItemResponses == null) {
log.info("bulkItemResponses is null");
return ResponseVo.error(rid, bulkItemResponses.items().toString());
}
log.info("特征池修改:[{}],updateType:[{}]", poolId, updateType);
......@@ -256,6 +249,25 @@ public class PoolService {
}
}
@Async
public void asyncModifyPool(String poolId, Person person) throws Exception {
if (!existPool(poolId)) {
return;
}
try {
BulkResponse bulkItemResponses = personService.addPerson(poolId, Arrays.asList(person));
if (bulkItemResponses != null && bulkItemResponses.errors()) {
log.info(bulkItemResponses.items().toString());
} else if (bulkItemResponses == null) {
log.info("bulkItemResponses is null");
}
log.info("特征池修改:[{}], bulkItemResponses: {}", poolId, bulkItemResponses);
} catch (ElasticsearchException e) {
log.error("modifyPool", e);
}
}
/**
* 查询特征池信息
*
......@@ -319,8 +331,26 @@ public class PoolService {
}
public boolean existPool(String poolId) throws IOException {
BooleanResponse exists = client.indices().exists(new ExistsRequest.Builder().index(poolId).build());
return exists.value();
if (poolMap.containsKey(poolId)) {
return true;
} else {
BooleanResponse exists = client.indices().exists(new ExistsRequest.Builder().index(poolId).build());
if (exists.value()) {
poolMap.put(poolId, true);
}
return exists.value();
}
}
@Scheduled(cron = "0 0/2 * * * ?")
public void checkPoolExist() throws IOException {
Map<String, Boolean> tmpMap = poolMap;
for (Map.Entry<String, Boolean> entry : tmpMap.entrySet()) {
BooleanResponse exists = client.indices().exists(new ExistsRequest.Builder().index(entry.getKey()).build());
if (!exists.value()) {
poolMap.remove(entry.getKey());
}
}
}
public List<PoolInfo> queryPoolInfo(String poolId) throws IOException {
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!