当前位置: 代码迷 >> 综合 >> Netty(WebSocket聊天器)
  详细解决方案

Netty(WebSocket聊天器)

热度:108   发布时间:2023-11-20 01:26:51.0

处理 HTTP请求    

    如果被请求的URL以/ws结尾,那么将会把该协议升级为WebSocket;否则,服务器将使用基本的HTTP/S。在连接已经升级完成之后,所有数据都将会使用WebSocket进行传输。

package netty.in.action.websocket;import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedNioFile;import java.io.File;
import java.io.RandomAccessFile;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.Charset;public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private final String wsUri;public HttpRequestHandler(String wsUri) {this.wsUri = wsUri;}@Overridepublic void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {if (wsUri.equalsIgnoreCase(request.getUri())) {//如果请求了WebSocket协议升级,则增加引用计数(调用retain()方 法 ),并将它传递给下一个ChannelInboundHandlerctx.fireChannelRead(request.retain());} else {if (HttpHeaders.is100ContinueExpected(request)) {//处理100 Continue请求以符合HTTP1.1规范send100Continue(ctx);}HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");boolean keepAlive = HttpHeaders.isKeepAlive(request);if (keepAlive) {//如果请求了keep-alive,则添加所需要的HTTP头信息response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 1024);response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);}ctx.write(response);//将HttpResponse写到客户端ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);//写LastHttpContent并冲刷至客户端if (!keepAlive) {//如果没有请求keep-alive,则在写操作完成后关闭Channelfuture.addListener(ChannelFutureListener.CLOSE);}}}private static void send100Continue(ChannelHandlerContext ctx) {FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);ctx.writeAndFlush(response);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}

    如果该HTTP请求指向了地址为/ws的URI,那么HttpRequestHandler将调用FullHttpRequest对象上的retain()方法,并通过调用fireChannelRead(msg)方法将它转发给下一个ChannelInboundHandler。之所以需要调用retain()方法,是因为调用channelRead()方法完成之后,它将调用FullHttpRequest对象上的release()方法以释放它的资源。

    如果客户端发送了HTTP1.1的HTTP头信息Expect: 100-continue,那么HttpRequestHandler将会发送一个100 Continue响应。在该HTTP头信息被设置之后,HttpRequestHandler将会写回一个HttpResponse给客户端。这不是一个FullHttpResponse,因为它只是响应的第一个部分。此外,这里也不会调用writeAndFlush()方法,在结束的时候才会调用。

    如果不需要加密和压缩,那么可以通过将index.html的内容存储到DefaultFileRegion中来达到最佳效率。这将会利用零拷贝特性来进行内容的传输。否则,使用ChunkedNioFile。

    HttpRequestHandler将写一个LastHttpContent来标记响应的结束。如果没有请求keep-alive,那么HttpRequestHandler将会添加一个ChannelFutureListener到最后一次写出动作的ChannelFuture,并关闭该连接。在这里,你将调用writeAndFlush()方法以冲刷所有之前写入的消息。这部分代码代表了聊天服务器的第一个部分,它管理纯粹的HTTP请求和响应。

处理 WebSocket帧 

帧类型 描述
BinaryWebSocketFrame 包含了二进制数据
TextWebSocketFrame 包含了文本数据
ContinuationWebSocketFrame 包含属于上一个BinaryWebSocketFrame或TextWebSocket-Frame的文本数据或者二进制数据
ContinuationWebSocketFrame 表示一个CLOSE请求,包含一个关闭的状态码和关闭的原因
PingWebSocketFrame 请求传输一个PongWebSocketFrame
PongWebSocketFrame 作为一个对于PingWebSocketFrame的响应被发送

    TextWebSocketFrame是我们唯一真正需要处理的帧类型。为了符合WebSocket  RFC,Netty提供了WebSocketServerProtocolHandler来处理其他类型的帧。

package netty.in.action.websocket;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {private final ChannelGroup group;public TextWebSocketFrameHandler(ChannelGroup group) {this.group = group;}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {ctx.pipeline().remove(HttpRequestHandler.class);//如果该事件表示握手成功,则从该Channelipeline中移除HttpRequestHandler,因为将不会接收到任何HTTP消息了group.writeAndFlush(new TextWebSocketFrame("客户端 " + ctx.channel() + " 连接"));//通知所有已经连接的WebSocket客户端新的客户端已经连接上了group.add(ctx.channel());//将新的WebSocket Channel添加到ChannelGroup中,以便它可以接收到所有的消息} else {super.userEventTriggered(ctx, evt);}}@Overridepublic void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {group.writeAndFlush(msg.retain());//增加消息的引用计数,并将它写到ChannelGroup中所有已经连接的客户端}
}

    TextWebSocketFrameHandler只有一组非常少量的责任。当和新客户端的WebSocket握手成功完成之后,它将通过把通知消息写到ChannelGroup中的所有Channel来通知所有已经连接的客户端,然后它将把这个新Channel加入到该ChannelGroup中。

    如果接收到了TextWebSocketFrame消息,TextWebSocketFrameHandler将调用TextWebSocketFrame消息上的retain()方法,并使用writeAndFlush()方法来将它传输给ChannelGroup,以便所有已经连接的WebSocketChannel都将接收到它。

    和之前一样,对于retain()方法的调用是必需的,因为当channelRead0()方法返回时,TextWebSocketFrame的引用计数将会被减少。由于所有的操作都是异步的,因此,writeAndFlush()方法可能会在channelRead0()方法返回之后完成,而且它绝对不能访问一个已经失效的引用。

初始化ChannelPipeline

package netty.in.action.websocket;import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;public class ChatServerInitializer extends ChannelInitializer<Channel> {private final ChannelGroup group;public ChatServerInitializer(ChannelGroup group) {this.group = group;}@Overrideprotected void initChannel(Channel ch) throwsException {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpServerCodec());//创建HttpServer编解码器pipeline.addLast(new ChunkedWriteHandler());//添加了对异步编写大型数据流的支持pipeline.addLast(new HttpObjectAggregator(64 * 1024));//聚合 HTTP消息pipeline.addLast(new HttpRequestHandler("/zzf"));//处理Http请求pipeline.addLast(new WebSocketServerProtocolHandler("/zzf"));//它处理websocket握手以及控制帧的处理(关闭,Ping, Pong)。pipeline.addLast(new TextWebSocketFrameHandler(group));//处理文本消息}
}
ChannelHandler 职责
HttpServerCodec 将字节解码为HttpRequest、HttpContent和LastHttpContent。并将HttpRequest、HttpContent和LastHttpContent编码为字节
ChunkedWriteHandler 写入一个文件的内容
HttpObjectAggregator 将一个HttpMessage和跟随它的多个HttpContent聚合为单个FullHttpRequest或者FullHttpResponse(取决于它是被用来处理请求还是响应)。安装了这个之后,ChannelPipeline中的下一个ChannelHandler将只会收到完整的HTTP请求或响应
HttpRequestHandler 处理FullHttpRequest(那些不发送到/ws URI的请求)
WebSocketServerProtocolHandler 按照WebSocket规范的要求,处理WebSocket升级握手、PingWebSocketFrame、PongWebSocketFrame和CloseWebSocketFrame
TextWebSocketFrameHandler 处理TextWebSocketFrame和握手完成事件

    Netty的WebSocketServerProtocolHandler处理了所有委托管理的WebSocket帧类型以及升级握手本身。如果握手成功,那么所需的ChannelHandler将会被添加到ChannelPipeline中,而那些不再需要的ChannelHandler则将会被移除。

WebSocket协议升级之前的ChannelPipeline的状态如图:

    当WebSocket协议升级完成之后,WebSocketServerProtocolHandler将会把Http  -RequestDecoder替换为WebSocketFrameDecoder,把HttpResponseEncoder替换为WebSocketFrameEncoder。为了性能最大化,它将移除任何不再被WebSocket连接所需要的ChannelHandler。

引导

package netty.in.action.websocket;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.ImmediateEventExecutor;import java.net.InetSocketAddress;public class ChatServer {private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);private final EventLoopGroup group = new NioEventLoopGroup();private Channel channel;public ChannelFuture start(InetSocketAddress address) {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(group).channel(NioServerSocketChannel.class).childHandler(createInitializer(channelGroup));ChannelFuture future = bootstrap.bind(address);future.syncUninterruptibly();channel = future.channel();return future;}protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {return new ChatServerInitializer(group);}public void destroy() {if (channel != null) {channel.close();}channelGroup.close();group.shutdownGracefully();}public static void main(String[] args) throws Exception {int port = Integer.parseInt("8888");final ChatServer endpoint = new ChatServer();ChannelFuture future = endpoint.start(new InetSocketAddress(port));Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic voidrun() {endpoint.destroy();}});future.channel().closeFuture().syncUninterruptibly();}
}

然后需要一个html

<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body><script type="text/javascript">var socket;if (!window.WebSocket) {window.WebSocket = window.MozWebSocket;}if (window.WebSocket) {socket = new WebSocket("ws://localhost:8888/zzf");socket.onopen = function(event) {var ta = document.getElementById('responseText');ta.value = "连接开启!";};socket.onclose = function(event) {var ta = document.getElementById('responseText');ta.value = ta.value + "连接被关闭";};socket.onmessage = function(event) {var ta = document.getElementById('responseText');ta.value = ta.value + '\n' + event.data;};} else {alert("你的浏览器不支持 WebSocket!");}function send(message) {if (!window.WebSocket) {return;}if (socket.readyState == WebSocket.OPEN) {socket.send(message);} else {alert("连接没有开启.");}}</script><form onsubmit="return false;"><h3>WebSocket 聊天室:</h3><textarea id="responseText" style="width: 500px; height: 300px;"></textarea><br> <input type="text" name="message"  style="width: 300px" value="Welcome to www.waylau.com"><input type="button" value="发送消息" onclick="send(this.form.message.value)"><input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录"></form><br> <br> 
</body>
</html>

进行加密

package netty.in.action.websocket;import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;import javax.net.ssl.SSLEngine;public class SecureChatServerInitializer extends ChatServerInitializer {private final SslContext context;public SecureChatServerInitializer(ChannelGroup group, SslContext context) {super(group);this.context = context;}@Overrideprotected void initChannel(Channel ch) throws Exception {super.initChannel(ch);//调用父类的initChannel()方法SSLEngine engine = context.newEngine(ch.alloc());engine.setUseClientMode(false);ch.pipeline().addFirst(new SslHandler(engine));//将SslHandler添加到ChannelPipeline中}
}
package netty.in.action.websocket;import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.SelfSignedCertificate;import java.net.InetSocketAddress;public class SecureChatServer extends ChatServer {private final SslContext context;public SecureChatServer(SslContext context) {this.context = context;}@Overrideprotected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {return new SecureChatServerInitializer(group, context);//返回之前创建的SecureChatServer-Initializer以启用加密}public static void main(String[] args) throws Exception {int port = Integer.parseInt("8888");SelfSignedCertificate cert = new SelfSignedCertificate();SslContext context = SslContext.newServerContext(cert.certificate(), cert.privateKey());final SecureChatServer endpoint = new SecureChatServer(context);ChannelFuture future = endpoint.start(new InetSocketAddress(port));Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic voidrun() {endpoint.destroy();}});future.channel().closeFuture().syncUninterruptibly();}
}

参考《Netty实战》

  相关解决方案