消息队列,Linux进度间通讯

linux 进程间通信中消息传递主要分为管道,FIFO,消息队列
(1)管道
管道由pipe函数创建,提供一个单路(单向)数据流。pipe函数返回两个文件描述符:fd[0]和fd[1]。前者打开来读,后者打开来写。管道没有名字,所以只能由有亲缘关系的进程使用。尽管管道是由单个进程创建的,它却很少在单个进程内使用。管道的典型用途为两个不同进程(一个是父进程,一个是子进程)提供进程间的通信手段。首先,由一个进程(它将成为父进程)创建一个管道后调用fork派生一个自身的副本。接着,父进程关闭这个管道的读出端,子进程关闭同一管道的写入端。或者父进程关闭这个管道的写入端,子进程关闭同一管道的读出端。这就在父子进程间提供了一个单向数据流。

前面两篇文章分解介绍了匿名管道和命名管道方式的进程间通信,本文将介绍Linux消息队列(posix)的通信机制和特点。

POSIX消息队列与System V消息队列的主要区别:
1.对POSIX队列的读总数返回最高优先级到最早消息,对SV队列到读则可以返回任意指定优先级的消息
2.当往一个空队列放置一个消息时,POSIX允许产生一个信号或启动一个线程,System V不提供此机制

消息队列

(2)FIFO
FIFO指代先进先出(First in,First out),linux中的FIFO类似管道。它是一个单向(半双工)数据流。不同于管道的是,每个FIFO有一个路径名与之关联,从而允许无亲缘关系的进程访问同一个FIFO。FIFO也称为有名管道。FIFO由mkfifo函数创建。其中pathname是一个普通的Unix路径名,它是该FIFO的名字。mkfifo 函数已隐含指定 O_CREAT|O_EXCL. 也就是说,它那么创建一个新的FIFO,要么返回一个EEXIST错误(如果所指定的名字的FIFO已经存在)。如果不想希望创建一个新的FIFO,那就改为调用open而不是mkfifo.要打开一个已存在的FIFO或创建一个新的FIFO,应先调用mkfifo,再检查它是否返回EEXIST错误,若返回该错误则改为调用open.mkfifo 命令也能创建FIFO。可以从shell脚本或命令行中使用它。在创建出一个FIFO后,它必须或者打开来读,或者打开来写,所用的可以是open函数,也可以是某个标准I/O打开函数。FIFO不能打开来既读又写,因为它是半双工的。对管道或FIFO的write总是往末尾添加数据,对它们的read则总是从开头返回数据。如果对管道或FIFO调用lseek,那就返回ESPIPE错误。

1、消息队列

消息的属性:
1.一个无符号整数的优先级(POSIX)或一个长整数的类型(SV)
2.消息的数据部分长度(可以为0)
3.数据本身(如果长度大于0)

一、函数

(3)Posix 消息队列
消息队列可认为是一个消息链表。有足够写权限的线程可往队列中放置消息,有足够读权限的线程可从队列中取走消息。每个消息都是一个记录,它由发送者赋予一个优先级。在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达。这跟管道和FIFO是相反的,对后者来说,除非读出者已存在,否则先有写入者是没有意义的。一个进程可以往某个队列写入一些消息,然后终止,再让另外一个进程在以后某个时刻读出这些消息。消息队列具有随内核的持续性,这跟管道和FIFO不一样。Posix消息队列和System V消息队列。这两组函数间存在许多相似性,但也有主要的区别
1. 对Posix消息队列的读总是返回最高优先级的最早消息,对System V消息队列的读则可以返回任意指定优先级的消息。
2.当往一个空队列放置一个消息时,Posix 消息队列允许产生一个信号或启动一个线程。System V消息队列则不提供类似机制。

消息队列的实现分为两种,一种为System V的消息队列,一种是Posix消息队列;这篇文章将主要围绕Posix消息队列介绍;

POSIX消息队列总结:
mq_open创建一个新队列或者打开一个已经存在的队列
mq_close关闭队列
mq_unlink删除队列名,删除队列
mq_send往队列放置消息
mq_receive从一个队列中读出消息
mq_setattr和mq_getattr查询和设置队列的属性
mq_notify允许注册一个信号或者线程,在有一个消息被放置到空队列时,发送信号或者激活线程
每个消息被赋予一个小整数优先级,mq_receive总是返回最高优先级的最早消息

mq_open

队列中的每个消息具有如下属性:
1.一个无符号整数优先级(Posix)或一个长整数类型(System V).
2.消息的数据部分长度(可以为0).
3.数据本身(如果长度大于0)

消息队列可以认为是一个消息链表,某个进程往一个消息队列中写入消息之前,不需要另外某个进程在该队列上等待消息的达到,这一点与管道和FIFO相反。Posix消息队列与System V消息队列的区别如下:

限制:
/proc/sys/fs/mqueue/msg_max 10
/proc/sys/fs/mqueue/msgsize_max 8192
/proc/sys/fs/mqueue/queues_max 256

头文件

mqueue.h;

 

函数接口

(1) 对Posix消息队列的读总是返回最高优先级的最早消息,对System V消息队列的读则可以返回任意指定优先级的消息。

创建一个新的消息队列或者打开一个已经存在的消息队列
<mqueue.h> 注意:编译加-lrt
<fcntl.h>
<sys/stat.h>
mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode,  struct mq_attr *attr);
成功返回描述字,失败返回-1并设置errno
name: 必须为/开头!!!
oflag: O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_NONBLOCK

原型

mqd_t mq_open(const char *name, int oflag, .../*mode_t mode,struct mq_attr* attr*/);

 

  1. mqd_t mq_open(const char *name,int oflag,...)
    mq_open函数创建一个新的消息队列或打开一个已存在的消息队列
    2.int mq_close(mqd_t mqdes);
    mq_close函数关闭一个消息队列。
    3.int mq_unlink(const char *name);
    从系统中删除用作第一个参数的某个name.
  2. int mq_getattr(mqd_t mqdes,struct mq_attr *attr);
       int mq_setattr(mqd_t mqdes,const struct mq_attr *attr,struct mq_attr *oattr);
    每个消息队列有四个属性,mq_getattr返回所有这些属性,mq_setattr则设置其中某个属性。
    struct mq_attr{
       long mq_flags;
       long mq_maxmsg;
       long mq_msgsize;
       long mq_curmsgs;
    };

(2)当往一个空队列放置一个消息时,Posix消息队列允许产生一个信号或启动一个线程,System V消息队列则不提供类似的机制。

关闭消息队列,但不能删除它
mqd_t mq_close(mqd_t mqdes);
成功返回0,失败返回-1

函数功能

创建消息队列;

 

5.int mq_send(mqd_t mqdes,const char *ptr,size_t len,unsigned int prio);
 int mq_receive(mqd_t mqdes,char *ptr,size_t len,unsigned int *priop);

2、消息队列的基本操作

删除消息队列,不一定马上删除消息队列,但队列名会立即删除
mqd_t mq_unlink(const char *name);
成功返回0,失败返回-1
当某个进程还没有关闭此消息队列时,调用mq_unlink时,不会马上删除队列,当最后一个进程关闭队列时,该队列被删除
int flags;
mqd_t mqd;
flags = O_RDWR | O_CREAT | O_EXCL;
mqd = mq_open("/tmp.111", flags, 0644, NULL);
if (mqd == (mqd_t)-1) {
perror("mq_open");
return 1;
}

参数

name :消息队列的名字,根据消息队列的规则,为了更好的可移植性,该名字必须以‘/’开头,创建一个消息队列的时候无须路径,给出名字就好,其存放位置可有自己指定(创建前后都可以)。

 

oflag:O_RDONLY(只读) O_WRONLY(只写) O_RDWR(可读可写)O_CREAT(创建) O_EXCL (当消息已存在时,返回EEXIST错误到errno中)O_NONBLOCK(设置非阻塞)

 

mode:在oflag中指定O_CREAT时,此参数是需要的。表示创建消息队列的权限,S_IRUSR,S_IWUSR,S_IXUSR,S_IRGRP,S_IWGRP,S_IXGRP,S_IROTH,S_IWOTH,S_IXOTH相或组成或者写成0777(表示rwxrwxrwx)等用八进制表示也可以。

 

attr:在oflag中指定O_CREAT时,此参数是需要的。存放消息队列的属性。其中mq_flags为0,表示阻塞,为O_NONBLOCK为非阻塞。

(这里可以填写哪些参数具体看:二、Posix IPC)

 

mq_send函数往消息队列中写入消息,mq_receive函数从消息队列中读出消息。

 2.1 打开一个消息队列

消息队列的属性
mq_getattr mq_setattr
mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
成功返回0,失败返回-1
struct mq_attr {
long mq_flags;       /* Flags: 0 or O_NONBLOCK */
long mq_maxmsg;      /* Max. # of messages on queue */
long mq_msgsize;     /* Max. message size (bytes) */
long mq_curmsgs;     /* # of messages currently in queue */
};
mq_setattr只能修改mq_flags属性,maxmsg和msgsize在mq_open时设置
mqd_t mqd;
struct mq_attr attr;
mqd = mq_open(argv[1], O_RDONLY);
mq_getattr(mqd, &attr);
printf("maxmsg=%ld, msgsize=%ld, curmsgs=%ldn",
attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
mq_close(mqd);

返回值

若创建成功则返回消息队列的描述符,否则返回-1。

 

 

 

6.int mq_notify(mqd_t mqdes,const struct sigevent *motification);
结构体:
union sigval{
    int sival_int;
    void *sival_ptr;
};

#include    <mqueue.h>

收发消息
mq_send mq_receive
mq_receive返回队列中最高优先级的最早消息,而且该优先级能随该消息的内容及其长度一起返回

mq_close函数

struct sigevent
{
    int sigev_notify;
    int sigev_signo;
    union sigval sigev_value;
    void  (*sigev_notify_function)(union sigval);
    pthread_attr_t *sigev_notify_attributes;
};

typedef int mqd_t;

ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
成功返回消息的长度,消息的实际长度,不包括消息头;失败返回-1
msg_len指示msg_ptr的长度,必须大于等于mq_msgsize
如果msg_prio不为NULL,函数返回消息的优先级
如果队列为空,调用将阻塞,如果队列设置0_NONBLOCK,调用立即返回EAGAIN

头文件

mqueue.h

mq_notify函数为指定队列建立或删除异步事件通知。一些普遍适用于该函数的若干规则
1).如果notification参数非空,那么当前进程希望在一个消息达到所指定的先前为空的队列时得到通知。我们说"该进程被注册为接收该队列的通知"。
2).如果notification参数为空指针,而且当前进程目前被注册为接收所指定队列的通知,那么已存在的注册将被撤销。
3).任意时刻只有一个进程可以被注册为接收某个给定队列的通知。
4).当有一个消息达到某个先前为空的队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下,通知才会发出。这就是说,在mq_receive调用中的阻塞比任何通知的注册都优先。
5).当该通知被发送给它的注册进程时,其注册即被撤销。该进程必须再次调用mq_notify以重新注册(想要的话)。

mqd_t mq_open(const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */);

// 向队列加入一条消息
mqd_t mqd;
char *msg;
size_t len;
unsigned int prio;
len = 100;
prio = 5;
mqd = mq_open("/abc.123", O_WRONLY);
msg = (char *)malloc(len);
memset(msg, 0, len);
mq_send(mqd, msg, len, prio);

原型

int mq_close(mqd_t mqdes);

进程间通信中消息传递主要分为管道,FIFO,消息队列 (1)管道 管道由pipe函数创建,提供一个单路(单向)数据流。pipe函数返回两个文...

返回: 成功时为消息队列描述字,出错时为-1。 

// 从队列读入一条消息
mqd_t mqd;
char *msg;
size_t len;
int n;
unsigned int prio;
struct mq_attr attr;
mqd = mq_open("/abc.123", O_RDONLY);
mq_getattr(mqd, &attr);
len = attr.mq_msgsize;
msg = (char *)malloc(len);
memset(msg, 0, len);
n = mq_receive(mqd, msg, len, &prio);
printf("read %ld bytes, priority=%un", (long)n, prio);

函数功能

关闭已打开的消息队列,关闭后调用进程不可以再使用该描述符,但其消息队列并没有被删除。一个进程终止时,它的所有打开着的消息队列都关闭,就像调用了mq_close一样。

功能: 创建一个新的消息队列或打开一个已存在的消息的队列。

队列限制
long int open_max = sysconf(_SC_MQ_OPEN_MAX);  // -1
long int prio_max = sysconf(_SC_MQ_PRIO_MAX);  // 32768

参数

mqdes 消息队列的描述符,即消息队列创建成功后的返回值。

2.2 关闭一个消息队列

消息通告
当往空队列放置了一个消息时,通知进程
通告方式有2种:

返回值

成功返回0,失败返回-1。

 

 

#include    <mqueue.h>

  1. 产生一个信号
  2. 创建一个线程执行一个指定的函数
    mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);
    成功返回0;失败返回-1
    给队列建立或者删除异步事件通知
    1.如果notification非空,那么当前进程希望在有一个消息到达而且队列先前为空时得到通知,该进程被注册为接收该队列的通知
    2.如果notification为空,而且当前进程目前被注册为接收该队列的通知,那么现有注册将被撤销
    3.任意时刻只有一个进程可以被注册为接收队列的通知
    4.当有一个消息到达一个空队列,而且已经有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用的前提下,通知才会发送。即在mq_receive调用中的阻塞比任何通知的注册都优先
    5.当该通知已经发送给它的注册进程时,其注册即被撤销。该进程必须再次调用mq_notify以重新注册
    6.当调用mq_notify但是队列不为空时,通知不会发送;当队列变为空,并且有一个消息入队时,才发送通知

mq_unlink

int mq_close(mqd_t mqdes);

union sigval {                /* Data passed with notification */
int     sival_int;        /* Integer value */
void   *sival_ptr;        /* Pointer value */
};

头文件

mqueue.h

返回: 成功时为0,出错时为-1。

struct sigevent {
int    sigev_notify;      /* Notification method */
int    sigev_signo;       /* Notification signal */
union sigval sigev_value; /* Data passed with notification */
void (*sigev_notify_function) (union sigval);
/* Function for thread notification */
void  *sigev_notify_attributes;
/* Thread function attributes */
};
sigev_notify:SIGEV_NONE,SIGEV_SIGNAL,SIGEV_THREAD

原型

int mq_unlink(const char *name);

功能: 关闭已打开的消息队列。

// 使用非阻塞mq_receive的信号通知
volatile sig_atomic_t mqflag;
static void sig_usr1(int);
int main(int argc, char *argv[])
{
mqd_t mqd;
void *buf;
ssize_t n;
sigset_t zeromask, newmask, oldmask;
struct mq_attr attr;
struct sigevent sigev;

函数功能

从系统中删除名为name的消息队列,但删除的只是我们可以在系统中看见的文件的名字,但文件本身并没有被从磁盘上删除,除非该名称是文件的最后一个链接,并且该文件已关闭,才会将该文件真正从磁盘上删除。即如果某前该详细队列的文件还在其他进程中打开,那么不会将其从磁盘上删除,又或者这是最后一个链接,但它还为关闭,即未执行ma_close操作,或打开它的进程为结束就执行mq_unlink,它也不会从磁盘上删除。

2.3 删除一个消息队列

assert(argc == 2);
mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
mq_getattr(mqd, &attr);
buf = malloc(attr.mq_msgsize);
sigemptyset(&zeromask);
sigemptyset(&newmask);
sigemptyset(&oldmask);
sigaddset(&newmask, SIGUSR1);
signal(SIGUSR1, sig_usr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqd, &sigev);

参数

name消息队列的名称,以‘/’开始。

#include    <mqueue.h>

for ( ; ; ) {
sigprocmask(SIG_BLOCK, &newmask, &oldmask);
while (mqflag == 0)
sigsuspend(&zeromask);
mqflag = 0;
mq_notify(mqd, &sigev);
while ((n = mq_receive(mqd, buf, attr.mq_msgsize, NULL)) >= 0) {
printf("read %ld bytesn", (long)n);
}
if (errno != EAGAIN)
die("mq_receive");
sigprocmask(SIG_UNBLOCK, &newmask, NULL);
}

返回值

成功返回0,出错返回-1。

 

 

int mq_unlink(const char *name)

return 0;
}
static void sig_usr1(int signo)
{
mqflag = 1;
return;
}

mq_getattr

返回: 成功时为0,出错时为-1

// 使用sigwait代替信号处理程序的信号通知
#include <signal.h>
int sigwait(const sigset_t *set, int *sig);
成功返回0,并设置sig为收到的信号;失败返回错误码

头文件

mqueue.h

功能: 从系统中删除消息队列。

int main(...)
{
...
sigemptyset(&newmask);
sigaddset(&newmask, SIGUSR1);
sigprocmask(SIGBLOCK, &newmask, NULL);

原型

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

这三个函数操作的代码如下:

sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqd, &sigev);
for ( ; ; ) {
sigwait(&newmask, &signo);
if (signo == SIGUSR1) {
mq_notify(mqd, &sigev);
while ((n = mq_receive(mqd, buf, len, NULL)) >=0) {
printf("read %ld bytesn", n);

if (errno != EAGAIN)
die("mq_receive");
}
}
...
}

函数功能

获取mqdes指的消息队列的属性,存放到attr结构体中。

struct mq_attr

{

 

long int mq_flags;    /* Message queue flags:0,O_NONBLOCK */

long int mq_maxmsg;   /* Maximum number of messages.  */

long int mq_msgsize;  /* Maximum message size.  */

long int mq_curmsgs;  /* Number of messages currently queued.  */

long int __pad[4];

 

};

#include <mqueue.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdio.h>

// 使用select的POSIX消息队列
int pfds[2];
static void sig_usr1(int);
int main(int argc, char *arg[])
{
int fds;
char c;
fd_set rfds;
mqd_t mqd;
void *buf;
ssize_t n;
size_t len;
struct mq_attr attr;
struct sigevent sigev;

参数

mqdes为消息队列描述符,attr为上面解释的存放消息队列属性的结构体。

int main(int argc, char* argv[])
{
    int flag = O_RDWR | O_CREAT | O_EXCL;
    int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
    mqd_t mqid = mq_open("/mq_test", flag, mode,NULL);
    if (mqid == -1)
    {
        printf("mqueue create failed!n");
        return 1;
    }
    else
    {
        printf("mqueue create success!n");
    }
    mq_close(mqid);  return 0;
}

asset(argc == 2);
mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
mq_getattr(mqd, &attr);
len = attr.mq_msgsize;
buf = malloc(len);
pipe(pfds);
// 设置信号处理程序,建立通知
signal(SIGUSR1, sig_usr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
mq_notify(mqd, &sigev);
FD_ZERO(&rfds);
for ( ; ; ) {
FD_SET(pfds[0], &rfds);
nfds = select(pfds[0] 1, &rfds, NULL, NULL, NULL);
if (FD_ISSET(pfds[0], &rfds)) { // 管道可读
read(pfds[0], &c, 1);
mq_notify(mqd, &sigev);
while ((n = mq_receive(mqd, buf, len, NULL)) >= 0) {
printf("read %ld bytesn", (long)n);
}
if (errno != EAGAIN)
die("mq_receive");
}
}
return 0;
}

返回值

成功返回0,失败返回-1。

 

 

 

static void sig_usr1(int signo)
{
write(pfds[1], "", 1); // 异步信号处理安全的函数
return;
}

mq_setattr

#include <mqueue.h>
#include <unistd.h>

// 收到通知后,启动一个线程,接收消息,然后结束进程
#include <pthread.h>
#include <mqueue.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#define die(msg) { perror(msg); exit(EXIT_FAILURE); }
static void tfunc(union sigval sv)   /* Thread start function */
{
struct mq_attr attr;
ssize_t nr;
void *buf;
mqd_t mqdes = *((mqd_t *) sv.sival_ptr);
/* Determine max. msg size; allocate buffer to receive msg */
if (mq_getattr(mqdes, &attr) == -1) die("mq_getattr");
buf = malloc(attr.mq_msgsize);
if (buf == NULL) die("malloc");
nr = mq_receive(mqdes, buf, attr.mq_msgsize, NULL);
if (nr == -1) die("mq_receive");
printf("Read %ld bytes from MQ0n", (long) nr);
free(buf);
exit(EXIT_SUCCESS);         /* Terminate the process */
}

头文件

mqueue.h

int main(int argc, char* argv[])
{
    mq_unlink("/mq_test");   
    return 0;
}

int main(int argc, char *argv[])
{
mqd_t mqdes;
struct sigevent not;
assert(argc == 2);
mqdes = mq_open(argv[1], O_RDONLY);
if (mqdes == (mqd_t) -1) die("mq_open");
not.sigev_notify = SIGEV_THREAD;
not.sigev_notify_function = tfunc;
not.sigev_notify_attributes = NULL;
not.sigev_value.sival_ptr = &mqdes;   /* Arg. to thread func. */
if (mq_notify(mqdes, ¬) == -1) die("mq_notify");
pause();    /* Process will be terminated by thread function */
return 0;
}          

原型

int mq_setattr(mqd_t mqdes, const struct mq_attr *attr,

 struct mq_attr *oattr);

注意:编译posix mqueue时,要连接运行时库(runtime library),既-lrt选项,运行结果如下:

// 启动一个新线程
mqd_t mqd;
struct mq_attr attr;
struct sigevent sigev;
static void notify_thread(union sigval);
int main(int argc, char *argv[])
{
assert(argc == 2);
mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
mq_getattr(mqd, &attr);
sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_value.sival_ptr = NULL;
sigev.sigev_notify_function = notify_thread;
sigev.sigev_notify_attributes = NULL;
mq_notify(mqd, &sigev);
for ( ; ; )
pause();
return 0;
}
static void notify_thread(union sigval arg)
{
ssize_t n;
size_t len;
void *buf;
len = attr.mq_msgsize;
printf("notify_thread startedn");
buf = malloc(len);
mq_notify(mqd, &sigev);
while ((n = mq_receive(mqd, buf, len, NULL)) >= 0) {
printf("read %ld bytesn", (long)n);
}
if (errno != EAGAIN)
die("mq_receive");
free(buf);
pthread_exit(NULL);
}

函数功能

设置消息队列的属性,但是只使用attr结构体中的mq_flags属性,以设置(O_NONBLOCK)或清除(0)非阻塞标志。该结构体的另外三个属性被忽略,每个队列的最大消息数和每个消息的最大字节数都只能在创建时设置,当前队列中的消息数是随传送消息和读取消息的操作改变的,只能读取不能设置。如果oattr非空,那么指定队列的先前属性(4个)全将返回到由该指针指向的结构体中。

图片 1

POSIX实时信号
unix信号分为两大组:
实时信号:SIGRTMIN--SIGRTMAX
其他信号:SIGINT, SIGQUIT, SIGKILL, ...

参数

mqdes 消息队列的属性

attr 函数功能解释中

oattr 函数功能解释中

关于mqueue更多详细内容可以使用:man mq_overview命令查看,里面有一条需要注意的是,Linux下的Posix消息队列是在vfs中创建的,可以用

信号的实时行为取决于SA_SIGINFO
实时行为包含以下特征:
1.信号是排队的,即如果一个信号产生了3次,它就递交3次。以FIFO的顺序排队
2.当有多种SIGRTMIN到SIGRTMAX范围内的解阻塞信号排队时,值较小的信号先于值较大的信号递交(注意:linux与此相反)
3.当某个非实时信号递交时,传递给它的信号处理的唯一参数是该信号的值,实时信号比其他信号传递更多的信息
4.有些新函数使用实时信号工作,如sigqueue用来代替kill

返回值

成功返回0,失败返回-1。

 

 

 

mount -t mqueue none /dev/mqueue

// 查看实时信号的递交顺序
static void sig_rt(int, siginfo_t *, void *);
int main(void)
{
int i, j;
pid_t pid;
sigset_t newset;
union sigval val;
printf("SIGRTMIN=%d, SIGRTMAX=%dn", (int)SIGRTMIN, (int)SIGRTMAX);
pid = fork();
if (pid < 0) die("fork");
else if (pid == 0) {
/* 阻塞3个实时信号 */
sigemptyset(&newset);
sigaddset(&newset, SIGRTMIN);
sigaddset(&newset, SIGRTMIN 1);
sigaddset(&newset, SIGRTMIN 2);
sigprocmask(SIG_BLOCK, &newset, NULL);
signal_rt(SIGRTMIN, sig_rt);
signal_rt(SIGRTMIN 1, sig_rt);
signal_rt(SIGRTMIN 2, sig_rt);
sleep(6);
sigprocmask(SIG_UNBLOCK, &newset, NULL);
sleep(3);
exit(0);
}
else {
sleep(3);
for (i=SIGRTMIN; i<=SIGRTMIN 2; i ) {
for (j=0; j<=2; j ) {
val.sival_int = j;
sigqueue(pid, i, val);
printf("send signal signo=%d, val=%dn", i, j);
}
}
exit(0);
}
}

mq_send

将消息队列挂在在/dev/mqueue目录下,便于查看。

static void sig_rt(int signo, siginfo_t *info, void *context)
{
printf("receive signal signo=%d, code=%d, ival=%dn",
signo, info->si_code, info->si_value.sival_int);
}
typedef void sigfunc_rt(int, siginfo_t *, void *);
sigfunc_rt *signal_rt(int signo, sigfunc_rt *func)
{
struct sigaction act, oact;
act.sa_sigaction = func;
sigemptyset(&act.sa_mask);
act.sa_flags = SA_SIGINFO; /* 实时信号必须指定 */
if (signo == SIGALRM) {
#ifdef    SA_INTERRUPT
act.sa_flags |= SA_INTERRUPT;
#endif        
}
else {
#ifdef    SA_RESTART
act.sa_flags |= SA_RESTART;
#endif
}
if (sigaction(signo, &act, &oact) < 0)
return (sigfunc_rt *)SIG_ERR;
else
return oact.sa_sigaction;
}
输出如下:
[root@jiangkun unp]# ./rtsig 
SIGRTMIN=34, SIGRTMAX=64
send signal signo=34, val=0
send signal signo=34, val=1
send signal signo=34, val=2
send signal signo=35, val=0
send signal signo=35, val=1
send signal signo=35, val=2
send signal signo=36, val=0
send signal signo=36, val=1
send signal signo=36, val=2
receive signal signo=36, code=-1, ival=0
receive signal signo=36, code=-1, ival=1
receive signal signo=36, code=-1, ival=2
receive signal signo=35, code=-1, ival=0
receive signal signo=35, code=-1, ival=1
receive signal signo=35, code=-1, ival=2
receive signal signo=34, code=-1, ival=0
receive signal signo=34, code=-1, ival=1
receive signal signo=34, code=-1, ival=2

头文件

mqueue.h

2.4 mq_close()和mq_unlink()

struct sigaction {
void (*sa_handler)(int);
void (*sa_sigaction)(int, siginfo_t *, void *);
sigset_t sa_mask;
int sa_flags;
void (*sa_restorer)(void); /* 被遗弃了! */
};

原型

int mq_send(mqd_t mqdes, const char *ptr, size_t len,

 unsigned int prio);

mq_close()的功能是关闭消息队列的文件描述符,但消息队列并不从系统中删除,要删除一个消息队列,必须调用mq_unlink();这与文件系统的unlink()机制是一样的。

实时信号之所以是可靠的,因为在进程阻塞该信号的时间内,发给该进程的所有实时信号会排队,而非实时信号则会合并为一个信号。早期的kill函数只能向特 定的进程发送一个特定的信号,并且早期的信号处理函数也不能接受附加数据。siqueue和sigaction解决了这个问题。
下面这个例子中,进程先屏蔽SIGINT和SIGRTMIN两个信号,其中SIGINT是非实时信号,而SIGRTMIN为实时信号,接着进程睡眠,睡眠完成之后再接触对这两个信号的屏蔽,此时可以比较对两种信号的处理方式是否一样。
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
void sig_handler(int, siginfo_t*, void*);
int main(int argc,char *argv[])
{
struct sigaction act;
sigset_t newmask, oldmask;
int rc;    
sigemptyset(&newmask);
/* 往信号集中添加一个非实时信号 */
sigaddset(&newmask, SIGINT);
/* 往信号集中添加一个实时信号 */
sigaddset(&newmask, SIGRTMIN);
/* 屏蔽实时信号SIGRTMIN */
sigprocmask(SIG_BLOCK, &newmask, &oldmask);
act.sa_sigaction = sig_handler;
act.sa_flags = SA_SIGINFO;
if(sigaction(SIGINT, &act, NULL) < 0) {
printf("install signal errorn");
}
if(sigaction(SIGRTMIN, &act, NULL) < 0) {
printf("install signal errorn");
}
printf("pid = %dn", getpid());
/* 进程睡眠,在此时间内的发给该进程的所有实时信号 将排队,不会有信号丢失 */
sleep(20);    
/* 解除对SIGRTMIN信号的屏蔽,信号处理函数将会被调用 */
sigprocmask(SIG_SETMASK, &oldmask, NULL);
return 0;
}
void sig_handler(int signo, siginfo_t *info, void *context)
{
if(signo == SIGINT)
printf("Got a common signaln");
else
printf("Got a real time signaln");
}

函数功能

给描述符mqdes指向的消息队列发送消息,大小为len,内容存放在ptr中,prio为优先级。

3、消息队列的属性

将程序编译好之后,再开一个终端用于发送实时信号。
# ./sigqueue_receive 
pid = 8871
进程开始睡眠……
在新的终端输入:
ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
连续发送四个SIGRTMIN,接着回到之前的终端,连续四次按下"ctrl c"。
^C^C^C^C
最后进程终于醒来,整个输出如下:
pid = 8871
^C^C^C^CGot a real time signal
Got a real time signal
Got a real time signal
Got a real time signal
Got a common signal
果然接受到四个实时信号,并且四次调用了信号处理函数,而对于SIGINT,虽然也按下了四次"ctrl c",但是进程对其只做一次处理。这个例子中是先发实时信号后发非实时信号,所以信号处理函数先处理实
时信号,如果只是按照顺序注册信号的话,这很好理解,但是换一下,先按下了四次"ctrl c"然后使用kill发四次实时信号,结果发现输出的结果仍然 一样,这说明实时信号的优先级比非实时信号要高,内核每个进程的信号组成一个双向链表,实时信号插入的时候就不是随便插在尾部了。
信号的优先级:信号实质上是软中断,中断有优先级,信号也有优先级。如果一个进程有多个未决信号,则对于同一个未决的实时信号,内核将按照发送的顺序来递 交信号。如果存在多个未决的实时信号,则值(或者说编号)越大的越先被递送。如果既存在不可靠信号,又存在可靠信号(实时信号),虽然POSIX对这一情 况没有明确规定,但Linux系统和大多数遵循POSIX标准的操作系统一样,将优先递交可靠信号。一个进程如果处理 SIGQUIT(3),SIGINT(2),SIGHUP(1)(通过"kill -l" 可以查看信号的编号),那么先后给该进程发送SIGINT,SIGHUP,SIGQUIT,处理的顺序会是SIGQUIT,SIGINT,SIGHUP, 不论改变这个三个信号的发送顺序,处理的顺序都是一样的。

参数

mqdes为要发送消息的消息队列描述符;

ptr为要发送的数据;

len为消息的长度;

prio为消息的优先级;

#include    <mqueue.h>

返回值

成功返回0,失败返回-1。

 

 

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

mq_receive

int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *attr);

头文件

mqueue.h

均返回:成功时为0, 出错时为-1

原型

ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len,

 unsigned int *proip);

每个消息队列有四个属性:
struct mq_attr
{
    long mq_flags;       /* message queue flag : 0, O_NONBLOCK */
    long mq_maxmsg;  /* max number of messages allowed on queue*/
    long mq_msgsize;  /* max size of a message (in bytes)*/
    long mq_curmsgs;  /* number of messages currently on queue */
};

函数功能

从描述符mqdes指向的消息队列中读取消息存放ptr中。

4、消息收发

参数

mqdes为要从中读取消息的消息队列的描述符;

ptr为存放接受到的消息的指针;

len为接受的最大长度;

该值不能小于能加到该消息对列上的最大大小,如果len小于该值,就立即返回EMSGSIZE错误。

#include    <mqueue.h>

返回值

成功返回读取消息的内容的字节数,出错返回-1。

 

 

int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);

mq_notify

返回:成功时为0,出错为-1

头文件

mqueue.h;

signal.h;

ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);

原型

int mq_notify(mqd_t mqdes, const struct sigevent *notification);

返回:成功时为消息中的字节数,出错为-1

函数功能

为指定队列建立或删除异步事件通知。

mqsend代码如下:

参数

在<signal.h>中:

 

union signal

{

``int  sival_int;  ``/*整数值*/

``void *sival_ptr; ``/*指针值*/

};

 

struct sigevent

{

``int sigev_notify; ``/*通知类型:SIGEV_NONE、SIGEV_SIGNAL、SIGEV_THREAD*/

``int sigev_signo; ``/*信号值*/

``union sigval sigev_value; ``/*传递给信号处理函数或线程的信号值*/

``void (*sigev_notify_function)(``union sigval); ``/*线程处理函数*/

``pthread_attr_t *sigev_notify_attributes; ``/*线程属性*/

};

 

(1).如果notification参数非空,那么当前进程希望在有一个消息到达所指定的先前为空的队列时得到通知。我们说“该进程被注册为接收该队列的通知”。

(2).如果notification参数为空指针,而且当前进程目前被注册为接收所指定队列的通知,那么已存在的注册将被撤销。

(3).任意时刻只有一个进程可以被注册为接收某个指定队列的通知。

(4).当有一个消息到达某个先前为空的队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下,通知才会发出。这就是说,在mq_receive调用中的阻塞比任何通知的注册都优先。

(5).当该通知被发送给它的注册进程时,其注册即被撤销。该进程必须再次调用mq_notify重新注册(如果想要的话)。

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/types.h>

返回值

成功返回0,失败返回-1。

 

 

 

 

int main(int argc, char* argv[])
{
    int flag = O_RDWR;
    int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
    mqd_t mqid = mq_open("/mq_test",flag,mode,NULL);
    if (mqid == -1)
    {
        printf("open mqueue failed!n");
        return 1;
    }

二、程序举例

mqsend.c

#include<stdio.h>

#include<stdlib.h>

#include<sys/types.h>

#include<mqueue.h>

#include<sys/stat.h>

#include<string.h>

#include<unistd.h>

#include<fcntl.h>

 

int main(int argc, char **argv)

{

    mqd_t mqd;

    char *ptr;

    size_t len;

    unsigned int prio;

 

    if(argc != 4)

        perror("./mqsend mqueue_name size prio!n");

    len = atoi(argv[2]);

    prio = atoi(argv[3]);

   

    mqd = mq_open(argv[1], O_WRONLY);

   

    ptr = (char *)calloc(len, sizeof(char));

    strcpy(ptr, "wjj_xyd");

    mq_send(mqd, ptr, len, prio);

   

    exit(0);

}

mqreceive.c

#include<stdio.h>

#include<stdlib.h>

#include<mqueue.h>

#include<sys/types.h>

#include<sys/stat.h>

#include<unistd.h>

 

int main(int argc, char **argv)

{

    int c, flags;

    mqd_t mqd;

    ssize_t n;

    unsigned int prio;

    char *buff;

    struct mq_attr attr;

   

    flags = O_RDONLY;

    while((c = getopt(argc,argv, "n")) != -1)

    {

        switch(c){

            case 'n':flags |= O_NONBLOCK;

                 break;

        }

    }

 

    if(optind != argc - 1)

        perror("mqreceive error!n");

    mqd = mq_open(argv[optind], flags);

    mq_getattr(mqd, &attr);

   

    buff = (char*)malloc(attr.mq_msgsize);

 

    n = mq_receive(mqd, buff, attr.mq_msgsize, &prio);

    printf("buff = %s, read %ld bytes, priority = %un",buff, (long)n, prio);

 

    exit(0);

}

 

异步通知事件:

#include <stdio.h>

#include <stdlib.h>

#include <mqueue.h>

#include <sys/types.h>

#include <sys/stat.h>

#include <unistd.h>

#include <fcntl.h>

#include <errno.h>

#include <signal.h>

 

mqd_t mqd;

struct mq_attr attr;

struct sigevent sigev;

char *ptr;

unsigned ``int prio;

size_t n;

int rc;

void sig_usr1(``int signo);

 

/*读取某消息队列,消息队列名通过参数传递*/

/*当有消息放置到某个空的队列中时产生SIGUSR1信号*/

int main(``int argc, ``char *argv[])

{

``if``(argc != 2)

``{

``printf``(``"Usage: mqnotifysig1 <name>n"``);

``exit``(1);

``}

 

``/*只读模式打开消息队列*/

``mqd = mq_open(argv[1], O_RDONLY);

``if``(mqd < 0)

``{

``perror``(``"打开消息队列失败"``);

``exit``(1);

``}

 

``// 取得消息队列属性,根据mq_msgsize动态申请内存

``rc = mq_getattr(mqd, &attr);

``if``(rc < 0)

``{

``perror``(``"取得消息队列属性失败"``);

``exit``(1);

``}

 

``/*动态申请保证能存放单条消息的内存*/

``ptr = ``calloc``(attr.mq_msgsize, ``sizeof``(``char``));

``if``(NULL == ptr)

``{

``printf``(``"动态申请内存失败n"``);

``mq_close(mqd);

``exit``(1);

``}

 

``//注册信号函数

``signal``(SIGUSR1, sig_usr1);

``sigev.sigev_notify = SIGEV_SIGNAL;

``sigev.sigev_signo = SIGUSR1;

 

``//注册通知

``rc = mq_notify(mqd, &sigev); ``// 读取前需要再次注册

``if``(rc < 0)

``{

``perror``(``"通知注册失败"``);

``mq_close(mqd);

``free``(ptr);

``exit``(1);

``}

 

``for``(;;)

``{

``pause();

``}

``return 0;

}

 

void sig_usr1(``int signo)

{

``rc = mq_notify(mqd, &sigev); ``// 读取前需要再次注册

``if``(rc < 0)

``{

``perror``(``"通知注册失败"``);

``mq_close(mqd);

``free``(ptr);

``exit``(1);

``}

``/*接收一条消息*/

``n = mq_receive(mqd, ptr, attr.mq_msgsize, &prio);

``if``(n < 0)

``{

``perror``(``"读取失败"``);

``mq_close(mqd);

``free``(ptr);

``exit``(1);

``}

``printf``(``"读取 %ld 字节n优先级为 %un"``, (``long``)n, prio);

}

 

 

    char *buf = "hello, i am sender!";
    mq_send(mqid,buf,strlen(buf),20);
    mq_close(mqid);
   
    return 0;
}

mqrecv代码如下:

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/types.h>

int main(int argc, char* argv[])
{
    int flag = O_RDWR;
    int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
    mqd_t mqid = mq_open("/mq_test",flag,mode,NULL);
    if (mqid == -1)
    {
        printf("open mqueue failed!n");
        return 1;
    }

    struct mq_attr attr;
    mq_getattr(mqid,&attr);
    char buf[256] = {0};
    int priority = 0;
    mq_receive(mqid,buf,attr.mq_msgsize,&priority);
    printf("%sn",buf);
    mq_close(mqid);   
    return 0;
}

运行结果如下:

图片 2

首先我们运行三次send,然后运行四次recv,可见recv的前三次是可以收到消息队列里的三个消息的,当运行第四次的时,系统消息队列里为空,recv就会阻塞;关于非阻塞式mqueue见下文。

5、mq_notify函数

如前文介绍,poxis消息队列运行异步通知,以告知何时有一个消息放置到某个空消息队列中,这种通知有两种方式可以选择:

(1)产生一个信号

(2)创建一个线程来执行一个指定的函数

这种通知通过mq_notify() 函数建立。该函数为指定的消息消息队列建立或删除异步事件通知,

#include <mqueue.h> 

int mq_notify(mqd_t mqdes, const struct sigevent* notification);

(1)如果notification参数为非空,那么当前进程希望在有一个消息到达所指定的先前为空的对列时得到通知。

(2)如果notification参数为空,而且当前进程被注册为接收指定队列的通知,那么已存在的注册将被撤销。

(3)任意时刻只有一个进程可以被注册为接收某个给定队列的通知。

(4)当有一个消息到达先前为空的消息队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下,通知才会发出。即说明,在mq_receive调用中的阻塞比任何通知的注册都优先。

(5)当前通知被发送给它的注册进程时,其注册即被撤销。该进程必须再次调用mq_notify以重新注册。

sigevent结构如下:

union sigval{ 
int    sival_int;          /*integer value*/ 
void    *sival_ptr;        /*pointer value*/ 
}; 
 
struct sigevent{ 
int    sigev_notify;      /*SIGEV_{NONE, SIGNAL, THREAD}*/ 
int    sigev_signo;        /*signal number if SIGEV_SIGNAL*/ 
 
union sigval    sigev_value; 
 
void    (*sigev_notify_function)(union sigval); 
pthread_attr_t  *sigev_notify_attributes; 
}; 

 5.1 mq_notify() 使用信号处理程序

一个正确的使用非阻塞mq_receive的信号通知的例子:

#include <unistd.h>
#include <stdio.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <signal.h>
#include <stdlib.h>
#include <errno.h>

void sig_usr1(int );
volatile sig_atomic_t mqflag;

int main(int argc, char* argv[])
{
    mqd_t mqid = 0;
    void *buff;
    struct mq_attr attr;
    struct sigevent sigev;
    sigset_t zeromask,newmask,oldmask;

    int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
    mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL);  // 非阻塞式打开mqueue
    mq_getattr(mqid,&attr);
    buff = malloc(attr.mq_msgsize);

    sigemptyset(&zeromask);
    sigemptyset(&newmask);
    sigemptyset(&oldmask);
    sigaddset(&newmask,SIGUSR1);    // 初始化信号集
    signal(SIGUSR1,sig_usr1);      // 信号处理程序
    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    int n = mq_notify(mqid,&sigev);  // 启用通知

    for (;;)
    {
        sigprocmask(SIG_BLOCK,&newmask,&oldmask);
        while(mqflag == 0)
            sigsuspend(&zeromask);

        mqflag = 0;
        ssize_t n;
        mq_notify(mqid, &sigev);    // 重新注册
        while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=0)
            printf("SIGUSR1 received, read %ld bytes.n",(long)n);  //读取消息
        if(errno != EAGAIN)
            printf("mq_receive errorn");
        sigprocmask(SIG_UNBLOCK,&newmask,NULL);
    } 
    return 0;
}

void sig_usr1(int signo)
{
    mqflag = 1;
}

��里为什么使用的是非阻塞式mq_receive,为什么不在信号处理程序中打印接收到的字符请参阅《unp 第二卷》

5.2 mq_notify() 使用线程处理程序

异步事件通知的另一种方式是把sigev_notify设置成SIGEV_THREAD,这会创建一个新线程,该线程调用由sigev_notify_function指定的函数,所用的参数由sigev_value指定,新线程的属性由sigev_notify_attributes指定,要指定线程的默认属性的话,传空指针。新线程是作为脱离线程创建的。

#include <unistd.h>
#include <stdio.h>
#include <mqueue.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <signal.h>
#include <stdlib.h>
#include <errno.h>
#include <pthread.h>

mqd_t mqid = 0;
struct mq_attr attr;
struct sigevent sigev;

static void notify_thread(union sigval);

int main(int argc, char* argv[])
{

    int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
    mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL);
    mq_getattr(mqid,&attr);

    sigev.sigev_notify = SIGEV_THREAD;
    sigev.sigev_notify_function = notify_thread;
    sigev.sigev_value.sival_ptr = NULL;
    sigev.sigev_notify_attributes = NULL;
    int n = mq_notify(mqid,&sigev);

    for (;;)
        pause();
    return 0;
}

static void notify_thread(union sigval arg)
{
    ssize_t n;
    char* buff;
    printf("notify_thread_started!n");
    buff = malloc(attr.mq_msgsize);
    mq_notify(mqid, &sigev);
    while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=0)
        printf("SIGUSR1 received, read %ld bytes.n",(long)n);
    if(errno != EAGAIN)
        printf("mq_receive errorn");
    free(buff);
    pthread_exit(0);
}

一、进程间通信-管道

二、进程间通信-命名管道 https://www.linuxidc.com/Linux/2018-04/151681.htm

三、进程间通信-消息队列 https://www.linuxidc.com/Linux/2018-04/151682.htm

四、进程间通信-共享内存 https://www.linuxidc.com/Linux/2018-04/151683.htm

本文永久更新链接地址

图片 3

本文由星彩网app下载发布于星彩网app下载,转载请注明出处:消息队列,Linux进度间通讯

TAG标签: 星彩网app下载
Ctrl+D 将本页面保存为书签,全面了解最新资讯,方便快捷。