转载自: PHP与C(或其它语言)通过消息队列进行通讯,完整代码
?
<?php/* * class msg * Use for communication between php and php; * Create at: 12:08 2012/10/31 * Author: leixun([email protected]) * version 1 - 14:01 2012/10/31 */class msg{ private $id; private $msg_id; private $_serialize = true; /** * @param $_id ID */ public function msg($_id, $_serialize = true){ if(!function_exists('msg_get_queue')) { die('msg queue function not installed, Reconfigure PHP with --enable-sysvmsg <br/>'); } $this->id = $_id; $this->msg_id = msg_get_queue ( $_id ); $this->_serialize = $_serialize; if ($this->msg_id === false) { die(basename(__FILE__).'->'.__LINE__.': Unable to create message quee'); } } /** * @data data to send * @type message type */ public function send( $data, $type = 1, $blocking = false ) { if (!msg_send ($this->msg_id, $type, $data, $this->_serialize, $blocking, $msg_err)) { return "Msg not sent because $msg_err\n"; } return true; } /** * @param $type message type * @param $maxsize The maximum size of message to be accepted, */ public function receive($no_wait = true, $type = 1 , $maxsize = 1024 ) { $rs = msg_receive ( $this->msg_id , $type , $type , $maxsize , $message , $this->_serialize, $no_wait?MSG_IPC_NOWAIT:NULL , $errorcode); if($rs) return $message; else return false; } public function remove() { msg_remove_queue($this->msg_id); } }
?
<?phpdefine('base_path' , dirname(__FILE__));//msg_write.phpinclude(base_path.'/msg.php');$msg = new msg(1, false);$msg1 = new msg(2, false);if($argv[1]=='del') $msg->remove(); $str = 'There are no user contributed notes for this page.';while(1){ $data = substr($str,0,rand(18,25)); $msg->send(rand().$data, rand(1,10)); echo $data." -> sent\n"; echo 'Get:'.$msg1->receive(false, 0).chr(10); sleep(3); //usleep(10000);}echo 'Done';
?
C, gcc -g -o m msg.c -lpthread;
#include <stdio.h>#include <errno.h>#include <stdlib.h>#include <fcntl.h>#include <string.h>#include <unistd.h>#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>#define MAX_MSG_LEN 512static int php_msg = -1;static int php_msg1 = -1;static int running = 1;static void *php_msg_handler_thread(void *arg);static int msg_send(int msg_id, int fd, char *data);struct msg_st { long mtype; char mtext[MAX_MSG_LEN];};int main(int argc,char **argv){ printf("go 1 \n"); if((php_msg= msgget((key_t)1,0666|IPC_CREAT)) == -1) { perror("php_msg create"); return 0; } if((php_msg1= msgget((key_t)2,0666|IPC_CREAT)) == -1) { perror("php_msg create"); return 0; } ///////////////////////////////////////////////////////////////////////////////// pthread_t php_msg_pthread; int rs = pthread_create(&php_msg_pthread, NULL, (void*(*)(void*))php_msg_handler_thread, (void *)NULL); if(rs!=0) { perror("php_msg_pthread create"); return 0; } pthread_join(php_msg_pthread, NULL); return 0;}static void *php_msg_handler_thread(void *arg){ struct msg_st php_data; printf("sizeof(struct msg_st)=%d\n",sizeof(struct msg_st)); char* data; data = malloc(MAX_MSG_LEN); char *pre = "You told me:"; while(running){ //ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); if(msgrcv(php_msg,(void *) &php_data, MAX_MSG_LEN, 0 , 0) == -1) { perror("msgrcv"); if(errno==E2BIG) { if(msgctl(php_msg,IPC_RMID,0) == -1) { fprintf(stderr,"msgctl(IPC_RMID) failed \n"); } } else if(errno == EINVAL) { sleep(1); } }else{ printf("recevier mssage : %s , type= %d\n", php_data.mtext, php_data.mtype); memset(data, '\0', MAX_MSG_LEN); memcpy(data, pre, strlen(pre)); memcpy(data+strlen(pre), php_data.mtext, strlen(php_data.mtext)); msg_send(php_msg1, 2, data); bzero(php_data.mtext, strlen(php_data.mtext)); } //break; } free(data);}static int msg_send(int msg_id, int fd, char *data){ struct msg_st some_data; //some_data = malloc( sizeof(struct msg_st) ); memcpy(some_data.mtext, data, strlen(data) + 1); some_data.mtext[strlen(data)] = '\0'; some_data.mtype= fd; printf("will send %s \n", &some_data.mtext); //int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg); if((msgsnd(msg_id,(void *) & some_data, strlen(data), 0)) == -1) { perror("msgsnd"); return 0; } return 1;}
?
运行:./m
再运行:php msg_write.php
?
?
?