Commit a4242f63 by wenshuaiying

增加异步:处理添加person到es操作

1 parent 41a3e859
package com.viontech.match.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class AsyncConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(50);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(10000);
executor.setThreadNamePrefix("Async-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
......@@ -46,8 +46,7 @@ public class MainController {
return poolService.queryPool(requestVo);
case MatchPerson:
ResponseVo matchPerson = personService.matchPerson(requestVo);
requestVo.setPersonPool(Arrays.asList(requestVo.getPerson()));
poolService.modifyPool(requestVo);
poolService.asyncModifyPool(requestVo.getPersonPoolId(), requestVo.getPerson());
return matchPerson;
case UpdatePerson:
return personService.updatePerson(requestVo);
......
......@@ -27,6 +27,7 @@ import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
......@@ -110,6 +111,7 @@ public class PoolService {
TypeMapping.Builder builder = getCreateIndexContentBuilder();
CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(poolId).mappings(builder.build()).settings(settings.build()).build();
CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest);
poolMap.put(poolId, true);
if (addPerson) {
List<Person> personPool = requestVo.getPersonPool();
......@@ -158,6 +160,7 @@ public class PoolService {
} else if (flushPool == 1) {
//删除索引
DeleteIndexResponse delete = client.indices().delete(new DeleteIndexRequest.Builder().index(poolId).build());
poolMap.remove(poolId);
}
log.info("特征池删除完成:[{}],FlushPool:[{}]", poolId, flushPool);
return ResponseVo.success(rid);
......@@ -224,9 +227,8 @@ public class PoolService {
List<Person> personPool = requestVo.getPersonPool();
Integer updateType = requestVo.getUpdateType();
log.info("特征池修改操作开始:[{}],updateType:[{}],size:{}", poolId, updateType, personPool.size());
co.elastic.clients.elasticsearch.indices.ExistsRequest poolIdExists = new ExistsRequest.Builder().index(poolId).build();
BooleanResponse poolIdExistsResponse = client.indices().exists(poolIdExists);
if (!poolIdExistsResponse.value()) {
if (!existPool(poolId)) {
return ResponseVo.poolIdNotExists(rid);
}
try {
......@@ -247,6 +249,25 @@ public class PoolService {
}
}
@Async
public void asyncModifyPool(String poolId, Person person) throws Exception {
if (!existPool(poolId)) {
return;
}
try {
BulkResponse bulkItemResponses = personService.addPerson(poolId, Arrays.asList(person));
if (bulkItemResponses != null && bulkItemResponses.errors()) {
log.info(bulkItemResponses.items().toString());
} else if (bulkItemResponses == null) {
log.info("bulkItemResponses is null");
}
log.info("特征池修改:[{}], bulkItemResponses: {}", poolId, bulkItemResponses);
} catch (ElasticsearchException e) {
log.error("modifyPool", e);
}
}
/**
* 查询特征池信息
*
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!