📄 queue.c
字号:
/* ** Signal the broadcast-complete condition variable for the queue */ pthread_cond_broadcast( &(queue->queue_bcplt) ); queue->send_type = SEND; /* ** Unlock the queue broadcast completion mutex. */ pthread_mutex_unlock( &(queue->qbcst_lock) ); pthread_cleanup_pop( 0 ); } }}/******************************************************************************* new_extent_for - allocates space for queue data. Data is allocated in** blocks large enough to hold (qsize) messages plus** the extent link pointer. If a queue needs more room,** additional extents of the same size are added.*****************************************************************************/static q_extent_t * new_extent_for( p2pt_queue_t *queue, ULONG qsize ){ q_extent_t *new_extent; q_extent_t *nxt_extent; size_t block_size; /* ** Calculate the number of bytes of memory needed for this extent. ** Start with space required for an array of qsize messages. ** Then add the size of the link to the next extent plus one extra ** message for urgent queue sends. */ block_size = sizeof( q_msg_t ) * qsize; /* ** The q_extent_t contains a void pointer to the next extent in the ** queue followed by a q_msg_t for the urgent message space. Any data ** alignment spaces in the header will be taken into account here, too. */ block_size += sizeof( q_extent_t ); /* ** Now allocate a block of memory to contain the extent. */ if ( (new_extent = (q_extent_t *)ts_malloc( block_size )) != (q_extent_t *)NULL ) { /* ** Clear the memory block. Note that this creates a NULL pointer ** for the nxt_extent link as well as zeroing the message array. */ bzero( (void *)new_extent, (int)block_size ); /* ** Increment total number of extents currently allocated for queue */ queue->total_extents++; /* ** Find the last extent in the extent list */ if ( queue->first_extent == (q_extent_t *)NULL ) { /* Extent list is empty... new_extent is first_extent */ queue->first_extent = new_extent; } else { /* Find end of extent list and append new_extent onto it */ for ( nxt_extent = queue->first_extent; nxt_extent->nxt_extent != (q_extent_t *)NULL; nxt_extent = (q_extent_t *)(nxt_extent->nxt_extent) ); nxt_extent->nxt_extent = new_extent; } queue->last_msg_in_queue = &(new_extent->msgs[qsize]); }#ifdef DIAG_PRINTFS printf( "\r\nnew extent @ %p for queue @ %p", new_extent, queue );#endif return( new_extent );}/******************************************************************************* q_create - creates a p2pthread message queue*****************************************************************************/ULONG q_create( char name[4], ULONG qsize, ULONG opt, ULONG *qid ){ p2pt_queue_t *queue; ULONG error; int i; error = ERR_NO_ERROR; /* ** First allocate memory for the queue control block */ queue = (p2pt_queue_t *)ts_malloc( sizeof( p2pt_queue_t ) ); if ( queue != (p2pt_queue_t *)NULL ) { /* ** Ok... got a control block. ** Now allocate memory for the first queue data extent. */ /* ** Option Flags for queue */ queue->flags = opt; queue->total_extents = 0; if ( new_extent_for( queue, qsize ) != (q_extent_t *)NULL ) { /* ** Got both a control block and a data extent... ** Initialize the control block. */ /* ** ID for queue */ queue->qid = new_qid(); if ( qid != (ULONG *)NULL ) *qid = queue->qid; /* ** Name for queue */ for ( i = 0; i < 4; i++ ) queue->qname[i] = name[i]; /* ** Mutex and Condition variable for queue send/pend */ pthread_mutex_init( &(queue->queue_lock), (pthread_mutexattr_t *)NULL ); pthread_cond_init( &(queue->queue_send), (pthread_condattr_t *)NULL ); /* ** Mutex and Condition variable for queue broadcast/delete */ pthread_mutex_init( &(queue->qbcst_lock), (pthread_mutexattr_t *)NULL ); pthread_cond_init( &(queue->queue_bcplt), (pthread_condattr_t *)NULL ); if ( qsize > 0 ) { /* ** Pointer to next message pointer to be fetched from queue */ queue->queue_head = &(queue->first_extent->msgs[1]); /* ** Pointer to last message pointer sent to queue */ queue->queue_tail = &(queue->first_extent->msgs[1]); } else { /* ** Pointer to next message pointer to be fetched from queue */ queue->queue_head = &(queue->first_extent->msgs[0]); /* ** Pointer to last message pointer sent to queue */ queue->queue_tail = &(queue->first_extent->msgs[0]); } /* ** Type of send operation last performed on queue */ queue->send_type = SEND; /* ** First task control block in list of tasks waiting on queue */ queue->first_susp = (p2pthread_cb_t *)NULL; /* ** Count of tasks awakened by q_broadcast call */ queue->bcst_tasks_awakened = 0; /* ** Total messages per memory allocation block (extent) ** (First extent has one extra for urgent message.) */ queue->msgs_per_extent = qsize; /* ** Total number of messages currently sent to queue */ queue->msg_count = 0; /* ** If no errors thus far, we have a new queue ready to link into ** the queue list. */ if ( error == ERR_NO_ERROR ) { link_qcb( queue ); } else { /* ** Oops! Problem somewhere above. Release control block ** and data memory and return. */ ts_free( (void *)queue->first_extent ); ts_free( (void *)queue ); } } else { /* ** No memory for queue data... free queue control block & return */ ts_free( (void *)queue ); error = ERR_NOMGB; } } else { error = ERR_NOQCB; } return( error );}/******************************************************************************* q_urgent - sends a message to the front of a p2pthread queue and awakens the** first selected task waiting on the queue.*****************************************************************************/ULONG q_urgent( ULONG qid, q_msg_t msg ){#ifdef DIAG_PRINTFS p2pthread_cb_t *our_tcb;#endif p2pt_queue_t *queue; q_extent_t *new_extent; ULONG error; int qsize; error = ERR_NO_ERROR; if ( (queue = qcb_for( qid )) != (p2pt_queue_t *)NULL ) {#ifdef DIAG_PRINTFS our_tcb = my_tcb(); printf( "\r\ntask @ %p urgent send to queue list @ %p", our_tcb, &(queue->first_susp) );#endif /* ** 'Lock the p2pthread scheduler' to defer any context switch to a ** higher priority task until after this call has completed its work. */ sched_lock(); /* ** Lock mutex for queue send */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->queue_lock)); pthread_mutex_lock( &(queue->queue_lock) ); /* ** See how many messages are already sent into the queue */ if ( queue->msg_count > (queue->total_extents * queue->msgs_per_extent) ) { /* ** Queue is already full... try to add another extent. ** (First extent has msgs_per_extent + 1 messages... ** any additional extents have only msgs_per_extent. Extent ** header contains one message, so decrement msg count by one ** before allocating space for extent.) */ qsize = queue->msgs_per_extent - 1; if ( (!(queue->flags & Q_LIMIT)) && ((new_extent = new_extent_for( queue, qsize )) != (q_extent_t *)NULL) ) { /* ** The queue_tail would have wrapped to the head of ** the first extent... change it to the first message ** in the newly-added extent. */ queue->queue_tail = &(new_extent->msgs[0]); /* ** Stuff the new message onto the front of the queue. */ urgent_msg_to( queue, msg ); /* ** Signal the condition variable for the queue */ pthread_cond_broadcast( &(queue->queue_send) ); } else /* ** No memory for more extents... return QUEUE FULL error */ error = ERR_QFULL; } else { /* ** Stuff the new message onto the front of the queue. */ urgent_msg_to( queue, msg ); /* ** Signal the condition variable for the queue */ pthread_cond_broadcast( &(queue->queue_send) ); } /* ** Unlock the queue mutex. */ pthread_mutex_unlock( &(queue->queue_lock) ); pthread_cleanup_pop( 0 ); /* ** 'Unlock the p2pthread scheduler' to enable a possible context switch ** to a task made runnable by this call. */ sched_unlock(); } else { error = ERR_OBJDEL; } return( error );}/******************************************************************************* q_send - posts a message to the tail of a p2pthread queue and awakens the** first selected task waiting on the queue.*****************************************************************************/ULONG q_send( ULONG qid, q_msg_t msg ){#ifdef DIAG_PRINTFS p2pthread_cb_t *our_tcb;#endif p2pt_queue_t *queue; q_extent_t *new_extent; int qsize; ULONG error; error = ERR_NO_ERROR; if ( (queue = qcb_for( qid )) != (p2pt_queue_t *)NULL ) {#ifdef DIAG_PRINTFS our_tcb = my_tcb(); printf( "\r\ntask @ %p send to queue list @ %p", our_tcb, &(queue->first_susp) );#endif /* ** 'Lock the p2pthread scheduler' to defer any context switch to a ** higher priority task until after this call has completed its work. */ sched_lock(); /* ** Lock mutex for queue send. Note: since i call pthread_cleanup_push()
** before pthread_mutex_lock in every thread, so even though i call
** sched_lock() above and get the scheduler controller, the queue->queue_lock
** has been unlocked by other thread. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->queue_lock));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -