mqueue.cxx
来自「ecos实时嵌入式操作系统」· CXX 代码 · 共 983 行 · 第 1/2 页
CXX
983 行
/*========================================================================//// mqueue.cxx//// Message queues tests////========================================================================//####ECOSGPLCOPYRIGHTBEGIN####// -------------------------------------------// This file is part of eCos, the Embedded Configurable Operating System.// Copyright (C) 1998, 1999, 2000, 2001, 2002 Red Hat, Inc.//// eCos is free software; you can redistribute it and/or modify it under// the terms of the GNU General Public License as published by the Free// Software Foundation; either version 2 or (at your option) any later version.//// eCos is distributed in the hope that it will be useful, but WITHOUT ANY// WARRANTY; without even the implied warranty of MERCHANTABILITY or// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License// for more details.//// You should have received a copy of the GNU General Public License along// with eCos; if not, write to the Free Software Foundation, Inc.,// 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.//// As a special exception, if other files instantiate templates or use macros// or inline functions from this file, or you compile this file and link it// with other works to produce a work based on this file, this file does not// by itself cause the resulting work to be covered by the GNU General Public// License. However the source code for this file must still be made available// in accordance with section (3) of the GNU General Public License.//// This exception does not invalidate any other reasons why a work based on// this file might be covered by the GNU General Public License.//// Alternative licenses for eCos may be arranged by contacting Red Hat, Inc.// at http://sources.redhat.com/ecos/ecos-license/// -------------------------------------------//####ECOSGPLCOPYRIGHTEND####//========================================================================//#####DESCRIPTIONBEGIN####//// Author(s): jlarmour// Contributors: // Date: 2000-05-14// Purpose: This file provides the implementation for POSIX message// queues// Description: It uses eCos kernel mqueues as the underlying// implementation// Usage: ////####DESCRIPTIONEND####////======================================================================*//* CONFIGURATION */#include <pkgconf/posix.h>#ifdef CYGPKG_POSIX_MQUEUES#include <pkgconf/kernel.h>/* INCLUDES */#include <cyg/infra/cyg_type.h> // common types etc.#include <cyg/infra/cyg_ass.h> // Assertion support#include <cyg/infra/cyg_trac.h> // Tracing support#include <cyg/kernel/mqueue.hxx> // eCos Mqueue Header#include <cyg/kernel/sched.hxx> // Cyg_Scheduler::lock()#include <cyg/kernel/sched.inl> // inlines for above#include <mqueue.h> // Standard POSIX mqueue header#include <sys/types.h> // mode_t, ssize_t#include <limits.h> // PATH_MAX#include <stdlib.h> // malloc, etc.#include <errno.h> // errno#include <fcntl.h> // O_*#include <stdarg.h> // varargs#include <pthread.h> // mutexes#include <string.h> // strncpy#include <new> // C++ new#ifdef CYGFUN_POSIX_MQUEUE_NOTIFY# include <signal.h># include "pprivate.h" // cyg_sigqueue()#endif#ifdef CYGFUN_KERNEL_THREADS_TIMER# include <time.h># include "pprivate.h" // cyg_timespec_to_ticks()#endif/* CONSTANTS */#define MQ_VALID_MAGIC 0x6db256c1/* TYPE DEFINITIONS */struct mqtabent;// this is a queue user - each one of these corresponds to a mqd_tstruct mquser { int flags; // O_RDONLY, O_WRONLY, O_RDWR, O_NONBLOCK struct mqtabent *tabent; // back pointer to table entry struct mquser *next; bool notifieruser; // POSIX sucks so bad. It requires a mq_close // to only deregister the notification if it // was done via this descriptor. So we have to // know if it was this one#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR cyg_uint32 magic; // magic number: MQ_VALID_MAGIC if valid#endif };struct mqtabent { char name[ PATH_MAX ]; // ascii name - set to "" when unused Cyg_Mqueue *mq; // the underlying queue object long maxmsg; // as set on creation long msgsize; // as set on creation bool unlinkme; // unlink when final user closes? struct mquser *users; // each user#ifdef CYGFUN_POSIX_MQUEUE_NOTIFY const struct sigevent *sigev; // notification event#endif};/* GLOBALS */static struct mqtabent mqtab[ CYGNUM_POSIX_MQUEUE_OPEN_MAX ];static pthread_mutex_t mqtab_mut = PTHREAD_MUTEX_INITIALIZER;/* LOCAL FUNCTIONS *///------------------------------------------------------------------------// Deallocation callback from Cyg_Mqueuestatic voidmy_free( void *ptr, size_t ){ free( ptr );}//------------------------------------------------------------------------// Do the actual "unlink" of a queue, i.e. mark it invalid in the table.// The table mutex is assumed to be lockedstatic voiddo_mq_unlink( struct mqtabent *tabent ){ CYG_REPORT_FUNCTION(); CYG_CHECK_DATA_PTRC( tabent ); tabent->name[0] = '\0'; // won't match anything the user sends now tabent->mq->~Cyg_Mqueue(); free( tabent->mq ); tabent->mq=NULL; CYG_REPORT_RETURN();}//------------------------------------------------------------------------#ifdef CYGFUN_POSIX_MQUEUE_NOTIFYstatic voidnotifyme( Cyg_Mqueue &q, CYG_ADDRWORD data ){ CYG_REPORT_FUNCTION(); struct mquser *user = (struct mquser *)data; CYG_CHECK_DATA_PTRC( user ); struct mqtabent *tabent = user->tabent; CYG_CHECK_DATA_PTRC( tabent ); Cyg_Scheduler::lock(); // we may have been pre-empted before this, so check there's still a // notification to do if ( NULL == tabent->sigev ) { Cyg_Scheduler::unlock(); CYG_REPORT_RETURN(); return; } // if const struct sigevent *ev = tabent->sigev; // first deregister q.setnotify( NULL, 0 ); tabent->sigev = NULL; user->notifieruser = false; // not any more // now the rest of the world can go Cyg_Scheduler::unlock(); // queue event. If it fails... nothing we can do :-( so ignore return code cyg_sigqueue( ev, SI_MESGQ ); cyg_deliver_signals(); CYG_REPORT_RETURN();}#endif // ifdef CYGFUN_POSIX_MQUEUE_NOTIFY//------------------------------------------------------------------------/* EXPORTED FUNCTIONS */externC mqd_tmq_open( const char *name, int oflag, ... ){ CYG_REPORT_FUNCTYPE( "returning %08x" ); CYG_REPORT_FUNCARG2( "name=%08x, oflag=%d", name, oflag ); CYG_CHECK_DATA_PTRC( name ); if ( ((oflag & O_RDONLY) != O_RDONLY) && ((oflag & O_WRONLY) != O_WRONLY) && ((oflag & O_RDWR) != O_RDWR)) { // user didn't specify mode errno = EINVAL; CYG_REPORT_RETVAL( -1 ); return (mqd_t)-1; } // if mqd_t retval; cyg_ucount32 i; struct mqtabent *qtabent=NULL; int interr; interr = pthread_mutex_lock( &mqtab_mut ); // should never fail CYG_ASSERT( interr == 0, "internal lock failed!" ); // find if a matching entry exists first // FIXME: Should check for length and return ENAMETOOLONG for ( i=0; i < CYGNUM_POSIX_MQUEUE_OPEN_MAX; i++ ) { if ( 0 == strncmp(name, mqtab[i].name, PATH_MAX) ) { qtabent = &mqtab[i]; break; } // if } // for if ( (NULL != qtabent) && (O_EXCL == (oflag & O_EXCL)) ) { errno = EEXIST; retval = (mqd_t)-1; goto exit_unlock; } if ( (NULL == qtabent) && (O_CREAT != (oflag & O_CREAT)) ) { errno = ENOENT; retval = (mqd_t)-1; goto exit_unlock; } // so if we didn't find something, we must be being asked to create it if (NULL == qtabent) { mode_t mode; // FIXME: mode ignored for now const struct mq_attr *attr; const struct mq_attr default_attr = { 0, MQ_OPEN_MAX, 128 }; va_list args; va_start( args, oflag ); mode = va_arg( args, mode_t ); attr = va_arg( args, struct mq_attr * ); va_end( args ); // find an empty table entry for ( i=0; i < CYGNUM_POSIX_MQUEUE_OPEN_MAX; i++ ) { if ( NULL == mqtab[i].mq ) break; } // if not found, table is full if ( i == CYGNUM_POSIX_MQUEUE_OPEN_MAX ) { errno = ENFILE; retval = (mqd_t)-1; goto exit_unlock; } Cyg_Mqueue::qerr_t qerr; // user can specify NULL attr, which means arbitrary message queue // size! Duh. if ( NULL == attr ) attr = &default_attr; else { // if they do supply one, POSIX says we're meant to check it if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) { errno = EINVAL; retval = (mqd_t)-1; goto exit_unlock; } } // else // allocate the underlying queue Cyg_Mqueue *mqholder = (Cyg_Mqueue *)malloc( sizeof(Cyg_Mqueue) ); if ( NULL == mqholder ) { errno = ENOSPC; retval = (mqd_t)-1; goto exit_unlock; } // construct it with placement new mqtab[i].mq = new (mqholder) Cyg_Mqueue( attr->mq_maxmsg, attr->mq_msgsize, &malloc, &my_free, &qerr ); switch (qerr) { case Cyg_Mqueue::OK: break; case Cyg_Mqueue::NOMEM: free( mqholder ); errno = ENOSPC; retval = (mqd_t)-1; goto exit_unlock; default: CYG_FAIL("Unhandled Cyg_Mqueue constructor return error"); break; } // switch mqtab[i].users = (struct mquser *) malloc( sizeof(struct mquser) ); if ( NULL == mqtab[i].users ) { mqtab[i].mq->~Cyg_Mqueue(); free( mqholder ); errno = ENOSPC; retval = (mqd_t)-1; goto exit_unlock; } // initialize mqtab[i] mqtab[i].maxmsg = attr->mq_maxmsg; mqtab[i].msgsize = attr->mq_msgsize; mqtab[i].unlinkme = false;#ifdef CYGFUN_POSIX_MQUEUE_NOTIFY mqtab[i].sigev = NULL;#endif strncpy( mqtab[i].name, name, PATH_MAX ); // initialize first mqtab[i].users mqtab[i].users->next = NULL; // set the mode for later, but also note that O_NONBLOCK can // be set in oflags *or* the attr the user passed mqtab[i].users->flags = oflag | (attr->mq_flags & O_NONBLOCK); // set back pointer so that message queue handle can find actual queue mqtab[i].users->tabent = &mqtab[i]; mqtab[i].users->notifieruser = false;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR mqtab[i].users->magic = MQ_VALID_MAGIC; // now valid#endif retval=(mqd_t)mqtab[i].users; goto exit_unlock; } // if (NULL == qtabent) // so we're not creating, and we have a valid qtabent // But this qtabent may be being unlinked. If so, we are permitted // to return an error, so we will. (see under mq_unlink() in POSIX) // Which error though? EINVAL seems best, but POSIX doesn't say :-/ if (true == qtabent->unlinkme) { errno = EINVAL; retval = (mqd_t)-1; goto exit_unlock; } // now we have a usable qtabent struct mquser *user; user = (struct mquser *) malloc( sizeof(struct mquser) ); if ( NULL == user ) { errno = ENOSPC; retval = (mqd_t)-1; goto exit_unlock; } // prepend to qtab user list user->next = qtabent->users; qtabent->users = user; // set back pointer so that message queue handle can find actual queue user->tabent = qtabent; user->flags = oflag;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR user->magic = MQ_VALID_MAGIC; // now valid#endif retval=(mqd_t)user; exit_unlock: interr = pthread_mutex_unlock( &mqtab_mut ); // should never fail CYG_ASSERT( interr == 0, "internal lock failed!" ); CYG_REPORT_RETVAL( retval ); return retval;} // mq_open()//------------------------------------------------------------------------// NOTE: It is the *user*'s responsibility to ensure that nothing is// blocked in mq_send() or mq_receive() when closing the queue with// that descriptor. The standard does not specify the behaviour, so that's// what I am assumingexternC intmq_close( mqd_t mqdes ){ CYG_REPORT_FUNCTYPE( "returning %d" ); CYG_REPORT_FUNCARG1XV( mqdes ); struct mquser *user = (struct mquser *)mqdes;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR if ( user->magic != MQ_VALID_MAGIC ) { errno = EBADF; CYG_REPORT_RETVAL( -1 ); return -1; }#endif int interr; interr = pthread_mutex_lock( &mqtab_mut ); // should never fail CYG_ASSERT( interr == 0, "internal lock failed!" ); struct mqtabent *tabent = user->tabent; struct mquser *usertmp; // perhaps should return EBADF instead of assert? CYG_ASSERT( tabent->users != NULL, "Null message queue user list" );#ifdef CYGFUN_POSIX_MQUEUE_NOTIFY // deregister notification iff this was the message queue descriptor // that was used to register it (POSIX says) if ( true == user->notifieruser ) { tabent->mq->setnotify( NULL, 0 ); tabent->sigev = NULL; // not worth clearing notifieruser }#endif // find in the list for this queue and remove - sucks a bit, but seems // best over all - the list shouldn't be too long if ( tabent->users == user ) { tabent->users = user->next; // remove } else { for ( usertmp=tabent->users; NULL != usertmp->next; usertmp = usertmp->next ) { if ( usertmp->next == user ) break; } // for // perhaps should return EBADF instead of assert? CYG_ASSERT( usertmp->next != NULL, "Couldn't find message queue user" ); usertmp->next = user->next; // remove } // else #ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR user->magic = 0; // invalidate#endif // free it up free( user ); if ( (true == tabent->unlinkme) && (NULL == tabent->users) ) { do_mq_unlink( tabent ); } // if interr = pthread_mutex_unlock( &mqtab_mut ); // should never fail CYG_ASSERT( interr == 0, "internal lock failed!" ); CYG_REPORT_RETVAL( 0 ); return 0;} // mq_close()//------------------------------------------------------------------------externC intmq_unlink( const char *name ){ CYG_REPORT_FUNCTYPE( "returning %d" ); CYG_REPORT_FUNCARG1( "name=%s", name );
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?