学习罗老师,先上一张Kernel层向上发送消息处理流程的序列图,下面一点一点分析。
Step.17以前都在上一篇分析文章里,不在说明了。从Step.17开始分析。
Step.17 在main方法中,nm->start()方法里,开启Socket,监听Kernel层向上发送的消息
int NetlinkManager::start() { struct sockaddr_nl nladdr; int sz = 64 * 1024; int on = 1; memset(&nladdr, 0, sizeof(nladdr)); nladdr.nl_family = AF_NETLINK; nladdr.nl_pid = getpid(); nladdr.nl_groups = 0xffffffff; // 创建协议族为PF_NETLINK,类型为SOCK_DGRAM的socket,返回该socket套接字的文件描述符 if ((mSock = socket(PF_NETLINK, SOCK_DGRAM,NETLINK_KOBJECT_UEVENT)) < 0) { SLOGE("Unable to create uevent socket: %s", strerror(errno)); return -1; } if (setsockopt(mSock, SOL_SOCKET, SO_RCVBUFFORCE, &sz, sizeof(sz)) < 0) { SLOGE("Unable to set uevent socket SO_RECBUFFORCE option: %s", strerror(errno)); return -1; } if (setsockopt(mSock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on)) < 0) { SLOGE("Unable to set uevent socket SO_PASSCRED option: %s", strerror(errno)); return -1; } if (bind(mSock, (struct sockaddr *) &nladdr, sizeof(nladdr)) < 0) { SLOGE("Unable to bind uevent socket: %s", strerror(errno)); return -1; } mHandler = new NetlinkHandler(mSock); if (mHandler->start()) { SLOGE("Unable to start NetlinkHandler: %s", strerror(errno)); return -1; } return 0;}
Step.18~26 NetlinkHandler的继承关系,NetlinkHandler→NetlinkListener→SocketListener。依次创建NetlinkHandler、NetlinkListener、SocketListener的实例
NetlinkHandler的构造函数
NetlinkHandler::NetlinkHandler(int listenerSocket) : NetlinkListener(listenerSocket) {}
NetlinkListener的构造函数
NetlinkListener::NetlinkListener(int socket) : SocketListener(socket, false) { mFormat = NETLINK_FORMAT_ASCII;}
SocketListener的构造函数
SocketListener::SocketListener(int socketFd, bool listen) { mListen = listen; mSocketName = NULL; mSock = socketFd; pthread_mutex_init(&mClientsLock, NULL); mClients = new SocketClientCollection();}
最后实例化了一个SocketClientCollection。
Step.27 接着看看start()方法都干了什么。
NetlinkHandler.start
int NetlinkHandler::start() { // 调用父类startListener()方法 return this->startListener();}
Step.28 SocketListener的startListener()方法
int SocketListener::startListener() { if (!mSocketName && mSock == -1) { SLOGE("Failed to start unbound listener"); errno = EINVAL; return -1; } else if (mSocketName) { if ((mSock = android_get_control_socket(mSocketName)) < 0) { SLOGE("Obtaining file descriptor socket '%s' failed: %s", mSocketName, strerror(errno)); return -1; } } // 将mSock套接字(NetlinkManager::start()中创建的)变为被连接套接口, // 使得一个进程可以接受其它进程的请求,从而成为一个服务器进程 if (mListen && listen(mSock, 4) < 0) { SLOGE("Unable to listen on socket (%s)", strerror(errno)); return -1; } else if (!mListen) // 创建SocketClient实例,并添加到前面构造函数中实例化的SocketClientCollection中 mClients->push_back(new SocketClient(mSock, false)); // 建立管道,得到管道的读取端和写入端 if (pipe(mCtrlPipe)) { SLOGE("pipe failed (%s)", strerror(errno)); return -1; } // 启动线程,指定线程的开始执行的函数threadStart,并进入该方法,继续分析 if (pthread_create(&mThread, NULL, SocketListener::threadStart, this)) { SLOGE("pthread_create (%s)", strerror(errno)); return -1; } return 0;}
Step.32 SocketListener的threadStart方法
void *SocketListener::threadStart(void *obj) { SocketListener *me = reinterpret_cast<SocketListener *>(obj); // 继续调用自己的runListener()方法, // 马上进入最最重要的部分 me->runListener(); pthread_exit(NULL); return NULL;}
Strp.33 runListener()方法
void SocketListener::runListener() { // 创建一个SocketClientCollection,用来保存待处理的SocketClient,其实就是接受到Kernel层消息的SocketClient。 SocketClientCollection *pendingList = new SocketClientCollection(); // 开始while循环,时刻监听kernel的消息 while(1) { SocketClientCollection::iterator iterator; fd_set read_fds; int rc = 0; int max = -1; // 一连串对套接字的更新,重置,检查等处理 FD_ZERO(&read_fds); if (mListen) { max = mSock; FD_SET(mSock, &read_fds); } FD_SET(mCtrlPipe[0], &read_fds); if (mCtrlPipe[0] > max) max = mCtrlPipe[0]; pthread_mutex_lock(&mClientsLock); for (it = mClients->begin(); it != mClients->end(); ++it) { int fd = (*it)->getSocket(); FD_SET(fd, &read_fds); if (fd > max) max = fd; } pthread_mutex_unlock(&mClientsLock); if ((rc = select(max + 1, &read_fds, NULL, NULL, NULL)) < 0) { if (errno == EINTR) continue; SLOGE("select failed (%s)", strerror(errno)); sleep(1); continue; } else if (!rc) continue; if (FD_ISSET(mCtrlPipe[0], &read_fds)) break; if (mListen && FD_ISSET(mSock, &read_fds)) { struct sockaddr addr; socklen_t alen; int c; do { alen = sizeof(addr); c = accept(mSock, &addr, &alen); } while (c < 0 && errno == EINTR); if (c < 0) { SLOGE("accept failed (%s)", strerror(errno)); sleep(1); continue; } pthread_mutex_lock(&mClientsLock); mClients->push_back(new SocketClient(c, true)); pthread_mutex_unlock(&mClientsLock); } /* Add all active clients to the pending list first */ pendingList->clear(); pthread_mutex_lock(&mClientsLock); for (it = mClients->begin(); it != mClients->end(); ++it) { int fd = (*it)->getSocket(); if (FD_ISSET(fd, &read_fds)) { // 如果有消息,将当前SocketClient添加到pendingList中 pendingList->push_back(*it); } } pthread_mutex_unlock(&mClientsLock); /* Process the pending list, since it is owned by the thread, * there is no need to lock it */ // 遍历pendingList,判断是否不为空。 // 不为空表示,有kernel层发送过来的消息需要处理 // 调用onDataAvailable处理消息 while (!pendingList->empty()) { /* Pop the first item from the list */ it = pendingList->begin(); SocketClient* c = *it; pendingList->erase(it); /* Process it, if false is returned and our sockets are * connection-based, remove and destroy it */ if (!onDataAvailable(c) && mListen) { /* Remove the client from our array */ pthread_mutex_lock(&mClientsLock); for (it = mClients->begin(); it != mClients->end(); ++it) { if (*it == c) { mClients->erase(it); break; } } pthread_mutex_unlock(&mClientsLock); /* Remove our reference to the client */ c->decRef(); } } } delete pendingList;}
Step.34 取到Kernel的消息后,接下来处理该消息
NetlinkListener的onDataAvailable方法
bool NetlinkListener::onDataAvailable(SocketClient *cli){ int socket = cli->getSocket(); ssize_t count; count = TEMP_FAILURE_RETRY(uevent_kernel_multicast_recv(socket, mBuffer, sizeof(mBuffer))); if (count < 0) { SLOGE("recvmsg failed (%s)", strerror(errno)); return false; } NetlinkEvent *evt = new NetlinkEvent(); if (!evt->decode(mBuffer, count, mFormat)) { SLOGE("Error decoding NetlinkEvent"); } else { // 开始处理从kernel层接受到的消息 onEvent(evt); } delete evt; return true;}
Step.35 NetlinkHandler的onEvent方法,将消息解析后,传给ValumeManager来处理
void NetlinkHandler::onEvent(NetlinkEvent *evt) { VolumeManager *vm = VolumeManager::Instance(); const char *subsys = evt->getSubsystem(); if (!subsys) { SLOGW("No subsystem found in netlink event"); return; } if (!strcmp(subsys, "block")) { vm->handleBlockEvent(evt); }}
Step.36~41
消息经过各种Check后,将消息发到vold Socket中.
----------------------------void VolumeManager::handleBlockEvent(NetlinkEvent *evt) { const char *devpath = evt->findParam("DEVPATH"); /* Lookup a volume to handle this device */ VolumeCollection::iterator it; bool hit = false; for (it = mVolumes->begin(); it != mVolumes->end(); ++it) { if (!(*it)->handleBlockEvent(evt)) {#ifdef NETLINK_DEBUG SLOGD("Device '%s' event handled by volume %s\n", devpath, (*it)->getLabel());#endif hit = true; break; } } if (!hit) {#ifdef NETLINK_DEBUG SLOGW("No volumes handled block event for '%s'", devpath);#endif }}--------------------------------------------------------int DirectVolume::handleBlockEvent(NetlinkEvent *evt) { const char *dp = evt->findParam("DEVPATH"); PathCollection::iterator it; for (it = mPaths->begin(); it != mPaths->end(); ++it) { if (!strncmp(dp, *it, strlen(*it))) { /* We can handle this disk */ int action = evt->getAction(); const char *devtype = evt->findParam("DEVTYPE"); if (action == NetlinkEvent::NlActionAdd) { int major = atoi(evt->findParam("MAJOR")); int minor = atoi(evt->findParam("MINOR")); char nodepath[255]; snprintf(nodepath, sizeof(nodepath), "/dev/block/vold/%d:%d", major, minor); if (createDeviceNode(nodepath, major, minor)) { SLOGE("Error making device node '%s' (%s)", nodepath, strerror(errno)); } if (!strcmp(devtype, "disk")) { handleDiskAdded(dp, evt); } else { handlePartitionAdded(dp, evt); } } else if (action == NetlinkEvent::NlActionRemove) { if (!strcmp(devtype, "disk")) { handleDiskRemoved(dp, evt); } else { handlePartitionRemoved(dp, evt); } } else if (action == NetlinkEvent::NlActionChange) { if (!strcmp(devtype, "disk")) { handleDiskChanged(dp, evt); } else { handlePartitionChanged(dp, evt); } } else { SLOGW("Ignoring non add/remove/change event"); } return 0; } } errno = ENODEV; return -1;}--------------------------------------------------------void DirectVolume::handleDiskAdded(const char *devpath, NetlinkEvent *evt) { mDiskMajor = atoi(evt->findParam("MAJOR")); mDiskMinor = atoi(evt->findParam("MINOR")); const char *tmp = evt->findParam("NPARTS"); if (tmp) { mDiskNumParts = atoi(tmp); } else { SLOGW("Kernel block uevent missing 'NPARTS'"); mDiskNumParts = 1; }// FUJITSU TEN:2014-01-14 #55703 start snprintf(mDevPath, 1024, "/sys/%s", evt->findParam("DEVPATH"));// FUJITSU TEN:2014-01-14 #55703 end char msg[255]; int partmask = 0; int i; for (i = 1; i <= mDiskNumParts; i++) { partmask |= (1 << i); } mPendingPartMap = partmask; if (mDiskNumParts == 0) {#ifdef PARTITION_DEBUG SLOGD("Dv::diskIns - No partitions - good to go son!");#endif setState(Volume::State_Idle); } else {#ifdef PARTITION_DEBUG SLOGD("Dv::diskIns - waiting for %d partitions (mask 0x%x)", mDiskNumParts, mPendingPartMap);#endif setState(Volume::State_Pending); } snprintf(msg, sizeof(msg), "Volume %s %s disk inserted (%d:%d)", getLabel(), getMountpoint(), mDiskMajor, mDiskMinor); mVm->getBroadcaster()->sendBroadcast(ResponseCode::VolumeDiskInserted, msg, false);}--------------------------------------------------------void SocketListener::sendBroadcast(int code, const char *msg, bool addErrno) { pthread_mutex_lock(&mClientsLock); SocketClientCollection::iterator i; for (i = mClients->begin(); i != mClients->end(); ++i) { if ((*i)->sendMsg(code, msg, addErrno)) { SLOGW("Error sending broadcast (%s)", strerror(errno)); } } pthread_mutex_unlock(&mClientsLock);}--------------------------------------------------------int SocketClient::sendMsg(int code, const char *msg, bool addErrno) { char *buf; const char* arg; const char* fmt; char tmp[1]; int len; if (addErrno) { fmt = "%.3d %s (%s)"; arg = strerror(errno); } else { fmt = "%.3d %s"; arg = NULL; } /* Measure length of required buffer */ len = snprintf(tmp, sizeof tmp, fmt, code, msg, arg); /* Allocate in the stack, then write to it */ buf = (char*)alloca(len+1); snprintf(buf, len+1, fmt, code, msg, arg); /* Send the zero-terminated message */ return sendMsg(buf);}int SocketClient::sendMsg(const char *msg) { if (mSocket < 0) { errno = EHOSTUNREACH; return -1; } // Send the message including null character if (sendData(msg, strlen(msg) + 1) != 0) { SLOGW("Unable to send msg '%s'", msg); return -1; } return 0;}int SocketClient::sendData(const void* data, int len) { int rc = 0; const char *p = (const char*) data; int brtw = len; if (len == 0) { return 0; } pthread_mutex_lock(&mWriteMutex); while (brtw > 0) { rc = write(mSocket, p, brtw); if (rc > 0) { p += rc; brtw -= rc; continue; } if (rc < 0 && errno == EINTR) continue; pthread_mutex_unlock(&mWriteMutex); if (rc == 0) { SLOGW("0 length write :("); errno = EIO; } else { SLOGW("write error (%s)", strerror(errno)); } return -1; } pthread_mutex_unlock(&mWriteMutex); return 0;}----------------------------
Library层接受到Kernel层消息,到发送到Application Framework层的处理就到这里了。
后面是Application Framework层如何获取到这个消息
MountService实例化时,创建了一个用来监听vold Socket的Connector
public MountService(Context context) { ...... /* * Create the connection to vold with a maximum queue of twice the * amount of containers we'd ever expect to have. This keeps an * "asec list" from blocking a thread repeatedly. */ // 创建用来监听vold Socket的Connector mConnector = new NativeDaemonConnector(this, "vold", MAX_CONTAINERS * 2, VOLD_TAG); mReady = false; // 启动mConnector,NativeDaemonConnector的run方法中调用了listenToSocket方法, // 开始监听这个vold Socket Thread thread = new Thread(mConnector, VOLD_TAG); thread.start(); ......}
NativeDaemonConnector的run方法中调用了listenToSocket方法,开始监听vold Socket
private void listenToSocket() throws IOException { LocalSocket socket = null; try { ...... while (true) { int count = inputStream.read(buffer, start, BUFFER_SIZE - start); if (count < 0) break; // Add our starting point to the count and reset the start. count += start; start = 0; for (int i = 0; i < count; i++) { if (buffer[i] == 0) { String event = new String(buffer, start, i - start); if (LOCAL_LOGD) Slog.d(TAG, String.format("RCV <- {%s}", event)); String[] tokens = event.split(" ", 2); try { int code = Integer.parseInt(tokens[0]); if (code >= ResponseCode.UnsolicitedInformational) { // 发送消息到Handler中 mCallbackHandler.sendMessage( mCallbackHandler.obtainMessage(code, event)); } else { try { mResponseQueue.put(event); } catch (InterruptedException ex) { Slog.e(TAG, "Failed to put response onto queue", ex); } } } catch (NumberFormatException nfe) { Slog.w(TAG, String.format("Bad msg (%s)", event)); } start = i + 1; } } ...... } } ......}
public boolean handleMessage(Message msg) { String event = (String) msg.obj; try { if (!mCallbacks.onEvent(msg.what, event, event.split(" "))) { Slog.w(TAG, String.format( "Unhandled event '%s'", event)); } } catch (Exception e) { Slog.e(TAG, String.format( "Error handling '%s'", event), e); } return true;}
mCallbacks就是NativeDaemonConnector实例化时传递进来的,
它其实就是MountService。所以回到MountService的onEvent方法
/** * Callback from NativeDaemonConnector */public boolean onEvent(int code, String raw, String[] cooked) { ...... if (code == VoldResponseCode.VolumeStateChange) { ...... } else if ((code == VoldResponseCode.VolumeDiskInserted) || (code == VoldResponseCode.VolumeDiskRemoved) || (code == VoldResponseCode.VolumeBadRemoval)) { ...... } else { return false; } return true;}
OK!!! 这里就开始分发处理各种消息了。
明天开始FrameWork层向下发送消息处理流程