当前位置: 代码迷 >> Android >> android vold架构详解(二)_Kernel层向上发送消息处理流程
  详细解决方案

android vold架构详解(二)_Kernel层向上发送消息处理流程

热度:31   发布时间:2016-04-28 03:36:45.0
android vold架构详解(2)_Kernel层向上发送消息处理流程
学习罗老师,先上一张Kernel层向上发送消息处理流程的序列图,下面一点一点分析。



Step.17以前都在上一篇分析文章里,不在说明了。从Step.17开始分析。
Step.17 在main方法中,nm->start()方法里,开启Socket,监听Kernel层向上发送的消息
int NetlinkManager::start() {    struct sockaddr_nl nladdr;    int sz = 64 * 1024;    int on = 1;    memset(&nladdr, 0, sizeof(nladdr));    nladdr.nl_family = AF_NETLINK;    nladdr.nl_pid = getpid();    nladdr.nl_groups = 0xffffffff;    // 创建协议族为PF_NETLINK,类型为SOCK_DGRAM的socket,返回该socket套接字的文件描述符    if ((mSock = socket(PF_NETLINK,                        SOCK_DGRAM,NETLINK_KOBJECT_UEVENT)) < 0) {        SLOGE("Unable to create uevent socket: %s", strerror(errno));        return -1;    }    if (setsockopt(mSock, SOL_SOCKET, SO_RCVBUFFORCE, &sz, sizeof(sz)) < 0) {        SLOGE("Unable to set uevent socket SO_RECBUFFORCE option: %s", strerror(errno));        return -1;    }    if (setsockopt(mSock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on)) < 0) {        SLOGE("Unable to set uevent socket SO_PASSCRED option: %s", strerror(errno));        return -1;    }    if (bind(mSock, (struct sockaddr *) &nladdr, sizeof(nladdr)) < 0) {        SLOGE("Unable to bind uevent socket: %s", strerror(errno));        return -1;    }	    mHandler = new NetlinkHandler(mSock);    if (mHandler->start()) {        SLOGE("Unable to start NetlinkHandler: %s", strerror(errno));        return -1;    }    return 0;}


Step.18~26 NetlinkHandler的继承关系,NetlinkHandler→NetlinkListener→SocketListener。依次创建NetlinkHandler、NetlinkListener、SocketListener的实例
NetlinkHandler的构造函数
NetlinkHandler::NetlinkHandler(int listenerSocket) :                NetlinkListener(listenerSocket) {}


NetlinkListener的构造函数
NetlinkListener::NetlinkListener(int socket) :                            SocketListener(socket, false) {    mFormat = NETLINK_FORMAT_ASCII;}


SocketListener的构造函数
SocketListener::SocketListener(int socketFd, bool listen) {    mListen = listen;    mSocketName = NULL;    mSock = socketFd;    pthread_mutex_init(&mClientsLock, NULL);    mClients = new SocketClientCollection();}

最后实例化了一个SocketClientCollection。

Step.27 接着看看start()方法都干了什么。
NetlinkHandler.start
int NetlinkHandler::start() {	// 调用父类startListener()方法    return this->startListener();}


Step.28 SocketListener的startListener()方法
int SocketListener::startListener() {	    if (!mSocketName && mSock == -1) {        SLOGE("Failed to start unbound listener");        errno = EINVAL;        return -1;    } else if (mSocketName) {        if ((mSock = android_get_control_socket(mSocketName)) < 0) {            SLOGE("Obtaining file descriptor socket '%s' failed: %s",                 mSocketName, strerror(errno));            return -1;        }    }    // 将mSock套接字(NetlinkManager::start()中创建的)变为被连接套接口,    // 使得一个进程可以接受其它进程的请求,从而成为一个服务器进程    if (mListen && listen(mSock, 4) < 0) {        SLOGE("Unable to listen on socket (%s)", strerror(errno));        return -1;    } else if (!mListen)    	// 创建SocketClient实例,并添加到前面构造函数中实例化的SocketClientCollection中        mClients->push_back(new SocketClient(mSock, false));    // 建立管道,得到管道的读取端和写入端    if (pipe(mCtrlPipe)) {        SLOGE("pipe failed (%s)", strerror(errno));        return -1;    }    // 启动线程,指定线程的开始执行的函数threadStart,并进入该方法,继续分析    if (pthread_create(&mThread, NULL, SocketListener::threadStart, this)) {        SLOGE("pthread_create (%s)", strerror(errno));        return -1;    }    return 0;}


Step.32 SocketListener的threadStart方法
void *SocketListener::threadStart(void *obj) {    SocketListener *me = reinterpret_cast<SocketListener *>(obj);    // 继续调用自己的runListener()方法,    // 马上进入最最重要的部分    me->runListener();    pthread_exit(NULL);    return NULL;}



Strp.33 runListener()方法
void SocketListener::runListener() {	// 创建一个SocketClientCollection,用来保存待处理的SocketClient,其实就是接受到Kernel层消息的SocketClient。    SocketClientCollection *pendingList = new SocketClientCollection();    // 开始while循环,时刻监听kernel的消息    while(1) {        SocketClientCollection::iterator iterator;        fd_set read_fds;        int rc = 0;        int max = -1;        // 一连串对套接字的更新,重置,检查等处理        FD_ZERO(&read_fds);        if (mListen) {            max = mSock;            FD_SET(mSock, &read_fds);        }        FD_SET(mCtrlPipe[0], &read_fds);        if (mCtrlPipe[0] > max)            max = mCtrlPipe[0];        pthread_mutex_lock(&mClientsLock);        for (it = mClients->begin(); it != mClients->end(); ++it) {            int fd = (*it)->getSocket();            FD_SET(fd, &read_fds);            if (fd > max)                max = fd;        }        pthread_mutex_unlock(&mClientsLock);        if ((rc = select(max + 1, &read_fds, NULL, NULL, NULL)) < 0) {            if (errno == EINTR)                continue;            SLOGE("select failed (%s)", strerror(errno));            sleep(1);            continue;        } else if (!rc)            continue;        if (FD_ISSET(mCtrlPipe[0], &read_fds))            break;        if (mListen && FD_ISSET(mSock, &read_fds)) {            struct sockaddr addr;            socklen_t alen;            int c;            do {                alen = sizeof(addr);                c = accept(mSock, &addr, &alen);            } while (c < 0 && errno == EINTR);            if (c < 0) {                SLOGE("accept failed (%s)", strerror(errno));                sleep(1);                continue;            }            pthread_mutex_lock(&mClientsLock);            mClients->push_back(new SocketClient(c, true));            pthread_mutex_unlock(&mClientsLock);        }        /* Add all active clients to the pending list first */        pendingList->clear();        pthread_mutex_lock(&mClientsLock);        for (it = mClients->begin(); it != mClients->end(); ++it) {            int fd = (*it)->getSocket();            if (FD_ISSET(fd, &read_fds)) {            	// 如果有消息,将当前SocketClient添加到pendingList中                pendingList->push_back(*it);            }        }        pthread_mutex_unlock(&mClientsLock);        /* Process the pending list, since it is owned by the thread,         * there is no need to lock it */        // 遍历pendingList,判断是否不为空。        // 不为空表示,有kernel层发送过来的消息需要处理        // 调用onDataAvailable处理消息        while (!pendingList->empty()) {            /* Pop the first item from the list */            it = pendingList->begin();            SocketClient* c = *it;            pendingList->erase(it);            /* Process it, if false is returned and our sockets are             * connection-based, remove and destroy it */            if (!onDataAvailable(c) && mListen) {                /* Remove the client from our array */                pthread_mutex_lock(&mClientsLock);                for (it = mClients->begin(); it != mClients->end(); ++it) {                    if (*it == c) {                        mClients->erase(it);                        break;                    }                }                pthread_mutex_unlock(&mClientsLock);                /* Remove our reference to the client */                c->decRef();            }        }    }    delete pendingList;}


Step.34 取到Kernel的消息后,接下来处理该消息
NetlinkListener的onDataAvailable方法
bool NetlinkListener::onDataAvailable(SocketClient *cli){    int socket = cli->getSocket();    ssize_t count;    count = TEMP_FAILURE_RETRY(uevent_kernel_multicast_recv(socket, mBuffer, sizeof(mBuffer)));    if (count < 0) {        SLOGE("recvmsg failed (%s)", strerror(errno));        return false;    }    NetlinkEvent *evt = new NetlinkEvent();    if (!evt->decode(mBuffer, count, mFormat)) {        SLOGE("Error decoding NetlinkEvent");    } else {    	// 开始处理从kernel层接受到的消息        onEvent(evt);    }    delete evt;    return true;}


Step.35 NetlinkHandler的onEvent方法,将消息解析后,传给ValumeManager来处理
void NetlinkHandler::onEvent(NetlinkEvent *evt) {    VolumeManager *vm = VolumeManager::Instance();    const char *subsys = evt->getSubsystem();    if (!subsys) {        SLOGW("No subsystem found in netlink event");        return;    }    if (!strcmp(subsys, "block")) {        vm->handleBlockEvent(evt);    }}


Step.36~41
消息经过各种Check后,将消息发到vold Socket中.
----------------------------void VolumeManager::handleBlockEvent(NetlinkEvent *evt) {    const char *devpath = evt->findParam("DEVPATH");    /* Lookup a volume to handle this device */    VolumeCollection::iterator it;    bool hit = false;    for (it = mVolumes->begin(); it != mVolumes->end(); ++it) {        if (!(*it)->handleBlockEvent(evt)) {#ifdef NETLINK_DEBUG            SLOGD("Device '%s' event handled by volume %s\n", devpath, (*it)->getLabel());#endif            hit = true;            break;        }    }    if (!hit) {#ifdef NETLINK_DEBUG        SLOGW("No volumes handled block event for '%s'", devpath);#endif    }}--------------------------------------------------------int DirectVolume::handleBlockEvent(NetlinkEvent *evt) {    const char *dp = evt->findParam("DEVPATH");    PathCollection::iterator  it;    for (it = mPaths->begin(); it != mPaths->end(); ++it) {        if (!strncmp(dp, *it, strlen(*it))) {            /* We can handle this disk */            int action = evt->getAction();            const char *devtype = evt->findParam("DEVTYPE");            if (action == NetlinkEvent::NlActionAdd) {                int major = atoi(evt->findParam("MAJOR"));                int minor = atoi(evt->findParam("MINOR"));                char nodepath[255];                snprintf(nodepath,                         sizeof(nodepath), "/dev/block/vold/%d:%d",                         major, minor);                if (createDeviceNode(nodepath, major, minor)) {                    SLOGE("Error making device node '%s' (%s)", nodepath,                                                               strerror(errno));                }                if (!strcmp(devtype, "disk")) {                    handleDiskAdded(dp, evt);                } else {                    handlePartitionAdded(dp, evt);                }            } else if (action == NetlinkEvent::NlActionRemove) {                if (!strcmp(devtype, "disk")) {                    handleDiskRemoved(dp, evt);                } else {                    handlePartitionRemoved(dp, evt);                }            } else if (action == NetlinkEvent::NlActionChange) {                if (!strcmp(devtype, "disk")) {                    handleDiskChanged(dp, evt);                } else {                    handlePartitionChanged(dp, evt);                }            } else {                    SLOGW("Ignoring non add/remove/change event");            }            return 0;        }    }    errno = ENODEV;    return -1;}--------------------------------------------------------void DirectVolume::handleDiskAdded(const char *devpath, NetlinkEvent *evt) {    mDiskMajor = atoi(evt->findParam("MAJOR"));    mDiskMinor = atoi(evt->findParam("MINOR"));    const char *tmp = evt->findParam("NPARTS");    if (tmp) {        mDiskNumParts = atoi(tmp);    } else {        SLOGW("Kernel block uevent missing 'NPARTS'");        mDiskNumParts = 1;    }// FUJITSU TEN:2014-01-14 #55703 start    snprintf(mDevPath, 1024, "/sys/%s", evt->findParam("DEVPATH"));// FUJITSU TEN:2014-01-14 #55703 end    char msg[255];    int partmask = 0;    int i;    for (i = 1; i <= mDiskNumParts; i++) {        partmask |= (1 << i);    }    mPendingPartMap = partmask;    if (mDiskNumParts == 0) {#ifdef PARTITION_DEBUG        SLOGD("Dv::diskIns - No partitions - good to go son!");#endif        setState(Volume::State_Idle);    } else {#ifdef PARTITION_DEBUG        SLOGD("Dv::diskIns - waiting for %d partitions (mask 0x%x)",             mDiskNumParts, mPendingPartMap);#endif        setState(Volume::State_Pending);    }    snprintf(msg, sizeof(msg), "Volume %s %s disk inserted (%d:%d)",             getLabel(), getMountpoint(), mDiskMajor, mDiskMinor);    mVm->getBroadcaster()->sendBroadcast(ResponseCode::VolumeDiskInserted,                                             msg, false);}--------------------------------------------------------void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) {    pthread_mutex_lock(&mClientsLock);    SocketClientCollection::iterator i;    for (i = mClients->begin(); i != mClients->end(); ++i) {        if ((*i)->sendMsg(code, msg, addErrno)) {            SLOGW("Error sending broadcast (%s)", strerror(errno));        }    }    pthread_mutex_unlock(&mClientsLock);}--------------------------------------------------------int SocketClient::sendMsg(int code, const char *msg, bool addErrno) {    char *buf;    const char* arg;    const char* fmt;    char tmp[1];    int  len;    if (addErrno) {        fmt = "%.3d %s (%s)";        arg = strerror(errno);    } else {        fmt = "%.3d %s";        arg = NULL;    }    /* Measure length of required buffer */    len = snprintf(tmp, sizeof tmp, fmt, code, msg, arg);    /* Allocate in the stack, then write to it */    buf = (char*)alloca(len+1);    snprintf(buf, len+1, fmt, code, msg, arg);    /* Send the zero-terminated message */    return sendMsg(buf);}int SocketClient::sendMsg(const char *msg) {    if (mSocket < 0) {        errno = EHOSTUNREACH;        return -1;    }    // Send the message including null character    if (sendData(msg, strlen(msg) + 1) != 0) {        SLOGW("Unable to send msg '%s'", msg);        return -1;    }    return 0;}int SocketClient::sendData(const void* data, int len) {    int rc = 0;    const char *p = (const char*) data;    int brtw = len;    if (len == 0) {        return 0;    }    pthread_mutex_lock(&mWriteMutex);    while (brtw > 0) {        rc = write(mSocket, p, brtw);        if (rc > 0) {            p += rc;            brtw -= rc;            continue;        }        if (rc < 0 && errno == EINTR)            continue;        pthread_mutex_unlock(&mWriteMutex);        if (rc == 0) {            SLOGW("0 length write :(");            errno = EIO;        } else {            SLOGW("write error (%s)", strerror(errno));        }        return -1;    }    pthread_mutex_unlock(&mWriteMutex);    return 0;}----------------------------


Library层接受到Kernel层消息,到发送到Application Framework层的处理就到这里了。
后面是Application Framework层如何获取到这个消息

MountService实例化时,创建了一个用来监听vold Socket的Connector
public MountService(Context context) {    ......    /*     * Create the connection to vold with a maximum queue of twice the     * amount of containers we'd ever expect to have. This keeps an     * "asec list" from blocking a thread repeatedly.     */    // 创建用来监听vold Socket的Connector    mConnector = new NativeDaemonConnector(this, "vold", MAX_CONTAINERS * 2, VOLD_TAG);    mReady = false;    // 启动mConnector,NativeDaemonConnector的run方法中调用了listenToSocket方法,    // 开始监听这个vold Socket    Thread thread = new Thread(mConnector, VOLD_TAG);    thread.start();	......}


NativeDaemonConnector的run方法中调用了listenToSocket方法,开始监听vold Socket
private void listenToSocket() throws IOException {    LocalSocket socket = null;    try {        ......        while (true) {            int count = inputStream.read(buffer, start, BUFFER_SIZE - start);            if (count < 0) break;            // Add our starting point to the count and reset the start.            count += start;            start = 0;            for (int i = 0; i < count; i++) {                if (buffer[i] == 0) {                    String event = new String(buffer, start, i - start);                    if (LOCAL_LOGD) Slog.d(TAG, String.format("RCV <- {%s}", event));                    String[] tokens = event.split(" ", 2);                    try {                        int code = Integer.parseInt(tokens[0]);                        if (code >= ResponseCode.UnsolicitedInformational) {                        	// 发送消息到Handler中                            mCallbackHandler.sendMessage(                                    mCallbackHandler.obtainMessage(code, event));                        } else {                            try {                                mResponseQueue.put(event);                            } catch (InterruptedException ex) {                                Slog.e(TAG, "Failed to put response onto queue", ex);                            }                        }                    } catch (NumberFormatException nfe) {                        Slog.w(TAG, String.format("Bad msg (%s)", event));                    }                    start = i + 1;                }            }            ......        }    }     ......}


public boolean handleMessage(Message msg) {    String event = (String) msg.obj;    try {        if (!mCallbacks.onEvent(msg.what, event, event.split(" "))) {            Slog.w(TAG, String.format(                    "Unhandled event '%s'", event));        }    } catch (Exception e) {        Slog.e(TAG, String.format(                "Error handling '%s'", event), e);    }    return true;}


mCallbacks就是NativeDaemonConnector实例化时传递进来的,
它其实就是MountService。所以回到MountService的onEvent方法
/** * Callback from NativeDaemonConnector */public boolean onEvent(int code, String raw, String[] cooked) {    ......    if (code == VoldResponseCode.VolumeStateChange) {       ......    } else if ((code == VoldResponseCode.VolumeDiskInserted) ||               (code == VoldResponseCode.VolumeDiskRemoved) ||               (code == VoldResponseCode.VolumeBadRemoval)) {        ......    } else {        return false;    }    return true;}


OK!!! 这里就开始分发处理各种消息了。

明天开始FrameWork层向下发送消息处理流程
  相关解决方案