当前位置: 代码迷 >> Android >> (2)androidpn-server tomcat版源码解析之-push消息处理
  详细解决方案

(2)androidpn-server tomcat版源码解析之-push消息处理

热度:104   发布时间:2016-04-28 00:07:57.0
(二)androidpn-server tomcat版源码解析之--push消息处理

在?(一)androidpn-server tomcat版源码解析之--项目启动这篇中,已经描述了整个推送服务器的启动过程,并且把握到了消息的入口即XmppIoHandler这个类,今天我将继续往下分析下面的核心代码,主要分为3大块,链接创建,消息的发送,链接关闭。

先贴一段XmppIoHandler的部分代码

/**     * Invoked from an I/O processor thread when a new connection has been created.     */    public void sessionCreated(IoSession session) throws Exception {        log.debug("sessionCreated()...");        session.getConfig().setBothIdleTime(IDLE_TIME);;    }    /**     * Invoked when a connection has been opened.     */    public void sessionOpened(IoSession session) throws Exception {        log.debug("sessionOpened()...");        log.debug("remoteAddress=" + session.getRemoteAddress());        // Create a new XML parser        XMLLightweightParser parser = new XMLLightweightParser("UTF-8");        session.setAttribute(XML_PARSER, parser);        // Create a new connection        Connection connection = new Connection(session);        session.setAttribute(CONNECTION, connection);        session.setAttribute(STANZA_HANDLER, new StanzaHandler(serverName,                connection));    }    /**     * Invoked when a connection is closed.     */    public void sessionClosed(IoSession session) throws Exception {        log.debug("sessionClosed()...");        Connection connection = (Connection) session.getAttribute(CONNECTION);        connection.close();    }    /**     * Invoked with the related IdleStatus when a connection becomes idle.     */    public void sessionIdle(IoSession session, IdleStatus status)            throws Exception {        log.debug("sessionIdle()...");        Connection connection = (Connection) session.getAttribute(CONNECTION);        if (log.isDebugEnabled()) {            log.debug("Closing connection that has been idle: " + connection);        }        connection.close();    }    /**     * Invoked when any exception is thrown.     */    public void exceptionCaught(IoSession session, Throwable cause)            throws Exception {        log.debug("exceptionCaught()...");        log.error(cause);    }    /**     * Invoked when a message is received.     */    public void messageReceived(IoSession session, Object message)            throws Exception {        log.debug("messageReceived()...");        log.debug("RCVD: " + message);        // Get the stanza handler        StanzaHandler handler = (StanzaHandler) session                .getAttribute(STANZA_HANDLER);        // Get the XMPP packet parser        int hashCode = Thread.currentThread().hashCode();        XMPPPacketReader parser = parsers.get(hashCode);        if (parser == null) {            parser = new XMPPPacketReader();            parser.setXPPFactory(factory);            parsers.put(hashCode, parser);        }        // The stanza handler processes the message        try {            handler.process((String) message, parser);        } catch (Exception e) {            log.error(                    "Closing connection due to error while processing message: "                            + message, e);            Connection connection = (Connection) session                    .getAttribute(CONNECTION);            connection.close();        }    }    /**     * Invoked when a message written by IoSession.write(Object) is sent out.     */    public void messageSent(IoSession session, Object message) throws Exception {        log.debug("messageSent()...");    }
  • ?链接创建

androidpn-server的消息处理步奏为:?sessionCreated->sessionOpened->Router->handle

对应的处理类为:XmppIoHandler- >StanzaHandler->PacketRouterIQAuthHandler.java->IQHandler.java

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? IQRegisterHandler.java

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? IQRosterHandler.java

?

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? PresenceUpdateHandler.java

androidpn是以xmpp为协议的,要看懂以下的处理步奏也就必须补充点xmpp的一个相关知识

xmpp的三种消息类型:

message:message是一种基本推送消息方法,它不要求响应。

presence:presence用来表明用户的状态,如:online、away、dnd(请勿打扰)等

iq:一种请求/响应机制,从一个实体从发送请求,另外一个实体接受请求,并进行响应。

XmppIoHandler.java-> ?sessionCreated:

public void sessionCreated(IoSession session) throws Exception {        log.debug("sessionCreated()...");        session.getConfig().setBothIdleTime(IDLE_TIME);;    }

?client的请求第一次到达时候为session设置了一个idletime

?

XmppIoHandler.java->?sessionOpened

public void sessionOpened(IoSession session) throws Exception {        log.debug("sessionOpened()...");        log.debug("remoteAddress=" + session.getRemoteAddress());        // Create a new XML parser        XMLLightweightParser parser = new XMLLightweightParser("UTF-8");        session.setAttribute(XML_PARSER, parser);        // Create a new connection        Connection connection = new Connection(session);        session.setAttribute(CONNECTION, connection);        session.setAttribute(STANZA_HANDLER, new StanzaHandler(serverName,                connection));    }

?初始化了xml解析器,并且创建了StanzaHandler实例,StanzaHandler这个类时后续处理的关键

StanzaHandler.java->?StanzaHandler

    /**     * Constructor.     *      * @param serverName the server name     * @param connection the connection     */    public StanzaHandler(String serverName, Connection connection) {        this.serverName = serverName;        this.connection = connection;        this.router = new PacketRouter();//该构造中初始化了三个router实现,分别是 MessageRouter,PresenceRouter,IQRouter,分别处理xmpp的三种消息类型        notificationService = ServiceLocator.getNotificationService();        notificationManager = new NotificationManager();    }

?XmppIoHandler.java->?messageReceived

    public void messageReceived(IoSession session, Object message)            throws Exception {        log.debug("messageReceived()...");        log.debug("RCVD: " + message);        // Get the stanza handler        StanzaHandler handler = (StanzaHandler) session                .getAttribute(STANZA_HANDLER);        // Get the XMPP packet parser        int hashCode = Thread.currentThread().hashCode();        XMPPPacketReader parser = parsers.get(hashCode);        if (parser == null) {            parser = new XMPPPacketReader();            parser.setXPPFactory(factory);            parsers.put(hashCode, parser);        }        // The stanza handler processes the message        try {            handler.process((String) message, parser);//###这个方法中,程序会根据xmpp的xml头来判断消息类型,并且传递到对应的Router处理类        } catch (Exception e) {            log.error(                    "Closing connection due to error while processing message: "                            + message, e);            Connection connection = (Connection) session                    .getAttribute(CONNECTION);            connection.close();        }    }

?因为当前做的需求是消息推送,而MessageRouter实用在IM这种对话场合,所以MessageRouter并没有实现处理PresenceRouter和IQRouter都有对应的功能实现。

IQRoute.java ->?IQRouter

?

public IQRouter() {        sessionManager = SessionManager.getInstance();        iqHandlers.add(new IQAuthHandler());        iqHandlers.add(new IQRegisterHandler());        iqHandlers.add(new IQRosterHandler());        notificationService = ServiceLocator.getNotificationService();    }

?构造中又实现了3和handler,用户建立好链接后又会对用户进行IQAuthHandler(鉴权),IQRegisterHandler(解析用户携带参数,往数据库中插入用户信息,将链接保存在sessionManager中),IQRosterHandler(没进行什么操作)

?

?

?

  • 消息的发送

以广播消息为例

SessionManager.java

/**     * Returns a list that contains all authenticated client sessions.     *      * @return a list that contains all client sessions     */    public Collection<ClientSession> getSessions() {        return clientSessions.values();    }

?获得链接池中所有链接

Session.java

?

    /**     * Delivers the packet to the associated connection.     *      * @param packet the packet to deliver     */    public void deliver(Packet packet) {        if (connection != null && !connection.isClosed()) {            connection.deliver(packet);        }    }

?Connection.java

/**     * Delivers the packet to this connection (without checking the recipient).     *      * @param packet the packet to deliver     */    public void deliver(Packet packet) {        log.debug("SENT: " + packet.toXML());        if (!isClosed()) {            IoBuffer buffer = IoBuffer.allocate(4096);            buffer.setAutoExpand(true);            boolean errorDelivering = false;            try {                XMLWriter xmlSerializer = new XMLWriter(new IoBufferWriter(                        buffer, (CharsetEncoder) encoder.get()),                        new OutputFormat());                xmlSerializer.write(packet.getElement());                xmlSerializer.flush();                buffer.flip();                ioSession.write(buffer);//###通过mina的isSession对象传递            } catch (Exception e) {                log.debug("Connection: Error delivering packet" + "\n"                        + this.toString(), e);                errorDelivering = true;            }            if (errorDelivering) {                close();            } else {                session.incrementServerPacketCount();            }        }    }

?封装层次为isSession->Connection->Session

?

?

?

  • 链接关闭

XmppIoHandler.java

/**     * Invoked when a connection is closed.     */    public void sessionClosed(IoSession session) throws Exception {        log.debug("sessionClosed()...");        Connection connection = (Connection) session.getAttribute(CONNECTION);        connection.close();    }

?Connection.java

/**     * Closes the session including associated socket connection,     * notifing all listeners that the channel is shutting down.     */    public void close() {        boolean closedSuccessfully = false;        synchronized (this) {            if (!isClosed()) {                try {                    deliverRawText("</stream:stream>", false);                } catch (Exception e) {                    // Ignore                }                if (session != null) {                    session.setStatus(Session.STATUS_CLOSED);                }                ioSession.close(false);                closed = true;                closedSuccessfully = true;            }        }        if (closedSuccessfully) {            notifyCloseListeners();        }    }

?此处使用了观察者模式,首先传递了Xmpp的结束xml“</stream:stream>”,再关闭了mina最低层的ioSession,再通知观察者,也就是androidpn封装的ClientSession在内存中删除该会话。

?

?

?

?

?原创文章,转载请声名出处 ?http://spjich.iteye.com/blog/2226149
  相关解决方案