📄 lmsgqlib.c
字号:
result = 1; /* ** Multiple messages removed from the queue may be represented by ** only a single signal to the condition variable, so continue ** checking for a message slot for our task as long as more space ** is available. Also note that for a 'zero-length' queue, the ** presence of a task waiting on the queue for our message will ** allow our message to be posted to the queue. */ while ( (queue->msg_count <= (queue->msgs_per_queue - 1)) || ((queue->msgs_per_queue == 0) && (queue->first_susp != (v2pthread_cb_t *)NULL)) ) { /* ** Message slot available... see if it's for our task. */ if ( signal_for_my_task( &(queue->first_write_susp), queue->order ) ) { /* ** Message slot was destined for our task... waiting is over. */ result = 0; *retcode = 0; break; } else { /* ** Message slot isn't for our task... continue waiting. ** Sleep awhile to allow other tasks ahead of ours in the ** list of tasks waiting on the queue to get their ** messages, bringing our task to the head of the list. */ pthread_mutex_unlock( &(queue->qfull_lock) ); taskDelay( 1 ); pthread_mutex_lock( &(queue->qfull_lock) ); } /* ** If a timeout was specified, make sure we respect it and ** exit this loop if it expires. */ if ( timeout != (struct timespec *)NULL ) { gettimeofday( &now, (struct timezone *)NULL ); if ( timeout->tv_nsec > (now.tv_usec * 1000) ) { usec = (timeout->tv_nsec - (now.tv_usec * 1000)) / 1000; if ( timeout->tv_sec < now.tv_sec ) usec = 0; else usec += ((timeout->tv_sec - now.tv_sec) * 1000000); } else { usec = ((timeout->tv_nsec + 1000000000) - (now.tv_usec * 1000)) / 1000; if ( (timeout->tv_sec - 1) < now.tv_sec ) usec = 0; else usec += (((timeout->tv_sec - 1) - now.tv_sec) * 1000000); } if ( usec == 0 ) break; } } } return( result );}/******************************************************************************* waitToSend - sends the queue message if sufficient space becomes available** within the allotted waiting interval.*****************************************************************************/STATUS waitToSend( v2pt_mqueue_t *queue, char *msg, uint msglen, int wait, int pri ){ v2pthread_cb_t *our_tcb; struct timeval now; struct timespec timeout; int retcode; long sec, usec; STATUS error; error = OK; if ( wait != NO_WAIT ) { /* ** Add tcb for task to list of tasks waiting on queue */ our_tcb = my_tcb();#ifdef DIAG_PRINTFS printf( "\r\ntask @ %p wait on queue space list @ %p", our_tcb, &(queue->first_write_susp) );#endif link_susp_tcb( &(queue->first_write_susp), our_tcb ); retcode = 0; /* ** Unlock the queue mutex so other tasks can receive messages. */ pthread_mutex_unlock( &(queue->queue_lock) ); /* ** Caller expects to wait for queue space, with or without a timeout. */ if ( wait == WAIT_FOREVER ) { /* ** Infinite wait was specified... wait without timeout. */ while ( waiting_on_q_space( queue, 0, &retcode ) ) { pthread_cond_wait( &(queue->queue_space), &(queue->qfull_lock) ); } } else { /* ** Wait on queue message space with timeout... ** Calculate timeout delay in seconds and microseconds. */ sec = 0; usec = wait * V2PT_TICK * 1000; gettimeofday( &now, (struct timezone *)NULL ); usec += now.tv_usec; if ( usec > 1000000 ) { sec = usec / 1000000; usec = usec % 1000000; } timeout.tv_sec = now.tv_sec + sec; timeout.tv_nsec = usec * 1000; /* ** Wait for queue message space for the current task or for the ** timeout to expire. The loop is required since the task ** may be awakened by signals for messages which are ** not ours, or for signals other than from a message send. */ while ( (waiting_on_q_space( queue, &timeout, &retcode )) && (retcode != ETIMEDOUT) ) { retcode = pthread_cond_timedwait( &(queue->queue_space), &(queue->qfull_lock), &timeout ); } } /* ** Re-lock the queue mutex before manipulating its control block. */ pthread_mutex_lock( &(queue->queue_lock) ); /* ** Remove the calling task's tcb from the pended task list ** for the queue. Clear our TCB's suspend list pointer in ** case the queue was killed & its ctrl blk deallocated. */ unlink_susp_tcb( &(queue->first_write_susp), our_tcb ); our_tcb->suspend_list = (v2pthread_cb_t **)NULL; /* ** See if we were awakened due to a msgQDelete on the queue. */ if ( queue->send_type & KILLD ) { notify_if_delete_complete( queue ); error = S_objLib_OBJ_DELETED;#ifdef DIAG_PRINTFS printf( "...queue deleted" );#endif } else { /* ** See if we timed out or if we got a message slot */ if ( retcode == ETIMEDOUT ) { /* ** Timed out without obtaining a message slot */ error = S_objLib_OBJ_TIMEOUT;#ifdef DIAG_PRINTFS printf( "...timed out" );#endif } else { /* ** A message slot was freed on the queue for this task... */#ifdef DIAG_PRINTFS printf( "...rcvd queue msg space" );#endif if ( pri == MSG_PRI_URGENT ) { /* ** Stuff the new message onto the front of the queue. */ urgent_msg_to( queue, msg, msglen ); /* ** Signal the condition variable for the queue */ pthread_cond_broadcast( &(queue->queue_send) ); } else /* ** Send the new message to the back of the queue. */ send_msg_to( queue, msg, msglen ); } } } else { /* ** Queue is full and no waiting allowed... return QUEUE FULL error */ error = S_objLib_OBJ_UNAVAILABLE; } return( error );}/******************************************************************************* msgQSend - posts a message to the tail of a v2pthread queue and awakens the** first selected task pended on the queue.*****************************************************************************/STATUS msgQSend( v2pt_mqueue_t *queue, char *msg, uint msglen, int wait, int pri ){#ifdef DIAG_PRINTFS v2pthread_cb_t *our_tcb;#endif STATUS error; error = OK; /* ** First ensure that the specified queue exists and that we have ** exclusive access to it. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->queue_lock)); if ( queue_valid( queue ) ) { /* ** Okay... the queue is legitimate and we have exclusive access... ** Make sure caller's message is within max message size for queue. */ if ( msglen > queue->msg_len ) { error = S_msgQLib_INVALID_MSG_LENGTH; } else {#ifdef DIAG_PRINTFS our_tcb = my_tcb(); if ( pri == MSG_PRI_URGENT ) printf( "\r\ntask @ %p urgent send to queue list @ %p", our_tcb, &(queue->first_susp) ); else printf( "\r\ntask @ %p send to queue list @ %p", our_tcb, &(queue->first_susp) );#endif /* ** See how many messages are already sent into the queue */ if ( queue->msg_count > queue->msgs_per_queue ) { /* ** Queue is full... if waiting on space is allowed, wait ** until space becomes available or the timeout expires. ** If space becomes available, send the caller's message. */ error = waitToSend( queue, msg, msglen, wait, pri ); } else { if ( queue->msg_count == queue->msgs_per_queue ) { if ( (queue->msgs_per_queue == 0) && (queue->first_susp != (v2pthread_cb_t *)NULL) ) { /* ** Special case... Send the new message. */ send_msg_to( queue, msg, msglen ); } else { if ( pri == MSG_PRI_URGENT ) { /* ** Stuff the new message onto the queue. */ urgent_msg_to( queue, msg, msglen ); /* ** Signal the condition variable for the queue */ pthread_cond_broadcast( &(queue->queue_send) ); } else /* ** Queue is full... if waiting on space is ** allowed, wait until space becomes available ** or the timeout expires. If space becomes ** available, send the caller's message. */ error = waitToSend( queue, msg, msglen, wait, pri ); } } else { if ( pri == MSG_PRI_URGENT ) { /* ** Stuff the new message onto the front of the queue. */ urgent_msg_to( queue, msg, msglen ); /* ** Signal the condition variable for the queue */ pthread_cond_broadcast( &(queue->queue_send) ); } else /* ** Send the new message to the back of the queue. */ send_msg_to( queue, msg, msglen ); } } } /* ** Unlock the queue mutex. */ pthread_mutex_unlock( &(queue->queue_lock) ); } else { error = S_objLib_OBJ_ID_ERROR; } /* ** Clean up the opening pthread_cleanup_push() */ pthread_cleanup_pop( 0 ); if ( error != OK ) { errno = (int)error; error = ERROR; } return( error );}/******************************************************************************* delete_mqueue - takes care of destroying the specified queue and freeing** any resources allocated for that queue*****************************************************************************/static void delete_mqueue( v2pt_mqueue_t *queue ){ /* ** First remove the queue from the queue list */ unlink_qcb( queue ); /* ** Next delete extent allocated for queue data. */ ts_free( (void *)queue->first_msg_in_queue ); /* ** Finally delete the queue control block itself; */ ts_free( (void *)queue );}/******************************************************************************* msgQDelete - removes the specified queue from the queue list and frees** the memory allocated for the queue control block and extents.*****************************************************************************/STATUS msgQDelete( v2pt_mqueue_t *queue ){ STATUS error; error = OK; /* ** First ensure that the specified queue exists and that we have ** exclusive access to it. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->queue_lock)); if ( queue_valid( queue ) ) { /* ** Okay... the queue is legitimate and we have exclusive access... ** Declare the send type */ queue->send_type = KILLD; /* ** Block while any tasks are still pended on the queue */ taskLock(); if ( (queue->first_susp != (v2pthread_cb_t *)NULL) || (queue->first_write_susp != (v2pthread_cb_t *)NULL) ) { /* ** Lock mutex for queue delete completion */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->qdlet_lock) ); pthread_mutex_lock( &(queue->qdlet_lock) ); /* ** Signal the condition variable for tasks waiting on ** messages in the queue
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -