Message Queue

Introduction

  • Message queues allow for the exchange of messages between processes.
  • A common system message queue is maintained between the process for communications.
  • The sending process places a message onto a queue which can be read by the other process.
  • The system call associated with send and receive is blocking calls.
  • If the sender tries to put a message onto a full queue, is put to wait on the waiting queue, and similarly if the receiver tries to read the message from an empty queue is again blocked.
  • They can be made non-blocking by using the IPC_NOWAIT option in the option flag.

Diag-1: Message Queue

Message queue is a sequence of message, each of which has two parts:

  • The payload, which is an array of bytes.
  • A type, which is an integer value; helps in communication with different types of messages.
  struct msgbuf 
  {
     long msg_type;
     char msg_text[SIZE];
  };

Message Queue API:
These are the following system calls related to a Message Queue IPC.

/* Create and access a message queue */
int msgget (key_t key, int msg_flag);

/* Send a message */
int msgsnd (int msg_id, void *msg_ptr, size_t msg_size, int msg_flag);

/* Receive a message */
int msgrecv (int msg_id, void *msg_ptr, size_t msg_size, int msg_type, int msg_flag);

/* Control the message queue */
int msgctl (int msg_id, int cmd, struct msqid_ds *buf);

Create and access a message queue:

  • The message queue is created and is accessed by msgget() system call.
  • Syntax:
    • int msg_id = msgget(key_t key, int msg_flag);
  • The first parameter is the key, a kind of name that can be used by the other process to access the message queue.
  • The second parameter is the flag which defines the access permission flag and the creation control flags.
  • Few common flags are:
    • IPC_CREAT : If a message queue for the key does not exist, it is created else return the existing.
    • IPC_CREAT | IPC_EXCL : This is for creating a new message queue. if it exists with the given key, it fails.
  • Permission flag to indicate permission granted to the owner, group and all other processes.
  • It returns -1 on failure and message id in the case of success.
/* create a private message queue, with access only to the owner. */
key_t key =121;
int queue_id = msgget(key, IPC_PRIVATE| 0600); 
if (queue_id == -1) {
    printf ("\n Error in creating message queue: %d %s", errno, strerror (errno));
    exit(EXIT_FAILURE);
}

Send a message:

  • The message is appended onto to message queue by msgsnd() API.
  • Syntax:
    • int msgsnd ( int msg_id, void *msg_ptr, size_t msg_sz, int flag);
  • The first parameter(msg_id) is the id of the message queue returned by msgget() API.
  • The second parameter (msg_ptr) is the pointer to the message to be sent.
  • The third parameter (msg_sz) is the size of the message.
  • The fourth parameter(flag) defines whether this is blocking in case if the queue is full or systemwide limit on queued messages has been reached or it is non-blocking.
  • If the flag is IPC_NOWAIT, it makes the API non-blocking that it returns immediately if the queue is full and return -1.
  • If the flag is clear, it blocks the caller till the space is available in the queue.
  • This API return -1 on failure and 0 on success.
struct msgbuf {
    long mtype; 
    char mtext[1];
};

/* Populate the message queue structure */
char* msg_text = "hello world";
struct msgbuf* msg = (struct msgbuf*)malloc(sizeof(struct msgbuf) + strlen(msg_text));
msg->mtype = 1;
strcpy(msg->mtext, msg_text);

int rc = msgsnd (queue_id, msg, strlen(msg_text)+1, 0);
if (rc == -1) {
    printf ("\n Error in sending message queue: %d %s", errno, 
            strerror(errno));
    exit (EXIT_FAILURE);
}

Receiving a message:

  • The message is read from the message queue by msgrecv() API.
  • Syntax: int msgrecv( int msg_id, void *msg_ptr, size_t msg_sz, int msg_type, int flag);
  • The first parameter(msg_id) is the id of the message queue returned by msget() API.
  • The second parameter(msg_ptr) is the pointer to the buffer where a message is to stored.
  • The third parameter(msg_sz) is the size of the message.
  • The fourth parameter is the type that helps the receiver to receive a message of a particular type and is negotiated between sender and receiver.
  • It’s an integer value that can be following:
    • The message type is 0:It means read the first message, irrespective of type(read as it comes).
    • The message type is +ve : Read the first message of given type.
    • The message type is –ve : Means read the first message whose type is less than equal to an absolute value of type.
    • For example, if the type is -5 then it will read the first message whose type lies between 1 to 5
  • The fifth parameter(flag) defines whether this is blocking in case if the queue is empty that is no message of the appropriate type is there to read or a non-blocking call.
  • If the flag is IPC_NOWAIT, it makes the API non-blocking that it returns immediately if the queue is empty and return -1.
  • If the flag is clear, it blocks the caller till the message is there in the queue.
  • This API return -1 on failure and the number of bytes read on success.
struct msgbuf* recv_msg = (struct msgbuf*)malloc(sizeof(struct 
                           msgbuf)+ strlen("hello world"));
int rc = msgrcv(queue_id, recv_msg, strlen("hello world") + 1, 0, 0);
if (rc == -1) {
    printf("\n Error in receiving message queue: %d %s", errno, 
            strerror(errno));
    exit(EXIT_FAILURE);
}

Controlling the message queue:

  • The message queue can be controlled by using msgctl() API.
  • Syntax: int msgctl ( int msg_id, int cmd, struct msqid_ds *buf);
  • The first parameter is the message id.
  • The second parameter specifies the command, some of the common commands are:
    • IPC_RMID : Delete the message queue.
    • IPC_SET : We can set some fields(permissions) in the msqid_ds structure in the kernel.
    • IPC_STAT : It copies the data in the kernel data structure(msqid_ds) into the location pointed by a buffer.
  • The third parameter is the pointer to the buffer where data in the kernel structure msqid_ds is placed. It is used with command IPC_STAT and IPC_SET.
  • It returns 0 on success and -1 on failure.
key_t key =121;
struct msqid_ds msqid_ds, *buffer
int msg_id = msgget(key, 0666| IPC_CREAT)
buffer = &msqid_ds;
msgctl (msg_id, IPC_STAT, buffer);
printf ("\n Number of message n Q is: %d\n", buffer->qnum);
printf ("\n No of bytes Q has: %lu:\n", buffer->__msg_cbytes);
msgctl (msg_id, IPC_RMID, 0)

Internal structure of Message Queue:

  • Linux maintains a linked list of message queue, identified by msg_id.
  • When message queues are created a new msqid_ds data structure is inserted at the end of the list.
  • If a new message queue is created, then its associated data structure msqid_ds is initialized.

Some of the information maintained in the msqid_ds are:

  • It also has a permission field(structure) that is compared with effective users and group identifier before writing a message into the queue.
  • It maintains the time when this queue was last modified that is written into.
  • It maintains the current number message in a queue.
  • It maintains the current number of bytes in a queue.
  • Time of last the message sent and received.

Diag-2: Message Queue internal structure

Data structure:

  • If a new message queue is created, then associated data structure msqid_ds is initialized.
  • This stores all the information about the message queue.

Diag-3: Message queue data structure

signals & message queues overview

Advantages:

One of the biggest plus point with message queue is that it is asynchronous in nature that is both sender and receiver does not need to wait for each other. They work independent of each other.

Example: Our email system, Pizza ordering.

Drawback of Message Queue:

  • A typical downside of this mechanism is that several data exchanges are to be done between user space and system space – which means, lot of copying between user -space buffers and system space buffers.
  • If amount of data copied is large and frequently copied, may increase latency. In a typical RTOS, such large data copying may be unacceptable – in certain RTOS environments it may still be acceptable.
  • In typical scenarios 4 copies of data is required:

At the sender side:

  • From input file to process buffer.
  • From process buffer to message queue.

At the receiver side:

  • From message queue to its buffer.
  • From buffer to its input file.

Thus there is a lot of data copying from user space to buffer space which is an overhead causing latency. More system calls are required.

Sample sender and receiver program

Sender Program

#include <stdio.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <string.h>
  
struct mesg_buffer {
    long type;
    char mesg_text[100];
};  
  
int main()
{
    key_t key;
    int msgid;
    struct mesg_buffer *mesg_buffer = (struct mesg_buffer *)
                                  malloc(sizeof(struct mesg_buffer));
    /* For generating a random key */
    key = ftok ("progfile", 70);
    
    msgid = msgget (key, 0666 | IPC_CREAT);
    mesg_buffer->type = 1;
    strcpy (mesg_buffer->mesg_text, "This is message queue example");
    
    msgsnd (msgid, mesg_buffer, sizeof(struct mesg_buffer), 0); 
    printf ("Data send is : %s \n", mesg_buffer->mesg_text);

    return 0;
}

Output:

[aprakash@wtl-lview-6 message_queue]$ gcc msg_sender.c 
[aprakash@wtl-lview-6 message_queue]$ ./a.out 
 Data send is : This is message queue example 

Receiver Program:

  #include <stdio.h>
 #include <sys/msg.h>
 
 struct mesg_buffer {
     long mesg_type;
     char mesg_text[100];   
 };   
       
 int main()
 {   
     key_t key; 
     int msgid;
     struct mesg_buffer *recv_buf;
 
     key = ftok ("progfile", 70);
     msgid = msgget (key, 0666 | IPC_CREAT);
     msgrcv (msgid, recv_buf, sizeof(struct mesg_buffer), 1, 0);
     printf ("Data Received is : %s \n", recv_buf->mesg_text);
     msgctl (msgid, IPC_RMID, NULL);
   
    return 0;
}

Output:

[aprakash@wtl-lview-6 message_queue]$ gcc msg_receiver.c 
[aprakash@wtl-lview-6 message_queue]$ ./a.out 
Data Received is : This is message queue example 

Relevant Posts:



Categories: Operating system (OS)

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: