mqueue.cxx
来自「ecos实时嵌入式操作系统」· CXX 代码 · 共 983 行 · 第 1/2 页
CXX
983 行
int retval, interr; cyg_ucount32 i; struct mqtabent *qtabent=NULL; interr = pthread_mutex_lock( &mqtab_mut ); // should never fail CYG_ASSERT( interr == 0, "internal lock failed!" ); // find the entry first // FIXME: Should check for length and return ENAMETOOLONG for ( i=0; i < CYGNUM_POSIX_MQUEUE_OPEN_MAX; i++ ) { if ( 0 == strncmp(name, mqtab[i].name, PATH_MAX) ) { qtabent = &mqtab[i]; break; } // if } // for if ( NULL == qtabent ) { // not found errno = ENOENT; retval = -1; goto exit_unlock; } if ( NULL != qtabent->users ) { // still in use qtabent->unlinkme = true; // so mark it as pending deletion } else { do_mq_unlink( qtabent ); } // else retval = 0; exit_unlock: interr = pthread_mutex_unlock( &mqtab_mut ); // should never fail CYG_ASSERT( interr == 0, "internal lock failed!" ); CYG_REPORT_RETVAL( retval ); return retval;} // mq_unlink()//------------------------------------------------------------------------externC intmq_send( mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio ){ CYG_REPORT_FUNCTYPE( "returning %d" ); CYG_REPORT_FUNCARG4( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%u", mqdes, msg_ptr, msg_len, msg_prio ); CYG_CHECK_DATA_PTRC( msg_ptr ); struct mquser *user = (struct mquser *)mqdes; struct mqtabent *tabent = user->tabent;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR if ( user->magic != MQ_VALID_MAGIC ) { errno = EBADF; CYG_REPORT_RETVAL( -1 ); return -1; }#endif if ( msg_len > (size_t)tabent->msgsize ) { errno = EMSGSIZE; CYG_REPORT_RETVAL( -1 ); return -1; } if ( msg_prio > MQ_PRIO_MAX ) { errno = EINVAL; CYG_REPORT_RETVAL( -1 ); return -1; } if ( (O_WRONLY != (user->flags & O_WRONLY)) && (O_RDWR != (user->flags & O_RDWR)) ) { errno = EBADF; CYG_REPORT_RETVAL( -1 ); return -1; } // go for it Cyg_Mqueue::qerr_t err; err = tabent->mq->put( msg_ptr, msg_len, msg_prio, ((user->flags & O_NONBLOCK) != O_NONBLOCK) ); switch (err) { case Cyg_Mqueue::INTR: errno = EINTR; CYG_REPORT_RETVAL( -1 ); return -1; case Cyg_Mqueue::WOULDBLOCK: CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK, "Message queue assumed non-blocking when blocking requested" ); errno = EAGAIN; CYG_REPORT_RETVAL( -1 ); return -1; case Cyg_Mqueue::OK: CYG_REPORT_RETVAL( 0 ); return 0; default: CYG_FAIL( "unhandled message queue return code" ); return -1; // keep compiler happy } // switch} // mq_send()//------------------------------------------------------------------------externC ssize_tmq_receive( mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio ){ CYG_REPORT_FUNCTYPE( "returning %ld" ); CYG_REPORT_FUNCARG4( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%08x", mqdes, msg_ptr, msg_len, msg_prio ); CYG_CHECK_DATA_PTRC( msg_ptr ); CYG_CHECK_DATA_PTRC( msg_ptr+msg_len-1 ); if ( NULL != msg_prio ) CYG_CHECK_DATA_PTRC( msg_prio ); struct mquser *user = (struct mquser *)mqdes; struct mqtabent *tabent = user->tabent;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR if ( user->magic != MQ_VALID_MAGIC ) { errno = EBADF; CYG_REPORT_RETVAL( -1 ); return (ssize_t)-1; }#endif if ( (O_RDONLY != (user->flags & O_RDONLY)) && (O_RDWR != (user->flags & O_RDWR)) ) { errno = EBADF; CYG_REPORT_RETVAL( -1 ); return (ssize_t)-1; } if ( msg_len < (size_t)tabent->msgsize ) { errno = EMSGSIZE; CYG_REPORT_RETVAL( -1 ); return (ssize_t)-1; } // go for it Cyg_Mqueue::qerr_t err; err = tabent->mq->get( msg_ptr, &msg_len, msg_prio, ((user->flags & O_NONBLOCK) != O_NONBLOCK) ); switch (err) { case Cyg_Mqueue::INTR: errno = EINTR; CYG_REPORT_RETVAL( -1 ); return (ssize_t)-1; case Cyg_Mqueue::WOULDBLOCK: CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK, "Message queue assumed non-blocking when blocking requested" ); errno = EAGAIN; CYG_REPORT_RETVAL( -1 ); return (ssize_t)-1; case Cyg_Mqueue::OK: CYG_ASSERT( msg_len <= (size_t)tabent->msgsize, "returned message too long" ); if ( NULL != msg_prio ) CYG_ASSERT( *msg_prio <= MQ_PRIO_MAX, "returned message has invalid priority" ); CYG_REPORT_RETVAL( msg_len ); return (ssize_t)msg_len; default: CYG_FAIL( "unhandled message queue return code" ); return (ssize_t)-1; // keep compiler happy } // switch } // mq_receive()//------------------------------------------------------------------------#ifdef CYGFUN_KERNEL_THREADS_TIMERexternC intmq_timedsend( mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout){ CYG_REPORT_FUNCTYPE( "returning %d" ); CYG_REPORT_FUNCARG6( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%u, " "abs_timeout = %lu, %ld", mqdes, msg_ptr, msg_len, msg_prio, abs_timeout->tv_sec, abs_timeout->tv_nsec); CYG_CHECK_DATA_PTRC( msg_ptr ); struct mquser *user = (struct mquser *)mqdes; struct mqtabent *tabent = user->tabent;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR if ( user->magic != MQ_VALID_MAGIC ) { errno = EBADF; CYG_REPORT_RETVAL( -1 ); return -1; }#endif if ( msg_len > (size_t)tabent->msgsize ) { errno = EMSGSIZE; CYG_REPORT_RETVAL( -1 ); return -1; } if ( msg_prio > MQ_PRIO_MAX ) { errno = EINVAL; CYG_REPORT_RETVAL( -1 ); return -1; } if ( (O_WRONLY != (user->flags & O_WRONLY)) && (O_RDWR != (user->flags & O_RDWR)) ) { errno = EBADF; CYG_REPORT_RETVAL( -1 ); return -1; } // go for it Cyg_Mqueue::qerr_t err; bool nonblocking = ((user->flags & O_NONBLOCK) == O_NONBLOCK); bool badtimespec = (abs_timeout->tv_nsec < 0) || (abs_timeout->tv_nsec > 999999999l); err = tabent->mq->put( msg_ptr, msg_len, msg_prio, !nonblocking && !badtimespec, cyg_timespec_to_ticks(abs_timeout)); switch (err) { case Cyg_Mqueue::INTR: errno = EINTR; CYG_REPORT_RETVAL( -1 ); return -1; case Cyg_Mqueue::WOULDBLOCK: if (badtimespec) { errno = EINVAL; CYG_REPORT_RETVAL( -1 ); return -1; } CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK, "Message queue assumed non-blocking when blocking requested" ); errno = EAGAIN; CYG_REPORT_RETVAL( -1 ); return -1; case Cyg_Mqueue::TIMEOUT: errno = ETIMEDOUT; CYG_REPORT_RETVAL( -1 ); return -1; case Cyg_Mqueue::OK: CYG_REPORT_RETVAL( 0 ); return 0; default: CYG_FAIL( "unhandled message queue return code" ); return -1; // keep compiler happy } // switch} // mq_timedsend()//------------------------------------------------------------------------externC ssize_tmq_timedreceive( mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout){ CYG_REPORT_FUNCTYPE( "returning %ld" ); CYG_REPORT_FUNCARG6( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%08x, " "abs_timeout = %lu, %ld", mqdes, msg_ptr, msg_len, msg_prio, abs_timeout->tv_sec, abs_timeout->tv_nsec ); CYG_CHECK_DATA_PTRC( msg_ptr ); CYG_CHECK_DATA_PTRC( msg_ptr+msg_len-1 ); if ( NULL != msg_prio ) CYG_CHECK_DATA_PTRC( msg_prio ); struct mquser *user = (struct mquser *)mqdes; struct mqtabent *tabent = user->tabent;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR if ( user->magic != MQ_VALID_MAGIC ) { errno = EBADF; CYG_REPORT_RETVAL( -1 ); return (ssize_t)-1; }#endif if ( (O_RDONLY != (user->flags & O_RDONLY)) && (O_RDWR != (user->flags & O_RDWR)) ) { errno = EBADF; CYG_REPORT_RETVAL( -1 ); return (ssize_t)-1; } if ( msg_len < (size_t)tabent->msgsize ) { errno = EMSGSIZE; CYG_REPORT_RETVAL( -1 ); return (ssize_t)-1; } // go for it Cyg_Mqueue::qerr_t err; bool nonblocking = ((user->flags & O_NONBLOCK) == O_NONBLOCK); bool badtimespec = (abs_timeout->tv_nsec < 0) || (abs_timeout->tv_nsec > 999999999l); err = tabent->mq->get( msg_ptr, &msg_len, msg_prio, !nonblocking && !badtimespec, cyg_timespec_to_ticks(abs_timeout) ); switch (err) { case Cyg_Mqueue::INTR: errno = EINTR; CYG_REPORT_RETVAL( -1 ); return (ssize_t)-1; case Cyg_Mqueue::WOULDBLOCK: if (badtimespec) { errno = EINVAL; CYG_REPORT_RETVAL( -1 ); return -1; } CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK, "Message queue assumed non-blocking when blocking requested" ); errno = EAGAIN; CYG_REPORT_RETVAL( -1 ); return (ssize_t)-1; case Cyg_Mqueue::TIMEOUT: errno = ETIMEDOUT; CYG_REPORT_RETVAL( -1 ); return -1; case Cyg_Mqueue::OK: CYG_ASSERT( msg_len <= (size_t)tabent->msgsize, "returned message too long" ); if ( NULL != msg_prio ) CYG_ASSERT( *msg_prio <= MQ_PRIO_MAX, "returned message has invalid priority" ); CYG_REPORT_RETVAL( msg_len ); return (ssize_t)msg_len; default: CYG_FAIL( "unhandled message queue return code" ); return (ssize_t)-1; // keep compiler happy } // switch } // mq_timedreceive()//------------------------------------------------------------------------#endif#ifdef CYGFUN_POSIX_MQUEUE_NOTIFYexternC intmq_notify( mqd_t mqdes, const struct sigevent *notification ){ CYG_REPORT_FUNCTYPE( "returning %d" ); CYG_REPORT_FUNCARG2( "mqdes=%08x, notification=%08x", mqdes, notification ); if ( NULL != notification ) CYG_CHECK_DATA_PTRC( notification ); struct mquser *user = (struct mquser *)mqdes; struct mqtabent *tabent = user->tabent;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR if ( user->magic != MQ_VALID_MAGIC ) { errno = EBADF; CYG_REPORT_RETVAL( -1 ); return -1; }#endif // lock scheduler since we test and set non-atomically Cyg_Scheduler::lock(); // we are being told to clear the notification function if ( NULL == notification ) { tabent->mq->setnotify( NULL, 0 ); tabent->sigev = NULL; Cyg_Scheduler::unlock(); CYG_REPORT_RETVAL( 0 ); return 0; } // if if ( NULL != tabent->sigev ) { // already registered Cyg_Scheduler::unlock(); errno = EBUSY; CYG_REPORT_RETVAL( -1 ); return -1; } // if tabent->sigev = notification; user->notifieruser = true; // Used for deciding about whether to // deregister in mq_close() tabent->mq->setnotify( ¬ifyme, (CYG_ADDRWORD) user ); Cyg_Scheduler::unlock(); CYG_REPORT_RETVAL( 0 ); return 0;} // mq_notify()#endif // ifdef CYGFUN_POSIX_MQUEUE_NOTIFY//------------------------------------------------------------------------externC intmq_setattr( mqd_t mqdes, const struct mq_attr *mqstat, struct mq_attr *omqstat ){ CYG_REPORT_FUNCTYPE( "returning %d" ); CYG_REPORT_FUNCARG3( "mqdes=%08x, mqstat=%08x, omqstat=%08x", mqdes, mqstat, omqstat ); CYG_CHECK_DATA_PTRC( mqstat ); struct mquser *user = (struct mquser *)mqdes;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR if ( user->magic != MQ_VALID_MAGIC ) { errno = EBADF; CYG_REPORT_RETVAL( -1 ); return -1; }#endif if ( NULL != omqstat ) { CYG_CHECK_DATA_PTRC( omqstat ); mq_getattr( mqdes, omqstat ); } // if // Two-stage update, so lock sched since it's quick Cyg_Scheduler::lock(); user->flags &= ~O_NONBLOCK; // clear if ( (mqstat->mq_flags & O_NONBLOCK) == O_NONBLOCK ) { user->flags |= O_NONBLOCK; } // if Cyg_Scheduler::unlock(); CYG_REPORT_RETVAL( 0 ); return 0;} // mq_setattr()//------------------------------------------------------------------------externC intmq_getattr( mqd_t mqdes, struct mq_attr *mqstat ){ CYG_REPORT_FUNCTYPE( "returning %d" ); CYG_REPORT_FUNCARG2( "mqdes=%08x, mqstat=%08x", mqdes, mqstat ); CYG_CHECK_DATA_PTRC( mqstat ); struct mquser *user = (struct mquser *)mqdes; struct mqtabent *tabent = user->tabent;#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR if ( user->magic != MQ_VALID_MAGIC ) { errno = EBADF; CYG_REPORT_RETVAL( -1 ); return -1; }#endif mqstat->mq_flags = user->flags; mqstat->mq_maxmsg = tabent->maxmsg; mqstat->mq_msgsize = tabent->msgsize; mqstat->mq_curmsgs = tabent->mq->count(); CYG_REPORT_RETVAL( 0 ); return 0;} // mq_getattr()//------------------------------------------------------------------------#endif // ifdef CYGPKG_POSIX_MQUEUES/* EOF mqueue.cxx */
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?