📄 lmsgqlib.c
字号:
** Indicate type of send operation last performed on queue */ queue->send_type = URGNT;}/******************************************************************************* send_msg_to - sends the specified message to the tail of the specified queue*****************************************************************************/static void send_msg_to( v2pt_mqueue_t *queue, char *msg, uint msglen ){ uint i; char *element; /* ** It is assumed when we enter this function that the queue has space ** to accept the message about to be sent. Start by sending the ** message. */ if ( msg != (char *)NULL ) { element = (char *)&((queue->queue_tail)->msgbuf); for ( i = 0; i < msglen; i++ ) { *(element + i) = *(msg + i); } } (queue->queue_tail)->msglen = msglen;#ifdef DIAG_PRINTFS printf( "\r\nsent msg %p len %x to queue_tail @ %p", msg, msglen, queue->queue_tail );#endif /* ** Now increment the queue_tail (send) pointer, adjusting for ** possible wrap to the beginning of the queue. */ element = (char *)queue->queue_tail; element += queue->vmsg_len; queue->queue_tail = (q_msg_t *)element; if ( queue->queue_tail > queue->last_msg_in_queue ) { /* ** Wrap the queue_tail pointer to the first message address ** in the queue. */ queue->queue_tail = queue->first_msg_in_queue; }#ifdef DIAG_PRINTFS printf( " new queue_tail @ %p", queue->queue_tail );#endif /* ** Increment the message counter for the queue */ queue->msg_count++; /* ** Indicate type of send operation last performed on queue */ queue->send_type = SEND; /* ** Signal the condition variable for the queue */ pthread_cond_broadcast( &(queue->queue_send) );}/******************************************************************************* notify_if_delete_complete - indicates if all tasks waiting on specified** queue have successfully been awakened. *****************************************************************************/static void notify_if_delete_complete( v2pt_mqueue_t *queue ){ /* ** All tasks pending on the specified queue are being awakened... ** If the calling task was the last task pending on the queue, ** signal the deletion-complete condition variable. */ if ( (queue->first_susp == (v2pthread_cb_t *)NULL) && (queue->first_write_susp == (v2pthread_cb_t *)NULL) ) { /* ** Lock mutex for queue delete completion */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->qdlet_lock) ); pthread_mutex_lock( &(queue->qdlet_lock) ); /* ** Signal the deletion-complete condition variable for the queue */ pthread_cond_broadcast( &(queue->qdlet_cmplt) ); /* ** Unlock the queue delete completion mutex. */ pthread_mutex_unlock( &(queue->qdlet_lock) ); pthread_cleanup_pop( 0 ); }}/******************************************************************************* fetch_msg_from - fetches the next message from the specified queue*****************************************************************************/static uint fetch_msg_from( v2pt_mqueue_t *queue, char *msg ){ char *element; uint i; uint msglen; /* ** It is assumed when we enter this function that the queue contains ** one or more messages to be fetched. ** Fetch the message from the queue_head message location. */ if ( msg != (char *)NULL ) { element = (char *)&((queue->queue_head)->msgbuf); msglen = (queue->queue_head)->msglen; for ( i = 0; i < msglen; i++ ) { *(msg + i) = *(element + i); } } else msglen = 0;#ifdef DIAG_PRINTFS printf( "\r\nfetched msg of len %x from queue_head @ %p", msglen, queue->queue_head );#endif /* ** Clear the message from the queue */ element = (char *)&((queue->queue_head)->msgbuf); *element = (char)NULL; (queue->queue_head)->msglen = 0; /* ** Now increment the queue_head (send) pointer, adjusting for ** possible wrap to the beginning of the queue. */ element = (char *)queue->queue_head; element += queue->vmsg_len; queue->queue_head = (q_msg_t *)element; if ( queue->queue_head > queue->last_msg_in_queue ) { /* ** New queue_head pointer overflowed end of queue... ** Wrap the queue_head pointer to the first message address ** in the queue. */ queue->queue_head = queue->first_msg_in_queue; }#ifdef DIAG_PRINTFS printf( " new queue_head @ %p", queue->queue_head );#endif /* ** Decrement the message counter for the queue */ queue->msg_count--; /* ** Now see if adequate space was freed in the queue and alert any tasks ** waiting for message space if adequate space now exists. */ if ( queue->first_write_susp != (v2pthread_cb_t *)NULL ) { if ( queue->msg_count <= (queue->msgs_per_queue - 1) ) {#ifdef DIAG_PRINTFS printf( "\r\nqueue @ %p freed msg space for queue list @ %p", queue, &(queue->first_write_susp) );#endif /* ** Lock mutex for queue space */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->qfull_lock)); pthread_mutex_lock( &(queue->qfull_lock) ); /* ** Alert the waiting tasks that message space is available. */ pthread_cond_broadcast( &(queue->queue_space) ); /* ** Unlock the queue space mutex. */ pthread_cleanup_pop( 1 ); } } return( msglen );}/******************************************************************************* data_extent_for - allocates space for queue data. Data is allocated in** a block large enough to hold (max_msgs + 1) messages.*****************************************************************************/static q_msg_t * data_extent_for( v2pt_mqueue_t *queue ){ char *new_extent; char *last_msg; size_t alloc_size; /* ** Calculate the number of bytes of memory needed for this extent. ** Start by calculating the size of each element of the extent array. ** Each (q_msg_t) element will contain an unsigned int byte length followed ** by a character array of queue->msg_len bytes. First get the size ** of the q_msg_t 'header' excluding the start of the data array. ** Then add the size of the maximum-length message data. */ queue->vmsg_len = sizeof( q_msg_t ) - sizeof( char * ); queue->vmsg_len += (sizeof( char ) * queue->msg_len); /* ** The size of each array element is now known... ** Multiply it by the number of elements to get allocation size. */ alloc_size = queue->vmsg_len * (queue->msgs_per_queue + 1); /* ** Now allocate a block of memory to contain the extent. */ if ( (new_extent = (char *)ts_malloc( alloc_size )) != (char *)NULL ) { /* ** Clear the memory block. Note that this creates a NULL pointer ** for the nxt_extent link as well as zeroing the message array. */ bzero( (void *)new_extent, (int)alloc_size ); /* ** Link new data extent into the queue control block */ last_msg = new_extent + (queue->vmsg_len * queue->msgs_per_queue); queue->first_msg_in_queue = (q_msg_t *)new_extent; queue->last_msg_in_queue = (q_msg_t *)last_msg; }#ifdef DIAG_PRINTFS printf( "\r\nnew extent @ %p for queue @ %p vmsg_len %x", new_extent, queue, queue->vmsg_len );#endif return( (q_msg_t *)new_extent );}/******************************************************************************* msgQCreate - creates a v2pthread message queue*****************************************************************************/v2pt_mqueue_t * msgQCreate( int max_msgs, uint msglen, int opt ){ v2pt_mqueue_t *queue; STATUS error; error = OK; /* ** First allocate memory for the queue control block */ queue = (v2pt_mqueue_t *)ts_malloc( sizeof( v2pt_mqueue_t ) ); if ( queue != (v2pt_mqueue_t *)NULL ) { /* ** Ok... got a control block. */ /* ** Total messages in memory allocation block (extent) ** (Extent has one extra for urgent message.) */ queue->msgs_per_queue = max_msgs; /* ** Maximum size of messages sent to queue */ queue->msg_len = msglen; /* ** Now allocate memory for the first queue data extent. */ if ( data_extent_for( queue ) != (q_msg_t *)NULL ) { /* ** Got both a control block and a data extent... ** Initialize the control block. */ /* ** Mutex and Condition variable for queue send/pend */ pthread_mutex_init( &(queue->queue_lock), (pthread_mutexattr_t *)NULL ); pthread_cond_init( &(queue->queue_send), (pthread_condattr_t *)NULL ); /* ** Mutex and Condition variable for queue delete */ pthread_mutex_init( &(queue->qdlet_lock), (pthread_mutexattr_t *)NULL ); pthread_cond_init( &(queue->qdlet_cmplt), (pthread_condattr_t *)NULL ); /* ** Mutex and Condition variable for queue-full pend */ pthread_mutex_init( &(queue->qfull_lock), (pthread_mutexattr_t *)NULL ); pthread_cond_init( &(queue->queue_space), (pthread_condattr_t *)NULL ); /* ** Pointer to next message pointer to be fetched from queue */ queue->queue_head = queue->first_msg_in_queue; /* ** Pointer to last message pointer sent to queue */ queue->queue_tail = queue->first_msg_in_queue; /* ** Type of send operation last performed on queue */ queue->send_type = SEND; /* ** First task control block in list of tasks waiting to receive ** a message from queue */ queue->first_susp = (v2pthread_cb_t *)NULL; /* ** First task control block in list of tasks waiting for space to ** post messages to queue */ queue->first_write_susp = (v2pthread_cb_t *)NULL; /* ** Total number of messages currently sent to queue */ queue->msg_count = 0; /* ** Task pend order (FIFO or Priority) for queue */ if ( opt & MSG_Q_PRIORITY ) queue->order = 1; else queue->order = 0; /* ** If no errors thus far, we have a new queue ready to link into ** the queue list. */ if ( error == OK ) { link_qcb( queue ); } else { /* ** Oops! Problem somewhere above. Release control block ** and data memory and return. */ ts_free( (void *)queue->first_msg_in_queue ); ts_free( (void *)queue ); } } else { /* ** No memory for queue data... free queue control block & return */ ts_free( (void *)queue ); error = S_memLib_NOT_ENOUGH_MEMORY; } } else { error = S_memLib_NOT_ENOUGH_MEMORY; } if ( error != OK ) { errno = (int)error; queue = (v2pt_mqueue_t *)NULL; } return( queue );}/******************************************************************************* waiting_on_q_space - returns a nonzero result unless a qualifying event** occurs on the specified queue which should cause the** pended task to be awakened. The qualifying events** are:** (1) message space is freed in the queue and the ** current task is selected to receive it** (2) the queue is deleted*****************************************************************************/static int waiting_on_q_space( v2pt_mqueue_t *queue, struct timespec *timeout, int *retcode ){ int result; struct timeval now; ulong usec; if ( queue->send_type & KILLD ) { /* ** Queue has been killed... waiting is over. */ result = 0; *retcode = 0; } else { /* ** Queue still in service... check for message space availability. ** Initially assume no message space available for our task */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -