📄 queue.c
字号:
/***************************************************************************** * queue.c - defines the wrapper functions and data structures needed * to implement a Wind River pSOS+ (R) standard queue API * in a POSIX Threads environment. ****************************************************************************/#include <errno.h>#include <unistd.h>#include <stdlib.h>#include <stdio.h>#include <string.h>#include <signal.h>#include <sys/time.h>#include "p2pthread.h"#undef DIAG_PRINTFS#define SEND 0#define BCAST 1#define KILLD 2#define Q_NOWAIT 0x01#define Q_PRIOR 0x02#define Q_LIMIT 0x04#define ERR_TIMEOUT 0x01#define ERR_NODENO 0x04#define ERR_OBJDEL 0x05#define ERR_OBJTFULL 0x08#define ERR_OBJNF 0x09#define ERR_NOQCB 0x33#define ERR_NOMGB 0x34#define ERR_QFULL 0x35#define ERR_QKILLD 0x36#define ERR_NOMSG 0x37#define ERR_TATQDEL 0x38#define ERR_MATQDEL 0x39/******************************************************************************* p2pthread queue message type*****************************************************************************/typedef ULONG q_msg_t[4];/******************************************************************************* p2pthread queue extent type - this is the header for a dynamically allocated** memory block of a size determined at runtime.** An array of (qsize + 1) messages immediately ** follows the nxt_extent pointer. The first** element of the array is included in the header** to guarantee proper alignment for the additional** (qsize) array elements which will be allocated** and appended to it.*****************************************************************************/typedef struct queue_extent_header{ void * nxt_extent; /* Points to next extent block (if any, else NULL)*/ q_msg_t msgs[1]; /* Array of qsize + 1 q_msg_t messages */} q_extent_t;/******************************************************************************* Control block for p2pthread queue**** The message list for a queue is organized into a series of one or** more arrays called extents. Actual send and fetch operations are** done using a queue_head and queue_tail pointer. These pointers** must 'rotate' through the arrays in the extent list to create a** single large logical circular buffer. A single extra location is added** to ensure room for urgent messages or broadcasts even when the queue is** 'full' for normal messages.*******************************************************************************/typedef struct p2pt_queue{ /* ** ID for queue */ ULONG qid; /* ** Queue Name */ char qname[4]; /* ** Option Flags for queue */ ULONG flags; /* ** Mutex and Condition variable for queue send/pend */ pthread_mutex_t queue_lock; pthread_cond_t queue_send; /* ** Mutex and Condition variable for queue broadcast/delete */ pthread_mutex_t qbcst_lock; pthread_cond_t queue_bcplt; /* ** Pointer to next message pointer to be fetched from queue */ q_msg_t * queue_head; /* ** Pointer to last message pointer sent to queue */ q_msg_t * queue_tail; /* ** Type of send operation last performed on queue */ int send_type; /* ** Pointer to first extent in list of extents allocated for queue */ q_extent_t * first_extent; /* ** Pointer to last message in last extent in extent list */ q_msg_t * last_msg_in_queue; /* ** Pointer to next queue control block in queue list. */ struct p2pt_queue * nxt_queue; /* ** First task control block in list of tasks waiting on queue.
** Note: i thought this member should be catagoried to
** q_first_susp, sem4_first_susp, etc., but it's wrong,since
** there is only one object can one task suspend on at one time. */ p2pthread_cb_t * first_susp; /* ** Count of tasks awakened by q_broadcast call */ ULONG bcst_tasks_awakened; /* ** Total number of messages currently sent to queue */ int msg_count; /* ** Total messages per memory allocation block (extent) */ int msgs_per_extent; /* ** Total number of extents currently allocated for queue */ int total_extents; /* ** Task pend order (FIFO or Priority) for queue */ int order;} p2pt_queue_t;/******************************************************************************* External function and data references*****************************************************************************/extern void *ts_malloc( size_t blksize );extern void ts_free( void *blkaddr );extern p2pthread_cb_t * my_tcb( void );extern void sched_lock( void );extern void sched_unlock( void );extern ULONG tm_wkafter( ULONG interval );extern void link_susp_tcb( p2pthread_cb_t **list_head, p2pthread_cb_t *new_entry );extern void unlink_susp_tcb( p2pthread_cb_t **list_head, p2pthread_cb_t *entry );extern int signal_for_my_task( p2pthread_cb_t **list_head, int pend_order );/******************************************************************************* p2pthread Global Data Structures*****************************************************************************//*** queue_list is a linked list of queue control blocks. It is used to locate** queues by their ID numbers.*/static p2pt_queue_t * queue_list;/*** queue_list_lock is a mutex used to serialize access to the queue list*/static pthread_mutex_t queue_list_lock = PTHREAD_MUTEX_INITIALIZER;/******************************************************************************* qcb_for - returns the address of the queue control block for the queue** idenified by qid*****************************************************************************/static p2pt_queue_t * qcb_for( ULONG qid ){ p2pt_queue_t *current_qcb; int found_qid; if ( queue_list != (p2pt_queue_t *)NULL ) { /* ** One or more queues already exist in the queue list... ** Scan the existing queues for a matching ID. */ found_qid = FALSE; for ( current_qcb = queue_list; current_qcb != (p2pt_queue_t *)NULL; current_qcb = current_qcb->nxt_queue ) { if ( current_qcb->qid == qid ) { found_qid = TRUE; break; } } if ( found_qid == FALSE ) /* ** No matching ID found */ current_qcb = (p2pt_queue_t *)NULL; } else current_qcb = (p2pt_queue_t *)NULL; return( current_qcb );}/******************************************************************************* new_qid - automatically returns a valid, unused queue ID*****************************************************************************/static ULONG new_qid( void ){ p2pt_queue_t *current_qcb; ULONG new_queue_id; /* ** Protect the queue list while we examine it. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&queue_list_lock ); pthread_mutex_lock( &queue_list_lock ); /* ** Get the highest previously assigned queue id and add one. */ if ( queue_list != (p2pt_queue_t *)NULL ) { /* ** One or more queues already exist in the queue list... ** Find the highest queue ID number in the existing list. */ new_queue_id = queue_list->qid; for ( current_qcb = queue_list; current_qcb->nxt_queue != (p2pt_queue_t *)NULL; current_qcb = current_qcb->nxt_queue ) { if ( (current_qcb->nxt_queue)->qid > new_queue_id ) { new_queue_id = (current_qcb->nxt_queue)->qid; } } /* ** Add one to the highest existing queue ID */ new_queue_id++; } else { /* ** this is the first queue being added to the queue list. */ new_queue_id = 1; } /* ** Re-enable access to the queue list by other threads. */ pthread_mutex_unlock( &queue_list_lock ); pthread_cleanup_pop( 0 ); return( new_queue_id );}/******************************************************************************* link_qcb - appends a new queue control block pointer to the queue_list*****************************************************************************/static void link_qcb( p2pt_queue_t *new_queue ){ p2pt_queue_t *current_qcb; /* ** Protect the queue list while we examine and modify it. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&queue_list_lock ); pthread_mutex_lock( &queue_list_lock ); new_queue->nxt_queue = (p2pt_queue_t *)NULL; if ( queue_list != (p2pt_queue_t *)NULL ) { /* ** One or more queues already exist in the queue list... ** Insert the new entry in ascending numerical sequence by qid. */ for ( current_qcb = queue_list; current_qcb->nxt_queue != (p2pt_queue_t *)NULL; current_qcb = current_qcb->nxt_queue ) { if ( (current_qcb->nxt_queue)->qid > new_queue->qid ) { new_queue->nxt_queue = current_qcb->nxt_queue; break; } } current_qcb->nxt_queue = new_queue;#ifdef DIAG_PRINTFS printf( "\r\nadd queue cb @ %p to list @ %p", new_queue, current_qcb );#endif } else { /* ** this is the first queue being added to the queue list. */ queue_list = new_queue;#ifdef DIAG_PRINTFS printf( "\r\nadd queue cb @ %p to list @ %p", new_queue, &queue_list );#endif } /* ** Re-enable access to the queue list by other threads. */ pthread_mutex_unlock( &queue_list_lock ); pthread_cleanup_pop( 0 );}/******************************************************************************* unlink_qcb - removes a queue control block pointer from the queue_list*****************************************************************************/static p2pt_queue_t * unlink_qcb( ULONG qid ){ p2pt_queue_t *current_qcb; p2pt_queue_t *selected_qcb; selected_qcb = (p2pt_queue_t *)NULL;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -