Linux 报文队列
- Linux 报文队列
- 一IPC 概述
- 二报文队列
- 1 报文队列简述
- 2 代码分析
- 21 msgget 创建报文队列
- 22 msgsnd 报文发送
- 221 相关数据结构
- 222 sys_msgsnd源码分析
- 3 msgrcv 报文接收
- 4 msgctl 报文控制与设置
一、IPC 概述
早期的Unix系统进程件通信机制主要有管道和信号。管道开始只能在近亲之间通信,于是将pipe推广到VFS层面,形成了FIFO。但有两个显著的缺点:信号能传递的信息太少。而管道只能传递无格式的字节流。于是,为了应对OS的日益发展,IPC新的机制出现,包括了:
- 报文传递
- 共享内存
- 进程同步
- 信号量
- 互斥信号量
- rendezvous
以上三种IPC机制,称之为“system V IPC”Linux为 system V IPC提供了一个统一的系统调用ipc,其接口为:
int ipc(unsigned int call, int first, int second, int third, void *ptr, int forth);
其中第一个参数call为操作码,定义如下:
[include/asm-i386/ipc.h]
13 /* 14 *SEM为信号量设置的 15 *MSG为报文传递设置的 16 *SHM为共享内存设置的 17 * */ 18 #define SEMOP 1 19 #define SEMGET 2 20 #define SEMCTL 3 21 #define MSGSND 11 22 #define MSGRCV 12 23 #define MSGGET 13 24 #define MSGCTL 14 25 #define SHMAT 21 26 #define SHMDT 22 27 #define SHMGET 23 28 #define SHMCTL 24
C语言库函数为ipc提供了semget()、msgget()、msgsnd()等库函数,这些库函数最终都落实到统一的系统调用ipc()当中去。
二、报文队列
1.1 报文队列简述
进程可以调用库函数msgget()创建一个报文队列,实际上就是通过操作码为MSGGET的ipc系统调用建立报文队列。报文队列通过一个键值key来标示的,而非文件名。一旦报文队列建立之后,进程就可以用相同的键值通过msgget()来取得报文队列的访问,而发送报文的进程也可以通过msgsnd()发送报文到指定的队列中,接收进程则通过msgrcv()来取得报文。另外进程可以通msgctl()对报文队列进行额外的控制。
1.2 代码分析
1.2.1 msgget() 创建报文队列
库函数long sys_msgget(key_t key, int msgflag)
实际上是用MSGGET操作码调用ipc系统调用,该函数有两个作用:
+ 当msgflag的IPC_CREATE位置位时,就利用key创建一个报文队列。
+ 当msgflag的IPC_CREATE位清零时,就了用key查找一个报文队列。
sys_msgget的代码如下:
[ipc/msg.c]
306 asmlinkage long sys_msgget (key_t key, int msgflg)307 {308 int id, ret = -EPERM;309 struct msg_queue *msq;310 311 down(&msg_ids.sem);312 /*每个进程都可以建立一个私用的报文队列,其键值为IPC_PRAVETE.313 * 该报文队列只能用于进程自己收发*/314 if (key == IPC_PRIVATE)315 /*建立一个新的报文队列*/316 ret = newque(key, msgflg);317 else if ((id = ipc_findkey(&msg_ids, key)) == -1) { /* key not used */318 /*如果键值对应的报文队列已经建立,则创建失败*/319 if (!(msgflg & IPC_CREAT))320 ret = -ENOENT;321 else322 /*否则创建一个报文队列*/323 ret = newque(key, msgflg);324 } else if (msgflg & IPC_CREAT && msgflg & IPC_EXCL) {325 ret = -EEXIST;326 } else {327 /*如果查找到报文队列,且msgflag表明为查找报文队列,则返回队列id*/328 msq = msg_lock(id);329 if(msq==NULL)330 BUG();331 if (ipcperms(&msq->q_perm, msgflg))332 ret = -EACCES;333 else334 ret = msg_buildid(id, msq->q_perm.seq);335 msg_unlock(id);336 }337 up(&msg_ids.sem);338 return ret;339 }
IPC_PRIVATE作为私有报文队列可以无条件创建,这意味着key可以不唯一。如果不是IPC_PRIVATE则必须保证key的唯一性。首先查找key的报文队列:
- 如果已经存在且msgflag表明要创建报文队列,则创建失败。
- 如果已经存在,但表msgflag表明是获取队列,则返回队列id.
- 如果不存在,且msgflag表明要创建报文队列,则创建一个报文队列且返回id
(注:队列id和队列键值key是不同的概念,队列id类似于打开的文件描述符fd,而队列键值则类似于文件的路径名)
报文队列的创建由newqueue函数来完成。在读该函数之前,有必要了解几个数据结构及其关系。
内核中有一个全局数据结构msg_ids用于管理报文队列。该变量为struct ipc_ids类型的变量。定义于
[ipc/util.h]
15 struct ipc_ids { 16 int size; 17 int in_use; 18 int max_id; 19 unsigned short seq; 20 unsigned short seq_max; 21 struct semaphore sem; 22 spinlock_t ary; 23 struct ipc_id* entries; 24 }; 25 26 struct ipc_id { 27 struct kern_ipc_perm* p; 28 };
其中关键成员为entries,该成员指向一个struct ipc_id结构数组。struct ipc_id结构实际上是一个kern_ipc_perm *p,所以可以把entries实际上指向一个kern_ipc_perm数组。每个报文队列都具有一个报文队列头,用于管理该报文队列,也可以说每个报文队列头是一个报文队列对象。而kern_ipc_perm为报文队列头的第一个成员,用于记录该报文队列的用户id,组id,键值key等参数。报文队列头struct msg_queue定义于:
[msg/ipc.h]
68 struct msg_queue { 69 struct kern_ipc_perm q_perm; 70 time_t q_stime; /* last msgsnd time */ 71 time_t q_rtime; /* last msgrcv time */ 72 time_t q_ctime; /* last change time */ 73 unsigned long q_cbytes; /* current number of bytes on queue */ 74 unsigned long q_qnum; /* number of messages in queue */ 75 unsigned long q_qbytes; /* max number of bytes on queue */ 76 pid_t q_lspid; /* pid of last msgsnd */ 77 pid_t q_lrpid; /* last receive pid */ 78 79 struct list_head q_messages; 80 struct list_head q_receivers; 81 struct list_head q_senders; 82 };
现在梳理一下上面几个数据结构中的关系:
- msg_ids中有个entries成员指向了一个kern_ipc_perm类型的数组
- kern_ipc_perm结构记录了一个报文队列的重要参数
- kern_ipc_perm为报文队列头msg_queue的第一个成员,其起始地址与报文队列头一致。也就是说,只要找到了kern_ipc_perm就可以找到报文队列头,即报文队列。
现在看newque的代码。
[ipc/msg.c]
117 static int newque (key_t key, int msgflg)118 { 119 int id;120 /*报文队列头*/121 struct msg_queue *msq;122 123 msq = (struct msg_queue *) kmalloc (sizeof (*msq), GFP_KERNEL);124 if (!msq) 125 return -ENOMEM; 126 /*分配一个报文队列id,实际是上将msg_ids中的entry分配一个位置127 * 然后将该位置的指针指向msg->perm即可*/128 id = ipc_addid(&msg_ids, &msq->q_perm, msg_ctlmni);129 if(id == -1) {130 kfree(msq);131 return -ENOSPC;132 } 133 /*填充msg_queue中的各个成员*/134 msq->q_perm.mode = (msgflg & S_IRWXUGO);135 msq->q_perm.key = key;136 137 msq->q_stime = msq->q_rtime = 0;138 msq->q_ctime = CURRENT_TIME;139 msq->q_cbytes = msq->q_qnum = 0;140 msq->q_qbytes = msg_ctlmnb;141 msq->q_lspid = msq->q_lrpid = 0;142 INIT_LIST_HEAD(&msq->q_messages);143 INIT_LIST_HEAD(&msq->q_receivers);144 INIT_LIST_HEAD(&msq->q_senders);145 msg_unlock(id);146 147 /*根据msg_ids.entries数组的索引值生成一个id号并且返回*/148 return msg_buildid(id,msq->q_perm.seq);149 }
上述的代码中,报文队列对象分配出来后,要注册到msg_ids中,注册操作由函数ipc_addid完成。该函数的主要工作是在msg_ids->entries中查找一个空闲的位置,并且将struct kern_ipc_perm q_perm插入该文职,然后返回索引值id.最后根据索引之生成一个全局的id并且返回即可。可以看出来,报文队列的创建与打开,与进程的文件创建与打开类似。之后进程就可以使用返回的id从msg_ids中索引得到报文队列头,从而能堆报文队列执行发送,接受,控制等等操作了。
1.2.2 msgsnd() 报文发送
1.2.2.1 相关数据结构
调用msgget()创建了报文队列之后,就可以通过msgsnd向队列发送消息了。在读msgsnd的源码之前,有必要了解几个用到的数据结构。
用户空间的报文,报文头为struct msgbuf,其承载报文内容。struct msgbuf定义如下:
[include/linux/msg.h]
34 /* message buffer for msgsnd and msgrcv calls */ 35 struct msgbuf { 36 long mtype; /* type of message */ 37 char mtext[1]; /* message text */ 38 };
mtype标示了报文的类型。而mtext则为一个长度为1的数组,事实上mtext可以看作一个指针,指向了报文的起始地址。内核中使用的报文头为struct msg_msg结构,定义如下:
[ipc/msg.h]
56 struct msg_msg { 57 struct list_head m_list; 58 long m_type; 59 int m_ts; /* message text size */ 60 struct msg_msgseg* next; 61 /* the actual message follows immediately */ 62 };
内核中的报文头中并没有任何指针成员指向报文内容,而是直接将报文内容安排到了紧接着报文头的位置。有时候报文内容太多,体积过大,就将报文分成多个报文段存放,其中报文头中的struct msg_msgseg指针就用于指向各个报文段头。msg_msgseg定义如下:
51 struct msg_msgseg { 52 struct msg_msgseg* next; 53 /* the next part of the message follows immediately */ 54 };
报文段的存放也是将报文段内同安排到紧接着msg_msgseg的位置。报文头和报文段头的关系是这样的:
如果报文体积较小,那么直接安排到紧挨着报文头的位置即可。如果报文提及大于一个物理页的大小,就将其大于物理页的部分安排到一个新的页中,并且页的起始地址为一个msg_msgseg结构,也就是报文段头,报文内同则紧挨着msg_msgseg排列,然后msg_msg中的next指针指向了msg_msgseg结构。
1.2.2.2 sys_msgsnd源码分析
msgsnd的内核实现为sys_msgsnd,该函数的定义如下:
[ipc/msg.c]
645 asmlinkage long sys_msgsnd (int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg)646 {647 struct msg_queue *msq;648 struct msg_msg *msg;649 long mtype;650 int err;651 /*参数检查*/652 if (msgsz > msg_ctlmax || (long) msgsz < 0 || msqid < 0)653 return -EINVAL;654 if (get_user(mtype, &msgp->mtype))655 return -EFAULT;656 if (mtype < 1)657 return -EINVAL;658 /*将msgbuf总的报文内容从用户空间复制过来,填充到msg_msg结构体当中去*/659 msg = load_msg(msgp->mtext, msgsz);660 if(IS_ERR(msg))661 return PTR_ERR(msg);662 663 msg->m_type = mtype;664 msg->m_ts = msgsz;665 666 /*根据id找到报文队列并且上锁*/667 msq = msg_lock(msqid);668 err=-EINVAL;669 if(msq==NULL)670 goto out_free;671 /*参数检查结束*/672 retry:673 err= -EIDRM;674 /*检查id的正确性*/675 if (msg_checkid(msq,msqid))676 goto out_unlock_free;677 678 /*权限检查,不是每个进程都有资格发送报文的*/679 err=-EACCES;680 if (ipcperms(&msq->q_perm, S_IWUGO))681 goto out_unlock_free;682 683 /* msq->q_cbytes为当前队列大小,msq->q_qbytes为当前队列的容量684 * msq->q_qnum为报文当前个数,最大报文个数不能超过msq->q_qbytes685 * 这里是检查容量是否足以容下报文,并且检查报文个数是否超过最大686 * 限制687 * */688 if(msgsz + msq->q_cbytes > msq->q_qbytes ||689 1 + msq->q_qnum > msq->q_qbytes) {690 struct msg_sender s;691 692 /*如果报文不能送达,检查msgflag是否使用无阻塞访问693 * 如果使用无阻塞访问,则立即返回出错码694 * 如果没有使用无阻塞访问,则进入睡眠状态*/695 if(msgflg&IPC_NOWAIT) {696 err=-EAGAIN;697 goto out_unlock_free;698 }699 700 /*将发送进程加入msq->sender链表中,这样就可以在接受者701 * 接收完毕之后,报文队列有了新的空间,接受这就唤醒702 * sender队列上的进程*/703 ss_add(msq, &s);704 /*解锁报文队列*/705 msg_unlock(msqid);706 /*主动调度,此时发送进程正式进入休眠*/707 schedule();708 current->state= TASK_RUNNING;709 710 msq = msg_lock(msqid);711 err = -EIDRM;712 /*进程醒来之后需要进行新的检查,因为各个条件可能在713 * 进程睡眠时期发生改变*/714 if(msq==NULL)715 goto out_free;716 ss_del(&s);717 718 /*如果睡眠期间有信号投递到进程,那就立即返回处理信号。719 * 很可能是信号唤醒的进程*/720 if (signal_pending(current)) {721 err=-EINTR;722 goto out_unlock_free;723 }724 /*如前文所述,需要再次检查*/725 goto retry;726 }727 728 /*如果没有阻塞发生,就可以发送消息了。如果有进程正在729 * 等待接受消息,那么就没有必要将消息挂入队列,直接调用730 * pipelined_send将消息递交给进程即可*/731 if(!pipelined_send(msq,msg)) {732 /* noone is waiting for this message, enqueue it */733 /*将消息msg_msg挂入消息队列头msq->q_messages队列中去*/734 list_add_tail(&msg->m_list,&msq->q_messages);735 msq->q_cbytes += msgsz;736 msq->q_qnum++;737 atomic_add(msgsz,&msg_bytes);738 atomic_inc(&msg_hdrs);739 }740 741 err = 0;742 msg = NULL;743 msq->q_lspid = current->pid;744 msq->q_stime = CURRENT_TIME;745 746 out_unlock_free:747 msg_unlock(msqid);748 out_free:749 if(msg!=NULL)750 free_msg(msg);751 return err;752 }
sys_msg完成的工作有:
(1) 通过id值查找到报文队列msgqueue
(2) 将报文拷贝到内核空间并且建立报文头msg_msg。
(3) 将报文头挂接到msgqueue
具体实现细节已经详细注释,这里不再赘述。
1.3 msgrcv 报文接收
msgrcv的代码如下:
[ipc/msg.c]
773 asmlinkage long sys_msgrcv (int msqid, struct msgbuf *msgp, size_t msgsz,774 long msgtyp, int msgflg)775 {776 struct msg_queue *msq;777 struct msg_receiver msr_d;778 struct list_head* tmp;779 struct msg_msg* msg, *found_msg;780 int err;781 int mode;782 /*开始参数检查*/783 if (msqid < 0 || (long) msgsz < 0)784 return -EINVAL;785 mode = convert_mode(&msgtyp,msgflg);786 787 /*获取报文队列*/788 msq = msg_lock(msqid);789 if(msq==NULL)790 return -EINVAL;791 /*参数检查结束*/792 retry:793 err=-EACCES;794 /*权限检查*/795 if (ipcperms (&msq->q_perm, S_IRUGO))796 goto out_unlock;797 798 tmp = msq->q_messages.next;799 found_msg=NULL;800 /*遍历队列取出报文*/801 while (tmp != &msq->q_messages) {802 msg = list_entry(tmp,struct msg_msg,m_list);803 /*测试条件是否满足804 * 如果msgtyp >= 0,则查找到一个msg->m_type == msgtyp的报文将其返回805 * 如果msgtyp < 0,则查找到一个msg->m_type < msgtype的最小值的报文将其返回806 * */807 if(testmsg(msg,msgtyp,mode)) {808 found_msg = msg;809 if(mode == SEARCH_LESSEQUAL && msg->m_type != 1) {810 found_msg=msg;811 msgtyp=msg->m_type-1;812 } else {813 found_msg=msg;814 break;815 }816 }817 tmp = tmp->next;818 }819 if(found_msg) {820 msg=found_msg;821 822 /*如果查找到了报文,且用户提供的缓冲区大小不够大,823 * 有设置了不能截断的标志,则返回出错码*/824 if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {825 err=-E2BIG;826 goto out_unlock;827 }828 829 /*从报文队列中将报文取出*/830 list_del(&msg->m_list);831 msq->q_qnum--;832 msq->q_rtime = CURRENT_TIME;833 msq->q_lrpid = current->pid;834 msq->q_cbytes -= msg->m_ts;835 atomic_sub(msg->m_ts,&msg_bytes);836 atomic_dec(&msg_hdrs);837 838 /*如果有进程正在等待发送报文,则唤醒进程*/839 ss_wakeup(&msq->q_senders,0);840 msg_unlock(msqid);841 out_success:842 msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz;843 if (put_user (msg->m_type, &msgp->mtype) ||844 store_msg(msgp->mtext, msg, msgsz)) {845 msgsz = -EFAULT;846 }847 free_msg(msg);848 return msgsz;849 } else850 {851 struct msg_queue *t;852 /* no message waiting. Prepare for pipelined853 * receive.854 */855 /*如果没有查找到报文,且设置了非阻塞方式访问,则立即返回出错码*/856 if (msgflg & IPC_NOWAIT) {857 err=-ENOMSG;858 goto out_unlock;859 }860 861 /*否则进程睡眠*/862 list_add_tail(&msr_d.r_list,&msq->q_receivers);863 msr_d.r_tsk = current;864 msr_d.r_msgtype = msgtyp;865 msr_d.r_mode = mode;866 if(msgflg & MSG_NOERROR)867 msr_d.r_maxsize = INT_MAX;868 else869 msr_d.r_maxsize = msgsz;870 msr_d.r_msg = ERR_PTR(-EAGAIN);871 current->state = TASK_INTERRUPTIBLE;872 msg_unlock(msqid);873 874 schedule();875 current->state = TASK_RUNNING;876
该函数主要作的工作就是:
+ 根据id值查找得到报文队列头。
+ 遍历报文队列头中的报文队列
+ 比对报文队列中的报文类型与传进来的参数msgtyp,判断是否查找成功
+ 如果报文查找成功,且用户提供的缓冲区足够大,则将报文复制到用户缓冲区内并且返回即可。
+ 如果报文查找成功,但用户提供的缓冲区不够大,且设置了不能截短报文,则返回出错码。
+ 如果报文查找失败,且用户设置了非阻塞访问标志,则返回出错码,否则进程睡眠
1.4 msgctl() 报文控制与设置
报文机制对比管道机制的一大优点就是报文队列可以通过msgctl获取其状态信息,和设置相关参数。内核实现为long sys_msgsctl(int msqid, int cmd, struct msqid_ds *buf)
其中msqid为报文队列的id, cmd 为具体命令码。定义为:
[include/linux/ipc.h]
34 /* 35 * Control commands used with semctl, msgctl and shmctl 36 * see also specific commands in sem.h, msg.h and shm.h 37 */ 38 #define IPC_RMID 0 /* remove resource */ 39 #define IPC_SET 1 /* set ipc_perm options */ 40 #define IPC_STAT 2 /* get ipc_perm options */ 41 #define IPC_INFO 3 /* see ipcs */
这些命令吗并不是专门为报文队列设置的。它适用于所有system V IPC.不过对于具体的机制还有其它具体的专用命令,对于报文队列而言,还有另外两个专用的命令码:
[include/linux/msg.h]
6 /* ipcs ctl commands */ 7 #define MSG_STAT 11 8 #define MSG_INFO 12
最后一个参数buf为一个msqid_ds结构指针,这个结构用于IPC_STAT和IPC_SET定义如下:
[include/linux/msg.h]
15 struct msqid_ds { 16 struct ipc_perm msg_perm; 17 struct msg *msg_first; /* first message on queue,unused */ 18 struct msg *msg_last; /* last message in queue,unused */ 19 __kernel_time_t msg_stime; /* last msgsnd time */ 20 __kernel_time_t msg_rtime; /* last msgrcv time */ 21 __kernel_time_t msg_ctime; /* last change time */ 22 unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit */ 23 unsigned long msg_lqbytes; /* ditto */ 24 unsigned short msg_cbytes; /* current number of bytes on queue */ 25 unsigned short msg_qnum; /* number of messages in queue */ 26 unsigned short msg_qbytes; /* max number of bytes on queue */ 27 __kernel_ipc_pid_t msg_lspid; /* pid of last msgsnd */ 28 __kernel_ipc_pid_t msg_lrpid; /* last receive pid */ 29 };
当命令码为IPC_INFO时,buf指向一个msginfo结构
[include/linux/msg.h]
40 /* buffer for msgctl calls IPC_INFO, MSG_INFO */ 41 struct msginfo { 42 int msgpool; 43 int msgmap; 44 int msgmax; 45 int msgmnb; 46 int msgmni; 47 int msgssz; 48 int msgtql; 49 unsigned short msgseg; 50 };
值得注意的是:
- 在使用SET命令设置报文队列时,如果有进程正在接收报文,则全部出错返回。
- IPC_RMID命令用来撤销一个报文队列id,相当与关闭文件。