← 返回首页
🏎️

Netty架构:EventLoop与Pipeline

📂 architecture ⏱ 4 min 655 words

Netty架构:EventLoop与Pipeline

EventLoop线程模型

EventLoop是Netty的核心组件,负责处理所有IO事件和任务调度。

// 自定义EventLoop
public class CustomEventLoop extends SingleThreadEventLoop {
    
    private final Queue<Runnable> taskQueue;
    private final Set<SelectStrategy> strategies;
    
    public CustomEventLoop(EventLoopGroup parent) {
        super(parent);
        this.taskQueue = new ConcurrentLinkedQueue<>();
        this.strategies = new HashSet<>();
    }
    
    @Override
    protected void run() {
        for (;;) {
            // 执行任务
            runAllTasks();
            
            // 处理IO事件
            processSelectedKeys();
            
            // 检查是否需要退出
            if (isShuttingDown()) {
                confirmShutdown();
                break;
            }
        }
    }
    
    private void runAllTasks() {
        Runnable task;
        while ((task = taskQueue.poll()) != null) {
            try {
                task.run();
            } catch (Exception e) {
                // 处理异常
            }
        }
    }
    
    @Override
    public EventLoopGroup next() {
        return super.next();
    }
    
    @Override
    public ChannelFuture register(ChannelPromise promise) {
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}

// EventLoopGroup配置
@Component
public class EventLoopConfig {
    
    @Bean("bossGroup")
    public EventLoopGroup bossGroup() {
        return new NioEventLoopGroup(1, new ThreadFactory() {
            private final AtomicInteger count = new AtomicInteger(0);
            
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "netty-boss-" + count.incrementAndGet());
            }
        });
    }
    
    @Bean("workerGroup")
    public EventLoopGroup workerGroup() {
        return new NioEventLoopGroup(
            Runtime.getRuntime().availableProcessors() * 2,
            new ThreadFactory() {
                private final AtomicInteger count = new AtomicInteger(0);
                
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "netty-worker-" + count.incrementAndGet());
                }
            });
    }
}

ChannelPipeline处理链

Pipeline是Netty的事件处理链,采用责任链模式处理IO事件。

// 自定义ChannelHandler
@Sharable
public class CustomChannelInboundHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 连接激活
        System.out.println("连接已建立: " + ctx.channel().remoteAddress());
        
        // 继续传播事件
        super.channelActive(ctx);
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 处理读取事件
        System.out.println("收到消息: " + msg);
        
        // 修改消息并传播
        String modifiedMsg = processMessage(msg.toString());
        ctx.fireChannelRead(modifiedMsg);
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 连接关闭
        System.out.println("连接已关闭: " + ctx.channel().remoteAddress());
        
        super.channelInactive(ctx);
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 处理异常
        cause.printStackTrace();
        ctx.close();
    }
    
    private String processMessage(String msg) {
        return "Processed: " + msg;
    }
}

// 自定义ChannelOutboundHandler
@Sharable
public class CustomChannelOutboundHandler extends ChannelOutboundHandlerAdapter {
    
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) 
            throws Exception {
        // 处理写入事件
        System.out.println("发送消息: " + msg);
        
        // 修改消息并传播
        String modifiedMsg = processOutbound(msg.toString());
        ctx.write(modifiedMsg, promise);
    }
    
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        // 处理刷新事件
        System.out.println("刷新缓冲区");
        
        super.flush(ctx);
    }
    
    private String processOutbound(String msg) {
        return "Outbound: " + msg;
    }
}

// Pipeline配置
@Component
public class PipelineConfig {
    
    @Bean
    public ChannelInitializer<SocketChannel> channelInitializer(
            @Qualifier("businessHandler") BusinessHandler businessHandler) {
        return new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) {
                ChannelPipeline pipeline = ch.pipeline();
                
                // 入站处理器
                pipeline.addLast("decoder", new MessageDecoder());
                pipeline.addLast("encoder", new MessageEncoder());
                pipeline.addLast("idleState", new IdleStateHandler(60, 0, 0));
                pipeline.addLast("business", businessHandler);
                
                // 出站处理器
                pipeline.addLast("outboundHandler", new CustomChannelOutboundHandler());
            }
        };
    }
}

ByteBuf内存管理

ByteBuf是Netty的字节缓冲区,提供更灵活的内存管理机制。

// ByteBuf使用示例
@Component
public class ByteBufExample {
    
    // ByteBuf的分配与释放
    public void demonstrateByteBuf() {
        // 分配堆缓冲区
        ByteBuf heapBuf = Unpooled.buffer(256);
        
        try {
            // 写入数据
            heapBuf.writeBytes("Hello, Netty!".getBytes());
            
            // 读取数据
            byte[] data = new byte[heapBuf.readableBytes()];
            heapBuf.readBytes(data);
            
            System.out.println("读取数据: " + new String(data));
            
        } finally {
            // 释放缓冲区
            heapBuf.release();
        }
        
        // 分配直接缓冲区
        ByteBuf directBuf = Unpooled.directBuffer(256);
        
        try {
            // 写入数据
            directBuf.writeBytes("Direct Buffer".getBytes());
            
            // 使用切片
            ByteBuf slice = directBuf.slice(0, 6);
            System.out.println("切片数据: " + slice.toString(StandardCharsets.UTF_8));
            
        } finally {
            directBuf.release();
        }
    }
    
    // ByteBuf引用计数
    public void demonstrateReferenceCounting() {
        ByteBuf buf = Unpooled.buffer(256);
        
        System.out.println("初始引用计数: " + buf.refCnt()); // 1
        
        // 复制缓冲区(引用计数+1)
        ByteBuf copied = buf.copy();
        System.out.println("复制后引用计数: " + buf.refCnt()); // 2
        
        // 释放复制的缓冲区
        copied.release();
        System.out.println("释放复制后引用计数: " + buf.refCnt()); // 1
        
        // 释放原始缓冲区
        buf.release();
        System.out.println("释放后引用计数: " + buf.refCnt()); // 0
    }
}

// 自定义编解码器
@Component
public class MessageDecoder extends ByteToMessageDecoder {
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 检查是否有足够的数据
        if (in.readableBytes() < 4) {
            return;
        }
        
        // 标记当前读取位置
        in.markReaderIndex();
        
        // 读取消息长度
        int length = in.readInt();
        
        // 检查数据是否完整
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }
        
        // 读取消息体
        byte[] body = new byte[length];
        in.readBytes(body);
        
        // 解码消息
        Message message = deserializeMessage(body);
        
        // 传递给下一个处理器
        out.add(message);
    }
    
    private Message deserializeMessage(byte[] body) {
        // 实现反序列化逻辑
        return new Message(body);
    }
}

Netty性能优化

# Netty性能配置
netty:
  server:
    # Boss线程组配置
    boss:
      thread_count: 1
    
    # Worker线程组配置
    worker:
      thread_count: 16
    
    # Channel配置
    channel:
      so_backlog: 128
      so_keepalive: true
      tcp_nodelay: true
      write_buffer_water_mark: "32KB-64KB"
    
    # Idle状态检测
    idle_state:
      reader_idle_time: 60
      writer_idle_time: 30
      all_idle_time: 90
  
  client:
    # 连接超时
    connect_timeout: 5000
    
    # 重试配置
    retry:
      max_attempts: 3
      delay: 1000
    
    # 连接池配置
    pool:
      max_connections: 100
      acquire_timeout: 30000
  
  # 内存管理
  memory:
    # 预分配缓冲区
    prefer_direct: true
    
    # 内存泄漏检测
    leak_detection: "PARANOID"
    
    # 写缓冲区配置
    write_buffer:
      initial_capacity: 256
      max_capacity: 65536
      allocator: "pooled"

Netty通过EventLoop、Pipeline和ByteBuf三大核心组件,提供了高性能、可扩展的网络编程框架,是Java网络编程的首选框架。