📄 msgqlib.c
字号:
/***************************************************************************** * msgQLib.c - defines the wrapper functions and data structures needed * to implement a VxWorks 'native' message queue API in a * POSIX Threads environment. * * Copyright (C) 2000 Monta Vista Software Inc. * * Author : Gary S. Robertson * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License * as published by the Free Software Foundation; either version 2.1 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. ****************************************************************************/#include <errno.h>#include <unistd.h>#include <stdlib.h>#include <stdio.h>#include <string.h>#include <signal.h>#include <sys/time.h>#include "vxwk2pthread.h"#include "vxwkdefs.h"#undef DIAG_PRINTFS#define SEND 0#define URGNT 1#define KILLD 2/******************************************************************************* VxWorks queue message type*****************************************************************************/typedef struct q_msg{ uint msglen; char *msgbuf;} q_msg_t;/******************************************************************************* Control block for VxWorks queue**** The message list for a queue is organized into an array called an extent.** Actual send and fetch operations are done using a queue_head and** queue_tail pointer. These pointers must 'rotate' through the extent to** create a logical circular buffer. A single extra location is added** to ensure room for urgent messages even when the queue is 'full' for** normal messages.*******************************************************************************/typedef struct vxwk_mqueue{ /* ** Mutex and Condition variable for queue send/pend */ pthread_mutex_t queue_lock; pthread_cond_t queue_send; /* ** Mutex and Condition variable for queue delete */ pthread_mutex_t qdlet_lock; pthread_cond_t qdlet_cmplt; /* ** Mutex and Condition variable for queue-full pend */ pthread_mutex_t qfull_lock; pthread_cond_t queue_space; /* ** Pointer to next message pointer to be fetched from queue */ q_msg_t * queue_head; /* ** Pointer to last message pointer sent to queue */ q_msg_t * queue_tail; /* ** Type of send operation last performed on queue */ int send_type; /* ** Pointer to first message in queue */ q_msg_t * first_msg_in_queue; /* ** Pointer to last message in queue */ q_msg_t * last_msg_in_queue; /* ** Pointer to next queue control block in queue list */ struct vxwk_mqueue * nxt_queue; /* ** First task control block in list of tasks waiting to receive ** a message from queue */ vxwk2pthread_cb_t * first_susp; /* ** First task control block in list of tasks waiting for space to ** post messages to queue */ vxwk2pthread_cb_t * first_write_susp; /* ** Total number of messages currently sent to queue */ int msg_count; /* ** Total (max) messages per queue */ int msgs_per_queue; /* ** Maximum size of messages sent to queue */ uint msg_len; /* ** sizeof( each element in queue ) used for subscript incr/decr. */ size_t vmsg_len; /* ** Task pend order (FIFO or Priority) for queue */ int order;} vxwk_mqueue_t;/******************************************************************************* External function and data references*****************************************************************************/extern void * ts_malloc( size_t blksize );extern void ts_free( void *blkaddr );extern vxwk2pthread_cb_t * my_tcb( void );extern void taskLock( void );extern void taskUnlock( void );extern STATUS taskDelay( int interval );extern void link_susp_tcb( vxwk2pthread_cb_t **list_head, vxwk2pthread_cb_t *new_entry );extern void unlink_susp_tcb( vxwk2pthread_cb_t **list_head, vxwk2pthread_cb_t *entry );extern int signal_for_my_task( vxwk2pthread_cb_t **list_head, int pend_order );/******************************************************************************* VxWorks-to-pthread Global Data Structures*****************************************************************************//*** mqueue_list is a linked list of queue control blocks. It is used to locate** queues by their ID numbers.*/static vxwk_mqueue_t * mqueue_list;/*** mqueue_list_lock is a mutex used to serialize access to the queue list*/static pthread_mutex_t mqueue_list_lock = PTHREAD_MUTEX_INITIALIZER;/******************************************************************************* queue_valid - verifies whether the specified queue still exists, and if** so, locks exclusive access to the queue for the caller.*****************************************************************************/static int queue_valid( vxwk_mqueue_t *queue ){ vxwk_mqueue_t *current_qcb; int found_queue; found_queue = FALSE; /* ** Protect the queue list while we examine and modify it. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&mqueue_list_lock ); pthread_mutex_lock( &mqueue_list_lock ); if ( mqueue_list != (vxwk_mqueue_t *)NULL ) { /* ** One or more queues already exist in the queue list... ** Scan the existing queues for a matching ID. */ for ( current_qcb = mqueue_list; current_qcb != (vxwk_mqueue_t *)NULL; current_qcb = current_qcb->nxt_queue ) { if ( current_qcb == queue ) { /* ** Lock mutex for queue access (it is assumed that a ** 'pthread_cleanup_push()' has already been performed ** by the caller in case of unexpected thread termination.) */ pthread_mutex_lock( &(queue->queue_lock) ); found_queue = TRUE; break; } } } /* ** Re-enable access to the queue list by other threads. */ pthread_cleanup_pop( 1 ); return( found_queue );}/******************************************************************************* link_qcb - appends a new queue control block pointer to the mqueue_list*****************************************************************************/static void link_qcb( vxwk_mqueue_t *new_mqueue ){ vxwk_mqueue_t *current_qcb; /* ** Protect the queue list while we examine and modify it. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&mqueue_list_lock ); pthread_mutex_lock( &mqueue_list_lock ); new_mqueue->nxt_queue = (vxwk_mqueue_t *)NULL; if ( mqueue_list != (vxwk_mqueue_t *)NULL ) { /* ** One or more queues already exist in the queue list... ** Insert the new entry at the tail of the list. */ for ( current_qcb = mqueue_list; current_qcb->nxt_queue != (vxwk_mqueue_t *)NULL; current_qcb = current_qcb->nxt_queue ); current_qcb->nxt_queue = new_mqueue;#ifdef DIAG_PRINTFS printf( "\r\nadd queue cb @ %p to list @ %p", new_mqueue, current_qcb );#endif } else { /* ** this is the first queue being added to the queue list. */ mqueue_list = new_mqueue;#ifdef DIAG_PRINTFS printf( "\r\nadd queue cb @ %p to list @ %p", new_mqueue, &mqueue_list );#endif } /* ** Re-enable access to the queue list by other threads. */ pthread_mutex_unlock( &mqueue_list_lock ); pthread_cleanup_pop( 0 );}/******************************************************************************* unlink_qcb - removes a queue control block pointer from the mqueue_list*****************************************************************************/static vxwk_mqueue_t * unlink_qcb( vxwk_mqueue_t *qid ){ vxwk_mqueue_t *current_qcb; vxwk_mqueue_t *selected_qcb; selected_qcb = (vxwk_mqueue_t *)NULL; if ( mqueue_list != (vxwk_mqueue_t *)NULL ) { /* ** One or more queues exist in the queue list... ** Protect the queue list while we examine and modify it. */ pthread_cleanup_push( (void(*)(void *))pthread_mutex_unlock, (void *)&mqueue_list_lock ); pthread_mutex_lock( &mqueue_list_lock ); /* ** Scan the queue list for a qcb with a matching queue ID */ if ( mqueue_list == qid ) { /* ** The first queue in the list matches the selected queue ID */ selected_qcb = mqueue_list; mqueue_list = selected_qcb->nxt_queue;#ifdef DIAG_PRINTFS printf( "\r\ndel queue cb @ %p from list @ %p", selected_qcb, &mqueue_list );#endif } else { /* ** Scan the next qcb for a matching qid while retaining a ** pointer to the current qcb. If the next qcb matches, ** select it and then unlink it from the queue list. */ for ( current_qcb = mqueue_list; current_qcb->nxt_queue != (vxwk_mqueue_t *)NULL; current_qcb = current_qcb->nxt_queue ) { if ( current_qcb->nxt_queue == qid ) { /* ** Queue ID of next qcb matches... ** Select the qcb and then unlink it by linking ** the selected qcb's next qcb into the current qcb. */ selected_qcb = current_qcb->nxt_queue; current_qcb->nxt_queue = selected_qcb->nxt_queue;#ifdef DIAG_PRINTFS printf( "\r\ndel queue cb @ %p from list @ %p", selected_qcb, current_qcb );#endif break; } } } /* ** Re-enable access to the queue list by other threads. */ pthread_mutex_unlock( &mqueue_list_lock ); pthread_cleanup_pop( 0 ); } return( selected_qcb );}/******************************************************************************* urgent_msg_to - sends a message to the front of the specified queue*****************************************************************************/static void urgent_msg_to( vxwk_mqueue_t *queue, char *msg, uint msglen ){ uint i; char *element; /* ** It is assumed when we enter this function that the queue has space ** to accept the message about to be sent. ** Pre-decrement the queue_head (fetch) pointer, adjusting for ** possible wrap to the end of the queue; ** (Urgent messages are placed at the queue head so they will be the ** next message fetched from the queue - ahead of any ** previously-queued messages.) */ element = (char *)queue->queue_head; element -= queue->vmsg_len; queue->queue_head = (q_msg_t *)element; if ( queue->queue_head < queue->first_msg_in_queue ) { /* ** New queue_head pointer underflowed beginning of the extent... ** Wrap the queue_head pointer to the last message address ** in the extent allocated for the queue. */ queue->queue_head = queue->last_msg_in_queue; }#ifdef DIAG_PRINTFS printf( " new queue_head @ %p", queue->queue_head );#endif if ( msg != (char *)NULL ) { element = (char *)&((queue->queue_head)->msgbuf); for ( i = 0; i < msglen; i++ ) { *(element + i) = *(msg + i); } } (queue->queue_head)->msglen = msglen;#ifdef DIAG_PRINTFS printf( "\r\nsent urgent msg %p len %x to queue_head @ %p", msg, msglen, queue->queue_head );#endif /* ** Increment the message counter for the queue */ queue->msg_count++; /*
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -