📄 vqueue.c
字号:
** Send msg and block while any tasks are still pended on the queue */ sched_lock(); if ( queue->first_susp != (p2pthread_cb_t *)NULL ) { /* ** Lock mutex for queue broadcast completion */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->qbcst_lock) ); pthread_mutex_lock( &(queue->qbcst_lock) ); /* ** Lock mutex for urgent queue send */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->queue_lock)); pthread_mutex_lock( &(queue->queue_lock) ); /* ** Signal the condition variable for the queue */ pthread_cond_broadcast( &(queue->queue_send) ); /* ** Unlock the queue mutex. */ pthread_mutex_unlock( &(queue->queue_lock) ); pthread_cleanup_pop( 0 ); /* ** Wait for all pended tasks to receive broadcast message. ** The last task to receive the message will signal the ** broadcast-complete condition variable. */ while ( queue->first_susp != (p2pthread_cb_t *)NULL ) { pthread_cond_wait( &(queue->qbcst_cmplt), &(queue->qbcst_lock) ); } /* ** Unlock the queue broadcast completion mutex. */ pthread_mutex_unlock( &(queue->qbcst_lock) ); pthread_cleanup_pop( 0 ); } delete_vqueue( queue ); sched_unlock(); } else { error = ERR_OBJDEL; } return( error );}/******************************************************************************* waiting_on_vqueue - returns a nonzero result unless a qualifying event** occurs on the specified queue which should cause the** pended task to be awakened. The qualifying events** are:** (1) a message is sent to the queue and the current** task is selected to receive it** (2) a broadcast message is sent to the queue** (3) the queue is deleted*****************************************************************************/static int waiting_on_vqueue( p2pt_vqueue_t *queue, struct timespec *timeout, int *retcode ){ int result; struct timeval now; ULONG usec; if ( queue->send_type & KILLD ) { /* ** Queue has been killed... waiting is over. */ result = 0; *retcode = 0; } else { /* ** Queue still in service... check for message availability. ** Initially assume no message for our task */ result = 1; /* ** Multiple messages sent to the queue may be represented by only ** a single signal to the condition variable, so continue ** checking for a message for our task as long as more messages ** are available. */ while ( queue->msg_count > 0 ) { /* ** Message arrived... see if it's for our task. */ if ( (queue->send_type & BCAST) || (signal_for_my_task( &(queue->first_susp), (queue->flags & Q_PRIOR) )) ) { /* ** Message was either broadcast for all tasks or was ** destined for our task specifically... waiting is over. */ result = 0; *retcode = 0; break; } else { /* ** Message isn't for our task... continue waiting. ** Sleep awhile to allow other tasks ahead of ours in the ** list of tasks waiting on the queue to get their ** messages, bringing our task to the head of the list. */ pthread_mutex_unlock( &(queue->queue_lock) ); tm_wkafter( 1 ); pthread_mutex_lock( &(queue->queue_lock) ); } /* ** If a timeout was specified, make sure we respect it and ** exit this loop if it expires. */ if ( timeout != (struct timespec *)NULL ) { gettimeofday( &now, (struct timezone *)NULL ); if ( timeout->tv_nsec > (now.tv_usec * 1000) ) { usec = (timeout->tv_nsec - (now.tv_usec * 1000)) / 1000; if ( timeout->tv_sec < now.tv_sec ) usec = 0; else usec += ((timeout->tv_sec - now.tv_sec) * 1000000); } else { usec = ((timeout->tv_nsec + 1000000000) - (now.tv_usec * 1000)) / 1000; if ( (timeout->tv_sec - 1) < now.tv_sec ) usec = 0; else usec += (((timeout->tv_sec - 1) - now.tv_sec) * 1000000); } if ( usec == 0 ) break; } } } return( result );}/******************************************************************************* q_vreceive - blocks the calling task until a message is available in the** specified p2pthread queue.*****************************************************************************/ULONG q_vreceive( ULONG qid, ULONG opt, ULONG max_wait, void *msgbuf, ULONG buflen, ULONG *msglen ){ p2pthread_cb_t *our_tcb; struct timeval now; struct timespec timeout; int retcode; long sec, usec; p2pt_vqueue_t *queue; ULONG error; error = ERR_NO_ERROR; if ( (queue = qcb_for( qid )) != (p2pt_vqueue_t *)NULL ) { /* ** Return with error if caller's buffer is smaller than max message ** size specified for queue. */ if ( buflen < queue->msg_len ) { error = ERR_BUFSIZ; return( error ); } /* ** Lock mutex for queue receive */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->queue_lock)); pthread_mutex_lock( &(queue->queue_lock) ); /* ** If a broadcast is in progress, wait for it to complete ** before adding our TCB to the queue's waiting list. */ while ( queue->send_type != SEND ) { pthread_mutex_unlock( &(queue->queue_lock) ); tm_wkafter( 1 ); pthread_mutex_lock( &(queue->queue_lock) ); } /* ** Add tcb for task to list of tasks waiting on queue */ our_tcb = my_tcb();#ifdef DIAG_PRINTFS printf( "\r\ntask @ %p wait on queue list @ %p", our_tcb, &(queue->first_susp) );#endif link_susp_tcb( &(queue->first_susp), our_tcb ); retcode = 0; if ( opt & Q_NOWAIT ) { /* ** Caller specified no wait on queue message... ** Check the condition variable with an immediate timeout. */ gettimeofday( &now, (struct timezone *)NULL ); timeout.tv_sec = now.tv_sec; timeout.tv_nsec = now.tv_usec * 1000; while ( (waiting_on_vqueue( queue, &timeout, &retcode )) && (retcode != ETIMEDOUT) ) { retcode = pthread_cond_timedwait( &(queue->queue_send), &(queue->queue_lock), &timeout ); } } else { /* ** Caller expects to wait on queue, with or without a timeout. */ if ( max_wait == 0L ) { /* ** Infinite wait was specified... wait without timeout. */ while ( waiting_on_vqueue( queue, 0, &retcode ) ) { pthread_cond_wait( &(queue->queue_send), &(queue->queue_lock) ); } } else { /* ** Wait on queue message arrival with timeout... ** Calculate timeout delay in seconds and microseconds. */ sec = 0; usec = max_wait * P2PT_TICK * 1000; gettimeofday( &now, (struct timezone *)NULL ); usec += now.tv_usec; if ( usec > 1000000 ) { sec = usec / 1000000; usec = usec % 1000000; } timeout.tv_sec = now.tv_sec + sec; timeout.tv_nsec = usec * 1000; /* ** Wait for a queue message for the current task or for the ** timeout to expire. The loop is required since the task ** may be awakened by signals for messages which are ** not ours, or for signals other than from a message send. */ while ( (waiting_on_vqueue( queue, &timeout, &retcode )) && (retcode != ETIMEDOUT) ) { retcode = pthread_cond_timedwait( &(queue->queue_send), &(queue->queue_lock), &timeout ); } } } /* ** Remove the calling task's tcb from the pended task list ** for the queue. */ unlink_susp_tcb( &(queue->first_susp), our_tcb ); /* ** See if we were awakened due to a q_vdelete on the queue. */ if ( queue->send_type & KILLD ) { fetch_msg_from( queue, (char *)msgbuf, msglen ); error = ERR_QKILLD; *((char *)msgbuf) = (char)NULL;#ifdef DIAG_PRINTFS printf( "...queue deleted" );#endif } else { /* ** See if we timed out or if we got a message */ if ( retcode == ETIMEDOUT ) { /* ** Timed out without a message */ if ( opt & Q_NOWAIT ) error = ERR_NOMSG; else error = ERR_TIMEOUT; *((char *)msgbuf) = (char)NULL;#ifdef DIAG_PRINTFS printf( "...timed out" );#endif } else { /* ** A message was sent to the queue for this task... ** Retrieve the message and clear the queue contents. */ fetch_msg_from( queue, (char *)msgbuf, msglen );#ifdef DIAG_PRINTFS printf( "...rcvd queue msg @ %p len %lx", msgbuf, *msglen );#endif } } /* ** Unlock the mutex for the condition variable and clean up. */ pthread_mutex_unlock( &(queue->queue_lock) ); pthread_cleanup_pop( 0 ); } else { error = ERR_OBJDEL; /* Invalid queue specified */ *((char *)msgbuf) = (char)NULL; } return( error );}/******************************************************************************* q_vident - identifies the specified p2pthread queue*****************************************************************************/ULONG q_vident( char name[4], ULONG node, ULONG *qid ){ p2pt_vqueue_t *current_qcb; ULONG error; error = ERR_NO_ERROR; /* ** Validate the node specifier... only zero is allowed here. */ if ( node != 0L ) error = ERR_NODENO; else { /* ** If queue name string is a NULL pointer, return with error. ** We'll ASSUME the qid pointer isn't NULL! */ if ( name == (char *)NULL ) { *qid = (ULONG)NULL; error = ERR_OBJNF; } else { /* ** Scan the task list for a name matching the caller's name. */ for ( current_qcb = vqueue_list; current_qcb != (p2pt_vqueue_t *)NULL; current_qcb = current_qcb->nxt_queue ) { if ( (strncmp( name, current_qcb->qname, 4 )) == 0 ) { /* ** A matching name was found... return its QID */ *qid = current_qcb->qid; break; } } if ( current_qcb == (p2pt_vqueue_t *)NULL ) { /* ** No matching name found... return caller's QID with error. */ *qid = (ULONG)NULL; error = ERR_OBJNF; } } } return( error );}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -