mq.c
来自「xenomai 很好的linux实时补丁」· C语言 代码 · 共 879 行 · 第 1/2 页
C
879 行
/* * Written by Gilles Chanteperdrix <gilles.chanteperdrix@laposte.net>. * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License as * published by the Free Software Foundation; either version 2 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *//** * @ingroup posix * @defgroup posix_mq Message queues services. * * Message queues services. * * A message queue allow exchanging data between real-time threads. Maximum * message length and maximum number of messages are fixed when the queue is * created (see mq_open()). * *@{*/#include <stdarg.h>#include <nucleus/queue.h>#include <posix/registry.h>#include <posix/internal.h> /* Magics, time conversion */#include <posix/thread.h> /* errno. */#include <posix/sig.h> /* pse51_siginfo_t. *//* Temporary definitions. */struct pse51_mq { pse51_node_t nodebase;#define node2mq(naddr) \ ((pse51_mq_t *) (((char *)naddr) - offsetof(pse51_mq_t, nodebase))) unsigned long flags; xnpqueue_t queued; xnsynch_t receivers; xnsynch_t senders; size_t memsize; char *mem; xnqueue_t avail; /* mq_notify */ pse51_siginfo_t si; pthread_t target; struct mq_attr attr; xnholder_t link; /* link in mqq */#define link2mq(laddr) \ ((pse51_mq_t *) (((char *)laddr) - offsetof(pse51_mq_t, link)))};typedef struct pse51_mq pse51_mq_t;typedef struct pse51_msg { xnpholder_t link; size_t len;#define link2msg(laddr) \ ((pse51_msg_t *)(((char *)laddr) - offsetof(pse51_msg_t, link))) char data[0];} pse51_msg_t;typedef struct pse51_direct_msg { char *buf; size_t *lenp; unsigned *priop; int used;} pse51_direct_msg_t;static xnqueue_t pse51_mqq;static pse51_msg_t *pse51_mq_msg_alloc(pse51_mq_t *mq){ xnpholder_t *holder = (xnpholder_t *) getq(&mq->avail); if(!holder) return NULL; initph(holder); return link2msg(holder);}static void pse51_mq_msg_free(pse51_mq_t *mq, pse51_msg_t *msg){ xnholder_t *holder = (xnholder_t *) (&msg->link); inith(holder); prependq(&mq->avail, holder); /* For earliest re-use of the block. */}static int pse51_mq_init(pse51_mq_t *mq, const struct mq_attr *attr){ unsigned i, msgsize, memsize; char *mem; if (xnpod_asynch_p() || !xnpod_root_p()) return EPERM; if(!attr->mq_maxmsg) return EINVAL; msgsize = attr->mq_msgsize + sizeof(pse51_msg_t); /* Align msgsize on natural boundary. */ if ((msgsize % sizeof(unsigned long))) msgsize += sizeof(unsigned long) - (msgsize % sizeof(unsigned long)); memsize = msgsize * attr->mq_maxmsg; memsize = PAGE_ALIGN(memsize); mem = (char *) xnarch_sysalloc(memsize); if (!mem) return ENOSPC; mq->flags = 0; mq->memsize = memsize; initpq(&mq->queued, xnqueue_down, 0); xnsynch_init(&mq->receivers, XNSYNCH_PRIO | XNSYNCH_NOPIP); xnsynch_init(&mq->senders, XNSYNCH_PRIO | XNSYNCH_NOPIP); mq->mem = mem; /* Fill the pool. */ initq(&mq->avail); for (i = 0; i < attr->mq_maxmsg; i++) { pse51_msg_t *msg = (pse51_msg_t *) (mem + i * msgsize); pse51_mq_msg_free(mq, msg); } mq->attr = *attr; mq->target = NULL; return 0;}static void pse51_mq_destroy(pse51_mq_t *mq){ int need_resched; spl_t s; xnlock_get_irqsave(&nklock, s); need_resched = (xnsynch_destroy(&mq->receivers) == XNSYNCH_RESCHED); need_resched = (xnsynch_destroy(&mq->senders) == XNSYNCH_RESCHED) || need_resched; removeq(&pse51_mqq, &mq->link); xnlock_put_irqrestore(&nklock, s); xnarch_sysfree(mq->mem, mq->memsize); if (need_resched) xnpod_schedule();}/** * Get a attribute object of a message queue. * * @see http://www.opengroup.org/onlinepubs/000095399/functions/mq_getattr.html * */int mq_getattr(mqd_t fd, struct mq_attr *attr){ pse51_desc_t *desc; pse51_mq_t *mq; spl_t s; int err; xnlock_get_irqsave(&nklock, s); err = pse51_desc_get(&desc, fd, PSE51_MQ_MAGIC); if(err) { xnlock_put_irqrestore(&nklock, s); thread_set_errno(err); return -1; } mq = node2mq(pse51_desc_node(desc)); *attr = mq->attr; attr->mq_flags = pse51_desc_getflags(desc); attr->mq_curmsgs = countpq(&mq->queued); xnlock_put_irqrestore(&nklock, s); return 0;}/** * Set flags of a message queue. * * @see http://www.opengroup.org/onlinepubs/000095399/functions/mq_setattr.html * */int mq_setattr(mqd_t fd, const struct mq_attr *__restrict__ attr, struct mq_attr *__restrict__ oattr){ pse51_desc_t *desc; pse51_mq_t *mq; long flags; spl_t s; int err; xnlock_get_irqsave(&nklock, s); err = pse51_desc_get(&desc, fd, PSE51_MQ_MAGIC); if(err) { xnlock_put_irqrestore(&nklock, s); thread_set_errno(err); return -1; } mq = node2mq(pse51_desc_node(desc)); if(oattr) { *oattr = mq->attr; oattr->mq_flags = pse51_desc_getflags(desc); oattr->mq_curmsgs = countpq(&mq->queued); } flags = (pse51_desc_getflags(desc) & PSE51_PERMS_MASK) | (attr->mq_flags & ~PSE51_PERMS_MASK); pse51_desc_setflags(desc, flags); xnlock_put_irqrestore(&nklock, s); return 0;}static int pse51_mq_trysend(pse51_desc_t *desc, const char *buffer, size_t len, unsigned prio){ xnthread_t *reader; pthread_t thread; pse51_mq_t *mq; unsigned flags; mq = node2mq(pse51_desc_node(desc)); flags = pse51_desc_getflags(desc) & PSE51_PERMS_MASK; if(flags != O_WRONLY && flags != O_RDWR) return EPERM; if(len > mq->attr.mq_msgsize) return EMSGSIZE; reader = xnsynch_wakeup_one_sleeper(&mq->receivers); thread = thread2pthread(reader); if(thread) { pse51_direct_msg_t *msg = (pse51_direct_msg_t *) thread->arg; memcpy(msg->buf, buffer, len); *(msg->lenp) = len; if(msg->priop) *(msg->priop) = prio; msg->used = 1; } else { pse51_msg_t *msg = pse51_mq_msg_alloc(mq); if(!msg) return EAGAIN; memcpy(&msg->data[0], buffer, len); msg->len = len; insertpqf(&mq->queued, &msg->link, prio); /* First message and no pending reader, attempt to send a signal if mq_notify was called. */ if (!reader && mq->target && countpq(&mq->queued) == 1) { pse51_sigqueue_inner(mq->target, &mq->si); mq->target = NULL; } } if(reader) xnpod_schedule(); return 0;}static int pse51_mq_tryrcv(pse51_desc_t *desc, char *__restrict__ buffer, size_t *__restrict__ lenp, unsigned *__restrict__ priop){ xnpholder_t *holder; pse51_msg_t *msg; pse51_mq_t *mq; unsigned flags; mq = node2mq(pse51_desc_node(desc)); flags = pse51_desc_getflags(desc) & PSE51_PERMS_MASK; if(flags != O_RDONLY && flags != O_RDWR) return EPERM; if(*lenp < mq->attr.mq_msgsize) return EMSGSIZE; if(!(holder = getpq(&mq->queued))) return EAGAIN; msg = link2msg(holder); if(priop) *priop = holder->prio; *lenp = msg->len; memcpy(buffer, &msg->data[0], msg->len); pse51_mq_msg_free(mq, msg); if(xnsynch_wakeup_one_sleeper(&mq->senders)) xnpod_schedule(); return 0;}static int pse51_mq_timedsend_inner(mqd_t fd, const char * buffer, size_t len, unsigned prio, xnticks_t abs_to){ int rc; for (;;) { xnticks_t to = abs_to; pse51_desc_t *desc; pse51_mq_t *mq; xnthread_t *cur; if ((rc = pse51_desc_get(&desc, fd, PSE51_MQ_MAGIC))) return rc; if ((rc = pse51_mq_trysend(desc, buffer, len, prio)) != EAGAIN) return rc; if ((pse51_desc_getflags(desc) & O_NONBLOCK)) return rc; if (xnpod_unblockable_p()) return EPERM; if ((rc = clock_adjust_timeout(&to, CLOCK_REALTIME))) return rc; mq = node2mq(pse51_desc_node(desc)); cur = xnpod_current_thread(); thread_cancellation_point(cur); xnsynch_sleep_on(&mq->senders, to); thread_cancellation_point(cur); if (xnthread_test_flags(cur, XNBREAK)) return EINTR; if (xnthread_test_flags(cur, XNTIMEO)) return ETIMEDOUT; if (xnthread_test_flags(cur, XNRMID)) return EBADF; }}static int pse51_mq_timedrcv_inner(mqd_t fd, char *__restrict__ buffer, size_t *__restrict__ lenp, unsigned *__restrict__ priop, xnticks_t abs_to){ xnthread_t *cur = xnpod_current_thread(); int rc; for (;;) { pse51_direct_msg_t msg; xnticks_t to = abs_to; pse51_desc_t *desc; pthread_t thread; pse51_mq_t *mq; int direct = 0; if ((rc = pse51_desc_get(&desc, fd, PSE51_MQ_MAGIC))) return rc; if ((rc = pse51_mq_tryrcv(desc, buffer, lenp, priop)) != EAGAIN) return rc; if ((pse51_desc_getflags(desc) & O_NONBLOCK)) return rc; if (xnpod_unblockable_p()) return EPERM; if ((rc = clock_adjust_timeout(&to, CLOCK_REALTIME))) return rc; mq = node2mq(pse51_desc_node(desc)); thread = thread2pthread(cur); if(thread) { msg.buf = buffer; msg.lenp = lenp; msg.priop = priop; msg.used = 0; thread->arg = &msg; direct = 1; } thread_cancellation_point(cur); xnsynch_sleep_on(&mq->receivers, to); thread_cancellation_point(cur);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?