当前位置: 代码迷 >> 综合 >> Netty :Netty介绍 实现简易多人聊天室
  详细解决方案

Netty :Netty介绍 实现简易多人聊天室

热度:40   发布时间:2023-12-01 18:27:32.0

Netty :Netty介绍 & 实现简易多人聊天室

博主在去年二月份就介绍了Java原生网络编程API,并用它们实现了多种I/O网络编程模型的简易多人聊天室:

  • Java网络编程-Socket编程初涉一(简易客户端-服务器)

  • Java网络编程-Socket编程初涉二(基于BIO模型的简易多人聊天室)

  • Java网络编程-Socket编程初涉三(伪异步I/O模型的简易多人聊天室)

  • Java网络编程-Socket编程初涉四(NIO模型的简易多人聊天室)

  • Java网络编程-Socket编程初涉五(AIO模型的简易客户端-服务器)

  • Java网络编程-Socket编程初涉六(AIO模型的简易多人聊天室)

使用Java原生网络编程API还是比较繁琐的,如果需要更换I/O网络编程模型来适应不同的业务场景,重构工作量还是很大的,而NettyJava原生网络编程API进行了封装和扩展,使得不同I/O网络编程模型之间的转换需要的代码改动非常少,并且性能非常高,Netty的实现原理留到以后再介绍,将Netty提供的组件使用熟练后,它的实现原理才能理解的更透彻,Netty的实现原理涉及到了Linux内核部分,比如零拷贝、I/O多路复用等,以及DMA等硬件。

Netty介绍

EventLoopGroup、EventLoop

Netty的调度模块称为EventLoopGroupNetty提供了NioEventLoopGroupOioEventLoopGroupEpollEventLoopGroup(在Linux下可用)等多种实现。
在这里插入图片描述
EventLoopGroup是一组EventLoop的抽象,一个EventLoopGroup当中会包含一个或多个EventLoop,如下图所示(图来自《Netty In Action》):
在这里插入图片描述
在这里插入图片描述

Channel

EventLoop在它的整个生命周期中只会与一个Thread(真正的I/O线程)绑定。所有由EventLoop处理的I/O事件都将在它所关联的Thread上进行处理。一个Channel在它的整个生命周期中只会注册在一个EventLoop上。也就是说一个Channel上绑定的所有方法只会由同一个线程执行。一个EventLoop在运行过程当中会被分配给一个或多个Channel

Channel可以有一个父Channel,这取决于它是如何创建的。 例如,被ServerSocketChannel接受的SocketChannel(当客户端与服务端建立连接时,在服务端创建与客户端通信的Channel)将在parent()上返回ServerSocketChannel作为其父ChannelChannel用于连接字节缓冲区和另一端的实体,这个实体可以是Socket,也可以是File,在NIO网络编程模型中,服务端和客户端进行IO通信的媒介就是ChannelNettyJava原生的ServerSocketChannel进行了封装和增强,相对于原生的ChannelNettyChannel增加了如下组件(但不限于这些组件):

  • ChannelId:标识唯一身份信息。
  • ChannelPipeline:处理或拦截Channel入站事件和出站操作的ChannelHandler的列表。 ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式以及 ChannelPipeline中的ChannelHandler如何互相交互。每个Channel都有自己的ChannelPipeline,并在创建新Channel时自动创建。
    在这里插入图片描述
  • EventLoop:用来处理Channel I/O事件的EventLoop
    在这里插入图片描述
  • ChannelConfigChannel的配置参数(例如接收缓冲区大小)。

ChannelHandler

ChannelHandler会处理I/O事件或拦截I/O操作,并将其转发到ChannelPipeline中的下一个ChannelHandlerChannelHandler本身并没有提供很多方法,通常需要实现其子类型之一:

  • ChannelInboundHandler:处理入站I/O事件的抽象。
  • ChannelOutboundHandler:处理出站I/O操作的抽象。

为了方便,Netty提供了以下适配器类:

  • ChannelInboundHandlerAdapter:处理入站I/O事件的一种简单实现。
  • ChannelOutboundHandlerAdapter:处理出站I/O操作的一种简单实现。

Bootstrap、ServerBootStrap

Netty的启动类分为客户端启动类和服务端启动类,分别是BootStrapServerBootStrap。它们都是AbstractBootStrap的子类,总的来说它们都是Netty中的辅助类,提供了链式配置方法,方便了Channel的引导和启动。

简易多人聊天室

博主接下来用Netty实现一个NIO网络编程模型的简易多人聊天室,来介绍Netty的基本使用。

首先需要导入Netty的依赖(博主使用4.1.70.Final版本):

        <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.70.Final</version></dependency>

服务端

package com.kaven.netty.nio;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.GlobalEventExecutor;import java.util.concurrent.atomic.AtomicReference;public class Server {
    private static final int PORT = 8080;public static void main(String[] args) throws InterruptedException {
    final ServerHandler serverHandler = new ServerHandler();// 主线程组,用于接受客户端的连接,但是不做任何处理,跟老板一样EventLoopGroup bossGroup = new NioEventLoopGroup();// 从线程组,主线程组会把任务丢给它,让从线程组去做相应的处理EventLoopGroup workerGroup = new NioEventLoopGroup();try{
    ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup , workerGroup).channel(NioServerSocketChannel.class).localAddress(PORT).childHandler(new ChannelInitializer<SocketChannel>() {
    @Overrideprotected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(serverHandler);}});ChannelFuture channelFuture = serverBootstrap.bind().sync();channelFuture.channel().closeFuture().sync();} finally {
    bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}@ChannelHandler.Sharablestatic class ServerHandler extends ChannelInboundHandlerAdapter {
    private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();clientChannels.add(channel);String sendMsg = "客户[" + channel.remoteAddress() + "]上线\n";System.out.print(sendMsg);clientChannels.forEach(clientChannel -> {
    if(clientChannel != channel) {
    clientChannel.writeAndFlush(sendMsg);}else {
    clientChannel.writeAndFlush("欢迎您上线\n");}});}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();if(clientChannels.contains(channel)) {
    clientChannels.remove(channel);String sendMsg = "客户[" + channel.remoteAddress() + "]异常下线\n";System.out.print(sendMsg);clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));}}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    Channel channel = ctx.channel();AtomicReference<String> sendMsg = new AtomicReference<>("客户[" + channel.remoteAddress() + "]消息: " + msg + "\n");if(msg instanceof String && msg.equals("quit")) {
    clientChannels.remove(channel);channel.close();sendMsg.set("客户[" + channel.remoteAddress() + "]下线\n");System.out.print(sendMsg.get());}clientChannels.forEach(clientChannel -> {
    if(clientChannel != channel) {
    clientChannel.writeAndFlush(sendMsg.get());}});}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    Channel channel = ctx.channel();clientChannels.remove(channel);String msg = cause.getMessage();String sendMsg = "客户[" + channel.remoteAddress() + "]异常: " + msg + "\n";System.out.print(sendMsg);clientChannels.forEach(clientChannel -> clientChannel.writeAndFlush(sendMsg));}}
}

当有客户端连接时,服务端会使用ChannelInitializer初始化与客户端通信的Channel,并且在ChannelChannelPipeline 中添加ChannelHandler,这里主要是添加StringDecoder(将接收到的ByteBuf解码为String,是一种ChannelInboundHandlerAdapter)、StringEncoder(将请求的String编码为ByteBuf,是一种ChannelOutboundHandlerAdapter)以及自实现的ChannelInboundHandlerAdapter(处理与客户端之间的通信)。

                    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Overrideprotected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(serverHandler);}});

@ChannelHandler.Sharable注解表示可以将该ChannelHandler的同一实例多次添加到一个或多个ChannelPipeline中,而不会出现竞争条件。如果未指定此注解,则每次将ChannelHandler实例添加到ChannelPipeline中时都必须创建一个新的实例,因为它具有成员变量等非共享状态。

ChannelGroup是一个线程安全的Set ,包含开放的Channel并提供对它们的各种批量操作。 使用ChannelGroup ,可以将Channel分类为一个有意义的组(例如,基于每个服务或每个状态来分组),以便实现广播的功能,关闭的Channel会自动从集合中删除(博主是手动删除的,只是为了演示怎么删除Channel),因此无需担心ChannelGroupChannel的生命周期。 一个Channel可以属于多个ChannelGroup

在这里插入图片描述

        private static ChannelGroup clientChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

客户端

package com.kaven.netty.nio;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;import java.net.InetSocketAddress;
import java.util.Scanner;public class Client {
    private static final int PORT = 8080;public static void main(String[] args) throws InterruptedException {
    final ClientHandler clientHandler = new ClientHandler();EventLoopGroup workerGroup = new NioEventLoopGroup();try{
    Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).remoteAddress(new InetSocketAddress(PORT)).handler(new ChannelInitializer<SocketChannel>() {
    @Overrideprotected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(clientHandler);}});ChannelFuture channelFuture = bootstrap.connect().sync();Channel channel = channelFuture.channel();//客户端发送消息Scanner scanner = new Scanner(System.in);while (scanner.hasNext()){
    String msg = scanner.nextLine();//通过客户端把输入内容发送到服务端channel.writeAndFlush(msg).sync();if(msg.equals("quit")) {
    channel.close().sync();break;}}channelFuture.channel().closeFuture().sync();} finally {
    workerGroup.shutdownGracefully();}}@ChannelHandler.Sharablestatic class ClientHandler extends SimpleChannelInboundHandler<String> {
    @Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    System.out.print(msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    Channel channel = ctx.channel();String msg = cause.getMessage();System.out.print("群聊[" + channel.remoteAddress() + "]异常: " + msg);}}
}

SimpleChannelInboundHandler抽象类继承了ChannelInboundHandlerAdapter类,并且默认会自动释放所有处理的消息,在这种情况下,如果需要将消息传递给ChannelPipeline中的下一个ChannelHandler,需要使用ReferenceCountUtil.retain(Object)Netty资源管理部分,博主以后也会详细介绍)。
在这里插入图片描述

测试

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

更换I/O网络编程模型

Netty中更换I/O网络编程模型非常方便,只需要修改服务端和客户端相应的地方即可,如下图所示:
在这里插入图片描述

在这里插入图片描述

OioEventLoopGroup        OioServerSocketChannel       OioSocketChannel
NioEventLoopGroup        NioServerSocketChannel       NioSocketChannel
EpollEventLoopGroup      EpollServerSocketChannel     EpollSocketChannel
KQueueEventLoopGroup     KQueueServerSocketChannel    KQueueSocketChannel

OioOld-Blocking-IO)就是BIO网络编程模型。

Epoll
在这里插入图片描述

KQueue
在这里插入图片描述

到这里就结束了,源码分析和实现原理留到以后再进行介绍,之后博主会对Netty中比较重要的组件进行详细介绍和源码分析。如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。