Netty高级特性详解
Netty高级特性详解
ByteBuf内存管理
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class ByteBufDemo {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buf = Unpooled.buffer(256);
// 写入数据
buf.writeBytes("Hello".getBytes());
// 读取数据
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
// 池化ByteBuf(推荐)
ByteBuf pooled = PooledByteBufAllocator.DEFAULT.buffer(1024);
// 零拷贝切片
ByteBuf slice = buf.slice(0, 5);
// 复制
ByteBuf copy = buf.copy();
}
}
零拷贝
public class ZeroCopyDemo {
// 传统拷贝
public void traditionalCopy(String path) throws IOException {
FileChannel in = new FileInputStream(path).getChannel();
FileChannel out = new FileOutputStream("output.txt").getChannel();
// 经过用户空间和内核空间的多次拷贝
in.transferTo(0, in.size(), out);
}
// Netty零拷贝
public void nettyZeroCopy(Channel channel, String path) throws IOException {
FileRegion region = new DefaultFileRegion(
new FileInputStream(path).getChannel(), 0, 1024
);
channel.writeAndFlush(region);
}
// CompositeByteBuf组合多个Buffer
public void composite() {
CompositeByteBuf composite = Unpooled.compositeBuffer();
ByteBuf header = Unpooled.buffer(10);
ByteBuf body = Unpooled.buffer(100);
composite.addComponent(true, header);
composite.addComponent(true, body);
// 逻辑上是一个Buffer,实际不拷贝数据
}
}
自定义协议编解码
// 协议格式:魔数(4) + 版本(1) + 序列化类型(1) + 消息类型(1) + 请求ID(4) + 数据长度(4) + 数据(n)
public class NettyMessage {
private int magicNumber = 0x12345678;
private byte version = 1;
private byte serializeType = 1;
private byte messageType = 1;
private int requestId;
private byte[] body;
}
public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf out) {
out.writeInt(msg.getMagicNumber());
out.writeByte(msg.getVersion());
out.writeByte(msg.getSerializeType());
out.writeByte(msg.getMessageType());
out.writeInt(msg.getRequestId());
out.writeInt(msg.getBody().length);
out.writeBytes(msg.getBody());
}
}
public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
public NettyMessageDecoder() {
super(1024 * 1024, 12, 4, 0, 0);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) return null;
try {
int magic = frame.readInt();
byte version = frame.readByte();
byte serializeType = frame.readByte();
byte messageType = frame.readByte();
int requestId = frame.readInt();
int length = frame.readInt();
byte[] body = new byte[length];
frame.readBytes(body);
NettyMessage message = new NettyMessage();
message.setVersion(version);
message.setSerializeType(serializeType);
message.setMessageType(messageType);
message.setRequestId(requestId);
message.setBody(body);
return message;
} finally {
frame.release();
}
}
}
线程模型优化
public class OptimizedNettyServer {
public void start() throws InterruptedException {
// 主线程组:处理连接请求
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 工作线程组:处理IO事件
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 业务线程组:处理耗时业务
EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(16);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new NettyMessageDecoder())
.addLast(new NettyMessageEncoder())
.addLast(businessGroup, new BusinessHandler());
}
});
}
}
连接池
public class NettyClientPool {
private final Bootstrap bootstrap;
private final ChannelPool pool;
public NettyClientPool(String host, int port, int maxSize) {
EventLoopGroup group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder());
}
})
.remoteAddress(host, port);
pool = new FixedChannelPool(bootstrap, new SimpleChannelHealthChecker(), maxSize);
}
public CompletableFuture<String> send(String message) {
CompletableFuture<String> future = new CompletableFuture<>();
pool.acquire().addListener(future2 -> {
Channel channel = (Channel) future2.getNow();
channel.writeAndFlush(message).addListener(f -> {
pool.release(channel);
future.complete(message);
});
});
return future;
}
}
SSL/TLS支持
public class SSLServer {
public void start() throws Exception {
SslContext sslCtx = SslContextBuilder.forServer(
new File("server.crt"), new File("server.key")
).build();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(sslCtx.newHandler(ch.alloc()))
.addLast(new ServerHandler());
}
});
}
}
总结
Netty的高级特性包括ByteBuf内存管理、零拷贝、自定义协议和线程模型优化。这些特性是构建高性能网络应用的关键。