POSIX 消息队列(上):API编程实例
POSIX 消息队列
- 相关API
- mq_open: 创建或打开一个消息队列
- mq_send: 向消息队列写入一条消息
- mq_receive:从消息队列读取一条消息
- mq_close: 关闭进程的打开消息队列
- mq_unlink: 删除一个消息队列
- mq_setattr:设置消息队列一些额外的属性
- mq_getattr:获取消息队列一些额外的属性
- mq_nofity: 异步通知
POSIX消息队列的相关API
创建或打开IPC对象
- 函数原型:
- mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
- mqd_t mq_open(const char *name, int oflag);
- 函数功能:使用指定名字创建或打开一个对象,返回该对象的句柄
- 函数参数:
- name:用来标识要创建或打开的对象
- oflag:O_CREAT/O_EXCL /O_READONLY/O_WRONLY/O_RDWR/O_NONBLOCK
- mode:位掩码,权限设置
- attr:设置消息队列的属性,若为NULL,使用默认属性,Linux3.5以后版本也可以通过/proc查看设置
- 函数返回值
- 成功:返回消息队列的IPC对象描述符
- 失败:返回-1,并设置errno
关闭POSIX消息队列
- 函数原型:
- int mq_close(mqd_t mqdes);
- 函数功能:通过描述符关闭消息队列
- TIPS:
- POSIX消息队列在进程终止或执行exec()时会自动被关闭
删除一个POSIX消息队列
- 函数原型:int mq_unlink(const char *name);
- 函数功能:
- 删除通过那么标识的消息队列
- 在所有进程使用完该队列之后销毁队列
- 若打开该队列的所有进程已经关闭该队列,立即删除
往POSIX消息队列写入消息
- 函数原型:int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
- 函数功能:将msg_ptr指向的缓冲区中的消息添加到描述符mqdes所引用的消息队列中
- 函数参数:
- mqdes: 消息队列描述符
- msg_ptr: 指向存放消息的缓冲区指针
- msg_len: 消息的长度 [10,8192]
- msg_prio:消息队列中按优先级排序,设置为0表示无需优先级
从POSIX消息队列读取数据
- ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
- 函数功能:
- 从mqdes引用的消息队列中删除一条优先级最高,存放时间最长的消息
- 将删除的消息保存在msg_ptr指针指向的缓冲区
- 函数参数:
- mqdes: 消息队列描述符
- msg_ptr:指向存放消息的缓冲区指针
- msg_len:msg_ptr所指向缓冲区长度,要大于消息队列的mq_msgsize
- msg_prio:如不为空,接收到的消息的优先级会被复制到指针指向处
- 函数返回值:
- 成功:返回接收的消息的字节数
- 失败:-1,并设置errno
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
亲缘进程之间的通信
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <mqueue.h>#define handle_error(s) \{perror(s);exit(EXIT_FAILURE);}int main(int argc, char *argv[])
{mqd_t mq_id;mq_id = mq_open("/posix_msg_queue", O_RDWR | O_CREAT, 0644, NULL);//handle_error("mq_open")struct mq_attr my_attribute;if (mq_getattr(mq_id, &my_attribute) == -1)handle_error("mq_getattr")printf("mq_flags: %ld\n", my_attribute.mq_flags);printf("mq_maxmsg: %ld\n", my_attribute.mq_maxmsg);printf("mq_msgsize: %ld\n", my_attribute.mq_msgsize);printf("mq_curmsgs: %ld\n", my_attribute.mq_curmsgs);int ret_from_fork;ret_from_fork = fork();if (ret_from_fork == 0) //Print out the input data{char msg_buf[my_attribute.mq_msgsize];memset(msg_buf, 0, my_attribute.mq_msgsize);int count = 0;while(1){if (mq_receive(mq_id, msg_buf, my_attribute.mq_msgsize, NULL) == -1)handle_error("mq_receive");printf("child process received msg: %s\n", msg_buf);sleep(1);if (++count % 10 == 0)break;}}else if (ret_from_fork > 0) //Send data to child process{int count = 0;while(1){if (mq_send(mq_id, "hello kiki", sizeof("hello kiki"), 1) == -1)handle_error("mq_send")printf("parent process: send msg to queue success\n");sleep(1);if (++count % 10 == 0)break;}}elsehandle_error("fork");mq_close(mq_id);sleep(5);if (mq_unlink("/posix_msg_queue") == -1)handle_error("mq_unlink");return 0;
}
没有亲缘关系进程间通信
mq_rcv.c
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <mqueue.h>
int main(int argc, char *argv[])
{mqd_t mq_id;if ((mq_id = mq_open("/posix_msg_queue1", O_RDONLY | O_CREAT, 0644, NULL)) == -1){perror("mq_open");exit(EXIT_FAILURE);}struct mq_attr my_attribute;if (mq_getattr(mq_id, &my_attribute) == -1){perror("mq_getattr");exit(EXIT_FAILURE);}char msg_buf[my_attribute.mq_msgsize];memset(msg_buf, 0, my_attribute.mq_msgsize);while(1){if (mq_receive(mq_id, msg_buf, my_attribute.mq_msgsize, NULL) == -1){perror("mq_receive");exit(EXIT_FAILURE);}printf("%s\n", msg_buf);sleep(1);}mq_close(mq_id);if (mq_unlink("/posix_msg_queue1") == -1){perror("mq_unlink");exit(EXIT_FAILURE);}return 0;
}
mq_rcv.c
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <mqueue.h>int main(int argc, char *argv[])
{mqd_t mq_id;if((mq_id = mq_open("/posix_msg_queue1", O_WRONLY | O_CREAT , 0644, NULL)) == -1){perror("mq_open");exit(-1);}while(1){if (mq_send(mq_id, "hello kiki", sizeof("hello kiki"), 1) == -1){perror("mq_send");exit(-1);}printf("msg send success-------------------------");sleep(1);}mq_close(mq_id);return 0;
}