前言:
perl和C语言都提供了消息队列的机制,而且这两种语言中的消息队列可以相互兼容.
当涉及到系统架构时,
可以用C语言实现核心计算模块,用Perl实现业务逻辑层,
而这两层间可以通过消息队列来实现不同语言的进程间的通信.
本文先用两个示例程序讲解Perl对消息队列的实现。
再一个示例程序讲解Perl与C之间使用消息队列进行进程间的通信.
一. 使用IPC::SysV和IPC::Msg模块访问Unix消息队列
例一, 单个进程中实现消息队列的访问(Perl实现)
File: msg_single_process.pl
1. #!/usr/bin/perl
2. use strict;
3. use warnings;
4. use IPC::SysV qw(IPC_PRIVATE S_IRWXU S_IRWXG S_IRWXO IPC_CREAT IPC_NOWAIT);
5. use IPC::Msg;
6. my($key, $msg, $msgid, $msgtype, $buf);
7. $key = IPC::SysV::ftok(".",'1');
8. $msg = new IPC::Msg($key, 0666 | IPC_CREAT) or die "create message queue: $!";
9. $msgtype = 1;
10.$msgid = $msg->id();
11.print "MSG_ID: ",$msgid,"\n";
12.$msg->snd($msgtype, "test", IPC_NOWAIT) or die "send message failed: $!";
13.$msg->rcv($buf, 1024) or die "receive message failed: $!";
14.print "BUFFER: ",$buf,"\n";
15.$msg->remove();
代码解释:
4.使用IPC::SysV模块,
qw(IPC_PRIVATE S_IRWXU S_IRWXG S_IRWXO IPC_CREAT IPC_NOWAIT)
等同于("IPC_PRIVATE S_IRWXU S_IRWXG S_IRWXO IPC_CREAT IPC_NOWAIT"),
表示本程序将从该模块引入若干符号,
在后续的代码里面需要要指定模块名就可以使用这些符号;
5.使用IPC::Msg模块(消息队列);
6.定义私有变量;
7.用ftok通过文件名获得一个key_t值,这里使用是当前目录.
类似C语言中的ftok函数,这个文件必须实际存在;
8.获得Msg对象。类似msgget函数;
9.指定消息类型;
10.获得消息队列的标识符;
12.发送消息。类似msgsnd函数;
13.接收消息。类似msgrcv函数;
15.删除消息队列;
更多的信息,可以参考IPC::Msg模块定义:
http://search.cpan.org/~mhx/IPC-SysV-2.03/lib/IPC/Msg.pm
例二.在两个进程间使用消息队列进行通信(Perl实现)
File: msg_snd.pl
#!/usr/bin/perl
use strict;
use warnings;
use IPC::SysV qw(IPC_PRIVATE S_IRWXU S_IRWXG S_IRWXO IPC_CREAT IPC_NOWAIT);
use IPC::Msg;
my($key, $msg, $msgid, $msgtype, $buf);
$key = IPC::SysV::ftok(".",'1');
$msg = new IPC::Msg($key, 0666 | IPC_CREAT) or die "create message queue: $!";
$msgtype = 1;
$msgid = $msg->id();
print "MSG_ID: ",$msgid,"\n";
my $pal;
foreach $pal('Tom','Dick','Harry','Pete','Hank')
{
$msg->snd($msgtype, "Hi, $pal", IPC_NOWAIT) or die "send message failed: $!";
print "BUFFER: ",$pal,"\n";
}
# end
$msg->snd($msgtype, "end", IPC_NOWAIT) or die "send message failed: $!";
print "BUFFER: end\n";
File: msg_rcv.pl
#!/usr/bin/perl
use strict;
use warnings;
use IPC::SysV qw(IPC_PRIVATE S_IRWXU S_IRWXG S_IRWXO IPC_CREAT IPC_NOWAIT);
use IPC::Msg;
my($key, $msg, $msgid, $msgtype, $buf);
$key = IPC::SysV::ftok(".",'1');
$msg = new IPC::Msg($key, 0666 | IPC_CREAT) or die "create message queue: $!";
$msgtype = 1;
$msgid = $msg->id();
print "MSG_ID: ",$msgid,"\n";
my $running = 1;
while ($running)
{
$msg->rcv($buf, 1024) or die "receive message failed: $!";
print "BUFFER: ",$buf,"\n";
if($buf eq "end")
{
$running = 0;
}
}
$msg->remove();
二. C与Perl进程间的消息队列通信
接收端使用Perl实现
msg_rcv.pl
#!/usr/bin/perl
use strict;
use warnings;
use IPC::SysV qw(IPC_PRIVATE S_IRWXU S_IRWXG S_IRWXO IPC_CREAT IPC_NOWAIT);
use IPC::Msg;
my($key, $msg, $msgid, $msgtype, $buf);
#$key = IPC::SysV::ftok(".",'1');
$key = 1234;
$msg = new IPC::Msg($key, 0666 | IPC_CREAT) or die "create message queue: $!";
$msgtype = 1;
my $running = 1;
while ($running)
{
$msg->rcv($buf, 1024) or die "receive message failed: $!";
print "BUFFER: ",$buf,"\n";
my $pos = index($buf, "end", 0);
print "POS: ",$pos,"\n";
if ($pos != -1)
{
$running = 0;
}
}
$msg->remove();
发送端采用C代码实现
msg_snd.c
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/msg.h>
#define MAX_TEXT 512
struct my_msg_st
{
long int my_msg_type;
char some_text[MAX_TEXT];
};
int main()
{
int index;
struct my_msg_st some_data;
int msgid;
msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
if (msgid == -1)
{
fprintf(stderr, "msgget failed with error: %d\n", errno);
exit(EXIT_FAILURE);
}
for (index = 0; index < 10; index++)
{
memset(some_data.some_text, 0, MAX_TEXT);
some_data.my_msg_type = 1;
snprintf(some_data.some_text,MAX_TEXT, "%s~%d","index",index);
if (msgsnd(msgid, (void *)&some_data, MAX_TEXT, 0) == -1)
{
fprintf(stderr, "msgsnd failed\n");
exit(EXIT_FAILURE);
}
}
/* ending */
memset(some_data.some_text, 0, MAX_TEXT);
some_data.my_msg_type = 1;
snprintf(some_data.some_text,MAX_TEXT, "%s","end");
if (msgsnd(msgid, (void *)&some_data, MAX_TEXT, 0) == -1)
{
fprintf(stderr, "msgsnd failed\n");
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}
编译与运行:
$gcc -g -Wall -o msg_snd msg_snd_c.c
$./msg_rcv.pl &
$./msg_snd
输出:
MSG_ID: 393219
BUFFER: index~0
BUFFER: index~1
BUFFER: index~2
BUFFER: index~3
BUFFER: index~4
BUFFER: index~5
BUFFER: index~6
BUFFER: index~7
BUFFER: index~8
BUFFER: index~9
BUFFER: end