📄 queue.c
字号:
if ( queue_list != (p2pt_queue_t *)NULL ) { /* ** One or more queues exist in the queue list... ** Protect the queue list while we examine and modify it. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&queue_list_lock ); pthread_mutex_lock( &queue_list_lock ); /* ** Scan the queue list for a qcb with a matching queue ID */ if ( queue_list->qid == qid ) { /* ** The first queue in the list matches the selected queue ID */ selected_qcb = queue_list; queue_list = selected_qcb->nxt_queue;#ifdef DIAG_PRINTFS printf( "\r\ndel queue cb @ %p from list @ %p", selected_qcb, &queue_list );#endif } else { /* ** Scan the next qcb for a matching qid while retaining a ** pointer to the current qcb. If the next qcb matches, ** select it and then unlink it from the queue list. */ for ( current_qcb = queue_list; current_qcb->nxt_queue != (p2pt_queue_t *)NULL; current_qcb = current_qcb->nxt_queue ) { if ( (current_qcb->nxt_queue)->qid == qid ) { /* ** Queue ID of next qcb matches... ** Select the qcb and then unlink it by linking ** the selected qcb's next qcb into the current qcb. */ selected_qcb = current_qcb->nxt_queue; current_qcb->nxt_queue = selected_qcb->nxt_queue;#ifdef DIAG_PRINTFS printf( "\r\ndel queue cb @ %p from list @ %p", selected_qcb, current_qcb );#endif break; } } } /* ** Re-enable access to the queue list by other threads. */ pthread_mutex_unlock( &queue_list_lock ); pthread_cleanup_pop( 0 ); } return( selected_qcb );}/******************************************************************************* urgent_msg_to - sends a message to the front of the specified queue*****************************************************************************/static void urgent_msg_to( p2pt_queue_t *queue, q_msg_t msg ){ q_extent_t *cur_extent; q_extent_t *prv_extent; q_msg_t *first_msg_in_extent; q_msg_t *last_msg_in_extent; q_msg_t *last_msg_in_prv_extent; int i, max_msg; /* ** It is assumed when we enter this function that the queue has space ** to accept the message about to be sent. ** Locate the extent containing the queue_head (Most of the time ** there will be only one extent) and establish the range of ** valid message pointers in that extent. */ prv_extent = (q_extent_t *)NULL; last_msg_in_prv_extent = (q_msg_t *)NULL; max_msg = queue->msgs_per_extent - 1; cur_extent = queue->first_extent; first_msg_in_extent = &(cur_extent->msgs[0]); last_msg_in_extent = &(cur_extent->msgs[max_msg + 1]); if ( (queue->queue_head < first_msg_in_extent) || (queue->queue_head > last_msg_in_extent) ) { /* ** queue_head is not in the first extent... find the right extent. */ prv_extent = cur_extent; last_msg_in_prv_extent = last_msg_in_extent; for ( cur_extent = (q_extent_t *)(queue->first_extent)->nxt_extent; cur_extent != (q_extent_t *)NULL; cur_extent = (q_extent_t *)(cur_extent->nxt_extent) ) { first_msg_in_extent = &(cur_extent->msgs[0]); last_msg_in_extent = &(cur_extent->msgs[max_msg]); if ( (queue->queue_head >= first_msg_in_extent) && (queue->queue_head <= last_msg_in_extent) ) break; prv_extent = cur_extent; last_msg_in_prv_extent = last_msg_in_extent; } } /* ** Found the extent containing the queue_head. ** Now decrement the queue_head (fetch) pointer, adjusting for ** possible wrap either to the end of the previous extent or to ** the end of the last extent. (Urgent messages are placed at ** the queue head so they will be the next message fetched from ** the queue - ahead of any previously-queued messages.) */ queue->queue_head--; if ( queue->queue_head < first_msg_in_extent ) { /* ** New queue_head pointer underflowed beginning of current extent... ** see if there's another extent preceding the current one. */ if ( prv_extent != (q_extent_t *)NULL ) { /* ** Another extent precedes this one in the extent list... ** Wrap the queue_head pointer to the last message address ** in the previous extent. */ queue->queue_head = last_msg_in_prv_extent; } else { /* ** The current extent was the first (or only) extent in the list... ** Wrap the queue_head pointer to the last message address ** in the last extent allocated for the queue. */ queue->queue_head = queue->last_msg_in_queue; } }#ifdef DIAG_PRINTFS printf( " new queue_head @ %p", queue->queue_head );#endif for ( i = 0; i < 4; i++ ) (*(queue->queue_head))[i] = msg[i];#ifdef DIAG_PRINTFS printf( "\r\nsent urgent msg %p to queue_head @ %p", msg, queue->queue_head );#endif /* ** Increment the message counter for the queue */ queue->msg_count++;}/******************************************************************************* send_msg_to - sends the specified message to the tail of the specified queue*****************************************************************************/static void send_msg_to( p2pt_queue_t *queue, q_msg_t msg ){ q_extent_t *cur_extent; q_msg_t *first_msg_in_extent; q_msg_t *last_msg_in_extent; int i, max_msg; /* ** It is assumed when we enter this function that the queue has space ** to accept the message about to be sent. Start by sending the ** message. */ for ( i = 0; i < 4; i++ ) (*(queue->queue_tail))[i] = msg[i];#ifdef DIAG_PRINTFS printf( "\r\nsent msg %lx%lx%lx%lx to queue_tail @ %p", msg[0], msg[1], msg[2], msg[3], queue->queue_tail );#endif /* ** Locate the extent containing the queue_tail just sent into. ** (Most of the time there will be only one extent.) ** Establish the range of valid message pointers in the current extent. */ max_msg = queue->msgs_per_extent - 1; cur_extent = queue->first_extent; first_msg_in_extent = &(cur_extent->msgs[0]); last_msg_in_extent = &(cur_extent->msgs[max_msg + 1]); if ( (queue->queue_tail < first_msg_in_extent) || (queue->queue_tail > last_msg_in_extent) ) { /* ** queue_tail is not in the first extent... find the right extent. */ for ( cur_extent = (q_extent_t *)(queue->first_extent)->nxt_extent; cur_extent != (q_extent_t *)NULL; cur_extent = (q_extent_t *)(cur_extent->nxt_extent) ) { first_msg_in_extent = &(cur_extent->msgs[0]); last_msg_in_extent = &(cur_extent->msgs[max_msg]); if ( (queue->queue_tail >= first_msg_in_extent) && (queue->queue_tail <= last_msg_in_extent) ) break; } } /* ** Found the extent containing the queue_tail just sent into. ** Now increment the queue_tail (send) pointer, adjusting for ** possible wrap either to the beginning of the next extent or to ** the beginning of the first extent. */ queue->queue_tail++; if ( queue->queue_tail > last_msg_in_extent ) { /* ** New queue_tail pointer overflowed end of current extent... ** see if there's another extent following the current one. */ if ( cur_extent->nxt_extent != (void *)NULL ) { /* ** Another extent follows in the extent list... ** Wrap the queue_tail pointer to the first message address ** in the next extent. */ cur_extent = (q_extent_t *)(cur_extent->nxt_extent); } else { /* ** The current extent was the last (or only) extent in the list... ** Wrap the queue_tail pointer to the first message address ** in the first extent. */ cur_extent = queue->first_extent; } queue->queue_tail = &(cur_extent->msgs[0]); }#ifdef DIAG_PRINTFS printf( " new queue_tail @ %p", queue->queue_tail );#endif /* ** Increment the message counter for the queue */ queue->msg_count++; /* ** Signal the condition variable for the queue */ pthread_cond_broadcast( &(queue->queue_send) );}/******************************************************************************* fetch_msg_from - fetches the next message from the specified queue*****************************************************************************/static void fetch_msg_from( p2pt_queue_t *queue, q_msg_t msg ){ q_extent_t *cur_extent; q_msg_t *first_msg_in_extent; q_msg_t *last_msg_in_extent; int i, max_msg; /* ** It is assumed when we enter this function that the queue contains ** one or more messages to be fetched. ** Fetch the message from the queue_head message location. */ if ( msg != (ULONG *)NULL ) { for ( i = 0; i < 4; i++ ) msg[i] = (*(queue->queue_head))[i]; }#ifdef DIAG_PRINTFS printf( "\r\nfetched msg %lx%lx%lx%lx from queue_head @ %p", msg[0], msg[1], msg[2], msg[3], queue->queue_head );#endif if ( queue->send_type == BCAST ) queue->bcst_tasks_awakened++; /* ** For normally sent messages, we will clear the message immediately... ** however, for queue_broadcast messages we will only clear the ** message after the last pended task has been awakened. */ if ( (queue->send_type == SEND) || (queue->first_susp == (p2pthread_cb_t *)NULL) ) { for ( i = 0; i < 4; i++ ) (*(queue->queue_head))[i] = (ULONG)(NULL); /* ** Locate the extent containing the queue_head just fetched from. ** (Most of the time there will be only one extent.) ** Establish the range of valid message pointers in the extent. */ max_msg = queue->msgs_per_extent - 1; cur_extent = queue->first_extent; first_msg_in_extent = &(cur_extent->msgs[0]); last_msg_in_extent = &(cur_extent->msgs[max_msg + 1]); if ( (queue->queue_head < first_msg_in_extent) || (queue->queue_head > last_msg_in_extent) ) { /* ** queue_head is not in the first extent... find the right extent. */ for ( cur_extent = (q_extent_t *)(queue->first_extent)->nxt_extent; cur_extent != (q_extent_t *)NULL; cur_extent = (q_extent_t *)(cur_extent->nxt_extent) ) { first_msg_in_extent = &(cur_extent->msgs[0]); last_msg_in_extent = &(cur_extent->msgs[max_msg]); if ( (queue->queue_head >= first_msg_in_extent) && (queue->queue_head <= last_msg_in_extent) ) break; } } /* ** Found the extent containing the queue_head just sent into. ** Now increment the queue_head (send) pointer, adjusting for ** possible wrap either to the beginning of the next extent or to ** the beginning of the first extent. */ queue->queue_head++; if ( queue->queue_head > last_msg_in_extent ) { /* ** New queue_head pointer overflowed end of current extent... ** see if there's another extent following the current one. */ if ( cur_extent->nxt_extent != (void *)NULL ) { /* ** Another extent follows in the extent list... ** Wrap the queue_head pointer to the first message address ** in the next extent. */ cur_extent = (q_extent_t *)(cur_extent->nxt_extent); } else { /* ** The current extent was the last (or only) one in the list... ** Wrap the queue_head pointer to the first message address ** in the first extent. */ cur_extent = queue->first_extent; } queue->queue_head = &(cur_extent->msgs[0]); }#ifdef DIAG_PRINTFS printf( " new queue_head @ %p", queue->queue_head );#endif /* ** Decrement the message counter for the queue */ queue->msg_count--; /* ** If the message just fetched was a broadcast message, then ** this was the last task pending on the queue... Signal the ** broadcast-complete condition variable. */ if ( queue->send_type != SEND ) { /* ** Lock mutex for queue broadcast completion */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->qbcst_lock) ); pthread_mutex_lock( &(queue->qbcst_lock) );
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -