Netty :Netty介绍 & 实现简易多人聊天室
博主在去年二月份就介绍了Java
原生网络编程API
,并用它们实现了多种I/O
网络编程模型的简易多人聊天室:
-
Java网络编程-Socket编程初涉一(简易客户端-服务器)
-
Java网络编程-Socket编程初涉二(基于BIO模型的简易多人聊天室)
-
Java网络编程-Socket编程初涉三(伪异步I/O模型的简易多人聊天室)
-
Java网络编程-Socket编程初涉四(NIO模型的简易多人聊天室)
-
Java网络编程-Socket编程初涉五(AIO模型的简易客户端-服务器)
-
Java网络编程-Socket编程初涉六(AIO模型的简易多人聊天室)
使用Java
原生网络编程API
还是比较繁琐的,如果需要更换I/O
网络编程模型来适应不同的业务场景,重构工作量还是很大的,而Netty
对Java
原生网络编程API
进行了封装和扩展,使得不同I/O
网络编程模型之间的转换需要的代码改动非常少,并且性能非常高,Netty
的实现原理留到以后再介绍,将Netty
提供的组件使用熟练后,它的实现原理才能理解的更透彻,Netty
的实现原理涉及到了Linux
内核部分,比如零拷贝、I/O
多路复用等,以及DMA
等硬件。
Netty介绍
EventLoopGroup、EventLoop
Netty
的调度模块称为EventLoopGroup
,Netty
提供了NioEventLoopGroup
、OioEventLoopGroup
、EpollEventLoopGroup
(在Linux
下可用)等多种实现。
EventLoopGroup
是一组EventLoop
的抽象,一个EventLoopGroup
当中会包含一个或多个EventLoop
,如下图所示(图来自《Netty In Action》
):
Channel
EventLoop
在它的整个生命周期中只会与一个Thread
(真正的I/O
线程)绑定。所有由EventLoop
处理的I/O
事件都将在它所关联的Thread
上进行处理。一个Channel
在它的整个生命周期中只会注册在一个EventLoop
上。也就是说一个Channel
上绑定的所有方法只会由同一个线程执行。一个EventLoop
在运行过程当中会被分配给一个或多个Channel
。
Channel
可以有一个父Channel
,这取决于它是如何创建的。 例如,被ServerSocketChannel
接受的SocketChannel
(当客户端与服务端建立连接时,在服务端创建与客户端通信的Channel
)将在parent()
上返回ServerSocketChannel
作为其父Channel
。Channel
用于连接字节缓冲区和另一端的实体,这个实体可以是Socket
,也可以是File
,在NIO
网络编程模型中,服务端和客户端进行IO
通信的媒介就是Channel
。Netty
对Java
原生的ServerSocketChannel
进行了封装和增强,相对于原生的Channel
, Netty
的Channel
增加了如下组件(但不限于这些组件):
ChannelId
:标识唯一身份信息。ChannelPipeline
:处理或拦截Channel
入站事件和出站操作的ChannelHandler
的列表。ChannelPipeline
实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式以及ChannelPipeline
中的ChannelHandler
如何互相交互。每个Channel
都有自己的ChannelPipeline
,并在创建新Channel
时自动创建。
EventLoop
:用来处理Channel I/O
事件的EventLoop
。
ChannelConfig
:Channel
的配置参数(例如接收缓冲区大小)。
ChannelHandler
ChannelHandler
会处理I/O
事件或拦截I/O
操作,并将其转发到ChannelPipeline
中的下一个ChannelHandler
。ChannelHandler
本身并没有提供很多方法,通常需要实现其子类型之一:
ChannelInboundHandler
:处理入站I/O
事件的抽象。ChannelOutboundHandler
:处理出站I/O
操作的抽象。
为了方便,Netty
提供了以下适配器类:
ChannelInboundHandlerAdapter
:处理入站I/O
事件的一种简单实现。ChannelOutboundHandlerAdapter
:处理出站I/O
操作的一种简单实现。
Bootstrap、ServerBootStrap
Netty
的启动类分为客户端启动类和服务端启动类,分别是BootStrap
和ServerBootStrap
。它们都是AbstractBootStrap
的子类,总的来说它们都是Netty
中的辅助类,提供了链式配置方法,方便了Channel
的引导和启动。
简易多人聊天室
博主接下来用Netty
实现一个NIO
网络编程模型的简易多人聊天室,来介绍Netty
的基本使用。
首先需要导入Netty
的依赖(博主使用4.1.70.Final
版本):
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.70.Final</version></dependency>
服务端
package com.kaven.netty.nio;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.GlobalEventExecutor;import java.util.concurrent.atomic.AtomicReference;public class Server {
private static final int PORT = 8080;public static void main(String[] args) throws InterruptedException {
final ServerHandler serverHandler = new ServerHandler();// 主线程组,用于接受客户端的连接,但是不做任何处理,跟老板一样EventLoopGroup bossGroup = new NioEventLoopGroup();// 从线程组,主线程组会把任务丢给它,让从线程组去做相应的处理EventLoopGroup workerGroup = new NioEventLoopGroup();try{
ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup , workerGroup).channel(NioServerSocketChannel.class).localAddress(PORT).childHandler(new ChannelInitializer<SocketChannel>() {
@Overrideprotected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(serverHandler);}});ChannelFuture channelFuture = serverBootstrap.bind().sync();channelFuture.channel().closeFuture().sync();} finally {
bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}@ChannelHandler.Sharablestatic class ServerHandler extends ChannelInboundHandlerAdapter {
private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();clientChannels.add(channel);String sendMsg = "客户[" + channel.remoteAddress() + "]上线\n";System.out.print(sendMsg);clientChannels.forEach(clientChannel -> {
if(clientChannel != channel) {
clientChannel.writeAndFlush(sendMsg);}else {
clientChannel.writeAndFlush("欢迎您上线\n");}});}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();if(clientChannels.contains(channel)) {
clientChannels.remove(channel);String sendMsg = "客户[" + channel.remoteAddress() + "]异常下线\n";System.out.print(sendMsg);clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel channel = ctx.channel();AtomicReference<String> sendMsg = new AtomicReference<>("客户[" + channel.remoteAddress() + "]消息: " + msg + "\n");if(msg instanceof String && msg.equals("quit")) {
clientChannels.remove(channel);channel.close();sendMsg.set("客户[" + channel.remoteAddress() + "]下线\n");System.out.print(sendMsg.get());}clientChannels.forEach(clientChannel -> {
if(clientChannel != channel) {
clientChannel.writeAndFlush(sendMsg.get());}});}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();clientChannels.remove(channel);String msg = cause.getMessage();String sendMsg = "客户[" + channel.remoteAddress() + "]异常: " + msg + "\n";System.out.print(sendMsg);clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));}}
}
当有客户端连接时,服务端会使用ChannelInitializer
初始化与客户端通信的Channel
,并且在Channel
的ChannelPipeline
中添加ChannelHandler
,这里主要是添加StringDecoder
(将接收到的ByteBuf
解码为String
,是一种ChannelInboundHandlerAdapter
)、StringEncoder
(将请求的String
编码为ByteBuf
,是一种ChannelOutboundHandlerAdapter
)以及自实现的ChannelInboundHandlerAdapter
(处理与客户端之间的通信)。
.childHandler(new ChannelInitializer<SocketChannel>() {
@Overrideprotected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(serverHandler);}});
@ChannelHandler.Sharable
注解表示可以将该ChannelHandler
的同一实例多次添加到一个或多个ChannelPipeline
中,而不会出现竞争条件。如果未指定此注解,则每次将ChannelHandler
实例添加到ChannelPipeline
中时都必须创建一个新的实例,因为它具有成员变量等非共享状态。
ChannelGroup
是一个线程安全的Set
,包含开放的Channel
并提供对它们的各种批量操作。 使用ChannelGroup
,可以将Channel
分类为一个有意义的组(例如,基于每个服务或每个状态来分组),以便实现广播的功能,关闭的Channel
会自动从集合中删除(博主是手动删除的,只是为了演示怎么删除Channel
),因此无需担心ChannelGroup
中Channel
的生命周期。 一个Channel
可以属于多个ChannelGroup
。
private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
客户端
package com.kaven.netty.nio;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;import java.net.InetSocketAddress;
import java.util.Scanner;public class Client {
private static final int PORT = 8080;public static void main(String[] args) throws InterruptedException {
final ClientHandler clientHandler = new ClientHandler();EventLoopGroup workerGroup = new NioEventLoopGroup();try{
Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(PORT)).handler(new ChannelInitializer<SocketChannel>() {
@Overrideprotected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(clientHandler);}});ChannelFuture channelFuture = bootstrap.connect().sync();Channel channel = channelFuture.channel();//客户端发送消息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){
String msg = scanner.nextLine();//通过客户端把输入内容发送到服务端channel.writeAndFlush(msg).sync();if(msg.equals("quit")) {
channel.close().sync();break;}}channelFuture.channel().closeFuture().sync();} finally {
workerGroup.shutdownGracefully();}}@ChannelHandler.Sharablestatic class ClientHandler extends SimpleChannelInboundHandler<String> {
@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.print(msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();String msg = cause.getMessage();System.out.print("群聊[" + channel.remoteAddress() + "]异常: " + msg);}}
}
SimpleChannelInboundHandler
抽象类继承了ChannelInboundHandlerAdapter
类,并且默认会自动释放所有处理的消息,在这种情况下,如果需要将消息传递给ChannelPipeline
中的下一个ChannelHandler
,需要使用ReferenceCountUtil.retain(Object)
(Netty
资源管理部分,博主以后也会详细介绍)。
测试
更换I/O网络编程模型
在Netty
中更换I/O
网络编程模型非常方便,只需要修改服务端和客户端相应的地方即可,如下图所示:
OioEventLoopGroup OioServerSocketChannel OioSocketChannel
NioEventLoopGroup NioServerSocketChannel NioSocketChannel
EpollEventLoopGroup EpollServerSocketChannel EpollSocketChannel
KQueueEventLoopGroup KQueueServerSocketChannel KQueueSocketChannel
Oio
(Old-Blocking-IO
)就是BIO
网络编程模型。
Epoll
:
KQueue
:
到这里就结束了,源码分析和实现原理留到以后再进行介绍,之后博主会对Netty
中比较重要的组件进行详细介绍和源码分析。如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。