⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mqueue.c

📁 一个开源的网络开发库ACE
💻 C
字号:
/* mqueue.c,v 1.6 2001/01/18 21:42:56 joeh Exp -*- C -*-
 * ============================================================================
 *
 * = LIBRARY
 *    pace
 *
 * = FILENAME
 *    pace/emulation/mqueue.c
 *
 * = AUTHOR
 *    John Heitmann
 *
 * ============================================================================ */

#include "pace/sys/mman.h"
#include "pace/stdio.h"
#include "pace/fcntl.h"
#include "pace/string.h"
#include "pace/stdlib.h"
#include "pace/sys/types.h"
#include "pace/pthread.h"
#include "pace/sys/stat.h"
#include "pace/emulation/mqueue.h"

typedef struct
{
  pace_mq_attr attr;
  pace_size_t num_open; /* How many processes have a valid mqd_t to here */
  pace_size_t rec_wait; /* How many processes are blocked on mq_receive */
  pace_pid_t not_pid; /* Who is actually registered for notification */
  pace_sigevent notification;
  pace_pthread_mutex_t mutex;
  pace_pthread_cond_t cond;
  pace_size_t head;
  pace_size_t freelist;
} mqfile;

typedef struct
{
  pace_size_t next; /* Index of next element */
  unsigned int priority;
  pace_size_t length;
} message_header;

static struct mq_attr pace_attrdefault = { 0, 32, 256, 0 };
#define PACE_MQ_LOCKPOSTFIX "mqlock9587"
#define PACE_MQ_DATAPOSTFIX "mqdata2355"

/* This remains mq_open due to the macro in pace/mqueue.h */
#if (PACE_HAS_POSIX_NONUOF_FUNCS)
pace_mqd_t mq_open (const char* name,
                    int oflag,
                    pace_mode_t mode,
                    pace_mq_attr* attr)
{
  int m_padding = sizeof (message_header); /* How much extra space per message do we need */
  int f_padding = sizeof (mqfile); /* How much fixed padding is needed */
  int mflags, mprot;
  int fd;
  int i;
  pace_size_t mapsize;
  char* mmaploc;
  char* new_name;
  char* lock_name;
  int create_mmap = 0; /* 1 if the file has never be inited */
  message_header* temp = 0; /*Used in initialization of mqueue*/
  long index; /* index into the file */
  pace_mqd_t result = (pace_mqd_t) pace_malloc (sizeof (struct mqd));
  pace_stat_s statbuf;

retry:
  if (attr == 0)
  {
    attr = &pace_attrdefault;
  }
  else
  {
    if (attr->mq_maxmsg < 0 || attr->mq_msgsize < 0)
    {
      errno = EBADF;
      return (pace_mqd_t)-1;
    }
  }

  /* Create a name that will go to /tmp with a unique name */
  new_name = (char*) malloc (256);
  lock_name = (char*) malloc (256);
  snprintf (new_name, 256, "/tmp%s%s", name, PACE_MQ_DATAPOSTFIX);
  snprintf (lock_name, 256, "/tmp%s%s", name, PACE_MQ_LOCKPOSTFIX);

  /* Fix alignment */
  if (attr->mq_msgsize % sizeof (long) != 0)
  {
    attr->mq_msgsize += 8 - (attr->mq_msgsize % sizeof (long));
  }

  if (oflag & PACE_O_CREAT)
  {
    /* We need to protect access without the help of O_RDONLY in the fs */
    fd = pace_open (new_name, PACE_O_RDWR | PACE_O_CREAT | PACE_O_EXCL, mode);

    if (fd == -1 && errno != EEXIST)
    {
      /* An error other than EEXIST has occurred. */
      return (pace_mqd_t)-1;
    }
    else if (fd != -1)
    {
      /* If a new file was created successfully */
      create_mmap = 1;
    }
    else if (oflag & PACE_O_EXCL)
    {
      /* If the file exists and we don't want it */
      errno = EEXIST;
      return (pace_mqd_t)-1;
    }
    else
    {
      /* We want the existing file */
      fd = pace_open (new_name, PACE_O_RDWR);
      if (fd == -1 && errno == ENOENT)
      {
        /* Something odd is going on */
        goto retry;
      }
      else if (fd == -1)
      {
        return (pace_mqd_t)-1;
      }
    }
  }
  else
  {
    fd = pace_open (new_name, PACE_O_RDWR);
    if (fd == -1)
    {
      return (pace_mqd_t)-1;
    }
  }

  /*
  The following loop makes shure that we haven't entered a race condition. If a file
  has been created but not initialized, its IXUSR will not be set (see above).
  */
  while (create_mmap == 0)
  {
    if (stat (lock_name, &statbuf) == -1)
    {
      close (fd);
      if (errno == ENOENT && (oflag & O_CREAT))
      {
        goto retry;
      }
      return (pace_mqd_t)-1;
    }
    else
    {
      break;
    }
  }

  mapsize = f_padding + (attr->mq_msgsize + m_padding) * (attr->mq_maxmsg);
  mprot = PACE_PROT_READ | PACE_PROT_WRITE;
  mflags = PACE_MAP_SHARED;

  if (create_mmap)
  {
    /* Create and 0 out the file */
    if (pace_lseek (fd, mapsize, PACE_SEEK_SET) == -1)
    {
      pace_unlink (new_name);
      return (pace_mqd_t)-1;
    }
    if (pace_write (fd, "", 1) != 1)
    {
      pace_unlink (new_name);
      return (pace_mqd_t)-1;
    }

    mmaploc = (char*) pace_mmap (0, mapsize, mprot, mflags, fd, 0);
    pace_close (fd);
    if (mmaploc == MAP_FAILED)
    {
      pace_unlink (new_name);
      return (pace_mqd_t)-1;
    }

    pace_memset (mmaploc, 0, mapsize);

    if ((errno = pace_pthread_mutex_init (&(((mqfile*)mmaploc)->mutex), 0)) != 0)
    {
      pace_unlink (new_name);
      pace_munmap (mmaploc, mapsize);
      return (pace_mqd_t)-1;
    }
    if ((errno = pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex))) != 0)
    {
      pace_unlink (new_name);
      pace_munmap (mmaploc, mapsize);
      return (pace_mqd_t)-1;
    }

    if ((errno = pace_pthread_cond_init (&(((mqfile*)mmaploc)->cond), 0)) != 0)
    {
      pace_unlink (new_name);
      pace_munmap (mmaploc, mapsize);
      return (pace_mqd_t)-1;
    }

    index = sizeof (mqfile);
    ((mqfile*)mmaploc)->freelist = index;
    ((mqfile*)mmaploc)->head = 0;

    for (i = 0; i < attr->mq_maxmsg; ++i)
    {
      temp = (message_header *) &mmaploc[index];
      index += sizeof (message_header) + attr->mq_msgsize;
      temp->next = index;
    }
    temp->next = 0;
    attr->mq_curmsgs = 0;
    ((mqfile*)mmaploc)->attr = *attr;

    /* Create the lock file so that the file is known to be inited */
    if (pace_open (lock_name, O_CREAT | O_EXCL) == -1)
    {
      pace_unlink (new_name);
      pace_munmap (mmaploc, mapsize);
      return (pace_mqd_t)-1;
    }

  }
  else
  {
    /* Just open the existing map */
    mmaploc = (char*) pace_mmap (0, mapsize, mprot, mflags, fd, 0);
    if (mmaploc == MAP_FAILED)
    {
      return (pace_mqd_t)-1;
    }
    pace_close (fd);

    /* ???? Test here for race */

    if (pace_pthread_mutex_lock (&(((mqfile*)mmaploc)->mutex)) == -1)
    {
      pace_munmap (mmaploc, mapsize);
      return (pace_mqd_t)-1;
    }
    ((mqfile*)mmaploc)->attr.mq_flags = attr->mq_flags;
  }

  ((mqfile*)mmaploc)->num_open++;


  if (pace_pthread_mutex_unlock (&(((mqfile*)mmaploc)->mutex)) == -1)
  {
    pace_munmap (mmaploc, mapsize);
    return (pace_mqd_t)-1;
  }

  result->mptr = mmaploc;
  result->length = mapsize;
  result->oflag = oflag;

  return result;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */

#if (PACE_HAS_POSIX_NONUOF_FUNCS)
int mq_close (pace_mqd_t mqdes)
{
  if (pace_pthread_mutex_lock (&( ((mqfile*)mqdes->mptr)->mutex)) == -1)
  {
    errno = EBADF;
    return -1;
  }
  ((mqfile*)mqdes->mptr)->num_open--;
  if (pace_pthread_mutex_unlock (&(((mqfile*)mqdes->mptr)->mutex)) == -1)
  {
    errno = EBADF;
    return -1;
  }
  if (munmap (mqdes->mptr, mqdes->length) == -1)
  {
    return -1;
  }
  free (mqdes);
  return 0;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */

#if (PACE_HAS_POSIX_NONUOF_FUNCS)
int mq_unlink (const char* name)
{
  int result1, result2;
  char* new_name;
  new_name = (char*) malloc (256);
  snprintf (new_name, 256, "/tmp%s%s", name, PACE_MQ_DATAPOSTFIX);
  result1 = pace_unlink (new_name);
  snprintf (new_name, 256, "/tmp%s%s", name, PACE_MQ_LOCKPOSTFIX);
  result2 = pace_unlink (new_name);
  free (new_name);
  return (result1 == -1 || result2 == -1 ? -1 : 0);
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */

#if (PACE_HAS_POSIX_NONUOF_FUNCS)
int mq_send (pace_mqd_t mqdes,
             const char* ptr,
             pace_size_t length,
             unsigned int priority)
{
  mqfile* queue = ((mqfile*)mqdes->mptr);
  long index, old_index;
  if (mqdes->oflag & O_RDONLY)
  {
    /* Incorrect access priviledges */
    errno = EBADF;
    return -1;
  }
  if (queue->attr.mq_msgsize < (int) length)
  {
    /* Message too long */
    errno = EMSGSIZE;
    return -1;
  }
  if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
  {
    return -1;
  }
  /* If the queue is full... */
  if (queue->attr.mq_curmsgs >= queue->attr.mq_maxmsg)
  {
    if (queue->attr.mq_flags & O_NONBLOCK)
    {
      errno = EAGAIN;
      return -1;
    }
    while (queue->attr.mq_maxmsg <= queue->attr.mq_curmsgs)
    {
      pace_pthread_cond_wait (&queue->cond, &queue->mutex);
      pace_printf ("Send Woke Up\n");
    }
  }

  /* Fill in the fields of the header */
  ((message_header*)(&mqdes->mptr[queue->freelist]))->priority = priority;
  ((message_header*)(&mqdes->mptr[queue->freelist]))->length = length;
  pace_memcpy (((void*)(&mqdes->mptr[queue->freelist + sizeof (message_header)])),
               ptr, length);

  /* Update the linked list */
  old_index = 0;
  index = queue->head;
  while (index != 0 && ((message_header*)(&mqdes->mptr[index]))->priority >= priority)
  {
    old_index = index;
    index = ((message_header*)(&mqdes->mptr[index]))->next;
  }

  /* If the msg goes at the head */
  if (old_index == 0)
  {
    queue->head = queue->freelist;
    queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next;
    ((message_header*)(&mqdes->mptr[queue->head]))->next = index;
  }
  else
  {
    ((message_header*)(&mqdes->mptr[old_index]))->next = queue->freelist;
    old_index = queue->freelist;
    queue->freelist = ((message_header*)(&mqdes->mptr[queue->freelist]))->next;
    ((message_header*)(&mqdes->mptr[old_index]))->next = index;
  }

  queue->attr.mq_curmsgs++;

  if ((errno = pace_pthread_mutex_unlock (&queue->mutex)) != 0)
  {
    return -1;
  }

  if (queue->attr.mq_curmsgs == 1)
  {
    /* If there is no one waiting and blocked */
    if (queue->not_pid != 0 && queue->rec_wait == 0)
    {
      if (queue->notification.sigev_notify == SIGEV_SIGNAL)
      {
        sigqueue (queue->not_pid,
                  queue->notification.sigev_signo,
                  queue->notification.sigev_value);
      }
      queue->not_pid = 0;
    }
    else
    {
      pace_printf ("Send is Signalling\n");
      /* Let other waiting threads know there is food on the table */
      if ((errno = pace_pthread_cond_signal (&((mqfile*)mqdes->mptr)->cond)) != 0)
      {
        return -1;
      }
    }
  }
  return 0;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */

#if (PACE_HAS_POSIX_NONUOF_FUNCS)
pace_ssize_t mq_receive (pace_mqd_t mqdes,
                         char * msg_ptr,
                         pace_size_t msg_len,
                         unsigned int * nmsg_prio)
{
  mqfile* queue = ((mqfile*)mqdes->mptr);
  pace_size_t temp;

  if (queue->attr.mq_msgsize > (long) msg_len)
  {
    errno = EMSGSIZE;
    return -1;
  }

  if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
  {
    return -1;
  }

  /* If the queue is empty... */
  if (queue->attr.mq_curmsgs <= 0)
  {
    if (queue->attr.mq_flags & O_NONBLOCK)
    {
      errno = EAGAIN;
      return -1;
    }
    while (queue->attr.mq_curmsgs <= 0)
    {
      pace_printf ("Recv is going to sleep\n");
      queue->rec_wait++;
      pace_pthread_cond_wait (&(queue->cond), &(queue->mutex));
      queue->rec_wait--;
      pace_printf ("Recv is waking from sleep\n");
    }
  }

  if (nmsg_prio != 0)
  {
    *nmsg_prio = ((message_header*)(&mqdes->mptr[queue->head]))->priority;
  }

  pace_memcpy (msg_ptr, ((void*)(&mqdes->mptr[queue->head + sizeof (message_header)])),
               ((message_header*)(&mqdes->mptr[queue->head]))->length);
  temp = queue->head;
  queue->head = ((message_header*)(&mqdes->mptr[queue->head]))->next;
  ((message_header*)(&mqdes->mptr[temp]))->next = queue->freelist;
  queue->freelist = temp;

  queue->attr.mq_curmsgs--;

  if (pace_pthread_mutex_unlock (&queue->mutex) == -1)
  {
    errno = EBADMSG;
    return -1;
  }

  if (queue->attr.mq_curmsgs == (queue->attr.mq_maxmsg-1))
  {
    pace_printf ("Recv is signalling\n");
    /* Let other waiting threads know there is room available */
    if ((errno = pace_pthread_cond_signal (&((mqfile*)mqdes->mptr)->cond)) != 0)
    {
      return -1;
    }
  }

  return ((message_header*)(&mqdes->mptr[queue->head]))->length;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */

#if (PACE_HAS_POSIX_NONUOF_FUNCS)
int mq_getattr (pace_mqd_t mqdes, pace_mq_attr * mqstat)
{
  mqfile* queue = ((mqfile*)mqdes->mptr);

  if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
  {
    return -1;
  }

  *mqstat = queue->attr;

  pace_pthread_mutex_unlock (&queue->mutex);

  return 0;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */

#if (PACE_HAS_POSIX_NONUOF_FUNCS)
int mq_setattr(pace_mqd_t mqdes,
               const pace_mq_attr * mqstat,
               pace_mq_attr * omqstat)
{
  mqfile* queue = ((mqfile*)(mqdes->mptr));

  if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
  {
    return -1;
  }
  if (omqstat != 0)
  {
    *omqstat = ((mqfile*)mqdes->mptr)->attr;
  }
  if (mqstat == 0 || mqdes == 0)
  {
    /* You eediot*/
    errno = EFAULT;
    pace_pthread_mutex_unlock (&queue->mutex);
    return -1;
  }

  ((mqfile*)mqdes->mptr)->attr.mq_flags = mqstat->mq_flags;

  pace_pthread_mutex_unlock (&queue->mutex);
  return 0;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */

#if (PACE_HAS_POSIX_NONUOF_FUNCS)
int mq_notify (pace_mqd_t mqd, const pace_sigevent* notification)
{
  mqfile* queue = ((mqfile*)(mqd->mptr));
  pace_pid_t pid = pace_getpid ();

  if ((errno = pace_pthread_mutex_lock (&queue->mutex)) != 0)
  {
    return -1;
  }

  if (notification == 0)
  {
    /* Unregister if notification is null */
    if (queue->not_pid == pid)
    {
      queue->not_pid = 0;
    }
  }
  else
  {
    if (queue->not_pid && pace_kill (queue->not_pid, 0))
    {
      /* If another process is registered */
      if (errno != ESRCH)
      {
        pace_pthread_mutex_unlock (&queue->mutex);
        return -1;
      }
    }
    queue->not_pid = pid;
    queue->notification = *notification;
  }

  pthread_mutex_unlock (&queue->mutex);

  return 0;
}
#endif /* PACE_HAS_POSIX_NONUOF_FUNCS */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -