当前位置: 代码迷 >> Linux/Unix >> Linux 报文行列
  详细解决方案

Linux 报文行列

热度:672   发布时间:2016-04-29 12:25:54.0
Linux 报文队列

Linux 报文队列

  • Linux 报文队列
    • 一IPC 概述
    • 二报文队列
      • 1 报文队列简述
      • 2 代码分析
        • 21 msgget 创建报文队列
        • 22 msgsnd 报文发送
          • 221 相关数据结构
          • 222 sys_msgsnd源码分析
      • 3 msgrcv 报文接收
      • 4 msgctl 报文控制与设置

一、IPC 概述

早期的Unix系统进程件通信机制主要有管道和信号。管道开始只能在近亲之间通信,于是将pipe推广到VFS层面,形成了FIFO。但有两个显著的缺点:信号能传递的信息太少。而管道只能传递无格式的字节流。于是,为了应对OS的日益发展,IPC新的机制出现,包括了:

  • 报文传递
  • 共享内存
  • 进程同步
    • 信号量
    • 互斥信号量
    • rendezvous

以上三种IPC机制,称之为“system V IPC”Linux为 system V IPC提供了一个统一的系统调用ipc,其接口为:

int ipc(unsigned int call, int first, int second, int third, void *ptr, int forth);

其中第一个参数call为操作码,定义如下:
[include/asm-i386/ipc.h]

 13 /*   14  *SEM为信号量设置的 15  *MSG为报文传递设置的 16  *SHM为共享内存设置的 17  * */        18 #define SEMOP        1 19 #define SEMGET       2 20 #define SEMCTL       3 21 #define MSGSND      11 22 #define MSGRCV      12 23 #define MSGGET      13 24 #define MSGCTL      14 25 #define SHMAT       21 26 #define SHMDT       22 27 #define SHMGET      23 28 #define SHMCTL      24

C语言库函数为ipc提供了semget()、msgget()、msgsnd()等库函数,这些库函数最终都落实到统一的系统调用ipc()当中去。

二、报文队列

1.1 报文队列简述

进程可以调用库函数msgget()创建一个报文队列,实际上就是通过操作码为MSGGET的ipc系统调用建立报文队列。报文队列通过一个键值key来标示的,而非文件名。一旦报文队列建立之后,进程就可以用相同的键值通过msgget()来取得报文队列的访问,而发送报文的进程也可以通过msgsnd()发送报文到指定的队列中,接收进程则通过msgrcv()来取得报文。另外进程可以通msgctl()对报文队列进行额外的控制。

1.2 代码分析

1.2.1 msgget() 创建报文队列

库函数long sys_msgget(key_t key, int msgflag)实际上是用MSGGET操作码调用ipc系统调用,该函数有两个作用:
+ 当msgflag的IPC_CREATE位置位时,就利用key创建一个报文队列。
+ 当msgflag的IPC_CREATE位清零时,就了用key查找一个报文队列。

sys_msgget的代码如下:
[ipc/msg.c]

306 asmlinkage long sys_msgget (key_t key, int msgflg)307 {308     int id, ret = -EPERM;309     struct msg_queue *msq;310     311     down(&msg_ids.sem);312     /*每个进程都可以建立一个私用的报文队列,其键值为IPC_PRAVETE.313      * 该报文队列只能用于进程自己收发*/314     if (key == IPC_PRIVATE)315         /*建立一个新的报文队列*/316         ret = newque(key, msgflg);317     else if ((id = ipc_findkey(&msg_ids, key)) == -1) { /* key not used */318         /*如果键值对应的报文队列已经建立,则创建失败*/319         if (!(msgflg & IPC_CREAT))320             ret = -ENOENT;321         else322             /*否则创建一个报文队列*/323             ret = newque(key, msgflg);324     } else if (msgflg & IPC_CREAT && msgflg & IPC_EXCL) {325         ret = -EEXIST;326     } else {327         /*如果查找到报文队列,且msgflag表明为查找报文队列,则返回队列id*/328         msq = msg_lock(id);329         if(msq==NULL)330             BUG();331         if (ipcperms(&msq->q_perm, msgflg))332             ret = -EACCES;333         else334             ret = msg_buildid(id, msq->q_perm.seq);335         msg_unlock(id);336     }337     up(&msg_ids.sem);338     return ret;339 } 

IPC_PRIVATE作为私有报文队列可以无条件创建,这意味着key可以不唯一。如果不是IPC_PRIVATE则必须保证key的唯一性。首先查找key的报文队列:

  • 如果已经存在且msgflag表明要创建报文队列,则创建失败。
  • 如果已经存在,但表msgflag表明是获取队列,则返回队列id.
  • 如果不存在,且msgflag表明要创建报文队列,则创建一个报文队列且返回id

(注:队列id和队列键值key是不同的概念,队列id类似于打开的文件描述符fd,而队列键值则类似于文件的路径名

报文队列的创建由newqueue函数来完成。在读该函数之前,有必要了解几个数据结构及其关系。
内核中有一个全局数据结构msg_ids用于管理报文队列。该变量为struct ipc_ids类型的变量。定义于
[ipc/util.h]

 15 struct ipc_ids { 16     int size; 17     int in_use; 18     int max_id; 19     unsigned short seq; 20     unsigned short seq_max; 21     struct semaphore sem; 22     spinlock_t ary; 23     struct ipc_id* entries; 24 }; 25  26 struct ipc_id { 27     struct kern_ipc_perm* p; 28 };

其中关键成员为entries,该成员指向一个struct ipc_id结构数组。struct ipc_id结构实际上是一个kern_ipc_perm *p,所以可以把entries实际上指向一个kern_ipc_perm数组。每个报文队列都具有一个报文队列头,用于管理该报文队列,也可以说每个报文队列头是一个报文队列对象。而kern_ipc_perm为报文队列头的第一个成员,用于记录该报文队列的用户id,组id,键值key等参数。报文队列头struct msg_queue定义于:
[msg/ipc.h]

 68 struct msg_queue { 69     struct kern_ipc_perm q_perm; 70     time_t q_stime;         /* last msgsnd time */ 71     time_t q_rtime;         /* last msgrcv time */ 72     time_t q_ctime;         /* last change time */ 73     unsigned long q_cbytes;     /* current number of bytes on queue */ 74     unsigned long q_qnum;       /* number of messages in queue */ 75     unsigned long q_qbytes;     /* max number of bytes on queue */ 76     pid_t q_lspid;          /* pid of last msgsnd */ 77     pid_t q_lrpid;          /* last receive pid */ 78      79     struct list_head q_messages; 80     struct list_head q_receivers; 81     struct list_head q_senders; 82 }; 

现在梳理一下上面几个数据结构中的关系:

  • msg_ids中有个entries成员指向了一个kern_ipc_perm类型的数组
  • kern_ipc_perm结构记录了一个报文队列的重要参数
  • kern_ipc_perm为报文队列头msg_queue的第一个成员,其起始地址与报文队列头一致。也就是说,只要找到了kern_ipc_perm就可以找到报文队列头,即报文队列。

现在看newque的代码。
[ipc/msg.c]

117 static int newque (key_t key, int msgflg)118 {   119     int id;120     /*报文队列头*/121     struct msg_queue *msq;122         123     msq  = (struct msg_queue *) kmalloc (sizeof (*msq), GFP_KERNEL);124     if (!msq) 125         return -ENOMEM; 126     /*分配一个报文队列id,实际是上将msg_ids中的entry分配一个位置127      * 然后将该位置的指针指向msg->perm即可*/128     id = ipc_addid(&msg_ids, &msq->q_perm, msg_ctlmni);129     if(id == -1) {130         kfree(msq);131         return -ENOSPC;132     }   133     /*填充msg_queue中的各个成员*/134     msq->q_perm.mode = (msgflg & S_IRWXUGO);135     msq->q_perm.key = key;136     137     msq->q_stime = msq->q_rtime = 0;138     msq->q_ctime = CURRENT_TIME;139     msq->q_cbytes = msq->q_qnum = 0;140     msq->q_qbytes = msg_ctlmnb;141     msq->q_lspid = msq->q_lrpid = 0;142     INIT_LIST_HEAD(&msq->q_messages);143     INIT_LIST_HEAD(&msq->q_receivers);144     INIT_LIST_HEAD(&msq->q_senders);145     msg_unlock(id);146     147     /*根据msg_ids.entries数组的索引值生成一个id号并且返回*/148     return msg_buildid(id,msq->q_perm.seq);149 }

上述的代码中,报文队列对象分配出来后,要注册到msg_ids中,注册操作由函数ipc_addid完成。该函数的主要工作是在msg_ids->entries中查找一个空闲的位置,并且将struct kern_ipc_perm q_perm插入该文职,然后返回索引值id.最后根据索引之生成一个全局的id并且返回即可。可以看出来,报文队列的创建与打开,与进程的文件创建与打开类似。之后进程就可以使用返回的id从msg_ids中索引得到报文队列头,从而能堆报文队列执行发送,接受,控制等等操作了。

1.2.2 msgsnd() 报文发送

1.2.2.1 相关数据结构

调用msgget()创建了报文队列之后,就可以通过msgsnd向队列发送消息了。在读msgsnd的源码之前,有必要了解几个用到的数据结构。
用户空间的报文,报文头为struct msgbuf,其承载报文内容。struct msgbuf定义如下:
[include/linux/msg.h]

 34 /* message buffer for msgsnd and msgrcv calls */ 35 struct msgbuf { 36     long mtype;         /* type of message */ 37     char mtext[1];      /* message text */ 38 };

mtype标示了报文的类型。而mtext则为一个长度为1的数组,事实上mtext可以看作一个指针,指向了报文的起始地址。内核中使用的报文头为struct msg_msg结构,定义如下:
[ipc/msg.h]

 56 struct msg_msg { 57     struct list_head m_list; 58     long  m_type;     59     int m_ts;           /* message text size */ 60     struct msg_msgseg* next; 61     /* the actual message follows immediately */ 62 }; 

内核中的报文头中并没有任何指针成员指向报文内容,而是直接将报文内容安排到了紧接着报文头的位置。有时候报文内容太多,体积过大,就将报文分成多个报文段存放,其中报文头中的struct msg_msgseg指针就用于指向各个报文段头。msg_msgseg定义如下:

 51 struct msg_msgseg { 52     struct msg_msgseg* next; 53     /* the next part of the message follows immediately */ 54 };  

报文段的存放也是将报文段内同安排到紧接着msg_msgseg的位置。报文头和报文段头的关系是这样的:
如果报文体积较小,那么直接安排到紧挨着报文头的位置即可。如果报文提及大于一个物理页的大小,就将其大于物理页的部分安排到一个新的页中,并且页的起始地址为一个msg_msgseg结构,也就是报文段头,报文内同则紧挨着msg_msgseg排列,然后msg_msg中的next指针指向了msg_msgseg结构。

1.2.2.2 sys_msgsnd源码分析

msgsnd的内核实现为sys_msgsnd,该函数的定义如下:
[ipc/msg.c]

645 asmlinkage long sys_msgsnd (int msqid, struct msgbuf *msgp, size_t msgsz, int msgflg)646 {647     struct msg_queue *msq;648     struct msg_msg *msg;649     long mtype;650     int err;651 /*参数检查*/652     if (msgsz > msg_ctlmax || (long) msgsz < 0 || msqid < 0)653         return -EINVAL;654     if (get_user(mtype, &msgp->mtype))655         return -EFAULT;656     if (mtype < 1)657         return -EINVAL;658     /*将msgbuf总的报文内容从用户空间复制过来,填充到msg_msg结构体当中去*/659     msg = load_msg(msgp->mtext, msgsz);660     if(IS_ERR(msg))661         return PTR_ERR(msg);662 663     msg->m_type = mtype;664     msg->m_ts = msgsz;665 666     /*根据id找到报文队列并且上锁*/667     msq = msg_lock(msqid);668     err=-EINVAL;669     if(msq==NULL)670         goto out_free;671 /*参数检查结束*/672 retry:673     err= -EIDRM;674     /*检查id的正确性*/675     if (msg_checkid(msq,msqid))676         goto out_unlock_free;677 678     /*权限检查,不是每个进程都有资格发送报文的*/679     err=-EACCES;680     if (ipcperms(&msq->q_perm, S_IWUGO))681         goto out_unlock_free;682 683     /* msq->q_cbytes为当前队列大小,msq->q_qbytes为当前队列的容量684      * msq->q_qnum为报文当前个数,最大报文个数不能超过msq->q_qbytes685      * 这里是检查容量是否足以容下报文,并且检查报文个数是否超过最大686      * 限制687      * */688     if(msgsz + msq->q_cbytes > msq->q_qbytes ||689         1 + msq->q_qnum > msq->q_qbytes) {690         struct msg_sender s;691 692         /*如果报文不能送达,检查msgflag是否使用无阻塞访问693          * 如果使用无阻塞访问,则立即返回出错码694          * 如果没有使用无阻塞访问,则进入睡眠状态*/695         if(msgflg&IPC_NOWAIT) {696             err=-EAGAIN;697             goto out_unlock_free;698         }699 700         /*将发送进程加入msq->sender链表中,这样就可以在接受者701          * 接收完毕之后,报文队列有了新的空间,接受这就唤醒702          * sender队列上的进程*/703         ss_add(msq, &s);704         /*解锁报文队列*/705         msg_unlock(msqid);706         /*主动调度,此时发送进程正式进入休眠*/707         schedule();708         current->state= TASK_RUNNING;709 710         msq = msg_lock(msqid);711         err = -EIDRM;712         /*进程醒来之后需要进行新的检查,因为各个条件可能在713          * 进程睡眠时期发生改变*/714         if(msq==NULL)715             goto out_free;716         ss_del(&s);717 718         /*如果睡眠期间有信号投递到进程,那就立即返回处理信号。719          * 很可能是信号唤醒的进程*/720         if (signal_pending(current)) {721             err=-EINTR;722             goto out_unlock_free;723         }724         /*如前文所述,需要再次检查*/725         goto retry;726     }727 728     /*如果没有阻塞发生,就可以发送消息了。如果有进程正在729      * 等待接受消息,那么就没有必要将消息挂入队列,直接调用730      * pipelined_send将消息递交给进程即可*/731     if(!pipelined_send(msq,msg)) {732         /* noone is waiting for this message, enqueue it */733         /*将消息msg_msg挂入消息队列头msq->q_messages队列中去*/734         list_add_tail(&msg->m_list,&msq->q_messages);735         msq->q_cbytes += msgsz;736         msq->q_qnum++;737         atomic_add(msgsz,&msg_bytes);738         atomic_inc(&msg_hdrs);739     }740 741     err = 0;742     msg = NULL;743     msq->q_lspid = current->pid;744     msq->q_stime = CURRENT_TIME;745 746 out_unlock_free:747     msg_unlock(msqid);748 out_free:749     if(msg!=NULL)750         free_msg(msg);751     return err;752 }

sys_msg完成的工作有:
(1) 通过id值查找到报文队列msgqueue
(2) 将报文拷贝到内核空间并且建立报文头msg_msg。
(3) 将报文头挂接到msgqueue

具体实现细节已经详细注释,这里不再赘述。

1.3 msgrcv 报文接收

msgrcv的代码如下:
[ipc/msg.c]

773 asmlinkage long sys_msgrcv (int msqid, struct msgbuf *msgp, size_t msgsz,774                 long msgtyp, int msgflg)775 {776     struct msg_queue *msq;777     struct msg_receiver msr_d;778     struct list_head* tmp;779     struct msg_msg* msg, *found_msg;780     int err;781     int mode;782 /*开始参数检查*/783     if (msqid < 0 || (long) msgsz < 0)784         return -EINVAL;785     mode = convert_mode(&msgtyp,msgflg);786 787     /*获取报文队列*/788     msq = msg_lock(msqid);789     if(msq==NULL)790         return -EINVAL;791 /*参数检查结束*/792 retry:793     err=-EACCES;794     /*权限检查*/795     if (ipcperms (&msq->q_perm, S_IRUGO))796         goto out_unlock;797 798     tmp = msq->q_messages.next;799     found_msg=NULL;800     /*遍历队列取出报文*/801     while (tmp != &msq->q_messages) {802         msg = list_entry(tmp,struct msg_msg,m_list);803         /*测试条件是否满足804          * 如果msgtyp >= 0,则查找到一个msg->m_type == msgtyp的报文将其返回805          * 如果msgtyp < 0,则查找到一个msg->m_type < msgtype的最小值的报文将其返回806          * */807         if(testmsg(msg,msgtyp,mode)) {808             found_msg = msg;809             if(mode == SEARCH_LESSEQUAL && msg->m_type != 1) {810                 found_msg=msg;811                 msgtyp=msg->m_type-1;812             } else {813                 found_msg=msg;814                 break;815             }816         }817         tmp = tmp->next;818     }819     if(found_msg) {820         msg=found_msg;821 822         /*如果查找到了报文,且用户提供的缓冲区大小不够大,823          * 有设置了不能截断的标志,则返回出错码*/824         if ((msgsz < msg->m_ts) && !(msgflg & MSG_NOERROR)) {825             err=-E2BIG;826             goto out_unlock;827         }828 829         /*从报文队列中将报文取出*/830         list_del(&msg->m_list);831         msq->q_qnum--;832         msq->q_rtime = CURRENT_TIME;833         msq->q_lrpid = current->pid;834         msq->q_cbytes -= msg->m_ts;835         atomic_sub(msg->m_ts,&msg_bytes);836         atomic_dec(&msg_hdrs);837 838         /*如果有进程正在等待发送报文,则唤醒进程*/839         ss_wakeup(&msq->q_senders,0);840         msg_unlock(msqid);841 out_success:842         msgsz = (msgsz > msg->m_ts) ? msg->m_ts : msgsz;843         if (put_user (msg->m_type, &msgp->mtype) ||844             store_msg(msgp->mtext, msg, msgsz)) {845                 msgsz = -EFAULT;846         }847         free_msg(msg);848         return msgsz;849     } else850     {851         struct msg_queue *t;852         /* no message waiting. Prepare for pipelined853          * receive.854          */855         /*如果没有查找到报文,且设置了非阻塞方式访问,则立即返回出错码*/856         if (msgflg & IPC_NOWAIT) {857             err=-ENOMSG;858             goto out_unlock;859         }860 861         /*否则进程睡眠*/862         list_add_tail(&msr_d.r_list,&msq->q_receivers);863         msr_d.r_tsk = current;864         msr_d.r_msgtype = msgtyp;865         msr_d.r_mode = mode;866         if(msgflg & MSG_NOERROR)867             msr_d.r_maxsize = INT_MAX;868          else869             msr_d.r_maxsize = msgsz;870         msr_d.r_msg = ERR_PTR(-EAGAIN);871         current->state = TASK_INTERRUPTIBLE;872         msg_unlock(msqid);873 874         schedule();875         current->state = TASK_RUNNING;876 

该函数主要作的工作就是:
+ 根据id值查找得到报文队列头。
+ 遍历报文队列头中的报文队列
+ 比对报文队列中的报文类型与传进来的参数msgtyp,判断是否查找成功
+ 如果报文查找成功,且用户提供的缓冲区足够大,则将报文复制到用户缓冲区内并且返回即可。
+ 如果报文查找成功,但用户提供的缓冲区不够大,且设置了不能截短报文,则返回出错码。
+ 如果报文查找失败,且用户设置了非阻塞访问标志,则返回出错码,否则进程睡眠

1.4 msgctl() 报文控制与设置

报文机制对比管道机制的一大优点就是报文队列可以通过msgctl获取其状态信息,和设置相关参数。内核实现为long sys_msgsctl(int msqid, int cmd, struct msqid_ds *buf)其中msqid为报文队列的id, cmd 为具体命令码。定义为:
[include/linux/ipc.h]

 34 /*  35  * Control commands used with semctl, msgctl and shmctl  36  * see also specific commands in sem.h, msg.h and shm.h 37  */ 38 #define IPC_RMID 0     /* remove resource */ 39 #define IPC_SET  1     /* set ipc_perm options */ 40 #define IPC_STAT 2     /* get ipc_perm options */ 41 #define IPC_INFO 3     /* see ipcs */

这些命令吗并不是专门为报文队列设置的。它适用于所有system V IPC.不过对于具体的机制还有其它具体的专用命令,对于报文队列而言,还有另外两个专用的命令码:
[include/linux/msg.h]

  6 /* ipcs ctl commands */  7 #define MSG_STAT 11  8 #define MSG_INFO 12

最后一个参数buf为一个msqid_ds结构指针,这个结构用于IPC_STAT和IPC_SET定义如下:
[include/linux/msg.h]

 15 struct msqid_ds { 16     struct ipc_perm msg_perm; 17     struct msg *msg_first;      /* first message on queue,unused  */ 18     struct msg *msg_last;       /* last message in queue,unused */ 19     __kernel_time_t msg_stime;  /* last msgsnd time */ 20     __kernel_time_t msg_rtime;  /* last msgrcv time */ 21     __kernel_time_t msg_ctime;  /* last change time */ 22     unsigned long  msg_lcbytes; /* Reuse junk fields for 32 bit */ 23     unsigned long  msg_lqbytes; /* ditto */ 24     unsigned short msg_cbytes;  /* current number of bytes on queue */ 25     unsigned short msg_qnum;    /* number of messages in queue */ 26     unsigned short msg_qbytes;  /* max number of bytes on queue */ 27     __kernel_ipc_pid_t msg_lspid;   /* pid of last msgsnd */ 28     __kernel_ipc_pid_t msg_lrpid;   /* last receive pid */ 29 };

当命令码为IPC_INFO时,buf指向一个msginfo结构
[include/linux/msg.h]

 40 /* buffer for msgctl calls IPC_INFO, MSG_INFO */ 41 struct msginfo { 42     int msgpool; 43     int msgmap; 44     int msgmax; 45     int msgmnb; 46     int msgmni; 47     int msgssz; 48     int msgtql; 49     unsigned short  msgseg; 50 };

值得注意的是:

  • 在使用SET命令设置报文队列时,如果有进程正在接收报文,则全部出错返回。
  • IPC_RMID命令用来撤销一个报文队列id,相当与关闭文件。