📄 queue.c
字号:
pthread_mutex_lock( &(queue->queue_lock) ); /* ** See how many messages are already sent into the queue */ if ( queue->msg_count > (queue->total_extents * queue->msgs_per_extent) ) { /* ** Queue is already full... try to add another extent. ** (First extent has msgs_per_extent + 1 messages... ** any additional extents have only msgs_per_extent. Extent ** header contains one message, so decrement msg count by one ** before allocating space for extent.) */ qsize = queue->msgs_per_extent - 1; if ( (!(queue->flags & Q_LIMIT)) && ((new_extent = new_extent_for( queue, qsize )) != (q_extent_t *)NULL) ) { /* ** The queue_tail would have wrapped to the head of ** the first extent... change it to the first message ** in the newly-added extent. */ queue->queue_tail = &(new_extent->msgs[0]); /* ** Send the new message. */ send_msg_to( queue, msg ); } else /* ** No memory for more extents... return QUEUE FULL error */ error = ERR_QFULL; } else { /* ** Message count is equal or below maximum... */ if ( queue->msg_count == (queue->total_extents * queue->msgs_per_extent) ) { /* ** Queue is already full... try to add another extent. */ qsize = queue->msgs_per_extent - 1; if ( !(queue->flags & Q_LIMIT) ) { /* ** Try to add another extent. */ if ( (new_extent = new_extent_for( queue, qsize )) != (q_extent_t *)NULL ) { /* ** The queue_tail would have wrapped to the head of ** the first extent... change it to the first message ** in the newly-added extent. */ queue->queue_tail = &(new_extent->msgs[0]); /* ** Send the new message. */ send_msg_to( queue, msg ); } else /* ** No memory for more extents... return QUEUE FULL */ error = ERR_QFULL; } else { if ( (queue->msgs_per_extent == 0) && (queue->first_susp != (p2pthread_cb_t *)NULL) ) { /* ** Special case... This case represents the zero size
** queue, send the new message. */ send_msg_to( queue, msg ); } else { /* ** No more extents allowed... return QUEUE FULL error */ error = ERR_QFULL; } } } else { /* ** Message count below maximum... Send the new message. */ send_msg_to( queue, msg ); } } /* ** Unlock the queue mutex. */ pthread_mutex_unlock( &(queue->queue_lock) ); pthread_cleanup_pop( 0 ); /* ** 'Unlock the p2pthread scheduler' to enable a possible context switch ** to a task made runnable by this call. */ sched_unlock(); } else { error = ERR_OBJDEL; } return( error );}/******************************************************************************* q_broadcast - sends the specified message to all tasks pending on the** specified p2pthread queue and awakens the tasks.*****************************************************************************/ULONG q_broadcast( ULONG qid, q_msg_t msg, ULONG *count ){ p2pt_queue_t *queue; q_extent_t *new_extent; ULONG error; int qsize; error = ERR_NO_ERROR; if ( (queue = qcb_for( qid )) != (p2pt_queue_t *)NULL ) { /* ** Lock mutex for urgent queue send */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->queue_lock)); pthread_mutex_lock( &(queue->queue_lock) ); if ( queue->first_susp != (p2pthread_cb_t *)NULL ) { /* ** See how many messages are already sent into the queue */ if ( queue->msg_count > (queue->total_extents * queue->msgs_per_extent) ) { /* ** Queue is already full... try to add another extent. ** (First extent has msgs_per_extent + 1 messages... ** any additional extents have only msgs_per_extent. Extent ** header contains one message, so decrement msg count by one ** before allocating space for extent.) */ qsize = queue->msgs_per_extent - 1; if ( (!(queue->flags & Q_LIMIT)) && ((new_extent = new_extent_for( queue, qsize )) != (q_extent_t *)NULL) ) { /* ** The queue_tail would have wrapped to the start of ** the first extent... change it to the first message ** in the newly-added extent. */ queue->queue_tail = &(new_extent->msgs[0]); /* ** Stuff the new message onto the front of the queue. */ urgent_msg_to( queue, msg ); /* ** Declare the send type */ queue->send_type = BCAST; } else /* ** No memory for more extents... return QUEUE FULL error */ error = ERR_QFULL; } else { /* ** Stuff the new message onto the front of the queue. */ urgent_msg_to( queue, msg ); /* ** Declare the send type */ queue->send_type = BCAST; } } /* ** Unlock the queue mutex. */ pthread_mutex_unlock( &(queue->queue_lock) ); pthread_cleanup_pop( 0 ); /* ** Send msg and block while any tasks are still pended on the queue. */ queue->bcst_tasks_awakened = 0;
/*
** Note: when i call pthread_cond_wait() below, the task is in it's ideal
** state, and give up it's scheduler control, so the task other than me
** can go on the way, and send cont_t queue->queue_bcplt back. That's
** why it has no effect i call sched_lock() below. */ sched_lock(); if ( (error == ERR_NO_ERROR) && (queue->first_susp != (p2pthread_cb_t *)NULL) ) { /* ** Lock mutex for queue broadcast completion */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->qbcst_lock) ); pthread_mutex_lock( &(queue->qbcst_lock) ); /* ** Lock mutex for urgent queue send, so while i was broadcasting
** the urgent message, no one can disturb me( send another message
** to the queue), after broadcasting, release the queue_lock, and
** the pthread call pthread_cond_wait() will get the message and
** mutex lock queue_lock again. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->queue_lock)); pthread_mutex_lock( &(queue->queue_lock) ); /* ** Signal the condition variable for the queue, wake up the task
** block on the ev_receive() call. */ pthread_cond_broadcast( &(queue->queue_send) ); /* ** Unlock the queue mutex. */ pthread_mutex_unlock( &(queue->queue_lock) ); pthread_cleanup_pop( 0 ); /* ** Wait for all pended tasks to receive broadcast message. ** The last task to receive the message will signal the ** broadcast-complete condition variable. */ while ( queue->first_susp != (p2pthread_cb_t *)NULL ) pthread_cond_wait( &(queue->queue_bcplt), &(queue->qbcst_lock) ); /* ** Unlock the queue broadcast completion mutex. */ pthread_mutex_unlock( &(queue->qbcst_lock) ); pthread_cleanup_pop( 0 ); } *count = queue->bcst_tasks_awakened; sched_unlock(); } else { error = ERR_OBJDEL; } return( error );}/******************************************************************************* delete_queue - takes care of destroying the specified queue and freeing** any resources allocated for that queue*****************************************************************************/static void delete_queue( p2pt_queue_t *queue ){ q_extent_t * current_extent; q_extent_t * next_extent; /* ** First remove the queue from the queue list */ unlink_qcb( queue->qid ); /* ** Next delete all extents allocated for queue data. */ next_extent = (q_extent_t *)NULL; for ( current_extent = queue->first_extent; current_extent != (q_extent_t *)NULL; current_extent = next_extent ) { next_extent = (q_extent_t *)current_extent->nxt_extent; ts_free( (void *)current_extent ); } /* ** Finally delete the queue control block itself; */ ts_free( (void *)queue );}/******************************************************************************* q_delete - removes the specified queue from the queue list and frees** the memory allocated for the queue control block and extents.*****************************************************************************/ULONG q_delete( ULONG qid ){ p2pt_queue_t *queue; ULONG error; static char deleted_msg[16] = "Queue Deleted!\n"; error = ERR_NO_ERROR; if ( (queue = qcb_for( qid )) != (p2pt_queue_t *)NULL ) { /* ** Lock mutex for queue delete */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->queue_lock)); pthread_mutex_lock( &(queue->queue_lock) ); if ( queue->msg_count ) error = ERR_MATQDEL; if ( queue->first_susp != (p2pthread_cb_t *)NULL ) { /* ** Stuff the new message onto the front of the queue. */ urgent_msg_to( queue, (ULONG *)deleted_msg ); /* ** Declare the send type */ queue->send_type = KILLD; error = ERR_TATQDEL; } /* ** Unlock the queue mutex. */ pthread_mutex_unlock( &(queue->queue_lock) ); pthread_cleanup_pop( 0 ); /* ** Send msg and block while any tasks are still pended on the queue */ sched_lock(); if ( queue->first_susp != (p2pthread_cb_t *)NULL ) { /* ** Lock mutex for queue broadcast completion */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->qbcst_lock) ); pthread_mutex_lock( &(queue->qbcst_lock) ); /* ** Lock mutex for urgent queue send */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&(queue->queue_lock)); pthread_mutex_lock( &(queue->queue_lock) ); /* ** Signal the condition variable for the queue
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -