📄 queue.c
字号:
}again: holder = getq(&queue->inq); if (!holder) { if (flags & Q_NOWAIT) { xnmutex_unlock(&__imutex); return ERR_NOMSG; } xnarch_post_graph_if(&queue->synchbase, 1, /* PENDED */ xnsynch_nsleepers(&queue->synchbase) == 0); xnsynch_sleep_on(&queue->synchbase,timeout,&__imutex); if (xnthread_test_flags(&psos_current_task()->threadbase,XNRMID)) { xnmutex_unlock(&__imutex); return ERR_QKILLD; /* Queue deleted while pending. */ } if (xnthread_test_flags(&psos_current_task()->threadbase,XNTIMEO)) { xnmutex_unlock(&__imutex); return ERR_TIMEOUT; /* Timeout.*/ } mbuf = psos_current_task()->waitargs.qmsg; if (!mbuf) /* Rare, but spurious wakeups might */ goto again; /* occur during memory contention. */ psos_current_task()->waitargs.qmsg = NULL; } else { mbuf = link2psosmbuf(holder); xnarch_post_graph(&queue->synchbase, countq(&queue->inq) > 0 ? 2 : 0); /* POSTED or EMPTY */ } xnmutex_unlock(&__imutex); if (mbuf->len > buflen) rc = ERR_BUFSIZ; memcpy(msgbuf,mbuf->data,minval(buflen,mbuf->len)); if (msglen) *msglen = mbuf->len; if (testbits(queue->synchbase.status,Q_NOCACHE)) xnfree(mbuf); else { xnmutex_lock(&__imutex); appendq(&queue->freeq,&mbuf->link); xnmutex_unlock(&__imutex); } return rc;}static u_long q_send_internal (u_long qid, u_long flags, void *msgbuf, u_long msglen){ xnthread_t *sleeper; psosqueue_t *queue; psosmbuf_t *mbuf; xnmutex_lock(&__imutex); queue = psos_h2obj_active(qid,PSOS_QUEUE_MAGIC,psosqueue_t); if (!queue) { u_long err = psos_handle_error(qid,PSOS_QUEUE_MAGIC,psosqueue_t); xnmutex_unlock(&__imutex); return err; } if ((flags & Q_VARIABLE) && !xnsynch_test_flags(&queue->synchbase,Q_VARIABLE)) { xnmutex_unlock(&__imutex); return ERR_NOTVARQ; } if (!(flags & Q_VARIABLE) && xnsynch_test_flags(&queue->synchbase,Q_VARIABLE)) { xnmutex_unlock(&__imutex); return ERR_VARQ; } if (msglen > queue->maxlen) { xnmutex_unlock(&__imutex); return ERR_MSGSIZ; } mbuf = get_mbuf(queue,msglen); if (!mbuf) { xnmutex_unlock(&__imutex); return ERR_NOMGB; } sleeper = xnsynch_wakeup_one_sleeper(&queue->synchbase); mbuf->len = msglen; if (sleeper) { memcpy(mbuf->data,msgbuf,msglen); thread2psostask(sleeper)->waitargs.qmsg = mbuf; xnmutex_unlock(&__imutex); xnpod_schedule(NULL); return SUCCESS; } if (!xnsynch_test_flags(&queue->synchbase,Q_INFINITE) && countq(&queue->inq) >= queue->maxnum) { xnmutex_unlock(&__imutex); return ERR_QFULL; } if (flags & Q_JAMMED) { prependq(&queue->inq,&mbuf->link); xnarch_post_graph(&queue->synchbase,4); /* JAMMED */ } else appendq(&queue->inq,&mbuf->link); memcpy(mbuf->data,msgbuf,msglen); xnarch_post_graph_if(&queue->synchbase, countq(&queue->inq) >= queue->maxnum ? 3 : 2, !xnsynch_test_flags(&queue->synchbase,Q_INFINITE)); /* FULL or POSTED */ xnmutex_unlock(&__imutex); return SUCCESS;}static u_long q_broadcast_internal (u_long qid, u_long flags, void *msgbuf, u_long msglen, u_long *count){ xnthread_t *sleeper; psosqueue_t *queue; xnmutex_lock(&__imutex); queue = psos_h2obj_active(qid,PSOS_QUEUE_MAGIC,psosqueue_t); if (!queue) { u_long err = psos_handle_error(qid,PSOS_QUEUE_MAGIC,psosqueue_t); xnmutex_unlock(&__imutex); return err; } if ((flags & Q_VARIABLE) && !xnsynch_test_flags(&queue->synchbase,Q_VARIABLE)) { xnmutex_unlock(&__imutex); return ERR_NOTVARQ; } if (!(flags & Q_VARIABLE) && xnsynch_test_flags(&queue->synchbase,Q_VARIABLE)) { xnmutex_unlock(&__imutex); return ERR_VARQ; } if (msglen > queue->maxlen) { xnmutex_unlock(&__imutex); return ERR_MSGSIZ; } *count = 0; while ((sleeper = xnsynch_wakeup_one_sleeper(&queue->synchbase)) != NULL) { psosmbuf_t *mbuf = get_mbuf(queue,msglen); if (!mbuf) { /* Will beget a spurious wakeup. */ thread2psostask(sleeper)->waitargs.qmsg = NULL; break; } mbuf->len = msglen; memcpy(mbuf->data,msgbuf,msglen); thread2psostask(sleeper)->waitargs.qmsg = mbuf; (*count)++; } xnmutex_unlock(&__imutex); return SUCCESS;}static u_long q_ident_internal (char name[4], u_long flags, u_long node, u_long *qid){ xnholder_t *holder; psosqueue_t *queue; xnpod_check_context(XNPOD_THREAD_CONTEXT); if (node > 1) return ERR_NODENO; if (!name) return ERR_OBJNF; xnmutex_lock(&__imutex); for (holder = getheadq(&psosqueueq); holder; holder = nextq(&psosqueueq,holder)) { queue = link2psosqueue(holder); if (queue->name[0] == name[0] && queue->name[1] == name[1] && queue->name[2] == name[2] && queue->name[3] == name[3]) { if (((flags & Q_VARIABLE) && testbits(queue->synchbase.status,Q_VARIABLE)) || (!(flags & Q_VARIABLE) && !testbits(queue->synchbase.status,Q_VARIABLE))) { *qid = (u_long)queue; xnmutex_unlock(&__imutex); return SUCCESS; } } } xnmutex_unlock(&__imutex); return ERR_OBJNF;}u_long q_create (char name[4], u_long maxnum, u_long flags, u_long *qid) { return q_create_internal(name, maxnum, sizeof(u_long[4]), flags & ~Q_VARIABLE, qid);}u_long q_vcreate (char name[4], u_long flags, u_long maxnum, u_long maxlen, u_long *qid) { return q_create_internal(name, maxnum, maxlen, flags|Q_VARIABLE, qid);}u_long q_delete (u_long qid) { return q_delete_internal(qid,0);}u_long q_vdelete (u_long qid) { return q_delete_internal(qid,Q_VARIABLE);}u_long q_ident (char name[4], u_long node, u_long *qid) { return q_ident_internal(name, 0, node, qid);}u_long q_vident (char name[4], u_long node, u_long *qid) { return q_ident_internal(name, Q_VARIABLE, node, qid);}u_long q_receive (u_long qid, u_long flags, u_long timeout, u_long msgbuf[4]) { return q_receive_internal(qid, flags & ~Q_VARIABLE, timeout, msgbuf, sizeof(msgbuf), NULL);}u_long q_vreceive(u_long qid, u_long flags, u_long timeout, void *msgbuf, u_long buflen, u_long *msglen) { return q_receive_internal(qid, flags|Q_VARIABLE, timeout, msgbuf, buflen, msglen);}u_long q_send (u_long qid, u_long msgbuf[4]) { return q_send_internal(qid, 0, msgbuf, sizeof(msgbuf));}u_long q_vsend (u_long qid, void *msgbuf, u_long msglen) { return q_send_internal(qid, Q_VARIABLE, msgbuf, msglen);}u_long q_broadcast (u_long qid, u_long msgbuf[4], u_long *count) { return q_broadcast_internal(qid, 0, msgbuf, sizeof(msgbuf), count);}u_long q_vbroadcast (u_long qid, void *msgbuf, u_long msglen, u_long *count) { return q_broadcast_internal(qid, Q_VARIABLE, msgbuf, msglen, count);}u_long q_urgent (u_long qid, u_long msgbuf[4]) { return q_send_internal(qid, Q_JAMMED, msgbuf, sizeof(msgbuf));}u_long q_vurgent (u_long qid, void *msgbuf, u_long msglen) { return q_send_internal(qid, Q_VARIABLE|Q_JAMMED, msgbuf, msglen);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -