← 返回首页

Netty高级特性详解

📂 java ⏱ 3 min 489 words

Netty高级特性详解

ByteBuf内存管理

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public class ByteBufDemo {
    public static void main(String[] args) {
        // 创建ByteBuf
        ByteBuf buf = Unpooled.buffer(256);

        // 写入数据
        buf.writeBytes("Hello".getBytes());

        // 读取数据
        byte[] data = new byte[buf.readableBytes()];
        buf.readBytes(data);

        // 池化ByteBuf(推荐)
        ByteBuf pooled = PooledByteBufAllocator.DEFAULT.buffer(1024);

        // 零拷贝切片
        ByteBuf slice = buf.slice(0, 5);

        // 复制
        ByteBuf copy = buf.copy();
    }
}

零拷贝

public class ZeroCopyDemo {
    // 传统拷贝
    public void traditionalCopy(String path) throws IOException {
        FileChannel in = new FileInputStream(path).getChannel();
        FileChannel out = new FileOutputStream("output.txt").getChannel();
        // 经过用户空间和内核空间的多次拷贝
        in.transferTo(0, in.size(), out);
    }

    // Netty零拷贝
    public void nettyZeroCopy(Channel channel, String path) throws IOException {
        FileRegion region = new DefaultFileRegion(
            new FileInputStream(path).getChannel(), 0, 1024
        );
        channel.writeAndFlush(region);
    }

    // CompositeByteBuf组合多个Buffer
    public void composite() {
        CompositeByteBuf composite = Unpooled.compositeBuffer();
        ByteBuf header = Unpooled.buffer(10);
        ByteBuf body = Unpooled.buffer(100);

        composite.addComponent(true, header);
        composite.addComponent(true, body);
        // 逻辑上是一个Buffer,实际不拷贝数据
    }
}

自定义协议编解码

// 协议格式:魔数(4) + 版本(1) + 序列化类型(1) + 消息类型(1) + 请求ID(4) + 数据长度(4) + 数据(n)

public class NettyMessage {
    private int magicNumber = 0x12345678;
    private byte version = 1;
    private byte serializeType = 1;
    private byte messageType = 1;
    private int requestId;
    private byte[] body;
}

public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf out) {
        out.writeInt(msg.getMagicNumber());
        out.writeByte(msg.getVersion());
        out.writeByte(msg.getSerializeType());
        out.writeByte(msg.getMessageType());
        out.writeInt(msg.getRequestId());
        out.writeInt(msg.getBody().length);
        out.writeBytes(msg.getBody());
    }
}

public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
    public NettyMessageDecoder() {
        super(1024 * 1024, 12, 4, 0, 0);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) return null;

        try {
            int magic = frame.readInt();
            byte version = frame.readByte();
            byte serializeType = frame.readByte();
            byte messageType = frame.readByte();
            int requestId = frame.readInt();
            int length = frame.readInt();
            byte[] body = new byte[length];
            frame.readBytes(body);

            NettyMessage message = new NettyMessage();
            message.setVersion(version);
            message.setSerializeType(serializeType);
            message.setMessageType(messageType);
            message.setRequestId(requestId);
            message.setBody(body);
            return message;
        } finally {
            frame.release();
        }
    }
}

线程模型优化

public class OptimizedNettyServer {
    public void start() throws InterruptedException {
        // 主线程组:处理连接请求
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);

        // 工作线程组:处理IO事件
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        // 业务线程组:处理耗时业务
        EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(16);

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ch.pipeline()
                        .addLast(new NettyMessageDecoder())
                        .addLast(new NettyMessageEncoder())
                        .addLast(businessGroup, new BusinessHandler());
                }
            });
    }
}

连接池

public class NettyClientPool {
    private final Bootstrap bootstrap;
    private final ChannelPool pool;

    public NettyClientPool(String host, int port, int maxSize) {
        EventLoopGroup group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ch.pipeline()
                        .addLast(new StringDecoder())
                        .addLast(new StringEncoder());
                }
            })
            .remoteAddress(host, port);

        pool = new FixedChannelPool(bootstrap, new SimpleChannelHealthChecker(), maxSize);
    }

    public CompletableFuture<String> send(String message) {
        CompletableFuture<String> future = new CompletableFuture<>();
        pool.acquire().addListener(future2 -> {
            Channel channel = (Channel) future2.getNow();
            channel.writeAndFlush(message).addListener(f -> {
                pool.release(channel);
                future.complete(message);
            });
        });
        return future;
    }
}

SSL/TLS支持

public class SSLServer {
    public void start() throws Exception {
        SslContext sslCtx = SslContextBuilder.forServer(
            new File("server.crt"), new File("server.key")
        ).build();

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) {
                ch.pipeline()
                    .addLast(sslCtx.newHandler(ch.alloc()))
                    .addLast(new ServerHandler());
            }
        });
    }
}

总结

Netty的高级特性包括ByteBuf内存管理、零拷贝、自定义协议和线程模型优化。这些特性是构建高性能网络应用的关键。