Netty架构:EventLoop与Pipeline
Netty架构:EventLoop与Pipeline
EventLoop线程模型
EventLoop是Netty的核心组件,负责处理所有IO事件和任务调度。
// 自定义EventLoop
public class CustomEventLoop extends SingleThreadEventLoop {
private final Queue<Runnable> taskQueue;
private final Set<SelectStrategy> strategies;
public CustomEventLoop(EventLoopGroup parent) {
super(parent);
this.taskQueue = new ConcurrentLinkedQueue<>();
this.strategies = new HashSet<>();
}
@Override
protected void run() {
for (;;) {
// 执行任务
runAllTasks();
// 处理IO事件
processSelectedKeys();
// 检查是否需要退出
if (isShuttingDown()) {
confirmShutdown();
break;
}
}
}
private void runAllTasks() {
Runnable task;
while ((task = taskQueue.poll()) != null) {
try {
task.run();
} catch (Exception e) {
// 处理异常
}
}
}
@Override
public EventLoopGroup next() {
return super.next();
}
@Override
public ChannelFuture register(ChannelPromise promise) {
promise.channel().unsafe().register(this, promise);
return promise;
}
}
// EventLoopGroup配置
@Component
public class EventLoopConfig {
@Bean("bossGroup")
public EventLoopGroup bossGroup() {
return new NioEventLoopGroup(1, new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "netty-boss-" + count.incrementAndGet());
}
});
}
@Bean("workerGroup")
public EventLoopGroup workerGroup() {
return new NioEventLoopGroup(
Runtime.getRuntime().availableProcessors() * 2,
new ThreadFactory() {
private final AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "netty-worker-" + count.incrementAndGet());
}
});
}
}
ChannelPipeline处理链
Pipeline是Netty的事件处理链,采用责任链模式处理IO事件。
// 自定义ChannelHandler
@Sharable
public class CustomChannelInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 连接激活
System.out.println("连接已建立: " + ctx.channel().remoteAddress());
// 继续传播事件
super.channelActive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 处理读取事件
System.out.println("收到消息: " + msg);
// 修改消息并传播
String modifiedMsg = processMessage(msg.toString());
ctx.fireChannelRead(modifiedMsg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 连接关闭
System.out.println("连接已关闭: " + ctx.channel().remoteAddress());
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 处理异常
cause.printStackTrace();
ctx.close();
}
private String processMessage(String msg) {
return "Processed: " + msg;
}
}
// 自定义ChannelOutboundHandler
@Sharable
public class CustomChannelOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
// 处理写入事件
System.out.println("发送消息: " + msg);
// 修改消息并传播
String modifiedMsg = processOutbound(msg.toString());
ctx.write(modifiedMsg, promise);
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
// 处理刷新事件
System.out.println("刷新缓冲区");
super.flush(ctx);
}
private String processOutbound(String msg) {
return "Outbound: " + msg;
}
}
// Pipeline配置
@Component
public class PipelineConfig {
@Bean
public ChannelInitializer<SocketChannel> channelInitializer(
@Qualifier("businessHandler") BusinessHandler businessHandler) {
return new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 入站处理器
pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("encoder", new MessageEncoder());
pipeline.addLast("idleState", new IdleStateHandler(60, 0, 0));
pipeline.addLast("business", businessHandler);
// 出站处理器
pipeline.addLast("outboundHandler", new CustomChannelOutboundHandler());
}
};
}
}
ByteBuf内存管理
ByteBuf是Netty的字节缓冲区,提供更灵活的内存管理机制。
// ByteBuf使用示例
@Component
public class ByteBufExample {
// ByteBuf的分配与释放
public void demonstrateByteBuf() {
// 分配堆缓冲区
ByteBuf heapBuf = Unpooled.buffer(256);
try {
// 写入数据
heapBuf.writeBytes("Hello, Netty!".getBytes());
// 读取数据
byte[] data = new byte[heapBuf.readableBytes()];
heapBuf.readBytes(data);
System.out.println("读取数据: " + new String(data));
} finally {
// 释放缓冲区
heapBuf.release();
}
// 分配直接缓冲区
ByteBuf directBuf = Unpooled.directBuffer(256);
try {
// 写入数据
directBuf.writeBytes("Direct Buffer".getBytes());
// 使用切片
ByteBuf slice = directBuf.slice(0, 6);
System.out.println("切片数据: " + slice.toString(StandardCharsets.UTF_8));
} finally {
directBuf.release();
}
}
// ByteBuf引用计数
public void demonstrateReferenceCounting() {
ByteBuf buf = Unpooled.buffer(256);
System.out.println("初始引用计数: " + buf.refCnt()); // 1
// 复制缓冲区(引用计数+1)
ByteBuf copied = buf.copy();
System.out.println("复制后引用计数: " + buf.refCnt()); // 2
// 释放复制的缓冲区
copied.release();
System.out.println("释放复制后引用计数: " + buf.refCnt()); // 1
// 释放原始缓冲区
buf.release();
System.out.println("释放后引用计数: " + buf.refCnt()); // 0
}
}
// 自定义编解码器
@Component
public class MessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 检查是否有足够的数据
if (in.readableBytes() < 4) {
return;
}
// 标记当前读取位置
in.markReaderIndex();
// 读取消息长度
int length = in.readInt();
// 检查数据是否完整
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
// 读取消息体
byte[] body = new byte[length];
in.readBytes(body);
// 解码消息
Message message = deserializeMessage(body);
// 传递给下一个处理器
out.add(message);
}
private Message deserializeMessage(byte[] body) {
// 实现反序列化逻辑
return new Message(body);
}
}
Netty性能优化
# Netty性能配置
netty:
server:
# Boss线程组配置
boss:
thread_count: 1
# Worker线程组配置
worker:
thread_count: 16
# Channel配置
channel:
so_backlog: 128
so_keepalive: true
tcp_nodelay: true
write_buffer_water_mark: "32KB-64KB"
# Idle状态检测
idle_state:
reader_idle_time: 60
writer_idle_time: 30
all_idle_time: 90
client:
# 连接超时
connect_timeout: 5000
# 重试配置
retry:
max_attempts: 3
delay: 1000
# 连接池配置
pool:
max_connections: 100
acquire_timeout: 30000
# 内存管理
memory:
# 预分配缓冲区
prefer_direct: true
# 内存泄漏检测
leak_detection: "PARANOID"
# 写缓冲区配置
write_buffer:
initial_capacity: 256
max_capacity: 65536
allocator: "pooled"
Netty通过EventLoop、Pipeline和ByteBuf三大核心组件,提供了高性能、可扩展的网络编程框架,是Java网络编程的首选框架。