mqueue.cxx

来自「ecos实时嵌入式操作系统」· CXX 代码 · 共 983 行 · 第 1/2 页

CXX
983
字号
    int retval, interr;    cyg_ucount32 i;    struct mqtabent *qtabent=NULL;    interr = pthread_mutex_lock( &mqtab_mut );    // should never fail    CYG_ASSERT( interr == 0, "internal lock failed!" );    // find the entry first    // FIXME: Should check for length and return ENAMETOOLONG    for ( i=0; i < CYGNUM_POSIX_MQUEUE_OPEN_MAX; i++ ) {        if ( 0 == strncmp(name, mqtab[i].name, PATH_MAX) ) {            qtabent = &mqtab[i];            break;        } // if    } // for    if ( NULL == qtabent ) { // not found        errno = ENOENT;        retval = -1;        goto exit_unlock;    }    if ( NULL != qtabent->users ) {   // still in use        qtabent->unlinkme = true;     // so mark it as pending deletion    } else {        do_mq_unlink( qtabent );    } // else    retval = 0; exit_unlock:    interr = pthread_mutex_unlock( &mqtab_mut );    // should never fail    CYG_ASSERT( interr == 0, "internal lock failed!" );    CYG_REPORT_RETVAL( retval );    return retval;} // mq_unlink()//------------------------------------------------------------------------externC intmq_send( mqd_t mqdes, const char *msg_ptr, size_t msg_len,         unsigned int msg_prio ){    CYG_REPORT_FUNCTYPE( "returning %d" );    CYG_REPORT_FUNCARG4( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%u",                         mqdes, msg_ptr, msg_len, msg_prio );    CYG_CHECK_DATA_PTRC( msg_ptr );        struct mquser *user = (struct mquser *)mqdes;    struct mqtabent *tabent = user->tabent;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR    if ( user->magic != MQ_VALID_MAGIC ) {        errno  = EBADF;        CYG_REPORT_RETVAL( -1 );        return -1;    }#endif        if ( msg_len > (size_t)tabent->msgsize ) {        errno = EMSGSIZE;        CYG_REPORT_RETVAL( -1 );        return -1;    }    if ( msg_prio > MQ_PRIO_MAX ) {        errno = EINVAL;        CYG_REPORT_RETVAL( -1 );        return -1;    }    if ( (O_WRONLY != (user->flags & O_WRONLY)) &&          (O_RDWR != (user->flags & O_RDWR)) ) {        errno = EBADF;        CYG_REPORT_RETVAL( -1 );        return -1;    }    // go for it    Cyg_Mqueue::qerr_t err;    err = tabent->mq->put( msg_ptr, msg_len, msg_prio,                           ((user->flags & O_NONBLOCK) != O_NONBLOCK) );    switch (err) {    case Cyg_Mqueue::INTR:        errno = EINTR;        CYG_REPORT_RETVAL( -1 );        return -1;    case Cyg_Mqueue::WOULDBLOCK:        CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK,                    "Message queue assumed non-blocking when blocking requested"            );        errno = EAGAIN;        CYG_REPORT_RETVAL( -1 );        return -1;            case Cyg_Mqueue::OK:        CYG_REPORT_RETVAL( 0 );        return 0;    default:        CYG_FAIL( "unhandled message queue return code" );        return -1; // keep compiler happy    } // switch} // mq_send()//------------------------------------------------------------------------externC ssize_tmq_receive( mqd_t mqdes, char *msg_ptr, size_t msg_len,            unsigned int *msg_prio ){    CYG_REPORT_FUNCTYPE( "returning %ld" );    CYG_REPORT_FUNCARG4( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%08x",                         mqdes, msg_ptr, msg_len, msg_prio );    CYG_CHECK_DATA_PTRC( msg_ptr );    CYG_CHECK_DATA_PTRC( msg_ptr+msg_len-1 );    if ( NULL != msg_prio )        CYG_CHECK_DATA_PTRC( msg_prio );            struct mquser *user = (struct mquser *)mqdes;    struct mqtabent *tabent = user->tabent;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR    if ( user->magic != MQ_VALID_MAGIC ) {        errno  = EBADF;        CYG_REPORT_RETVAL( -1 );        return (ssize_t)-1;    }#endif        if ( (O_RDONLY != (user->flags & O_RDONLY)) &&          (O_RDWR != (user->flags & O_RDWR)) ) {        errno = EBADF;        CYG_REPORT_RETVAL( -1 );        return (ssize_t)-1;    }    if ( msg_len < (size_t)tabent->msgsize ) {        errno = EMSGSIZE;        CYG_REPORT_RETVAL( -1 );        return (ssize_t)-1;    }    // go for it    Cyg_Mqueue::qerr_t err;    err = tabent->mq->get( msg_ptr, &msg_len, msg_prio,                           ((user->flags & O_NONBLOCK) != O_NONBLOCK) );    switch (err) {    case Cyg_Mqueue::INTR:        errno = EINTR;        CYG_REPORT_RETVAL( -1 );        return (ssize_t)-1;    case Cyg_Mqueue::WOULDBLOCK:        CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK,                    "Message queue assumed non-blocking when blocking requested"            );        errno = EAGAIN;        CYG_REPORT_RETVAL( -1 );        return (ssize_t)-1;            case Cyg_Mqueue::OK:        CYG_ASSERT( msg_len <= (size_t)tabent->msgsize,                    "returned message too long" );        if ( NULL != msg_prio )            CYG_ASSERT( *msg_prio <= MQ_PRIO_MAX,                        "returned message has invalid priority" );        CYG_REPORT_RETVAL( msg_len );        return (ssize_t)msg_len;    default:        CYG_FAIL( "unhandled message queue return code" );        return (ssize_t)-1; // keep compiler happy    } // switch    } // mq_receive()//------------------------------------------------------------------------#ifdef CYGFUN_KERNEL_THREADS_TIMERexternC intmq_timedsend( mqd_t mqdes, const char *msg_ptr, size_t msg_len,              unsigned int msg_prio, const struct timespec *abs_timeout){    CYG_REPORT_FUNCTYPE( "returning %d" );    CYG_REPORT_FUNCARG6( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%u, "                         "abs_timeout = %lu, %ld",                         mqdes, msg_ptr, msg_len, msg_prio,                          abs_timeout->tv_sec, abs_timeout->tv_nsec);    CYG_CHECK_DATA_PTRC( msg_ptr );        struct mquser *user = (struct mquser *)mqdes;    struct mqtabent *tabent = user->tabent;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR    if ( user->magic != MQ_VALID_MAGIC ) {        errno  = EBADF;        CYG_REPORT_RETVAL( -1 );        return -1;    }#endif        if ( msg_len > (size_t)tabent->msgsize ) {        errno = EMSGSIZE;        CYG_REPORT_RETVAL( -1 );        return -1;    }    if ( msg_prio > MQ_PRIO_MAX ) {        errno = EINVAL;        CYG_REPORT_RETVAL( -1 );        return -1;    }    if ( (O_WRONLY != (user->flags & O_WRONLY)) &&          (O_RDWR != (user->flags & O_RDWR)) ) {        errno = EBADF;        CYG_REPORT_RETVAL( -1 );        return -1;    }    // go for it    Cyg_Mqueue::qerr_t err;    bool nonblocking = ((user->flags & O_NONBLOCK) == O_NONBLOCK);    bool badtimespec = (abs_timeout->tv_nsec < 0) ||        (abs_timeout->tv_nsec > 999999999l);    err = tabent->mq->put( msg_ptr, msg_len, msg_prio,                           !nonblocking && !badtimespec,                           cyg_timespec_to_ticks(abs_timeout));    switch (err) {    case Cyg_Mqueue::INTR:        errno = EINTR;        CYG_REPORT_RETVAL( -1 );        return -1;    case Cyg_Mqueue::WOULDBLOCK:        if (badtimespec) {            errno = EINVAL;            CYG_REPORT_RETVAL( -1 );            return -1;        }        CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK,                    "Message queue assumed non-blocking when blocking requested"            );        errno = EAGAIN;        CYG_REPORT_RETVAL( -1 );        return -1;    case Cyg_Mqueue::TIMEOUT:        errno = ETIMEDOUT;        CYG_REPORT_RETVAL( -1 );        return -1;            case Cyg_Mqueue::OK:        CYG_REPORT_RETVAL( 0 );        return 0;    default:        CYG_FAIL( "unhandled message queue return code" );        return -1; // keep compiler happy    } // switch} // mq_timedsend()//------------------------------------------------------------------------externC ssize_tmq_timedreceive( mqd_t mqdes, char *msg_ptr, size_t msg_len,            unsigned int *msg_prio, const struct timespec *abs_timeout){    CYG_REPORT_FUNCTYPE( "returning %ld" );    CYG_REPORT_FUNCARG6( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%08x, "	                 "abs_timeout = %lu, %ld",                         mqdes, msg_ptr, msg_len, msg_prio,                         abs_timeout->tv_sec, abs_timeout->tv_nsec );    CYG_CHECK_DATA_PTRC( msg_ptr );    CYG_CHECK_DATA_PTRC( msg_ptr+msg_len-1 );    if ( NULL != msg_prio )        CYG_CHECK_DATA_PTRC( msg_prio );            struct mquser *user = (struct mquser *)mqdes;    struct mqtabent *tabent = user->tabent;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR    if ( user->magic != MQ_VALID_MAGIC ) {        errno  = EBADF;        CYG_REPORT_RETVAL( -1 );        return (ssize_t)-1;    }#endif        if ( (O_RDONLY != (user->flags & O_RDONLY)) &&          (O_RDWR != (user->flags & O_RDWR)) ) {        errno = EBADF;        CYG_REPORT_RETVAL( -1 );        return (ssize_t)-1;    }    if ( msg_len < (size_t)tabent->msgsize ) {        errno = EMSGSIZE;        CYG_REPORT_RETVAL( -1 );        return (ssize_t)-1;    }    // go for it    Cyg_Mqueue::qerr_t err;    bool nonblocking = ((user->flags & O_NONBLOCK) == O_NONBLOCK);    bool badtimespec = (abs_timeout->tv_nsec < 0) ||        (abs_timeout->tv_nsec > 999999999l);    err = tabent->mq->get( msg_ptr, &msg_len, msg_prio,                           !nonblocking && !badtimespec,                           cyg_timespec_to_ticks(abs_timeout) );    switch (err) {    case Cyg_Mqueue::INTR:        errno = EINTR;        CYG_REPORT_RETVAL( -1 );        return (ssize_t)-1;    case Cyg_Mqueue::WOULDBLOCK:        if (badtimespec) {            errno = EINVAL;            CYG_REPORT_RETVAL( -1 );            return -1;        }        CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK,                    "Message queue assumed non-blocking when blocking requested"            );        errno = EAGAIN;        CYG_REPORT_RETVAL( -1 );        return (ssize_t)-1;    case Cyg_Mqueue::TIMEOUT:        errno = ETIMEDOUT;        CYG_REPORT_RETVAL( -1 );        return -1;            case Cyg_Mqueue::OK:        CYG_ASSERT( msg_len <= (size_t)tabent->msgsize,                    "returned message too long" );        if ( NULL != msg_prio )            CYG_ASSERT( *msg_prio <= MQ_PRIO_MAX,                        "returned message has invalid priority" );        CYG_REPORT_RETVAL( msg_len );        return (ssize_t)msg_len;    default:        CYG_FAIL( "unhandled message queue return code" );        return (ssize_t)-1; // keep compiler happy    } // switch    } // mq_timedreceive()//------------------------------------------------------------------------#endif#ifdef CYGFUN_POSIX_MQUEUE_NOTIFYexternC intmq_notify( mqd_t mqdes, const struct sigevent *notification ){    CYG_REPORT_FUNCTYPE( "returning %d" );    CYG_REPORT_FUNCARG2( "mqdes=%08x, notification=%08x", mqdes, notification );    if ( NULL != notification )        CYG_CHECK_DATA_PTRC( notification );        struct mquser *user = (struct mquser *)mqdes;    struct mqtabent *tabent = user->tabent;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR    if ( user->magic != MQ_VALID_MAGIC ) {        errno  = EBADF;        CYG_REPORT_RETVAL( -1 );        return -1;    }#endif        // lock scheduler since we test and set non-atomically    Cyg_Scheduler::lock();        // we are being told to clear the notification function    if ( NULL == notification ) {        tabent->mq->setnotify( NULL, 0 );        tabent->sigev = NULL;        Cyg_Scheduler::unlock();        CYG_REPORT_RETVAL( 0 );        return 0;    } // if        if ( NULL != tabent->sigev ) {  // already registered        Cyg_Scheduler::unlock();        errno = EBUSY;        CYG_REPORT_RETVAL( -1 );        return -1;    } // if    tabent->sigev = notification;    user->notifieruser = true; // Used for deciding about whether to                               // deregister in mq_close()    tabent->mq->setnotify( &notifyme, (CYG_ADDRWORD) user );    Cyg_Scheduler::unlock();        CYG_REPORT_RETVAL( 0 );    return 0;} // mq_notify()#endif // ifdef CYGFUN_POSIX_MQUEUE_NOTIFY//------------------------------------------------------------------------externC intmq_setattr( mqd_t mqdes, const struct mq_attr *mqstat,            struct mq_attr *omqstat ){    CYG_REPORT_FUNCTYPE( "returning %d" );    CYG_REPORT_FUNCARG3( "mqdes=%08x, mqstat=%08x, omqstat=%08x",                         mqdes, mqstat, omqstat );    CYG_CHECK_DATA_PTRC( mqstat );    struct mquser *user = (struct mquser *)mqdes;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR    if ( user->magic != MQ_VALID_MAGIC ) {        errno  = EBADF;        CYG_REPORT_RETVAL( -1 );        return -1;    }#endif        if ( NULL != omqstat ) {        CYG_CHECK_DATA_PTRC( omqstat );        mq_getattr( mqdes, omqstat );    } // if    // Two-stage update, so lock sched since it's quick    Cyg_Scheduler::lock();    user->flags &= ~O_NONBLOCK;  // clear    if ( (mqstat->mq_flags & O_NONBLOCK) == O_NONBLOCK ) {        user->flags |= O_NONBLOCK;    } // if    Cyg_Scheduler::unlock();    CYG_REPORT_RETVAL( 0 );    return 0;} // mq_setattr()//------------------------------------------------------------------------externC intmq_getattr( mqd_t mqdes, struct mq_attr *mqstat ){    CYG_REPORT_FUNCTYPE( "returning %d" );    CYG_REPORT_FUNCARG2( "mqdes=%08x, mqstat=%08x", mqdes, mqstat );    CYG_CHECK_DATA_PTRC( mqstat );    struct mquser *user = (struct mquser *)mqdes;    struct mqtabent *tabent = user->tabent;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR    if ( user->magic != MQ_VALID_MAGIC ) {        errno  = EBADF;        CYG_REPORT_RETVAL( -1 );        return -1;    }#endif        mqstat->mq_flags   = user->flags;    mqstat->mq_maxmsg  = tabent->maxmsg;    mqstat->mq_msgsize = tabent->msgsize;    mqstat->mq_curmsgs = tabent->mq->count();            CYG_REPORT_RETVAL( 0 );    return 0;} // mq_getattr()//------------------------------------------------------------------------#endif // ifdef CYGPKG_POSIX_MQUEUES/* EOF mqueue.cxx */

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?