mq.c

来自「xenomai 很好的linux实时补丁」· C语言 代码 · 共 879 行 · 第 1/2 页

C
879
字号
/* * Written by Gilles Chanteperdrix <gilles.chanteperdrix@laposte.net>. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License as * published by the Free Software Foundation; either version 2 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *//** * @ingroup posix * @defgroup posix_mq Message queues services. * * Message queues services. * * A message queue allow exchanging data between real-time threads. Maximum * message length and maximum number of messages are fixed when the queue is * created (see mq_open()). * *@{*/#include <stdarg.h>#include <nucleus/queue.h>#include <posix/registry.h>#include <posix/internal.h>     /* Magics, time conversion */#include <posix/thread.h>       /* errno. */#include <posix/sig.h>          /* pse51_siginfo_t. *//* Temporary definitions. */struct pse51_mq {    pse51_node_t nodebase;#define node2mq(naddr) \    ((pse51_mq_t *) (((char *)naddr) - offsetof(pse51_mq_t, nodebase)))    unsigned long flags;    xnpqueue_t queued;    xnsynch_t receivers;    xnsynch_t senders;    size_t memsize;    char *mem;    xnqueue_t avail;    /* mq_notify */    pse51_siginfo_t si;    pthread_t target;    struct mq_attr attr;    xnholder_t link;            /* link in mqq */#define link2mq(laddr) \    ((pse51_mq_t *) (((char *)laddr) - offsetof(pse51_mq_t, link)))};typedef struct pse51_mq pse51_mq_t;typedef struct pse51_msg {    xnpholder_t link;    size_t len;#define link2msg(laddr) \    ((pse51_msg_t *)(((char *)laddr) - offsetof(pse51_msg_t, link)))    char data[0];} pse51_msg_t;typedef struct pse51_direct_msg {    char *buf;    size_t *lenp;    unsigned *priop;    int used;} pse51_direct_msg_t;static xnqueue_t pse51_mqq;static pse51_msg_t *pse51_mq_msg_alloc(pse51_mq_t *mq){    xnpholder_t *holder = (xnpholder_t *) getq(&mq->avail);    if(!holder)        return NULL;    initph(holder);    return link2msg(holder);}static void pse51_mq_msg_free(pse51_mq_t *mq, pse51_msg_t *msg){    xnholder_t *holder = (xnholder_t *) (&msg->link);    inith(holder);    prependq(&mq->avail, holder); /* For earliest re-use of the block. */}static int pse51_mq_init(pse51_mq_t *mq, const struct mq_attr *attr){    unsigned i, msgsize, memsize;    char *mem;    if (xnpod_asynch_p() || !xnpod_root_p())        return EPERM;    if(!attr->mq_maxmsg)        return EINVAL;    msgsize = attr->mq_msgsize + sizeof(pse51_msg_t);    /* Align msgsize on natural boundary. */    if ((msgsize % sizeof(unsigned long)))        msgsize += sizeof(unsigned long) - (msgsize % sizeof(unsigned long));    memsize = msgsize * attr->mq_maxmsg;    memsize = PAGE_ALIGN(memsize);    mem = (char *) xnarch_sysalloc(memsize);    if (!mem)        return ENOSPC;    mq->flags = 0;    mq->memsize = memsize;    initpq(&mq->queued, xnqueue_down, 0);    xnsynch_init(&mq->receivers, XNSYNCH_PRIO | XNSYNCH_NOPIP);    xnsynch_init(&mq->senders, XNSYNCH_PRIO | XNSYNCH_NOPIP);    mq->mem = mem;    /* Fill the pool. */    initq(&mq->avail);    for (i = 0; i < attr->mq_maxmsg; i++)        {        pse51_msg_t *msg = (pse51_msg_t *) (mem + i * msgsize);        pse51_mq_msg_free(mq, msg);        }    mq->attr = *attr;    mq->target = NULL;    return 0;}static void pse51_mq_destroy(pse51_mq_t *mq){    int need_resched;    spl_t s;    xnlock_get_irqsave(&nklock, s);    need_resched = (xnsynch_destroy(&mq->receivers) == XNSYNCH_RESCHED);    need_resched =        (xnsynch_destroy(&mq->senders) == XNSYNCH_RESCHED) || need_resched;    removeq(&pse51_mqq, &mq->link);    xnlock_put_irqrestore(&nklock, s);    xnarch_sysfree(mq->mem, mq->memsize);    if (need_resched)        xnpod_schedule();}/** * Get a attribute object of a message queue. * * @see http://www.opengroup.org/onlinepubs/000095399/functions/mq_getattr.html *  */int mq_getattr(mqd_t fd, struct mq_attr *attr){    pse51_desc_t *desc;    pse51_mq_t *mq;    spl_t s;    int err;    xnlock_get_irqsave(&nklock, s);    err = pse51_desc_get(&desc, fd, PSE51_MQ_MAGIC);    if(err)        {        xnlock_put_irqrestore(&nklock, s);        thread_set_errno(err);        return -1;        }        mq = node2mq(pse51_desc_node(desc));    *attr = mq->attr;    attr->mq_flags = pse51_desc_getflags(desc);    attr->mq_curmsgs = countpq(&mq->queued);    xnlock_put_irqrestore(&nklock, s);    return 0;}/** * Set flags of a message queue. * * @see http://www.opengroup.org/onlinepubs/000095399/functions/mq_setattr.html *  */int mq_setattr(mqd_t fd,               const struct mq_attr *__restrict__ attr,               struct mq_attr *__restrict__ oattr){    pse51_desc_t *desc;    pse51_mq_t *mq;    long flags;    spl_t s;    int err;    xnlock_get_irqsave(&nklock, s);    err = pse51_desc_get(&desc, fd, PSE51_MQ_MAGIC);    if(err)        {        xnlock_put_irqrestore(&nklock, s);        thread_set_errno(err);        return -1;        }    mq = node2mq(pse51_desc_node(desc));    if(oattr)        {        *oattr = mq->attr;        oattr->mq_flags = pse51_desc_getflags(desc);        oattr->mq_curmsgs = countpq(&mq->queued);        }    flags = (pse51_desc_getflags(desc) & PSE51_PERMS_MASK)        | (attr->mq_flags & ~PSE51_PERMS_MASK);    pse51_desc_setflags(desc, flags);    xnlock_put_irqrestore(&nklock, s);    return 0;}static int pse51_mq_trysend(pse51_desc_t *desc,                            const char *buffer,                            size_t len,                            unsigned prio){    xnthread_t *reader;    pthread_t thread;    pse51_mq_t *mq;    unsigned flags;    mq = node2mq(pse51_desc_node(desc));    flags = pse51_desc_getflags(desc) & PSE51_PERMS_MASK;    if(flags != O_WRONLY && flags != O_RDWR)        return EPERM;    if(len > mq->attr.mq_msgsize)        return EMSGSIZE;    reader = xnsynch_wakeup_one_sleeper(&mq->receivers);    thread = thread2pthread(reader);    if(thread)        {        pse51_direct_msg_t *msg = (pse51_direct_msg_t *) thread->arg;        memcpy(msg->buf, buffer, len);        *(msg->lenp) = len;        if(msg->priop)            *(msg->priop) = prio;        msg->used = 1;        }    else        {        pse51_msg_t *msg = pse51_mq_msg_alloc(mq);        if(!msg)            return EAGAIN;        memcpy(&msg->data[0], buffer, len);        msg->len = len;        insertpqf(&mq->queued, &msg->link, prio);        /* First message and no pending reader, attempt to send a signal if           mq_notify was called. */        if (!reader && mq->target && countpq(&mq->queued) == 1)            {            pse51_sigqueue_inner(mq->target, &mq->si);            mq->target = NULL;            }        }    if(reader)        xnpod_schedule();    return 0;}static int pse51_mq_tryrcv(pse51_desc_t *desc,                           char *__restrict__ buffer,                           size_t *__restrict__ lenp,                           unsigned *__restrict__ priop){    xnpholder_t *holder;    pse51_msg_t *msg;    pse51_mq_t *mq;    unsigned flags;    mq = node2mq(pse51_desc_node(desc));    flags = pse51_desc_getflags(desc) & PSE51_PERMS_MASK;    if(flags != O_RDONLY && flags != O_RDWR)        return EPERM;    if(*lenp < mq->attr.mq_msgsize)        return EMSGSIZE;    if(!(holder = getpq(&mq->queued)))        return EAGAIN;    msg = link2msg(holder);    if(priop)        *priop = holder->prio;    *lenp = msg->len;    memcpy(buffer, &msg->data[0], msg->len);    pse51_mq_msg_free(mq, msg);    if(xnsynch_wakeup_one_sleeper(&mq->senders))        xnpod_schedule();        return 0;}static int pse51_mq_timedsend_inner(mqd_t fd,                                    const char * buffer,                                    size_t len,                                    unsigned prio,                                    xnticks_t abs_to){    int rc;    for (;;) {        xnticks_t to = abs_to;        pse51_desc_t *desc;        pse51_mq_t *mq;        xnthread_t *cur;                if ((rc = pse51_desc_get(&desc, fd, PSE51_MQ_MAGIC)))            return rc;        if ((rc = pse51_mq_trysend(desc, buffer, len, prio)) != EAGAIN)            return rc;        if ((pse51_desc_getflags(desc) & O_NONBLOCK))            return rc;        if (xnpod_unblockable_p())            return EPERM;        if ((rc = clock_adjust_timeout(&to, CLOCK_REALTIME)))            return rc;        mq = node2mq(pse51_desc_node(desc));        cur = xnpod_current_thread();        thread_cancellation_point(cur);        xnsynch_sleep_on(&mq->senders, to);        thread_cancellation_point(cur);        if (xnthread_test_flags(cur, XNBREAK))            return EINTR;        if (xnthread_test_flags(cur, XNTIMEO))            return ETIMEDOUT;        if (xnthread_test_flags(cur, XNRMID))            return EBADF;        }}static int pse51_mq_timedrcv_inner(mqd_t fd,                                   char *__restrict__ buffer,                                   size_t *__restrict__ lenp,                                   unsigned *__restrict__ priop,                                   xnticks_t abs_to){    xnthread_t *cur = xnpod_current_thread();    int rc;    for (;;)        {        pse51_direct_msg_t msg;        xnticks_t to = abs_to;        pse51_desc_t *desc;        pthread_t thread;        pse51_mq_t *mq;        int direct = 0;                if ((rc = pse51_desc_get(&desc, fd, PSE51_MQ_MAGIC)))            return rc;        if ((rc = pse51_mq_tryrcv(desc, buffer, lenp, priop)) != EAGAIN)            return rc;        if ((pse51_desc_getflags(desc) & O_NONBLOCK))            return rc;        if (xnpod_unblockable_p())            return EPERM;        if ((rc = clock_adjust_timeout(&to, CLOCK_REALTIME)))            return rc;        mq = node2mq(pse51_desc_node(desc));        thread = thread2pthread(cur);                if(thread)            {            msg.buf = buffer;            msg.lenp = lenp;            msg.priop = priop;            msg.used = 0;            thread->arg = &msg;            direct = 1;            }        thread_cancellation_point(cur);        xnsynch_sleep_on(&mq->receivers, to);        thread_cancellation_point(cur);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?