问题描述
我正在阅读“Netty In Action V5”。 当阅读第2.3章和第2.4章时,我尝试使用示例EchoServer和EchoClient,当我测试一个连接到服务器的客户端时,一切都运行良好...然后我将示例修改为多个客户端可以连接到服务器。 我的目的是运行压力测试:1000个客户端将连接到服务器,每个客户端将回复100条消息到服务器,当所有客户端完成后,我将获得所有进程的总时间。 服务器部署在Linux机器(VPS)上,客户端部署在窗口机器上。
当运行压力测试时,我遇到了两个问题:
一些客户收到错误消息:
java.io.IOException: An existing connection was forcibly closed by the remote host
at sun.nio.ch.SocketDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:447)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)\at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
但是有些客户没有收到来自服务器的消息
工作环境:
Netty-all-4.0.30.Final
JDK1.8.0_25
Echo客户端部署在Window 7 Ultimate上
Echo Server部署在Linux Centos 6上
NettyClient类:
public class NettyClient {
private Bootstrap bootstrap;
private EventLoopGroup group;
public NettyClient(final ChannelInboundHandlerAdapter handler) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(handler);
}
});
}
public void start(String host, int port) throws Exception {
bootstrap.remoteAddress(new InetSocketAddress(host, port));
bootstrap.connect();
}
public void stop() {
try {
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
NettyServer类:
public class NettyServer {
private EventLoopGroup parentGroup;
private EventLoopGroup childGroup;
private ServerBootstrap boopstrap;
public NettyServer(final ChannelInboundHandlerAdapter handler) {
parentGroup = new NioEventLoopGroup(300);
childGroup = new NioEventLoopGroup(300);
boopstrap = new ServerBootstrap();
boopstrap.group(parentGroup, childGroup);
boopstrap.channel(NioServerSocketChannel.class);
boopstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(handler);
}
});
}
public void start(int port) throws Exception {
boopstrap.localAddress(new InetSocketAddress(port));
ChannelFuture future = boopstrap.bind().sync();
System.err.println("Start Netty server on port " + port);
future.channel().closeFuture().sync();
}
public void stop() throws Exception {
parentGroup.shutdownGracefully().sync();
childGroup.shutdownGracefully().sync();
}
}
类EchoClient
public class EchoClient {
private static final String HOST = "203.12.37.22";
private static final int PORT = 3344;
private static final int NUMBER_CONNECTION = 1000;
private static final int NUMBER_ECHO = 10;
private static CountDownLatch counter = new CountDownLatch(NUMBER_CONNECTION);
public static void main(String[] args) throws Exception {
List<NettyClient> listClients = Collections.synchronizedList(new ArrayList<NettyClient>());
for (int i = 0; i < NUMBER_CONNECTION; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
NettyClient client = new NettyClient(new EchoClientHandler(NUMBER_ECHO) {
@Override
protected void onFinishEcho() {
counter.countDown();
System.err.println((NUMBER_CONNECTION - counter.getCount()) + "/" + NUMBER_CONNECTION);
}
});
client.start(HOST, PORT);
listClients.add(client);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}).start();
}
long t1 = System.currentTimeMillis();
counter.await();
long t2 = System.currentTimeMillis();
System.err.println("Totla time: " + (t2 - t1));
for (NettyClient client : listClients) {
client.stop();
}
}
private static class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final String ECHO_MSG = "Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo";
private int numberEcho;
private int curNumberEcho = 0;
public EchoClientHandler(int numberEcho) {
this.numberEcho = numberEcho;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_MSG, CharsetUtil.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
curNumberEcho++;
if (curNumberEcho >= numberEcho) {
onFinishEcho();
} else {
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_MSG, CharsetUtil.UTF_8));
}
}
protected void onFinishEcho() {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
}
EchoServer类:
public class EchoServer {
private static final int PORT = 3344;
public static void main(String[] args) throws Exception {
NettyServer server = new NettyServer(new EchoServerHandler());
server.start(PORT);
System.err.println("Start server on port " + PORT);
}
@Sharable
private static class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
}
1楼
你可以改变两件事:
只创建一个客户端引导程序,并为所有客户端重用它 ,而不是为每个客户端创建一个。 因此,从客户端部分中提取引导构建,并在开始时仅保留连接。 这将限制内部的线程数。
当达到乒乓数量时, 关闭客户端的连接 。 目前你只调用onFinishEcho上的空方法,这在客户端没有任何关闭,所以没有客户端停止...因此也没有关闭通道......
您可能已经对客户端上的线程数量有一些限制。
另外一个元素可能是一个问题: 您没有指定任何编解码器 (字符串编解码器或其他),这可能导致从客户端或服务器部分发送被视为完全响应。
例如,您可能有第一个“Echo Echo Echo”块发送一个包含缓冲区开头的数据包,而其他部分(更多“Echo”)将通过以后的数据包发送。
为了防止这种情况,您应该使用一个编解码器来确保最终处理程序获得真正的完整消息,而不是部分消息。 如果没有,您可能会陷入其他问题,例如服务器端的错误,试图发送额外的数据包,而客户端会尽快关闭该频道...