⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 msgqdistlib.c

📁 VXWORKS源代码
💻 C
📖 第 1 页 / 共 5 页
字号:
* Could not establish communications with the remote node.* \i S_msgQDistLib_INVALID_PRIORITY* The argument <priority> is invalid.* \i S_msgQDistLib_INVALID_TIMEOUT* The argument <overallTimeout> is NO_WAIT .* \i S_msgQDistLib_RMT_MEMORY_SHORTAGE* There is not enough memory on the remote node.* \i S_objLib_OBJ_UNAVAILABLE* The argument <msgQTimeout> is set to NO_WAIT, and the queue is full.* \i S_objLib_OBJ_TIMEOUT* The queue is full for <msgQTimeout> ticks.* \i S_msgQLib_INVALID_MSG_LENGTH* The argument <nBytes> is larger than the <maxMsgLength> set for the * message queue.* \i S_msgQDistLib_OVERALL_TIMEOUT* There was no response from the remote side in <overallTimeout> ticks.* \ie** SEE ALSO: msgQLib*/STATUS msgQDistSend    (    MSG_Q_ID    msgQId,         /* message queue on which to send */    char *      buffer,         /* message to send                */    UINT        nBytes,         /* length of message              */    int         msgQTimeout,    /* ticks to wait at message queue */    int         overallTimeout, /* ticks to wait overall          */    int         priority        /* priority                       */    )    {    DIST_MSG_Q_ID     dMsgQId;    DIST_OBJ_NODE *   pObjNode;    MSG_Q_ID          lclMsgQId;    int               lclPriority;    /* MSG_PRI_URGENT or MSG_PRI_NORMAL */    DIST_MSG_Q_GRP_ID    distMsgQGrpId;    STATUS               status;    DIST_PKT_MSG_Q_SEND    pktSendHdr;    DIST_MSG_Q_SEND_INQ    inquiryNode;    DIST_INQ_ID            inquiryId;    DIST_IOVEC             distIOVec[2];    if (DIST_OBJ_VERIFY (msgQId) == ERROR)        {        errnoSet (S_distLib_OBJ_ID_ERROR);        return (ERROR);        }    if (overallTimeout == NO_WAIT)        {        errnoSet (S_msgQDistLib_INVALID_TIMEOUT);        return (ERROR);    /* makes no sense */        }    if (! DIST_MSG_Q_PRIO_VERIFY (priority))        {        errnoSet (S_msgQDistLib_INVALID_PRIORITY);        return (ERROR);    /* invalid priority */        }    pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (msgQId);    if (! IS_DIST_MSG_Q_OBJ (pObjNode))        {        errnoSet (S_distLib_OBJ_ID_ERROR);        return (ERROR);    /* legal object id, but not a message queue */        }    dMsgQId = (DIST_MSG_Q_ID) pObjNode->objNodeId;#ifdef MSG_Q_DIST_REPORT    printf ("msgQDistSend: msgQId %p, dMsgQId 0x%lx\n", msgQId, dMsgQId);#endif    if (IS_DIST_MSG_Q_TYPE_GRP (dMsgQId))        {        /* Message queue id points to a group. */        distMsgQGrpId = DIST_MSG_Q_ID_TO_DIST_MSG_Q_GRP_ID (dMsgQId);        status = msgQDistGrpSend (distMsgQGrpId, buffer, nBytes,                                  msgQTimeout, overallTimeout, priority);#ifdef MSG_Q_DIST_REPORT        printf ("msgQDistSend: msgQDistGrpSend returned = %d\n", status);#endif        return (status);        }    if (!IS_DIST_OBJ_LOCAL (pObjNode))        {        /* Message queue id points to a remote queue. */        inquiryNode.sendInq.inqType = DIST_MSG_Q_INQ_TYPE_SEND;        semBInit (&(inquiryNode.sendInqWait), SEM_Q_FIFO, SEM_EMPTY);        inquiryNode.remoteError = FALSE;        inquiryNode.sendInqMsgQueued = FALSE;        inquiryNode.sendInqTask = taskIdSelf();        inquiryId = distInqRegister ((DIST_INQ *) &inquiryNode);        pktSendHdr.sendHdr.pktType = DIST_PKT_TYPE_MSG_Q;        pktSendHdr.sendHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_SEND;        pktSendHdr.sendTblIx = DIST_MSG_Q_ID_TO_TBL_IX (dMsgQId);        pktSendHdr.sendInqId = (uint32_t) inquiryId;        pktSendHdr.sendTimeout =                htonl ((uint32_t) DIST_TICKS_TO_MSEC (msgQTimeout));        /* use IOV stuff here, since we do not want to copy data */        distIOVec[0].pIOBuffer = &pktSendHdr;        distIOVec[0].IOLen = DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_SEND);        distIOVec[1].pIOBuffer = buffer;        distIOVec[1].IOLen = nBytes;        status = distNetIOVSend (pObjNode->objNodeReside, &distIOVec[0], 2,                                 WAIT_FOREVER,                                 DIST_MSG_Q_PRIO_TO_NET_PRIO (priority));        if (status == ERROR)            {            distInqCancel ((DIST_INQ *) &inquiryNode);            errnoSet (S_distLib_UNREACHABLE);            return (ERROR);            }        /*         * semTake() blocks the requesting task until the service         * task gives the semaphore, because the request has         * been processed.         */        semTake (&(inquiryNode.sendInqWait), overallTimeout);        distInqCancel ((DIST_INQ *) &inquiryNode);        if (inquiryNode.sendInqMsgQueued)            return (OK);        /* If errno = S_objLib_OBJ_TIMEOUT, it could either be a result         * of the timeout from the semaphore or the remote errno.  We must         * check the remoteError flag of inquiryNode to determine what the         * source of the error was.  If it is a result of the semaphore, we         * will set errno to S_msgQDistLib_OVERALLTIMEOUT.  Otherwise, we'll         * leave the errno as it is.         */        if (inquiryNode.remoteError == FALSE)            errno = S_msgQDistLib_OVERALL_TIMEOUT;        return (ERROR);        }    /* Message queue id points to a local queue. */    lclPriority = DIST_MSG_Q_PRIO_TO_MSG_Q_PRIO (priority);    lclMsgQId = msgQDistTblGet (DIST_MSG_Q_ID_TO_TBL_IX (dMsgQId));    if (lclMsgQId == NULL)        return (ERROR);    /* does not exist */    if (msgQSend (lclMsgQId, buffer, nBytes, msgQTimeout, lclPriority)          == ERROR)        {        return (ERROR);    /* error in msgQSend() */        }    return (OK);    }/***************************************************************************** msgQDistReceive - receive a message from a distributed message queue (VxFusion option)** This routine receives a message from the distributed message queue specified * by <msgQId>.  The received message is copied into the specified buffer, * <buffer>, which is <maxNBytes> in length.  If the message is longer than * <maxNBytes>, the remainder of the message is discarded (no error indication* is returned).** The argument <msgQTimeout> specifies the time in ticks to wait for the * queuing of the message. The argument <overallTimeout> specifies the time* in ticks to wait for both the sending and queuing of the message.* While it is an error to set <overallTimeout> to NO_WAIT (0), * WAIT_FOREVER (-1) is allowed for both <msgQTimeout> and <overallTimeout>.** Calling msgQDistReceive() on a distributed message group returns an* error.** NOTE: When msgQDistReceive() is called through msgQReceive(), * <msgQTimeout> is set to <timeout> and <overallTimeout> to WAIT_FOREVER .** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: The number of bytes copied to <buffer>, or ERROR. ** ERRNO:* \is* \i S_distLib_OBJ_ID_ERROR* The argument <msgQId> is invalid.* \i S_distLib_UNREACHABLE* Could not establish communications with the remote node.* \i S_msgQLib_INVALID_MSG_LENGTH* The argument <maxNBytes> is less than 0.* \i S_msgQDistLib_INVALID_TIMEOUT* The argument <overallTimeout> is NO_WAIT .* \i S_msgQDistLib_RMT_MEMORY_SHORTAGE* There is not enough memory on the remote node.* \i S_objLib_OBJ_UNAVAILABLE* The argument <msgQTimeout> is set to NO_WAIT, and no messages are available.* \i S_objLib_OBJ_TIMEOUT* No messages were received in <msgQTimeout> ticks.* \i S_msgQDistLib_OVERALL_TIMEOUT* There was no response from the remote side in <overallTimeout> ticks.* \ie*** SEE ALSO: msgQLib*/int msgQDistReceive    (    MSG_Q_ID    msgQId,         /* message queue from which to receive */    char *      buffer,         /* buffer to receive message           */    UINT        maxNBytes,      /* length of buffer                    */    int         msgQTimeout,    /* ticks to wait at the message queue  */    int         overallTimeout  /* ticks to wait overall               */    )    {    DIST_MSG_Q_ID    dMsgQId;    DIST_OBJ_NODE *  pObjNode;    MSG_Q_ID         lclMsgQId;    DIST_PKT_MSG_Q_RECV_REQ    pktReq;    DIST_MSG_Q_RECV_INQ        inquiryNode;    DIST_INQ_ID                inquiryId;    STATUS                     status;    int                        nBytes;    if (DIST_OBJ_VERIFY (msgQId) == ERROR)        {        errnoSet (S_distLib_OBJ_ID_ERROR);        return (ERROR);        }    if (overallTimeout == NO_WAIT)        {        errnoSet (S_msgQDistLib_INVALID_TIMEOUT);        return (ERROR);    /* makes no sense */        }    /*     * Even though <maxNBytes> is unsigned, check for < 0 to catch     * possible caller errors.     */    if ((int) maxNBytes < 0)        {        errnoSet (S_msgQLib_INVALID_MSG_LENGTH);        return (ERROR);        }    pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (msgQId);    if (! IS_DIST_MSG_Q_OBJ (pObjNode))        {        errnoSet (S_distLib_OBJ_ID_ERROR);        return (ERROR); /* legal object id, but not a message queue */        }    dMsgQId = (DIST_MSG_Q_ID) pObjNode->objNodeId;#ifdef MSG_Q_DIST_REPORT    printf ("msgQDistReceive: msgQId %p, dMsgQId 0x%lx\n", msgQId, dMsgQId);#endif    if (IS_DIST_MSG_Q_TYPE_GRP (dMsgQId))        {        /* MSG_Q_ID is a group id. */        errnoSet (S_msgQDistLib_NOT_GROUP_CALLABLE);        return (ERROR);    /* error to call msgQReceive() on groups */        }    if (!IS_DIST_OBJ_LOCAL (pObjNode))        {        /*         * Queue is remote.         *         * Create a inquiry node and send a request to the remote         * node. Block until timeout exceeds or the request is         * answered.         */        inquiryNode.recvInq.inqType = DIST_MSG_Q_INQ_TYPE_RECV;        semBInit (&(inquiryNode.recvInqWait), SEM_Q_FIFO, SEM_EMPTY);        inquiryNode.recvInqTask = taskIdSelf();        inquiryNode.pRecvInqBuffer = buffer;        inquiryNode.recvInqMaxNBytes = maxNBytes;        inquiryNode.recvInqMsgArrived = FALSE;        inquiryNode.remoteError = FALSE;        inquiryId = distInqRegister ((DIST_INQ *) &inquiryNode);        pktReq.recvReqHdr.pktType = DIST_PKT_TYPE_MSG_Q;        pktReq.recvReqHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_RECV_REQ;        pktReq.recvReqTblIx = DIST_MSG_Q_ID_TO_TBL_IX (dMsgQId);        pktReq.recvReqInqId = (uint32_t) inquiryId;        pktReq.recvReqMaxNBytes = htonl ((uint32_t) maxNBytes);        pktReq.recvReqTimeout =                 htonl ((uint32_t) DIST_TICKS_TO_MSEC (msgQTimeout));        status = distNetSend (pObjNode->objNodeReside, (DIST_PKT *) &pktReq,                              DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_REQ),                              WAIT_FOREVER,                              DIST_MSG_Q_RECV_PRIO);        if (status == ERROR)            {            distInqCancel ((DIST_INQ *) &inquiryNode);            errnoSet (S_distLib_UNREACHABLE);            return (ERROR);            }        /*         * semTake() blocks the requesting task until         * the service task gives the semaphore, because         * the request has been processed.         */        semTake (&(inquiryNode.recvInqWait), overallTimeout);        if (inquiryNode.recvInqMsgArrived)            {            /*             * If <recvInqMsgArrived> is true, <recvInqMaxNBytes> has             * the number of bytes received.             */            nBytes = inquiryNode.recvInqMaxNBytes;            distInqCancel ((DIST_INQ *) &inquiryNode);            return (nBytes);            }        distInqCancel ((DIST_INQ *) &inquiryNode);        /* If errno = S_objLib_OBJ_TIMEOUT, it could either be a result         * of the timeout from the semaphore or the remote errno.  We must         * check the remoteError flag of inquiryNode to determine what the         * source of the error was.  If it is a result of the semaphore, we         * will set errno to S_msgQDistLib_OVERALLTIMEOUT.  Otherwise, we'll         * leave the errno as it is.         */        if (inquiryNode.remoteError == FALSE)            errnoSet (S_msgQDistLib_OVERALL_TIMEOUT);        return (ERROR);        }    /* The message queue is local to this node. This will be simple. */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -