Netty框架入门详解
Netty框架入门详解
什么是Netty
Netty是一个基于Java NIO的异步、事件驱动的网络应用框架,用于快速开发高性能、高可靠性的网络服务器和客户端。
核心概念
┌─────────────────────────────────────────┐
│ Netty架构 │
├─────────────────────────────────────────┤
│ Bootstrap (启动器) │
│ ├── EventLoopGroup (事件循环组) │
│ ├── Channel (通道) │
│ ├── ChannelPipeline (处理器链) │
│ │ ├── ChannelInboundHandler │
│ │ └── ChannelOutboundHandler │
│ └── ChannelHandler (处理器) │
└─────────────────────────────────────────┘
依赖配置
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.101.Final</version>
</dependency>
简单服务器
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
private final int port;
public NettyServer(int port) {
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("服务器启动在端口: " + port);
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyServer(8080).start();
}
}
消息处理器
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("客户端连接: " + ctx.channel().remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
String request = in.toString(CharsetUtil.UTF_8);
System.out.println("收到消息: " + request);
// 回复消息
String response = "Echo: " + request;
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
简单客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
private final String host;
private final int port;
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
Channel channel = future.channel();
// 发送消息
channel.writeAndFlush("Hello Netty");
channel.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new NettyClient("localhost", 8080).start();
}
}
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("收到回复: " + msg);
}
}
编解码器
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.*;
import io.netty.util.CharsetUtil;
// String解码器
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
out.add(msg.toString(CharsetUtil.UTF_8));
}
}
// String编码器
public class StringEncoder extends MessageToMessageEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) {
out.add(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
}
}
// 使用Netty内置编解码器
ch.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new ServerHandler());
心跳机制
// 服务端配置心跳
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS))
.addLast(new ServerHandler());
}
});
// 心跳处理器
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent event) {
if (event.state() == IdleState.READER_IDLE) {
System.out.println("读空闲超时,关闭连接: " + ctx.channel().remoteAddress());
ctx.channel().close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
总结
Netty通过Reactor线程模型和事件驱动机制,提供了高性能的网络编程能力。掌握Bootstrap、Channel、Pipeline和Handler是使用Netty的基础。