当前位置: 代码迷 >> 编程 >> 多进程编程:Perl与C历程间的消息队列通信
  详细解决方案

多进程编程:Perl与C历程间的消息队列通信

热度:7868   发布时间:2013-02-26 00:00:00.0
多进程编程:Perl与C进程间的消息队列通信
前言:
perl和C语言都提供了消息队列的机制,而且这两种语言中的消息队列可以相互兼容.
当涉及到系统架构时,
可以用C语言实现核心计算模块,用Perl实现业务逻辑层,
而这两层间可以通过消息队列来实现不同语言的进程间的通信.

本文先用两个示例程序讲解Perl对消息队列的实现。
再一个示例程序讲解Perl与C之间使用消息队列进行进程间的通信.

一. 使用IPC::SysV和IPC::Msg模块访问Unix消息队列 
例一, 单个进程中实现消息队列的访问(Perl实现)
File: msg_single_process.pl
1. #!/usr/bin/perl
2. use strict;
3. use warnings;
4. use IPC::SysV qw(IPC_PRIVATE S_IRWXU S_IRWXG S_IRWXO IPC_CREAT IPC_NOWAIT);
5. use IPC::Msg;

6. my($key, $msg, $msgid, $msgtype, $buf);
7. $key = IPC::SysV::ftok(".",'1');
8. $msg = new IPC::Msg($key, 0666 | IPC_CREAT) or die "create message queue: $!";
9. $msgtype = 1;
10.$msgid = $msg->id();
11.print "MSG_ID: ",$msgid,"\n";

12.$msg->snd($msgtype, "test", IPC_NOWAIT) or die "send message failed: $!";
13.$msg->rcv($buf, 1024) or die "receive message failed: $!";
14.print "BUFFER: ",$buf,"\n";

15.$msg->remove();

代码解释:
4.使用IPC::SysV模块,
  qw(IPC_PRIVATE S_IRWXU S_IRWXG S_IRWXO IPC_CREAT IPC_NOWAIT)
  等同于("IPC_PRIVATE S_IRWXU S_IRWXG S_IRWXO IPC_CREAT IPC_NOWAIT"),
  表示本程序将从该模块引入若干符号,
  在后续的代码里面需要要指定模块名就可以使用这些符号;
5.使用IPC::Msg模块(消息队列);
6.定义私有变量;
7.用ftok通过文件名获得一个key_t值,这里使用是当前目录.
  类似C语言中的ftok函数,这个文件必须实际存在;
8.获得Msg对象。类似msgget函数;
9.指定消息类型;
10.获得消息队列的标识符;
12.发送消息。类似msgsnd函数;
13.接收消息。类似msgrcv函数;
15.删除消息队列;


更多的信息,可以参考IPC::Msg模块定义:
http://search.cpan.org/~mhx/IPC-SysV-2.03/lib/IPC/Msg.pm

例二.在两个进程间使用消息队列进行通信(Perl实现)
File: msg_snd.pl
#!/usr/bin/perl
use strict;
use warnings;
use IPC::SysV qw(IPC_PRIVATE S_IRWXU S_IRWXG S_IRWXO IPC_CREAT IPC_NOWAIT);
use IPC::Msg;

my($key, $msg, $msgid, $msgtype, $buf);
$key = IPC::SysV::ftok(".",'1');
$msg = new IPC::Msg($key, 0666 | IPC_CREAT) or die "create message queue: $!";
$msgtype = 1;
$msgid = $msg->id();
print "MSG_ID: ",$msgid,"\n";

my $pal;
foreach $pal('Tom','Dick','Harry','Pete','Hank')
{
  $msg->snd($msgtype, "Hi, $pal", IPC_NOWAIT) or die "send message failed: $!";
  print "BUFFER: ",$pal,"\n";
}  
# end
$msg->snd($msgtype, "end", IPC_NOWAIT) or die "send message failed: $!";
print "BUFFER: end\n";

File: msg_rcv.pl
#!/usr/bin/perl
use strict;
use warnings;
use IPC::SysV qw(IPC_PRIVATE S_IRWXU S_IRWXG S_IRWXO IPC_CREAT IPC_NOWAIT);
use IPC::Msg;


my($key, $msg, $msgid, $msgtype, $buf);
$key = IPC::SysV::ftok(".",'1');
$msg = new IPC::Msg($key, 0666 | IPC_CREAT) or die "create message queue: $!";
$msgtype = 1;
$msgid = $msg->id();
print "MSG_ID: ",$msgid,"\n";

my $running = 1;
while ($running)
{
  $msg->rcv($buf, 1024) or die "receive message failed: $!";
  print "BUFFER: ",$buf,"\n";

  if($buf eq "end")
  {
    $running = 0;
  }
}
$msg->remove();

二. C与Perl进程间的消息队列通信
接收端使用Perl实现
msg_rcv.pl
#!/usr/bin/perl
use strict;
use warnings;
use IPC::SysV qw(IPC_PRIVATE S_IRWXU S_IRWXG S_IRWXO IPC_CREAT IPC_NOWAIT);
use IPC::Msg;

my($key, $msg, $msgid, $msgtype, $buf);
#$key = IPC::SysV::ftok(".",'1');
$key = 1234;
$msg = new IPC::Msg($key, 0666 | IPC_CREAT) or die "create message queue: $!";
$msgtype = 1;

my $running = 1;
while ($running)
{
  $msg->rcv($buf, 1024) or die "receive message failed: $!";
  print "BUFFER: ",$buf,"\n";

  my $pos = index($buf, "end", 0);
  print "POS: ",$pos,"\n";
  if ($pos != -1)
  {
    $running = 0;
  }
}

$msg->remove();

发送端采用C代码实现
msg_snd.c
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/msg.h>
#define MAX_TEXT 512
struct my_msg_st
{
  long int my_msg_type;
  char some_text[MAX_TEXT];
};

int main()
{
  int index;
  struct my_msg_st some_data;
  int msgid;

  msgid = msgget((key_t)1234, 0666 | IPC_CREAT);
  if (msgid == -1)
  {
    fprintf(stderr, "msgget failed with error: %d\n", errno);
    exit(EXIT_FAILURE);
  }

  for (index = 0; index < 10; index++)
  {
    memset(some_data.some_text, 0, MAX_TEXT);
    some_data.my_msg_type = 1;
    snprintf(some_data.some_text,MAX_TEXT, "%s~%d","index",index);


    if (msgsnd(msgid, (void *)&some_data, MAX_TEXT, 0) == -1)
    {
      fprintf(stderr, "msgsnd failed\n");
      exit(EXIT_FAILURE);
    }
  }

  /* ending */
  memset(some_data.some_text, 0, MAX_TEXT);
  some_data.my_msg_type = 1;
  snprintf(some_data.some_text,MAX_TEXT, "%s","end");

  if (msgsnd(msgid, (void *)&some_data, MAX_TEXT, 0) == -1)
  {
    fprintf(stderr, "msgsnd failed\n");
    exit(EXIT_FAILURE);
  }

  exit(EXIT_SUCCESS);
}

编译与运行:
$gcc -g -Wall -o msg_snd msg_snd_c.c 
$./msg_rcv.pl &
$./msg_snd

输出:
MSG_ID: 393219
BUFFER: index~0
BUFFER: index~1
BUFFER: index~2
BUFFER: index~3
BUFFER: index~4
BUFFER: index~5
BUFFER: index~6
BUFFER: index~7
BUFFER: index~8
BUFFER: index~9
BUFFER: end