📄 vqueue.c
字号:
/***************************************************************************** * vqueue.c - defines the wrapper functions and data structures needed * to implement a Wind River pSOS+ (R) variable length queue API * in a POSIX Threads environment. ****************************************************************************/#include <errno.h>#include <unistd.h>#include <stdlib.h>#include <stdio.h>#include <string.h>#include <signal.h>#include <sys/time.h>#include "p2pthread.h"#undef DIAG_PRINTFS#define SEND 0#define BCAST 1#define KILLD 2#define Q_NOWAIT 0x01#define Q_PRIOR 0x02#define Q_LIMIT 0x04#define ERR_TIMEOUT 0x01#define ERR_NODENO 0x04#define ERR_OBJDEL 0x05#define ERR_OBJTFULL 0x08#define ERR_OBJNF 0x09#define ERR_MSGSIZ 0x31#define ERR_BUFSIZ 0x32#define ERR_NOQCB 0x33#define ERR_NOMGB 0x34#define ERR_QFULL 0x35#define ERR_QKILLD 0x36#define ERR_NOMSG 0x37#define ERR_TATQDEL 0x38#define ERR_MATQDEL 0x39/******************************************************************************* p2pthread queue message type*****************************************************************************/typedef struct q_varlen_msg{ ULONG msglen; char *msgbuf;} q_vmsg_t;/******************************************************************************* Control block for p2pthread queue**** The message list for a queue is organized into an array called an extent.** Actual send and fetch operations are done using a queue_head and** queue_tail pointer. These pointers must 'rotate' through the extent to** create a logical circular buffer. A single extra location is added** to ensure room for urgent messages or broadcasts even when the queue is** 'full' for normal messages.*******************************************************************************/typedef struct p2pt_vqueue{ /* ** ID for queue */ ULONG qid; /* ** Queue Name */ char qname[4]; /* ** Option Flags for queue */ ULONG flags; /* ** Mutex and Condition variable for queue send/pend */ pthread_mutex_t queue_lock; pthread_cond_t queue_send; /* ** Mutex and Condition variable for queue broadcast/delete */ pthread_mutex_t qbcst_lock; pthread_cond_t qbcst_cmplt; /* ** Pointer to next message pointer to be fetched from queue */ q_vmsg_t * queue_head; /* ** Pointer to last message pointer sent to queue */ q_vmsg_t * queue_tail; /* ** Type of send operation last performed on queue */ int send_type; /* ** Pointer to first message in queue */ q_vmsg_t * first_msg_in_queue; /* ** Pointer to last message in queue */ q_vmsg_t * last_msg_in_queue; /* ** Pointer to next queue control block in queue list */ struct p2pt_vqueue * nxt_queue; /* ** First task control block in list of tasks waiting on queue */ p2pthread_cb_t * first_susp; /* ** Count of tasks awakened by q_broadcast call */ ULONG bcst_tasks_awakened; /* ** Total number of messages currently sent to queue */ int msg_count; /* ** Total messages per queue */ int msgs_per_queue; /* ** Maximum size of messages sent to queue */ ULONG msg_len; /* ** sizeof( each element in queue ) used for subscript incr/decr. */ size_t vmsg_len; /* ** Task pend order (FIFO or Priority) for queue */ int order;} p2pt_vqueue_t;/******************************************************************************* External function and data references*****************************************************************************/extern void * ts_malloc( size_t blksize );extern void ts_free( void *blkaddr );extern p2pthread_cb_t * my_tcb( void );extern void sched_lock( void );extern void sched_unlock( void );extern ULONG tm_wkafter( ULONG interval );extern void link_susp_tcb( p2pthread_cb_t **list_head, p2pthread_cb_t *new_entry );extern void unlink_susp_tcb( p2pthread_cb_t **list_head, p2pthread_cb_t *entry );extern int signal_for_my_task( p2pthread_cb_t **list_head, int pend_order );/******************************************************************************* p2pthread Global Data Structures*****************************************************************************//*** vqueue_list is a linked list of queue control blocks. It is used to locate** queues by their ID numbers.*/static p2pt_vqueue_t * vqueue_list;/*** vqueue_list_lock is a mutex used to serialize access to the queue list*/static pthread_mutex_t vqueue_list_lock = PTHREAD_MUTEX_INITIALIZER;/******************************************************************************* qcb_for - returns the address of the queue control block for the queue** idenified by qid*****************************************************************************/static p2pt_vqueue_t * qcb_for( ULONG qid ){ p2pt_vqueue_t *current_qcb; int found_qid; if ( vqueue_list != (p2pt_vqueue_t *)NULL ) { /* ** One or more queues already exist in the queue list... ** Scan the existing queues for a matching ID. */ found_qid = FALSE; for ( current_qcb = vqueue_list; current_qcb != (p2pt_vqueue_t *)NULL; current_qcb = current_qcb->nxt_queue ) { if ( current_qcb->qid == qid ) { found_qid = TRUE; break; } } if ( found_qid == FALSE ) /* ** No matching ID found */ current_qcb = (p2pt_vqueue_t *)NULL; } else current_qcb = (p2pt_vqueue_t *)NULL; return( current_qcb );}/******************************************************************************* new_vqid - automatically returns a valid, unused variable length queue ID*****************************************************************************/static ULONG new_vqid( void ){ p2pt_vqueue_t *current_qcb; ULONG new_queue_id; /* ** Protect the queue list while we examine it. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&vqueue_list_lock ); pthread_mutex_lock( &vqueue_list_lock ); /* ** Get the highest previously assigned queue id and add one. */ if ( vqueue_list != (p2pt_vqueue_t *)NULL ) { /* ** One or more queues already exist in the queue list... ** Find the highest queue ID number in the existing list. */ new_queue_id = vqueue_list->qid; for ( current_qcb = vqueue_list; current_qcb->nxt_queue != (p2pt_vqueue_t *)NULL; current_qcb = current_qcb->nxt_queue ) { if ( (current_qcb->nxt_queue)->qid > new_queue_id ) { new_queue_id = (current_qcb->nxt_queue)->qid; } } /* ** Add one to the highest existing queue ID */ new_queue_id++; } else { /* ** this is the first queue being added to the queue list. */ new_queue_id = 0; } /* ** Re-enable access to the queue list by other threads. */ pthread_mutex_unlock( &vqueue_list_lock ); pthread_cleanup_pop( 0 ); return( new_queue_id );}/******************************************************************************* link_qcb - appends a new queue control block pointer to the vqueue_list*****************************************************************************/static void link_qcb( p2pt_vqueue_t *new_vqueue ){ p2pt_vqueue_t *current_qcb; /* ** Protect the queue list while we examine and modify it. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&vqueue_list_lock ); pthread_mutex_lock( &vqueue_list_lock ); new_vqueue->nxt_queue = (p2pt_vqueue_t *)NULL; if ( vqueue_list != (p2pt_vqueue_t *)NULL ) { /* ** One or more queues already exist in the queue list... ** Insert the new entry in ascending numerical sequence by qid. */ for ( current_qcb = vqueue_list; current_qcb->nxt_queue != (p2pt_vqueue_t *)NULL; current_qcb = current_qcb->nxt_queue ) { if ( (current_qcb->nxt_queue)->qid > new_vqueue->qid ) { new_vqueue->nxt_queue = current_qcb->nxt_queue; break; } } current_qcb->nxt_queue = new_vqueue;#ifdef DIAG_PRINTFS printf( "\r\nadd queue cb @ %p to list @ %p", new_vqueue, current_qcb );#endif } else { /* ** this is the first queue being added to the queue list. */ vqueue_list = new_vqueue;#ifdef DIAG_PRINTFS printf( "\r\nadd queue cb @ %p to list @ %p", new_vqueue, &vqueue_list );#endif } /* ** Re-enable access to the queue list by other threads. */ pthread_mutex_unlock( &vqueue_list_lock ); pthread_cleanup_pop( 0 );}/******************************************************************************* unlink_qcb - removes a queue control block pointer from the vqueue_list*****************************************************************************/static p2pt_vqueue_t * unlink_qcb( ULONG qid ){ p2pt_vqueue_t *current_qcb; p2pt_vqueue_t *selected_qcb; selected_qcb = (p2pt_vqueue_t *)NULL; if ( vqueue_list != (p2pt_vqueue_t *)NULL ) { /* ** One or more queues exist in the queue list... ** Protect the queue list while we examine and modify it. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&vqueue_list_lock ); pthread_mutex_lock( &vqueue_list_lock ); /* ** Scan the queue list for a qcb with a matching queue ID */ if ( vqueue_list->qid == qid ) { /* ** The first queue in the list matches the selected queue ID */ selected_qcb = vqueue_list; vqueue_list = selected_qcb->nxt_queue;#ifdef DIAG_PRINTFS printf( "\r\ndel queue cb @ %p from list @ %p", selected_qcb, &vqueue_list );#endif } else { /* ** Scan the next qcb for a matching qid while retaining a ** pointer to the current qcb. If the next qcb matches, ** select it and then unlink it from the queue list. */ for ( current_qcb = vqueue_list; current_qcb->nxt_queue != (p2pt_vqueue_t *)NULL; current_qcb = current_qcb->nxt_queue ) { if ( (current_qcb->nxt_queue)->qid == qid ) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -