📄 vqueue.c
字号:
/* ** Queue ID of next qcb matches... ** Select the qcb and then unlink it by linking ** the selected qcb's next qcb into the current qcb. */ selected_qcb = current_qcb->nxt_queue; current_qcb->nxt_queue = selected_qcb->nxt_queue;#ifdef DIAG_PRINTFS printf( "\r\ndel queue cb @ %p from list @ %p", selected_qcb, current_qcb );#endif break; } } } /* ** Re-enable access to the queue list by other threads. */ pthread_mutex_unlock( &vqueue_list_lock ); pthread_cleanup_pop( 0 ); } return( selected_qcb );}/******************************************************************************* urgent_msg_to - sends a message to the front of the specified queue*****************************************************************************/static void urgent_msg_to( p2pt_vqueue_t *queue, char *msg, ULONG msglen ){ ULONG i; char *element; /* ** It is assumed when we enter this function that the queue has space ** to accept the message about to be sent. ** Pre-decrement the queue_head (fetch) pointer, adjusting for ** possible wrap to the end of the queue; ** (Urgent messages are placed at the queue head so they will be the ** next message fetched from the queue - ahead of any ** previously-queued messages.) */ element = (char *)queue->queue_head; element -= queue->vmsg_len; queue->queue_head = (q_vmsg_t *)element; if ( queue->queue_head < queue->first_msg_in_queue ) { /* ** New queue_head pointer underflowed beginning of the extent... ** Wrap the queue_head pointer to the last message address ** in the extent allocated for the queue. */ queue->queue_head = queue->last_msg_in_queue; }#ifdef DIAG_PRINTFS printf( " new queue_head @ %p", queue->queue_head );#endif if ( msg != (char *)NULL ) { element = (char *)&((queue->queue_head)->msgbuf); for ( i = 0; i < msglen; i++ ) { *(element + i) = *(msg + i); } } (queue->queue_head)->msglen = msglen;#ifdef DIAG_PRINTFS printf( "\r\nsent urgent msg %p len %lx to queue_head @ %p", msg, msglen, queue->queue_head );#endif /* ** Increment the message counter for the queue */ queue->msg_count++;}/******************************************************************************* send_msg_to - sends the specified message to the tail of the specified queue*****************************************************************************/static void send_msg_to( p2pt_vqueue_t *queue, char *msg, ULONG msglen ){ ULONG i; char *element; /* ** It is assumed when we enter this function that the queue has space ** to accept the message about to be sent. Start by sending the ** message. */ if ( msg != (char *)NULL ) { element = (char *)&((queue->queue_tail)->msgbuf); for ( i = 0; i < msglen; i++ ) { *(element + i) = *(msg + i); } } (queue->queue_tail)->msglen = msglen;#ifdef DIAG_PRINTFS printf( "\r\nsent msg %p len %lx to queue_tail @ %p", msg, msglen, queue->queue_tail );#endif /* ** Now increment the queue_tail (send) pointer, adjusting for ** possible wrap to the beginning of the queue. */ element = (char *)queue->queue_tail; element += queue->vmsg_len; queue->queue_tail = (q_vmsg_t *)element; if ( queue->queue_tail > queue->last_msg_in_queue ) { /* ** Wrap the queue_tail pointer to the first message address ** in the queue. */ queue->queue_tail = queue->first_msg_in_queue; }#ifdef DIAG_PRINTFS printf( " new queue_tail @ %p", queue->queue_tail );#endif /* ** Increment the message counter for the queue */ queue->msg_count++; /* ** Signal the condition variable for the queue */ pthread_cond_broadcast( &(queue->queue_send) );}/******************************************************************************* fetch_msg_from - fetches the next message from the specified queue*****************************************************************************/static void fetch_msg_from( p2pt_vqueue_t *queue, char *msg, ULONG *msglen ){ char *element; int i; /* ** It is assumed when we enter this function that the queue contains ** one or more messages to be fetched. ** Fetch the message from the queue_head message location. */ if ( msg != (char *)NULL ) { element = (char *)&((queue->queue_head)->msgbuf); for ( i = 0; i < (queue->queue_head)->msglen; i++ ) { *(msg + i) = *(element + i); } } if ( msglen != (ULONG *)NULL ) *msglen = (queue->queue_head)->msglen;#ifdef DIAG_PRINTFS printf( "\r\nfetched msg of len %lx from queue_head @ %p", (queue->queue_head)->msglen, queue->queue_head );#endif if ( queue->send_type == BCAST ) queue->bcst_tasks_awakened++; /* ** For normally sent messages, we will clear the message immediately... ** however, for queue_broadcast messages we will only clear the ** message after the last pended task has been awakened. */ if ( (queue->send_type == SEND) || (queue->first_susp == (p2pthread_cb_t *)NULL) ) { element = (char *)&((queue->queue_head)->msgbuf); *element = (char)NULL; (queue->queue_head)->msglen = 0L; /* ** Now increment the queue_head (send) pointer, adjusting for ** possible wrap to the beginning of the queue. */ element = (char *)queue->queue_head; element += queue->vmsg_len; queue->queue_head = (q_vmsg_t *)element; if ( queue->queue_head > queue->last_msg_in_queue ) { /* ** New queue_head pointer overflowed end of queue... ** Wrap the queue_head pointer to the first message address ** in the queue. */ queue->queue_head = queue->first_msg_in_queue; }#ifdef DIAG_PRINTFS printf( " new queue_head @ %p", queue->queue_head );#endif /* ** Decrement the message counter for the queue */ queue->msg_count--; /* ** If the message just fetched was a broadcast message, then ** this was the last task pending on the queue... Signal the ** broadcast-complete condition variable. */ if ( queue->send_type != SEND ) { /* ** Lock mutex for queue broadcast completion */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->qbcst_lock) ); pthread_mutex_lock( &(queue->qbcst_lock) ); /* ** Signal the broadcast-complete condition variable for the queue */ pthread_cond_broadcast( &(queue->qbcst_cmplt) ); queue->send_type = SEND; /* ** Unlock the queue broadcast completion mutex. */ pthread_mutex_unlock( &(queue->qbcst_lock) ); pthread_cleanup_pop( 0 ); } }}/******************************************************************************* data_extent_for - allocates space for queue data. Data is allocated in** a block large enough to hold (qsize + 1) messages.*****************************************************************************/static q_vmsg_t * data_extent_for( p2pt_vqueue_t *queue ){ char *new_extent; char *last_msg; size_t alloc_size; /* ** Calculate the number of bytes of memory needed for this extent. ** Start by calculating the size of each element of the extent array. ** Each (q_vmsg_t) element will contain a ULONG byte length followed ** by a character array of queue->msg_len bytes. First get the size ** of the q_vmsg_t 'header' excluding the start of the data array. ** Then add the size of the maximum-length message data. */ queue->vmsg_len = sizeof( q_vmsg_t ) - sizeof( char * ); queue->vmsg_len += (sizeof( char ) * queue->msg_len); /* ** The size of each array element is now known... ** Multiply it by the number of elements to get allocation size. */ alloc_size = queue->vmsg_len * (queue->msgs_per_queue + 1); /* ** Now allocate a block of memory to contain the extent. */ if ( (new_extent = (char *)ts_malloc( alloc_size )) != (char *)NULL ) { /* ** Clear the memory block. Note that this creates a NULL pointer ** for the nxt_extent link as well as zeroing the message array. */ bzero( (void *)new_extent, (int)alloc_size ); /* ** Link new data extent into the queue control block */ last_msg = new_extent + (queue->vmsg_len * queue->msgs_per_queue); queue->first_msg_in_queue = (q_vmsg_t *)new_extent; queue->last_msg_in_queue = (q_vmsg_t *)last_msg; }#ifdef DIAG_PRINTFS printf( "\r\nnew extent @ %p for queue @ %p vmsg_len %x", new_extent, queue, queue->vmsg_len );#endif return( (q_vmsg_t *)new_extent );}/******************************************************************************* q_vcreate - creates a p2pthread message queue*****************************************************************************/ULONG q_vcreate( char name[4], ULONG opt, ULONG qsize, ULONG msglen, ULONG *qid ){ p2pt_vqueue_t *queue; ULONG error; int i; error = ERR_NO_ERROR; /* ** First allocate memory for the queue control block */ queue = (p2pt_vqueue_t *)ts_malloc( sizeof( p2pt_vqueue_t ) ); if ( queue != (p2pt_vqueue_t *)NULL ) { /* ** Ok... got a control block. */ /* ** Total messages in memory allocation block (extent) ** (Extent has one extra for urgent message.) */ queue->msgs_per_queue = qsize; /* ** Maximum size of messages sent to queue */ queue->msg_len = msglen; /* ** Option Flags for queue */ queue->flags = opt; /* ** Now allocate memory for the first queue data extent. */ if ( data_extent_for( queue ) != (q_vmsg_t *)NULL ) { /* ** Got both a control block and a data extent... ** Initialize the control block. */ /* ** ID for queue */ queue->qid = new_vqid(); if ( qid != (ULONG *)NULL ) *qid = queue->qid; /* ** Name for queue */ for ( i = 0; i < 4; i++ ) queue->qname[i] = name[i]; /* ** Mutex and Condition variable for queue send/pend */ pthread_mutex_init( &(queue->queue_lock), (pthread_mutexattr_t *)NULL ); pthread_cond_init( &(queue->queue_send), (pthread_condattr_t *)NULL ); /* ** Mutex and Condition variable for queue broadcast/delete */ pthread_mutex_init( &(queue->qbcst_lock), (pthread_mutexattr_t *)NULL ); pthread_cond_init( &(queue->qbcst_cmplt), (pthread_condattr_t *)NULL ); /* ** Pointer to next message pointer to be fetched from queue */ queue->queue_head = queue->first_msg_in_queue; /* ** Pointer to last message pointer sent to queue */ queue->queue_tail = queue->first_msg_in_queue; /* ** Type of send operation last performed on queue */ queue->send_type = SEND; /* ** First task control block in list of tasks waiting on queue */ queue->first_susp = (p2pthread_cb_t *)NULL; /* ** Count of tasks awakened by q_broadcast call */ queue->bcst_tasks_awakened = 0; /* ** Total number of messages currently sent to queue */ queue->msg_count = 0; /* ** Task pend order (FIFO or Priority) for queue */ if ( opt & Q_PRIOR ) queue->order = 0; else queue->order = 1; /* ** If no errors thus far, we have a new queue ready to link into ** the queue list.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -