Commit ec46c70d by xmh

部分

1 parent eb5ca491
...@@ -3,6 +3,7 @@ package com.viontech.model; ...@@ -3,6 +3,7 @@ package com.viontech.model;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.experimental.Accessors;
/** /**
* . * .
...@@ -12,7 +13,8 @@ import lombok.Setter; ...@@ -12,7 +13,8 @@ import lombok.Setter;
*/ */
@Getter @Getter
@Setter @Setter
public abstract class BaseModel { @Accessors(chain = true)
public class BaseModel {
protected Long deviceId; protected Long deviceId;
protected Long length; protected Long length;
protected Long firstInt; protected Long firstInt;
...@@ -27,37 +29,62 @@ public abstract class BaseModel { ...@@ -27,37 +29,62 @@ public abstract class BaseModel {
* *
* @return model * @return model
*/ */
public void decode(ByteBuf byteBuf) { public static BaseModel from(ByteBuf byteBuf) {
this.deviceId = byteBuf.readUnsignedInt();
this.length = byteBuf.readUnsignedInt();
this.firstInt = byteBuf.readUnsignedInt(); long deviceId = byteBuf.readUnsignedInt();
this.secondInt = byteBuf.readUnsignedInt(); long length = byteBuf.readUnsignedInt();
this.data = new byte[(int) (length - 16)]; long firstInt = byteBuf.readUnsignedInt();
byteBuf.readBytes(data); long secondInt = byteBuf.readUnsignedInt();
decodeData(); BaseModel model;
switch ((int) firstInt) {
case 0x00010100:
model = new LoginData();
break;
case 0x0001FFFF:
model = new KeepAlive();
break;
default:
return null;
}
model.setDeviceId(deviceId);
model.setLength(length);
model.setFirstInt(firstInt);
model.setSecondInt(secondInt);
if (length > 16) {
model.data = new byte[(int) (length - 16)];
byteBuf.readBytes(model.data);
model.decodeData();
}
return model;
} }
; public static int castLong2Int(long data) {
return (int) (data & 0xFF);
}
/** /**
* 将 model 转换为 byteBuf * 将 model 转换为 byteBuf
* *
* @return byteBuf * @return byteBuf
*/ */
public void encode(ByteBuf byteBuf) { public void to(ByteBuf byteBuf) {
encodeData(); encodeData();
byteBuf.writeInt((int) (deviceId & 0xff)); byteBuf.writeInt(castLong2Int(deviceId));
byteBuf.writeInt((int) (length & 0xff)); byteBuf.writeInt(castLong2Int(length));
byteBuf.writeInt((int) (firstInt & 0xff)); byteBuf.writeInt(castLong2Int(firstInt));
byteBuf.writeInt((int) (secondInt & 0xff)); byteBuf.writeInt(castLong2Int(secondInt));
byteBuf.writeBytes(data); if (data != null && data.length > 0) {
byteBuf.writeBytes(data);
}
} }
; protected void encodeData() {
}
abstract protected void encodeData();
abstract protected void decodeData(); protected void decodeData() {
}
} }
package com.viontech.model;
/**
* .
*
* @author 谢明辉
* @date 2020/8/21
*/
public class KeepAlive extends BaseModel{
@Override
protected void encodeData() {
}
@Override
protected void decodeData() {
}
}
package com.viontech.netty;
import com.viontech.model.BaseModel;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import java.util.List;
/**
* .
*
* @author 谢明辉
* @date 2020/8/21
*/
public class ByteToMessageCodecHandler extends ByteToMessageCodec<BaseModel> {
@Override
protected void encode(ChannelHandlerContext ctx, BaseModel msg, ByteBuf out) throws Exception {
msg.to(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
out.add(BaseModel.from(in));
}
}
package com.viontech.netty;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author 谢明辉
*/
@Slf4j
public class ChannelGroup {
/** 设备序列号和最后心跳的映射 */
public static final Map<String, Long> HEART_BEAT_MAP = new LinkedHashMap<>();
private static final Lock LOCK = new ReentrantLock();
/** 设备序列号和通道id的映射 */
private static final Map<Long, Channel> DEVICE_CHANNEL_MAP = new LinkedHashMap<>();
/**
* 设备注册
*
* @param deviceId 设备ID
* @param channel 通道
*/
public static void registered(Long deviceId, Channel channel) {
LOCK.lock();
try {
DEVICE_CHANNEL_MAP.put(deviceId, channel);
} finally {
LOCK.unlock();
}
log.info("设备注册成功:{}", deviceId);
}
/**
* @param deviceId 设备ID
* @param msg 要发送的消息
*/
public static void sendMessage(Long deviceId, Object msg) {
Channel channel = DEVICE_CHANNEL_MAP.get(deviceId);
try {
channel.writeAndFlush(msg).await();
} catch (Exception ignored) {
}
}
/**
* 广播消息
*
* @param msg 消息
*/
public static void broadcast(Object msg) {
for (Channel channel : DEVICE_CHANNEL_MAP.values()) {
try {
channel.writeAndFlush(msg).await();
} catch (Exception ignored) {
}
}
}
}
package com.viontech.netty;
import com.viontech.model.BaseModel;
import com.viontech.model.KeepAlive;
import com.viontech.model.LoginData;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
/**
* @author 谢明辉
*/
@Slf4j
public class NettyReceiverHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
log.info("channelRegistered----------------");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof BaseModel) {
BaseModel message = (BaseModel) msg;
//注册命令
if (message instanceof LoginData) {
BaseModel result = new BaseModel().setDeviceId(0L).setLength(16L).setFirstInt(0x00020100L);
LoginData logindata = (LoginData) message;
Long deviceId = logindata.getDeviceId();
if ("admin".equals(logindata.getUsername()) && "admin".equals(logindata.getPassword())) {
ChannelGroup.registered(deviceId, ctx.channel());
result.setSecondInt(0L);
} else {
result.setSecondInt(1L);
}
ctx.writeAndFlush(result).await();
} else if (message instanceof KeepAlive) {
log.info("收到心跳,设备ID:[{}]", message.getDeviceId());
}
ReferenceCountUtil.release(msg);
ctx.fireChannelReadComplete();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
log.info("断开连接:{}", ctx.channel().id().toString());
}
}
...@@ -42,6 +42,8 @@ public class NettyServer implements CommandLineRunner { ...@@ -42,6 +42,8 @@ public class NettyServer implements CommandLineRunner {
protected void initChannel(SocketChannel ch) { protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO)); ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024 * 1024 * 1000, 4, 4, -16, 0)); ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024 * 1024 * 1000, 4, 4, -16, 0));
ch.pipeline().addLast(new ByteToMessageCodecHandler());
ch.pipeline().addLast(new NettyReceiverHandler());
} }
}) })
.option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.SO_BACKLOG, 128)
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!