Commit a21d76bc by HlQ

[add]

1.mqtt client 使用虚拟线程
2.LogAspect 截断过长的请求参数
3.RestClient 添加连接池配置
4.agent 绑定指标下发,改为分批下发
5.agent 批量绑定指标添加通过 mqtt server 下发操作
[chg] Event 实体类的 source 字段解释修改
1 parent e77da2e7
......@@ -55,13 +55,17 @@ public class LogAspect {
MDC.put("requestId", (String) request.getAttribute("requestId"));
var ip = ServletUtil.getClientIP(request);
var args = objectMapper.writeValueAsString(request.getParameterMap());
if (args.length() > 2000) {
args = args.substring(0, 2000);
}
log.info("Request URL:{}, Method:{}, IP:{}, {}, Operator:{}, Args:{}, Body:{}",
request.getRequestURI(),
request.getMethod(),
ip,
IPUtil.getIp(ip),
username,
objectMapper.writeValueAsString(request.getParameterMap()),
args,
StrUtil.replaceChars(ServletUtil.getBody(request), " \r\n", "")
);
}
......
package vion.config;
import net.dreamlu.iot.mqtt.spring.client.MqttClientCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.Executors;
/**
* mqtt客户端自定义配置
*/
@Configuration(proxyBeanMethods = false)
public class MqttClientCustomizerConfig {
@Bean
public MqttClientCustomizer mqttClientCustomizer() {
return creator -> creator.mqttExecutor(Executors.newVirtualThreadPerTaskExecutor());
}
}
......@@ -3,9 +3,14 @@ package vion.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.JdkClientHttpRequestFactory;
import org.springframework.web.client.RestClient;
import org.springframework.web.util.DefaultUriBuilderFactory;
import java.net.http.HttpClient;
import java.time.Duration;
import java.util.concurrent.Executors;
/**
* @author vion
* @date 2024/7/19
......@@ -18,6 +23,7 @@ public class RestClientConfig {
public RestClient getBongRestClient() {
return RestClient.builder()
.baseUrl("https://proapi.xbongbong.com")
.requestFactory(new JdkClientHttpRequestFactory(httpClient()))
.build();
}
......@@ -29,6 +35,7 @@ public class RestClientConfig {
uriBuilderFactory.setEncodingMode(DefaultUriBuilderFactory.EncodingMode.NONE);
return RestClient.builder()
.uriBuilderFactory(uriBuilderFactory)
.requestFactory(new JdkClientHttpRequestFactory(httpClient()))
.build();
}
......@@ -36,6 +43,14 @@ public class RestClientConfig {
public RestClient getdingRestClient() {
return RestClient.builder()
.baseUrl("https://api.dingtalk.com")
.requestFactory(new JdkClientHttpRequestFactory(httpClient()))
.build();
}
private HttpClient httpClient() {
return HttpClient.newBuilder()
.executor(Executors.newVirtualThreadPerTaskExecutor()) // 使用虚拟线程执行器
.connectTimeout(Duration.ofSeconds(5))
.build();
}
}
......@@ -35,7 +35,7 @@ public class EventDTO extends BaseDTO {
private String eventType;
/**
* 来源 1:系统 2:设备 3:平台
* 类别 设备类1 客流数据类2 聚类精度类3 其他4
*/
private Short source;
......
......@@ -8,6 +8,7 @@ import net.dreamlu.iot.mqtt.spring.client.MqttClientSubscribe;
import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.dromara.hutool.core.collection.CollUtil;
import org.dromara.hutool.core.text.StrUtil;
import org.dromara.hutool.core.thread.ThreadUtil;
import org.springframework.stereotype.Service;
import vion.constant.MqttMessageType;
import vion.model.monitor.EventRecord;
......@@ -94,8 +95,13 @@ public class MqttClientMessageListener {
return;
}
log.info("agent:{} topic:{} 开始下发事件监测", agentUid, topic);
for (RAgentEvent agentEvent : list) {
client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2);
// agent绑定事件过多,分批下发
var partition = CollUtil.partition(list, 5000);
for (List<RAgentEvent> rAgentEvents : partition) {
for (RAgentEvent agentEvent : rAgentEvents) {
client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2);
}
ThreadUtil.sleep(1000L);
}
log.info("agent:{} topic:{} 结束下发事件监测", agentUid, topic);
}
......
......@@ -51,7 +51,7 @@ public class Event extends BaseDTO {
private String eventType;
/**
* 来源 1:系统 2:设备 3:平台
* 类别 设备类1 客流数据类2 聚类精度类3 其他4
*/
@TableField(value = "\"source\"")
private Short source;
......
......@@ -94,7 +94,10 @@ public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> imp
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().stream()
.map(rec -> (rec.getStatus() != 200 || rec.getBodyStatus() != 200 || rec.getResponseTime() > 1000L) ? rec.getName() : "")
.map(rec -> (rec.getStatus() != 200
|| rec.getBodyStatus() != 200
|| rec.getResponseTime() > 1000L) ? rec.getName() : "success")
.distinct()
.filter(StrUtil::isNotBlank)
.collect(Collectors.joining(","))
));
......@@ -342,6 +345,7 @@ public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> imp
accountList.forEach(a -> {
if (uid2AccMap.containsKey(a.getUid())) {
var acc = uid2AccMap.get(a.getUid());
a.setAgentUid(agentUid);
a.setId(acc.getId());
updAccList.add(a);
} else {
......@@ -356,6 +360,7 @@ public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> imp
mallList.forEach(m -> {
if (uid2MallMap.containsKey(m.getUid())) {
var mall = uid2MallMap.get(m.getUid());
m.setAgentUid(agentUid);
m.setId(mall.getId());
updMallList.add(m);
} else {
......@@ -403,8 +408,6 @@ public class AgentServiceImpl extends MPJBaseServiceImpl<AgentMapper, Agent> imp
}
var obj = JsonUtil.createObj();
obj.put("result", "allow")
// todo expire_at 用法暂不清楚
// obj.put("expire_at", Instant.now().getEpochSecond() + 15);
.put("superuser", false)
.putPOJO("acl", JsonUtil.createArr()
.addPOJO(JsonUtil.createObj()
......
......@@ -9,6 +9,7 @@ import net.dreamlu.iot.mqtt.spring.client.MqttClientTemplate;
import org.dromara.hutool.core.collection.CollUtil;
import org.dromara.hutool.core.text.StrUtil;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import vion.mapper.monitor.MallMapper;
import vion.mapper.monitor.RAgentEventMapper;
import vion.model.monitor.Mall;
......@@ -77,16 +78,24 @@ public class RAgentEventServiceImpl extends MPJBaseServiceImpl<RAgentEventMapper
log.error(msg);
return msg;
}
agentEvents.forEach(agentEvent -> client.publish(topic, JsonUtil.toJsonByte(agentEvent),
MqttQoS.QOS2));
agentEvents.forEach(agentEvent -> client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2));
return this.removeBatchByIds(idList) ? "解绑成功" : "解绑失败";
}
@Override
@Transactional(rollbackFor = Exception.class)
public String bindBatch(String agentUid, RAgentEvent dto) {
var topic = TopicUtil.getEventTopic(agentUid);
if (StrUtil.isBlank(topic)) {
var msg = StrUtil.format("agent:{} 获取topic失败", agentUid);
log.error(msg);
return msg;
}
if (StrUtil.isNotBlank(dto.getAccountUid())) {
var accountUid = dto.getAccountUid();
var mallList = mallMapper.selectList(Wrappers.<Mall>lambdaQuery().eq(Mall::getAccountUid, accountUid));
mallList.forEach(m -> m.setAttention((short) 1));
mallMapper.updateById(mallList);
var agentEventList = mallList.stream().map(m -> {
var rAgentEvent = new RAgentEvent();
rAgentEvent.setAgentUid(agentUid);
......@@ -100,10 +109,13 @@ public class RAgentEventServiceImpl extends MPJBaseServiceImpl<RAgentEventMapper
return rAgentEvent;
}).toList();
this.saveBatch(agentEventList);
agentEventList.forEach(agentEvent -> client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2));
}
if (CollUtil.isNotEmpty(dto.getMallUidList())) {
var mallUidList = dto.getMallUidList();
var mallList = mallMapper.selectList(Wrappers.<Mall>lambdaQuery().in(Mall::getUid, mallUidList));
mallList.forEach(m -> m.setAttention((short) 1));
mallMapper.updateById(mallList);
var agentEventList = mallList.stream().map(m -> {
var rAgentEvent = new RAgentEvent();
rAgentEvent.setAgentUid(agentUid);
......@@ -117,6 +129,7 @@ public class RAgentEventServiceImpl extends MPJBaseServiceImpl<RAgentEventMapper
return rAgentEvent;
}).toList();
this.saveBatch(agentEventList);
agentEventList.forEach(agentEvent -> client.publish(topic, JsonUtil.toJsonByte(agentEvent), MqttQoS.QOS2));
}
return "批量绑定成功";
}
......
......@@ -36,7 +36,7 @@ public class EventVO {
private String eventType;
/**
* 来源 1:系统 2:设备 3:平台
* 类别 设备类1 客流数据类2 聚类精度类3 其他4
*/
private Short source;
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!