← 返回首页
🏎️

线程模型:Reactor与Proactor对比

📂 architecture ⏱ 4 min 741 words

线程模型:Reactor与Proactor对比

Reactor模型

Reactor模型通过事件驱动和非阻塞IO处理并发连接,是大多数网络框架的基础。

// 单Reactor单线程模型
public class SingleReactorSingleThread {
    
    private final Selector selector;
    private final ServerSocketChannel serverChannel;
    
    public SingleReactorSingleThread(int port) throws IOException {
        this.selector = Selector.open();
        this.serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        serverChannel.bind(new InetSocketAddress(port));
    }
    
    public void run() {
        while (true) {
            selector.select();
            
            Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
            
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                
                if (key.isAcceptable()) {
                    handleAccept(key);
                } else if (key.isReadable()) {
                    handleRead(key);
                } else if (key.isWritable()) {
                    handleWrite(key);
                }
            }
        }
    }
    
    private void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = serverChannel.accept();
        clientChannel.configureBlocking(false);
        clientChannel.register(selector, SelectionKey.OP_READ);
    }
    
    private void handleRead(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        
        int bytesRead = channel.read(buffer);
        if (bytesRead == -1) {
            channel.close();
            return;
        }
        
        // 处理数据
        buffer.flip();
        process(buffer);
    }
}

// 主从Reactor多线程模型
public class MainSubReactorMultiThread {
    
    private final Selector mainSelector;
    private final Selector[] subSelectors;
    private final ExecutorService workerPool;
    
    public MainSubReactorMultiThread(int port, int subReactorCount) throws IOException {
        this.mainSelector = Selector.open();
        this.subSelectors = new Selector[subReactorCount];
        this.workerPool = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors() * 2);
        
        for (int i = 0; i < subReactorCount; i++) {
            subSelectors[i] = Selector.open();
            new Thread(new SubReactor(subSelectors[i])).start();
        }
        
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.register(mainSelector, SelectionKey.OP_ACCEPT);
        serverChannel.bind(new InetSocketAddress(port));
    }
    
    public void run() {
        while (true) {
            mainSelector.select();
            
            Iterator<SelectionKey> keyIterator = 
                mainSelector.selectedKeys().iterator();
            
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove();
                
                if (key.isAcceptable()) {
                    handleAccept(key);
                }
            }
        }
    }
    
    private void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = 
            (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = serverChannel.accept();
        clientChannel.configureBlocking(false);
        
        // 轮询分配给子Reactor
        int index = (int) (System.currentTimeMillis() % subSelectors.length);
        subSelectors[index].wakeup();
        clientChannel.register(subSelectors[index], SelectionKey.OP_READ);
    }
    
    // 子Reactor处理IO事件
    private class SubReactor implements Runnable {
        private final Selector selector;
        
        SubReactor(Selector selector) {
            this.selector = selector;
        }
        
        @Override
        public void run() {
            while (true) {
                selector.select();
                
                Iterator<SelectionKey> keyIterator = 
                    selector.selectedKeys().iterator();
                
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    keyIterator.remove();
                    
                    if (key.isReadable()) {
                        // 将业务处理提交给工作线程池
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        
                        int bytesRead = channel.read(buffer);
                        if (bytesRead > 0) {
                            workerPool.submit(() -> process(buffer));
                        }
                    }
                }
            }
        }
    }
}

Proactor模型

Proactor模型将IO操作完全异步化,由内核完成IO操作后通知应用程序。

// Proactor模型实现
public class ProactorModel {
    
    private final AsynchronousChannelGroup channelGroup;
    private final CompletionHandler<AsynchronousSocketChannel, Void> acceptHandler;
    
    public ProactorModel(int port) throws IOException {
        this.channelGroup = AsynchronousChannelGroup
            .withFixedThreadPool(
                Runtime.getRuntime().availableProcessors(),
                Executors.defaultThreadFactory());
        
        AsynchronousServerSocketChannel serverChannel = 
            AsynchronousServerSocketChannel.open(channelGroup);
        serverChannel.bind(new InetSocketAddress(port));
        
        this.acceptHandler = new AcceptCompletionHandler(serverChannel);
        
        // 异步接受连接
        serverChannel.accept(null, acceptHandler);
    }
    
    // 接受连接完成处理器
    private class AcceptCompletionHandler 
            implements CompletionHandler<AsynchronousSocketChannel, Void> {
        
        private final AsynchronousServerSocketChannel serverChannel;
        
        AcceptCompletionHandler(AsynchronousServerSocketChannel serverChannel) {
            this.serverChannel = serverChannel;
        }
        
        @Override
        public void completed(AsynchronousSocketChannel result, Void attachment) {
            // 继续接受新连接
            serverChannel.accept(null, this);
            
            // 处理新连接
            handleConnection(result);
        }
        
        @Override
        public void failed(Throwable exc, Void attachment) {
            // 处理失败
        }
        
        private void handleConnection(AsynchronousSocketChannel channel) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            
            // 异步读取数据
            channel.read(buffer, buffer, 
                new ReadCompletionHandler(channel));
        }
    }
    
    // 读取完成处理器
    private class ReadCompletionHandler 
            implements CompletionHandler<Integer, ByteBuffer> {
        
        private final AsynchronousSocketChannel channel;
        
        ReadCompletionHandler(AsynchronousSocketChannel channel) {
            this.channel = channel;
        }
        
        @Override
        public void completed(Integer bytesRead, ByteBuffer buffer) {
            if (bytesRead == -1) {
                close();
                return;
            }
            
            buffer.flip();
            process(buffer);
            
            // 继续读取
            buffer.clear();
            channel.read(buffer, buffer, this);
        }
        
        @Override
        public void failed(Throwable exc, ByteBuffer buffer) {
            close();
        }
        
        private void close() {
            try {
                channel.close();
            } catch (IOException e) {
                // 处理关闭异常
            }
        }
    }
}

Reactor vs Proactor对比

# 线程模型对比
thread_models:
  reactor:
    name: "Reactor"
    description: "事件驱动,同步IO"
    io_type: "同步非阻塞"
    pros:
      - 实现简单
      - 资源占用少
      - 适合大多数场景
    cons:
      - 需要应用层处理IO
      - 并发度受限于线程数
    use_cases:
      - Web服务器
      - 聊天服务器
      - 游戏服务器
    examples:
      - "Netty"
      - "NIO"
      - "Mina"
  
  proactor:
    name: "Proactor"
    description: "完全异步,异步IO"
    io_type: "异步非阻塞"
    pros:
      - 并发度高
      - 资源利用率高
      - 适合高并发场景
    cons:
      - 实现复杂
      - 依赖操作系统支持
    use_cases:
      - 高性能文件服务器
      - 数据库服务器
      - 分布式存储
    examples:
      - "Boost.Asio"
      - "IOCP"
      - "libuv"

Netty线程模型

// Netty主从Reactor模型
@Component
public class NettyServer {
    
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private Channel serverChannel;
    
    @PostConstruct
    public void start() throws InterruptedException {
        // Boss线程组:处理新连接
        bossGroup = new NioEventLoopGroup(1);
        
        // Worker线程组:处理IO事件
        workerGroup = new NioEventLoopGroup(
            Runtime.getRuntime().availableProcessors() * 2);
        
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 128)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new IdleStateHandler(60, 0, 0));
                    pipeline.addLast(new LengthFieldBasedFrameDecoder(
                        1024 * 1024, 0, 4, 0, 4));
                    pipeline.addLast(new MessageDecoder());
                    pipeline.addLast(new MessageEncoder());
                    pipeline.addLast(new BusinessHandler());
                }
            });
        
        // 绑定端口
        serverChannel = bootstrap.bind(8080).sync().channel();
    }
    
    @PreDestroy
    public void stop() {
        if (serverChannel != null) {
            serverChannel.close();
        }
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }
}

// 业务处理Handler
@Sharable
public class BusinessHandler extends ChannelInboundHandlerAdapter {
    
    private final ExecutorService businessExecutor;
    
    public BusinessHandler() {
        this.businessExecutor = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors() * 2);
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 将业务处理提交给业务线程池
        businessExecutor.submit(() -> {
            try {
                Object result = processBusinessLogic(msg);
                
                // 将结果写回连接
                ctx.writeAndFlush(result);
                
            } catch (Exception e) {
                ctx.fireExceptionCaught(e);
            }
        });
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

线程模型的选择需要根据应用场景、性能要求和实现复杂度综合考虑,Reactor适合大多数场景,Proactor适合高并发高性能场景。