📄 mqueue.c
字号:
/* mqueue.c,v 1.6 2001/01/18 21:42:56 joeh Exp -*- C -*-
* ============================================================================
*
* = LIBRARY
* pace
*
* = FILENAME
* pace/emulation/mqueue.c
*
* = AUTHOR
* John Heitmann
*
* ============================================================================ */
#include "pace/sys/mman.h"
#include "pace/stdio.h"
#include "pace/fcntl.h"
#include "pace/string.h"
#include "pace/stdlib.h"
#include "pace/sys/types.h"
#include "pace/pthread.h"
#include "pace/sys/stat.h"
#include "pace/emulation/mqueue.h"
typedef struct
{
pace_mq_attr attr;
pace_size_t num_open; /* How many processes have a valid mqd_t to here */
pace_size_t rec_wait; /* How many processes are blocked on mq_receive */
pace_pid_t not_pid; /* Who is actually registered for notification */
pace_sigevent notification;
pace_pthread_mutex_t mutex;
pace_pthread_cond_t cond;
pace_size_t head;
pace_size_t freelist;
} mqfile;
typedef struct
{
pace_size_t next; /* Index of next element */
unsigned int priority;
pace_size_t length;
} message_header;
static struct mq_attr pace_attrdefault = { 0, 32, 256, 0 };
#define PACE_MQ_LOCKPOSTFIX "mqlock9587"
#define PACE_MQ_DATAPOSTFIX "mqdata2355"
/* This remains mq_open due to the macro in pace/mqueue.h */
#if (PACE_HAS_POSIX_NONUOF_FUNCS)
pace_mqd_t mq_open (const char* name,
int oflag,
pace_mode_t mode,
pace_mq_attr* attr)
{
int m_padding = sizeof (message_header); /* How much extra space per message do we need */
int f_padding = sizeof (mqfile); /* How much fixed padding is needed */
int mflags, mprot;
int fd;
int i;
pace_size_t mapsize;
char* mmaploc;
char* new_name;
char* lock_name;
int create_mmap = 0; /* 1 if the file has never be inited */
message_header* temp = 0; /*Used in initialization of mqueue*/
long index; /* index into the file */
pace_mqd_t result = (pace_mqd_t) pace_malloc (sizeof (struct mqd));
pace_stat_s statbuf;
retry:
if (attr == 0)
{
attr = &pace_attrdefault;
}
else
{
if (attr->mq_maxmsg < 0 || attr->mq_msgsize < 0)
{
errno = EBADF;
return (pace_mqd_t)-1;
}
}
/* Create a name that will go to /tmp with a unique name */
new_name = (char*) malloc (256);
lock_name = (char*) malloc (256);
snprintf (new_name, 256, "/tmp%s%s", name, PACE_MQ_DATAPOSTFIX);
snprintf (lock_name, 256, "/tmp%s%s", name, PACE_MQ_LOCKPOSTFIX);
/* Fix alignment */
if (attr->mq_msgsize % sizeof (long) != 0)
{
attr->mq_msgsize += 8 - (attr->mq_msgsize % sizeof (long));
}
if (oflag & PACE_O_CREAT)
{
/* We need to protect access without the help of O_RDONLY in the fs */
fd = pace_open (new_name, PACE_O_RDWR | PACE_O_CREAT | PACE_O_EXCL, mode);
if (fd == -1 && errno != EEXIST)
{
/* An error other than EEXIST has occurred. */
return (pace_mqd_t)-1;
}
else if (fd != -1)
{
/* If a new file was created successfully */
create_mmap = 1;
}
else if (oflag & PACE_O_EXCL)
{
/* If the file exists and we don't want it */
errno = EEXIST;
return (pace_mqd_t)-1;
}
else
{
/* We want the existing file */
fd = pace_open (new_name, PACE_O_RDWR);
if (fd == -1 && errno == ENOENT)
{
/* Something odd is going on */
goto retry;
}
else if (fd == -1)
{
return (pace_mqd_t)-1;
}
}
}
else
{
fd = pace_open (new_name, PACE_O_RDWR);
if (fd == -1)
{
return (pace_mqd_t)-1;
}
}
/*
The following loop makes shure that we haven't entered a race condition. If a file
has been created but not initialized, its IXUSR will not be set (see above).
*/
while (create_mmap == 0)
{
if (stat (lock_name, &statbuf) == -1)
{
close (fd);
if (errno == ENOENT && (oflag & O_CREAT))
{
goto retry;
}
return (pace_mqd_t)-1;
}
else
{
break;
}
}
mapsize = f_padding + (attr->mq_msgsize + m_padding) * (attr->mq_maxmsg);
mprot = PACE_PROT_READ | PACE_PROT_WRITE;
mflags = PACE_MAP_SHARED;
if (create_mmap)
{
/* Create and 0 out the file */
if (pace_lseek (fd, mapsize, PACE_SEEK_SET) == -1)
{
pace_unlink (new_name);
return (pace_mqd_t)-1;
}
if (pace_write (fd, "", 1) != 1)
{
pace_unlink (new_name);
return (pace_mqd_t)-1;
}
mmaploc = (char*) pace_mmap (0, mapsize, mprot, mflags, fd, 0);
pace_close (fd);
if (mmaploc == MAP_FAILED)
{
pace_unlink (new_name);
return (pace_mqd_t)-1;
}
pace_memset (mmaploc, 0, mapsize);
if ((errno = pace_pthread_mutex_init (&(((mqfile*)mmaploc)->mutex), 0)) != 0)
{
pace_unlink (new_name);
pace_munmap (mmaploc, mapsize);
return (pace_mqd_t)-1;
}
if ((errno = pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex))) != 0)
{
pace_unlink (new_name);
pace_munmap (mmaploc, mapsize);
return (pace_mqd_t)-1;
}
if ((errno = pace_pthread_cond_init (&(((mqfile*)mmaploc)->cond), 0)) != 0)
{
pace_unlink (new_name);
pace_munmap (mmaploc, mapsize);
return (pace_mqd_t)-1;
}
index = sizeof (mqfile);
((mqfile*)mmaploc)->freelist = index;
((mqfile*)mmaploc)->head = 0;
for (i = 0; i < attr->mq_maxmsg; ++i)
{
temp = (message_header *) &mmaploc[index];
index += sizeof (message_header) + attr->mq_msgsize;
temp->next = index;
}
temp->next = 0;
attr->mq_curmsgs = 0;
((mqfile*)mmaploc)->attr = *attr;
/* Create the lock file so that the file is known to be inited */
if (pace_open (lock_name, O_CREAT | O_EXCL) == -1)
{
pace_unlink (new_name);
pace_munmap (mmaploc, mapsize);
return (pace_mqd_t)-1;
}
}
else
{
/* Just open the existing map */
mmaploc = (char*) pace_mmap (0, mapsize, mprot, mflags, fd, 0);
if (mmaploc == MAP_FAILED)
{
return (pace_mqd_t)-1;
}
pace_close (fd);
/* ???? Test here for race */
if (pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex)) == -1)
{
pace_munmap (mmaploc, mapsize);
return (pace_mqd_t)-1;
}
((mqfile*)mmaploc)->attr.mq_flags = attr->mq_flags;
}
((mqfile*)mmaploc)->num_open++;
if (pace_pthread_mutex_unlock (&(((mqfile*)mmaploc)->mutex)) == -1)
{
pace_munmap (mmaploc, mapsize);
return (pace_mqd_t)-1;
}
result->mptr = mmaploc;
result->length = mapsize;
result->oflag = oflag;
return result;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
#if (PACE_HAS_POSIX_NONUOF_FUNCS)
int mq_close (pace_mqd_t mqdes)
{
if (pace_pthread_mutex_lock (&( ((mqfile*)mqdes->mptr)->mutex)) == -1)
{
errno = EBADF;
return -1;
}
((mqfile*)mqdes->mptr)->num_open--;
if (pace_pthread_mutex_unlock (&(((mqfile*)mqdes->mptr)->mutex)) == -1)
{
errno = EBADF;
return -1;
}
if (munmap (mqdes->mptr, mqdes->length) == -1)
{
return -1;
}
free (mqdes);
return 0;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
#if (PACE_HAS_POSIX_NONUOF_FUNCS)
int mq_unlink (const char* name)
{
int result1, result2;
char* new_name;
new_name = (char*) malloc (256);
snprintf (new_name, 256, "/tmp%s%s", name, PACE_MQ_DATAPOSTFIX);
result1 = pace_unlink (new_name);
snprintf (new_name, 256, "/tmp%s%s", name, PACE_MQ_LOCKPOSTFIX);
result2 = pace_unlink (new_name);
free (new_name);
return (result1 == -1 || result2 == -1 ? -1 : 0);
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
#if (PACE_HAS_POSIX_NONUOF_FUNCS)
int mq_send (pace_mqd_t mqdes,
const char* ptr,
pace_size_t length,
unsigned int priority)
{
mqfile* queue = ((mqfile*)mqdes->mptr);
long index, old_index;
if (mqdes->oflag & O_RDONLY)
{
/* Incorrect access priviledges */
errno = EBADF;
return -1;
}
if (queue->attr.mq_msgsize < (int) length)
{
/* Message too long */
errno = EMSGSIZE;
return -1;
}
if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
{
return -1;
}
/* If the queue is full... */
if (queue->attr.mq_curmsgs >= queue->attr.mq_maxmsg)
{
if (queue->attr.mq_flags & O_NONBLOCK)
{
errno = EAGAIN;
return -1;
}
while (queue->attr.mq_maxmsg <= queue->attr.mq_curmsgs)
{
pace_pthread_cond_wait (&queue->cond, &queue->mutex);
pace_printf ("Send Woke Up\n");
}
}
/* Fill in the fields of the header */
((message_header*)(&mqdes->mptr[queue->freelist]))->priority = priority;
((message_header*)(&mqdes->mptr[queue->freelist]))->length = length;
pace_memcpy (((void*)(&mqdes->mptr[queue->freelist + sizeof (message_header)])),
ptr, length);
/* Update the linked list */
old_index = 0;
index = queue->head;
while (index != 0 && ((message_header*)(&mqdes->mptr[index]))->priority >= priority)
{
old_index = index;
index = ((message_header*)(&mqdes->mptr[index]))->next;
}
/* If the msg goes at the head */
if (old_index == 0)
{
queue->head = queue->freelist;
queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next;
((message_header*)(&mqdes->mptr[queue->head]))->next = index;
}
else
{
((message_header*)(&mqdes->mptr[old_index]))->next = queue->freelist;
old_index = queue->freelist;
queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next;
((message_header*)(&mqdes->mptr[old_index]))->next = index;
}
queue->attr.mq_curmsgs++;
if ((errno = pace_pthread_mutex_unlock (&queue->mutex)) != 0)
{
return -1;
}
if (queue->attr.mq_curmsgs == 1)
{
/* If there is no one waiting and blocked */
if (queue->not_pid != 0 && queue->rec_wait == 0)
{
if (queue->notification.sigev_notify == SIGEV_SIGNAL)
{
sigqueue (queue->not_pid,
queue->notification.sigev_signo,
queue->notification.sigev_value);
}
queue->not_pid = 0;
}
else
{
pace_printf ("Send is Signalling\n");
/* Let other waiting threads know there is food on the table */
if ((errno = pace_pthread_cond_signal (&((mqfile*)mqdes->mptr)->cond)) != 0)
{
return -1;
}
}
}
return 0;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
#if (PACE_HAS_POSIX_NONUOF_FUNCS)
pace_ssize_t mq_receive (pace_mqd_t mqdes,
char * msg_ptr,
pace_size_t msg_len,
unsigned int * nmsg_prio)
{
mqfile* queue = ((mqfile*)mqdes->mptr);
pace_size_t temp;
if (queue->attr.mq_msgsize > (long) msg_len)
{
errno = EMSGSIZE;
return -1;
}
if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
{
return -1;
}
/* If the queue is empty... */
if (queue->attr.mq_curmsgs <= 0)
{
if (queue->attr.mq_flags & O_NONBLOCK)
{
errno = EAGAIN;
return -1;
}
while (queue->attr.mq_curmsgs <= 0)
{
pace_printf ("Recv is going to sleep\n");
queue->rec_wait++;
pace_pthread_cond_wait (&(queue->cond), &(queue->mutex));
queue->rec_wait--;
pace_printf ("Recv is waking from sleep\n");
}
}
if (nmsg_prio != 0)
{
*nmsg_prio = ((message_header*)(&mqdes->mptr[queue->head]))->priority;
}
pace_memcpy (msg_ptr, ((void*)(&mqdes->mptr[queue->head + sizeof (message_header)])),
((message_header*)(&mqdes->mptr[queue->head]))->length);
temp = queue->head;
queue->head = ((message_header*)(&mqdes->mptr[queue->head]))->next;
((message_header*)(&mqdes->mptr[temp]))->next = queue->freelist;
queue->freelist = temp;
queue->attr.mq_curmsgs--;
if (pace_pthread_mutex_unlock (&queue->mutex) == -1)
{
errno = EBADMSG;
return -1;
}
if (queue->attr.mq_curmsgs == (queue->attr.mq_maxmsg-1))
{
pace_printf ("Recv is signalling\n");
/* Let other waiting threads know there is room available */
if ((errno = pace_pthread_cond_signal (&((mqfile*)mqdes->mptr)->cond)) != 0)
{
return -1;
}
}
return ((message_header*)(&mqdes->mptr[queue->head]))->length;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
#if (PACE_HAS_POSIX_NONUOF_FUNCS)
int mq_getattr (pace_mqd_t mqdes, pace_mq_attr * mqstat)
{
mqfile* queue = ((mqfile*)mqdes->mptr);
if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
{
return -1;
}
*mqstat = queue->attr;
pace_pthread_mutex_unlock (&queue->mutex);
return 0;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
#if (PACE_HAS_POSIX_NONUOF_FUNCS)
int mq_setattr(pace_mqd_t mqdes,
const pace_mq_attr * mqstat,
pace_mq_attr * omqstat)
{
mqfile* queue = ((mqfile*)(mqdes->mptr));
if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
{
return -1;
}
if (omqstat != 0)
{
*omqstat = ((mqfile*)mqdes->mptr)->attr;
}
if (mqstat == 0 || mqdes == 0)
{
/* You eediot*/
errno = EFAULT;
pace_pthread_mutex_unlock (&queue->mutex);
return -1;
}
((mqfile*)mqdes->mptr)->attr.mq_flags = mqstat->mq_flags;
pace_pthread_mutex_unlock (&queue->mutex);
return 0;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
#if (PACE_HAS_POSIX_NONUOF_FUNCS)
int mq_notify (pace_mqd_t mqd, const pace_sigevent* notification)
{
mqfile* queue = ((mqfile*)(mqd->mptr));
pace_pid_t pid = pace_getpid ();
if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
{
return -1;
}
if (notification == 0)
{
/* Unregister if notification is null */
if (queue->not_pid == pid)
{
queue->not_pid = 0;
}
}
else
{
if (queue->not_pid && pace_kill (queue->not_pid, 0))
{
/* If another process is registered */
if (errno != ESRCH)
{
pace_pthread_mutex_unlock (&queue->mutex);
return -1;
}
}
queue->not_pid = pid;
queue->notification = *notification;
}
pthread_mutex_unlock (&queue->mutex);
return 0;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -