1.引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.mina/mina-core -->
<dependency><groupId>org.apache.mina</groupId><artifactId>mina-core</artifactId><version>2.0.7</version>
</dependency>
2.创建连接客户端
NioSocketConnector connector = new NioSocketConnector(); // 创建连接客户端
connector.setConnectTimeoutMillis(30000); // 设置连接超时
TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName("UTF-8"));
factory.setDecoderMaxLineLength(Integer.MAX_VALUE);
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(factory));
connector.getSessionConfig().setReceiveBufferSize(Integer.MAX_VALUE); // 设置接收缓冲区的大小
connector.getSessionConfig().setSendBufferSize(Integer.MAX_VALUE);// 设置输出缓冲区的大小
connector.setDefaultRemoteAddress(new InetSocketAddress(IP, Port));// 设置默认访问地址
connector.getSessionConfig().setTcpNoDelay(true);
connector.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
connector.setHandler(new DockHandler());
ConnectFuture future = connector.connect();
future.awaitUninterruptibly(); // 等待连接创建成功
IoSession session = future.getSession(); // 获取会话
消息处理器IoHandlerAdapter实现类:
public class DockHandler extends IoHandlerAdapter
{@Overridepublic void messageReceived(IoSession session, Object message)throws Exception{super.messageReceived(session, message);String msg = (String)message;log.info("收到消息");// 消息处理...}@Overridepublic void sessionCreated(IoSession session)throws Exception{super.sessionCreated(session);log.info("创建连接");}@Overridepublic void sessionOpened(IoSession session)throws Exception{super.sessionOpened(session);log.info("建立连接");}@Overridepublic void sessionClosed(IoSession session)throws Exception{super.sessionClosed(session);log.info("连接关闭");}@Overridepublic void sessionIdle(IoSession session, IdleStatus status)throws Exception{super.sessionIdle(session, status);log.info("重新连接");}@Overridepublic void exceptionCaught(IoSession session, Throwable cause)throws Exception{super.exceptionCaught(session, cause);log.info("会话异常!");if (session != null){session.closeNow();}}@Overridepublic void messageSent(IoSession session, Object message)throws Exception{super.messageSent(session, message);}@Overridepublic void inputClosed(IoSession session)throws Exception{super.inputClosed(session);}}
3.连接失败自动重连&断线重连功能
启动时连接失败自动重连:
for (;;)
{try{ConnectFuture future = connector.connect();future.awaitUninterruptibly(); // 等待连接创建成功IoSession session = future.getSession(); // 获取会话log.info("连接服务端[成功]");break;}catch (RuntimeIoException e){log.error("连接服务端[失败],5S后重新连接");Thread.sleep(5000);}
}
断线重连:
断线重连功能参考:https://blog.csdn.net/qq_34928194/article/details/105204583
这里采用过滤器的方式:
connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter()
{@Overridepublic void sessionClosed(NextFilter nextFilter, IoSession ioSession)throws Exception{for (;;){try{Thread.sleep(3000);ConnectFuture future = connector.connect();future.awaitUninterruptibly();// 等待连接创建成功IoSession session = future.getSession();// 获取会话if (session.isConnected()){log.info("断线重连成功");break;}}catch (Exception ex){log.info("断线重连失败,3s再次连接");}}}
});
4.心跳设置
使用Mina的KeepAliveFilter实现心跳:(了解Mina的KeepAliveFilter心跳机制)
(1) 新建类实现KeepAliveMessageFactory
public class MyKeepAliveMessageFactory implements KeepAliveMessageFactory
{@Overridepublic boolean isRequest(IoSession session, Object message){return false;}@Overridepublic boolean isResponse(IoSession session, Object message){return false;}@Overridepublic Object getRequest(IoSession session){return "#";// 心跳内容为#}@Overridepublic Object getResponse(IoSession session, Object request){return null;}
}
(2) 会话管理中加入KeepAliveFilter
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 设置会话属性读写通道10秒无操作则视为空闲状态
MyKeepAliveMessageFactory heartBeat = new MyKeepAliveMessageFactory();
KeepAliveFilter keepAliveFilter = new KeepAliveFilter(heartBeat, IdleStatus.READER_IDLE, KeepAliveRequestTimeoutHandler.NOOP);// 无心跳响应时不采取任何操作
keepAliveFilter.setForwardEvent(false);
keepAliveFilter.setRequestInterval(10);// 心跳间隔10s
keepAliveFilter.setRequestTimeout(1);// 超时时间1s
connector.getFilterChain().addLast("heart", keepAliveFilter);
5.Mina客户端完全断开连接的方法&加入断线重连功能后如何完全断开连接
断开连接:Mina2.0+版本:调用connector的dispose()方法
加入断线重连后如何彻底断开客户端连接:
经过尝试
(1) 如果使用过滤器方式的断线重连,断开连接(即使删除了重连过滤器)后依然会进行重连(当然重连失败,一直重连),代码如下:
connector.getFilterChain().remove("reconnection");
connector.dispose();
// 尝试无效
(2) 如果使用监听器方式的断线重连,删除监听器后断开连接就完全断开连接了,代码如下:
connector.removeListener(ioListener);// ioListener为创建的断线重连的监听器对象
connector.dispose();
6.粘包半包问题的解决
本文使用的Mina自带的基于文本的编解码器TextLineCodecFactory,根据回车换行(windows下是\r\n,linux下是\r)来断点传输数据,且在解码器中解决了半包粘包问题。
适用场景:报文为文本字符串类型的,且以换行符为数据分割符(当然可以利用TextLineCodecFactory的另外两个构造方法来自定义数据分割符)。
对于自定义报文(多为16进制报文),解决Mina数据接收的半包粘包问题需要我们自定义编解码器,在解码器中解决这一问题。
一个来自未来博文的url:Mina自定义解码器,解决半包粘包问题
完整项目代码联系获取