📄 lmsgqlib.c
字号:
*/ pthread_cond_broadcast( &(queue->queue_send) ); /* ** Unlock the queue send mutex. */ pthread_mutex_unlock( &(queue->queue_lock) ); /* ** Lock mutex for queue space */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->qfull_lock)); pthread_mutex_lock( &(queue->qfull_lock) ); /* ** Signal the condition variable for tasks waiting on ** space to post messages into the queue */ pthread_cond_broadcast( &(queue->queue_space) ); /* ** Unlock the queue space mutex. */ pthread_cleanup_pop( 1 ); /* ** Wait for all pended tasks to receive deletion signal. ** The last task to receive the deletion signal will signal the ** deletion-complete condition variable. */ while ( (queue->first_susp != (v2pthread_cb_t *)NULL) && (queue->first_write_susp != (v2pthread_cb_t *)NULL) ) { pthread_cond_wait( &(queue->qdlet_cmplt), &(queue->qdlet_lock) ); } /* ** Unlock the queue delete completion mutex. */ pthread_cleanup_pop( 1 ); } else { /* ** Unlock the queue mutex. */ pthread_mutex_unlock( &(queue->queue_lock) ); } /* ** No other tasks are pending on the queue by this point... ** Now physically delete the queue. */ delete_mqueue( queue ); taskUnlock(); } else { error = S_objLib_OBJ_ID_ERROR; } /* ** Clean up the opening pthread_cleanup_push() */ pthread_cleanup_pop( 0 ); if ( error != OK ) { errno = (int)error; error = ERROR; } return( error );}/******************************************************************************* waiting_on_q_msg - returns a nonzero result unless a qualifying event** occurs on the specified queue which should cause the** pended task to be awakened. The qualifying events** are:** (1) a message is sent to the queue and the current** task is selected to receive it** (2) the queue is deleted*****************************************************************************/static int waiting_on_q_msg( v2pt_mqueue_t *queue, struct timespec *timeout, int *retcode ){ int result; struct timeval now; ulong usec; if ( queue->send_type & KILLD ) { /* ** Queue has been killed... waiting is over. */ result = 0; *retcode = 0; } else { /* ** Queue still in service... check for message availability. ** Initially assume no message for our task */ result = 1; /* ** Multiple messages sent to the queue may be represented by only ** a single signal to the condition variable, so continue ** checking for a message for our task as long as more messages ** are available. */ while ( queue->msg_count > 0 ) { /* ** Message arrived... see if it's for our task. */ if ( signal_for_my_task( &(queue->first_susp), queue->order ) ) { /* ** Message was destined for our task... waiting is over. */ result = 0; *retcode = 0; break; } else { /* ** Message isn't for our task... continue waiting. ** Sleep awhile to allow other tasks ahead of ours in the ** list of tasks waiting on the queue to get their ** messages, bringing our task to the head of the list. */ pthread_mutex_unlock( &(queue->queue_lock) ); taskDelay( 1 ); pthread_mutex_lock( &(queue->queue_lock) ); } /* ** If a timeout was specified, make sure we respect it and ** exit this loop if it expires. */ if ( timeout != (struct timespec *)NULL ) { gettimeofday( &now, (struct timezone *)NULL ); if ( timeout->tv_nsec > (now.tv_usec * 1000) ) { usec = (timeout->tv_nsec - (now.tv_usec * 1000)) / 1000; if ( timeout->tv_sec < now.tv_sec ) usec = 0; else usec += ((timeout->tv_sec - now.tv_sec) * 1000000); } else { usec = ((timeout->tv_nsec + 1000000000) - (now.tv_usec * 1000)) / 1000; if ( (timeout->tv_sec - 1) < now.tv_sec ) usec = 0; else usec += (((timeout->tv_sec - 1) - now.tv_sec) * 1000000); } if ( usec == 0 ) break; } } } return( result );}/******************************************************************************* msgQReceive - blocks the calling task until a message is available in the** specified v2pthread queue.*****************************************************************************/int msgQReceive( v2pt_mqueue_t *queue, char *msgbuf, uint buflen, int max_wait ){ v2pthread_cb_t *our_tcb; struct timeval now; struct timespec timeout; int retcode; int msglen; long sec, usec; STATUS error; error = OK; msglen = 0; /* ** First ensure that the specified queue exists and that we have ** exclusive access to it. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->queue_lock)); if ( queue_valid( queue ) ) { /* ** Okay... the queue is legitimate and we have exclusive access... ** Ensure that caller's buffer is big enough for max message size ** specified for queue. */ if ( buflen < queue->msg_len ) { error = S_msgQLib_INVALID_MSG_LENGTH; } else { /* ** Add tcb for task to list of tasks waiting on queue */ our_tcb = my_tcb();#ifdef DIAG_PRINTFS printf( "\r\ntask @ %p wait on queue list @ %p", our_tcb, &(queue->first_susp) );#endif link_susp_tcb( &(queue->first_susp), our_tcb ); /* ** If tasks waiting to write to a zero-length queue, notify ** waiting task that we're ready to receive a message. */ if ( ((queue->msgs_per_queue == 0) && (queue->first_write_susp != (v2pthread_cb_t *)NULL)) ) { /* ** Lock mutex for queue space */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->qfull_lock)); pthread_mutex_lock( &(queue->qfull_lock) ); /* ** Alert the waiting tasks that message space is available. */ pthread_cond_broadcast( &(queue->queue_space) ); /* ** Unlock the queue space mutex. */ pthread_cleanup_pop( 1 ); } retcode = 0; if ( max_wait == NO_WAIT ) { /* ** Caller specified no wait on queue message... ** Check the condition variable with an immediate timeout. */ gettimeofday( &now, (struct timezone *)NULL ); timeout.tv_sec = now.tv_sec; timeout.tv_nsec = now.tv_usec * 1000; while ( (waiting_on_q_msg( queue, &timeout, &retcode )) && (retcode != ETIMEDOUT) ) { retcode = pthread_cond_timedwait( &(queue->queue_send), &(queue->queue_lock), &timeout ); } } else { /* ** Caller expects to wait on queue, with or without a timeout. */ if ( max_wait == WAIT_FOREVER ) { /* ** Infinite wait was specified... wait without timeout. */ while ( waiting_on_q_msg( queue, 0, &retcode ) ) { pthread_cond_wait( &(queue->queue_send), &(queue->queue_lock) ); } } else { /* ** Wait on queue message arrival with timeout... ** Calculate timeout delay in seconds and microseconds. */ sec = 0; usec = max_wait * V2PT_TICK * 1000; gettimeofday( &now, (struct timezone *)NULL ); usec += now.tv_usec; if ( usec > 1000000 ) { sec = usec / 1000000; usec = usec % 1000000; } timeout.tv_sec = now.tv_sec + sec; timeout.tv_nsec = usec * 1000; /* ** Wait for a queue message for the current task or for the ** timeout to expire. The loop is required since the task ** may be awakened by signals for messages which are ** not ours, or for signals other than from a message send. */ while ( (waiting_on_q_msg( queue, &timeout, &retcode )) && (retcode != ETIMEDOUT) ) { retcode = pthread_cond_timedwait( &(queue->queue_send), &(queue->queue_lock), &timeout ); } } } /* ** Remove the calling task's tcb from the waiting task list ** for the queue. Clear our TCB's suspend list pointer in ** case the queue was killed & its ctrl blk deallocated. */ unlink_susp_tcb( &(queue->first_susp), our_tcb ); our_tcb->suspend_list = (v2pthread_cb_t **)NULL; /* ** See if we were awakened due to a msgQDelete on the queue. */ if ( queue->send_type & KILLD ) { notify_if_delete_complete( queue ); error = S_objLib_OBJ_DELETED; *((char *)msgbuf) = (char)NULL;#ifdef DIAG_PRINTFS printf( "...queue deleted" );#endif } else { /* ** See if we timed out or if we got a message */ if ( retcode == ETIMEDOUT ) { /* ** Timed out without a message */ if ( max_wait == NO_WAIT ) error = S_objLib_OBJ_UNAVAILABLE; else error = S_objLib_OBJ_TIMEOUT; *((char *)msgbuf) = (char)NULL;#ifdef DIAG_PRINTFS printf( "...timed out" );#endif } else { /* ** A message was sent to the queue for this task... ** Retrieve the message and clear the queue contents. */ msglen = (int)fetch_msg_from( queue, (char *)msgbuf );#ifdef DIAG_PRINTFS printf( "...rcvd queue msg @ %p", msgbuf );#endif } } } /* ** Unlock the mutex for the condition variable. */ pthread_mutex_unlock( &(queue->queue_lock) ); } else { error = S_objLib_OBJ_ID_ERROR; /* Invalid queue specified */ *((char *)msgbuf) = (char)NULL; } /* ** Clean up the opening pthread_cleanup_push() */ pthread_cleanup_pop( 0 ); if ( error != OK ) { errno = (int)error; msglen = (int)ERROR; } return( msglen );}/******************************************************************************* msgQNumMsgs - returns the number of messages currently posted to the** specified queue.*****************************************************************************/int msgQNumMsgs( v2pt_mqueue_t *queue ){ int num_msgs; /* ** First ensure that the specified queue exists and that we have ** exclusive access to it. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->queue_lock)); if ( queue_valid( queue ) ) { /* ** Okay... the queue is legitimate and we have exclusive access... ** Get the number of messages currently posted to the queue. */ num_msgs = queue->msg_count; /* ** Unlock the mutex for the condition variable. */ pthread_mutex_unlock( &(queue->queue_lock) ); } else { num_msgs = (int)ERROR; } /* ** Clean up the opening pthread_cleanup_push() */ pthread_cleanup_pop( 0 ); return( num_msgs );}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -