线程模型:Reactor与Proactor对比
线程模型:Reactor与Proactor对比
Reactor模型
Reactor模型通过事件驱动和非阻塞IO处理并发连接,是大多数网络框架的基础。
// 单Reactor单线程模型
public class SingleReactorSingleThread {
private final Selector selector;
private final ServerSocketChannel serverChannel;
public SingleReactorSingleThread(int port) throws IOException {
this.selector = Selector.open();
this.serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
serverChannel.bind(new InetSocketAddress(port));
}
public void run() {
while (true) {
selector.select();
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isAcceptable()) {
handleAccept(key);
} else if (key.isReadable()) {
handleRead(key);
} else if (key.isWritable()) {
handleWrite(key);
}
}
}
}
private void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
}
private void handleRead(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = channel.read(buffer);
if (bytesRead == -1) {
channel.close();
return;
}
// 处理数据
buffer.flip();
process(buffer);
}
}
// 主从Reactor多线程模型
public class MainSubReactorMultiThread {
private final Selector mainSelector;
private final Selector[] subSelectors;
private final ExecutorService workerPool;
public MainSubReactorMultiThread(int port, int subReactorCount) throws IOException {
this.mainSelector = Selector.open();
this.subSelectors = new Selector[subReactorCount];
this.workerPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2);
for (int i = 0; i < subReactorCount; i++) {
subSelectors[i] = Selector.open();
new Thread(new SubReactor(subSelectors[i])).start();
}
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.register(mainSelector, SelectionKey.OP_ACCEPT);
serverChannel.bind(new InetSocketAddress(port));
}
public void run() {
while (true) {
mainSelector.select();
Iterator<SelectionKey> keyIterator =
mainSelector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isAcceptable()) {
handleAccept(key);
}
}
}
}
private void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel =
(ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
// 轮询分配给子Reactor
int index = (int) (System.currentTimeMillis() % subSelectors.length);
subSelectors[index].wakeup();
clientChannel.register(subSelectors[index], SelectionKey.OP_READ);
}
// 子Reactor处理IO事件
private class SubReactor implements Runnable {
private final Selector selector;
SubReactor(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
while (true) {
selector.select();
Iterator<SelectionKey> keyIterator =
selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
if (key.isReadable()) {
// 将业务处理提交给工作线程池
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = channel.read(buffer);
if (bytesRead > 0) {
workerPool.submit(() -> process(buffer));
}
}
}
}
}
}
}
Proactor模型
Proactor模型将IO操作完全异步化,由内核完成IO操作后通知应用程序。
// Proactor模型实现
public class ProactorModel {
private final AsynchronousChannelGroup channelGroup;
private final CompletionHandler<AsynchronousSocketChannel, Void> acceptHandler;
public ProactorModel(int port) throws IOException {
this.channelGroup = AsynchronousChannelGroup
.withFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
Executors.defaultThreadFactory());
AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open(channelGroup);
serverChannel.bind(new InetSocketAddress(port));
this.acceptHandler = new AcceptCompletionHandler(serverChannel);
// 异步接受连接
serverChannel.accept(null, acceptHandler);
}
// 接受连接完成处理器
private class AcceptCompletionHandler
implements CompletionHandler<AsynchronousSocketChannel, Void> {
private final AsynchronousServerSocketChannel serverChannel;
AcceptCompletionHandler(AsynchronousServerSocketChannel serverChannel) {
this.serverChannel = serverChannel;
}
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) {
// 继续接受新连接
serverChannel.accept(null, this);
// 处理新连接
handleConnection(result);
}
@Override
public void failed(Throwable exc, Void attachment) {
// 处理失败
}
private void handleConnection(AsynchronousSocketChannel channel) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 异步读取数据
channel.read(buffer, buffer,
new ReadCompletionHandler(channel));
}
}
// 读取完成处理器
private class ReadCompletionHandler
implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel channel;
ReadCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Integer bytesRead, ByteBuffer buffer) {
if (bytesRead == -1) {
close();
return;
}
buffer.flip();
process(buffer);
// 继续读取
buffer.clear();
channel.read(buffer, buffer, this);
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
close();
}
private void close() {
try {
channel.close();
} catch (IOException e) {
// 处理关闭异常
}
}
}
}
Reactor vs Proactor对比
# 线程模型对比
thread_models:
reactor:
name: "Reactor"
description: "事件驱动,同步IO"
io_type: "同步非阻塞"
pros:
- 实现简单
- 资源占用少
- 适合大多数场景
cons:
- 需要应用层处理IO
- 并发度受限于线程数
use_cases:
- Web服务器
- 聊天服务器
- 游戏服务器
examples:
- "Netty"
- "NIO"
- "Mina"
proactor:
name: "Proactor"
description: "完全异步,异步IO"
io_type: "异步非阻塞"
pros:
- 并发度高
- 资源利用率高
- 适合高并发场景
cons:
- 实现复杂
- 依赖操作系统支持
use_cases:
- 高性能文件服务器
- 数据库服务器
- 分布式存储
examples:
- "Boost.Asio"
- "IOCP"
- "libuv"
Netty线程模型
// Netty主从Reactor模型
@Component
public class NettyServer {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel serverChannel;
@PostConstruct
public void start() throws InterruptedException {
// Boss线程组:处理新连接
bossGroup = new NioEventLoopGroup(1);
// Worker线程组:处理IO事件
workerGroup = new NioEventLoopGroup(
Runtime.getRuntime().availableProcessors() * 2);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(60, 0, 0));
pipeline.addLast(new LengthFieldBasedFrameDecoder(
1024 * 1024, 0, 4, 0, 4));
pipeline.addLast(new MessageDecoder());
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new BusinessHandler());
}
});
// 绑定端口
serverChannel = bootstrap.bind(8080).sync().channel();
}
@PreDestroy
public void stop() {
if (serverChannel != null) {
serverChannel.close();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
// 业务处理Handler
@Sharable
public class BusinessHandler extends ChannelInboundHandlerAdapter {
private final ExecutorService businessExecutor;
public BusinessHandler() {
this.businessExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 将业务处理提交给业务线程池
businessExecutor.submit(() -> {
try {
Object result = processBusinessLogic(msg);
// 将结果写回连接
ctx.writeAndFlush(result);
} catch (Exception e) {
ctx.fireExceptionCaught(e);
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
线程模型的选择需要根据应用场景、性能要求和实现复杂度综合考虑,Reactor适合大多数场景,Proactor适合高并发高性能场景。