Commit f07d6fed by wenshuaiying

Merge branch 'develop_es8.15_merge_0624' into 'develop_es8.15_merge'

缓存 索引是否存在 & 查询接口 增加写入操作

See merge request !1
2 parents 8c5661a5 6f6a25b2
......@@ -3,8 +3,10 @@ package com.viontech.match;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync
@ComponentScan(basePackages = {"com.viontech"})public class MatchApp {
public static void main(String[] args) {
......
package com.viontech.match.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class AsyncConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(10000);
executor.setThreadNamePrefix("Async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
......@@ -56,8 +56,8 @@ public class ElasticsearchConfiguration {
connectionManager = new PoolingNHttpClientConnectionManager(new
DefaultConnectingIOReactor(ioReactorConfig));
// 设置最大连接数
connectionManager.setMaxTotal(200); // 整个连接池的最大连接数
connectionManager.setDefaultMaxPerRoute(40);
connectionManager.setMaxTotal(properties.getMaxConnTotal()); // 整个连接池的最大连接数
connectionManager.setDefaultMaxPerRoute(properties.getMaxConnPerRoute());
} catch (IOReactorException e) {
throw new RuntimeException(e);
}
......
......@@ -13,6 +13,7 @@ import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.Arrays;
/**
* .
......@@ -44,7 +45,15 @@ public class MainController {
case QueryPersonPool:
return poolService.queryPool(requestVo);
case MatchPerson:
return personService.matchPerson(requestVo);
ResponseVo matchPerson = personService.matchPerson(requestVo);
if (matchPerson.getMatchBodies() != null && matchPerson.getMatchBodies().size() > 0) {
requestVo.getPerson().setPersonId(matchPerson.getMatchBodies().get(0).getPersonId());
} else if(matchPerson.getMatchPersons() != null && matchPerson.getMatchPersons().size() > 0) {
requestVo.getPerson().setPersonId(matchPerson.getMatchPersons().get(0).getPersonId());
}
poolService.asyncModifyPool(requestVo.getPersonPoolId(), requestVo.getPerson());
return matchPerson;
case UpdatePerson:
return personService.updatePerson(requestVo);
case CountMatchPerson:
......
......@@ -56,6 +56,7 @@ import java.util.stream.Stream;
@Slf4j
public class PersonService {
private static final String[] FETCH_SOURCE = new String[]{"personId", "age", "gender", "fid", "counttime", "channelSerialNum", "body_type","unid"};
public static final SourceConfig SOURCE_CONFIG = new SourceConfig.Builder().filter(s -> s.includes(Arrays.asList(FETCH_SOURCE))).build();
@Resource
private ElasticsearchClient client;
@Resource
......@@ -577,14 +578,13 @@ public class PersonService {
// BoolQuery.Builder builder = getSearchSourceBuilder(feature, person, type);
Query scriptScoreQuery = getScriptScoreQuery(feature, person, type);
SourceConfig sourceConfig = new SourceConfig.Builder().filter(s -> s.includes(Arrays.asList(FETCH_SOURCE))).build();
double minScore = 0.0;
if (type == 0) {
minScore = person.getFaceMinScore().doubleValue()/100;
} else if (type == 1) {
minScore = person.getBodyMinScore().doubleValue()/100;
}
return new SearchRequest.Builder().index(poolId).query(scriptScoreQuery).source(sourceConfig)
return new SearchRequest.Builder().index(poolId).query(scriptScoreQuery).source(SOURCE_CONFIG)
.size(matchResultSize)
.minScore(minScore)
.timeout("50ms")
......@@ -596,6 +596,7 @@ public class PersonService {
List<Person> persons = new ArrayList<>();
// log.info("request:{}", JSON.tojsonS);
try {
log.info("人员匹配查es开始");
SearchResponse<SearchResultHit> search = client.search(searchRequest, SearchResultHit.class);
if (agg) {
Map aggregations = search.aggregations();
......@@ -611,7 +612,7 @@ public class PersonService {
// }
} else {
HitsMetadata hits = search.hits();
// log.info("match0:{}", hits.toString());
log.info("人员匹配, 耗时:{}", search.took());
List<Hit<SearchResultHit>> hits1 = hits.hits();
for (Hit item : hits1) {
SearchResultHit hit = (SearchResultHit) item.source();
......@@ -639,7 +640,7 @@ public class PersonService {
}
}
} catch (Exception e) {
log.error("match0 excepton", e.getMessage());
log.error(" n", e);
}
return persons;
......
......@@ -27,6 +27,7 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
......@@ -110,6 +111,7 @@ public class PoolService {
TypeMapping.Builder builder = getCreateIndexContentBuilder();
CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(poolId).mappings(builder.build()).settings(settings.build()).build();
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest);
poolMap.put(poolId, true);
if (addPerson) {
List<Person> personPool = requestVo.getPersonPool();
......@@ -158,6 +160,7 @@ public class PoolService {
} else if (flushPool == 1) {
//删除索引
DeleteIndexResponse delete = client.indices().delete(new DeleteIndexRequest.Builder().index(poolId).build());
poolMap.remove(poolId);
}
log.info("特征池删除完成:[{}],FlushPool:[{}]", poolId, flushPool);
return ResponseVo.success(rid);
......@@ -224,9 +227,8 @@ public class PoolService {
List<Person> personPool = requestVo.getPersonPool();
Integer updateType = requestVo.getUpdateType();
log.info("特征池修改操作开始:[{}],updateType:[{}],size:{}", poolId, updateType, personPool.size());
co.elastic.clients.elasticsearch.indices.ExistsRequest poolIdExists = new ExistsRequest.Builder().index(poolId).build();
BooleanResponse poolIdExistsResponse = client.indices().exists(poolIdExists);
if (!poolIdExistsResponse.value()) {
if (!existPool(poolId)) {
return ResponseVo.poolIdNotExists(rid);
}
try {
......@@ -247,6 +249,25 @@ public class PoolService {
}
}
@Async
public void asyncModifyPool(String poolId, Person person) throws Exception {
if (!existPool(poolId)) {
return;
}
try {
BulkResponse bulkItemResponses = personService.addPerson(poolId, Arrays.asList(person));
if (bulkItemResponses != null && bulkItemResponses.errors()) {
log.info(bulkItemResponses.items().toString());
} else if (bulkItemResponses == null) {
log.info("bulkItemResponses is null");
}
log.info("特征池修改:[{}], bulkItemResponses: {}", poolId, bulkItemResponses);
} catch (ElasticsearchException e) {
log.error("modifyPool", e);
}
}
/**
* 查询特征池信息
*
......@@ -325,7 +346,8 @@ public class PoolService {
public void checkPoolExist() throws IOException {
Map<String, Boolean> tmpMap = poolMap;
for (Map.Entry<String, Boolean> entry : tmpMap.entrySet()) {
if (!existPool(entry.getKey())) {
BooleanResponse exists = client.indices().exists(new ExistsRequest.Builder().index(entry.getKey()).build());
if (!exists.value()) {
poolMap.remove(entry.getKey());
}
}
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!