⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 msgqdistgrplib.c

📁 VXWORKS源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
                 */                if (! pDistGrpDbNode->pGrpDbGapNode)                    {                    /* GAP node is not initialized by now */                    distGapNodeInit (&dGapNode, pDistGrpDbNode, FALSE);                    pDistGrpDbNode->pGrpDbGapNode = &dGapNode;                    }                 msgQDistGrpDbUnlock();                     semTake (&dGapNode.gapWaitFor, WAIT_FOREVER);                 msgQDistGrpDbLock();            }        if ((pDGapNodeTemp = pDistGrpDbNode->pGrpDbGapNode) != NULL)            {            pDistGrpDbNode->pGrpDbGapNode = NULL;            distGapNodeDelete (pDGapNodeTemp);            }        msgQDistGrpDbUnlock();        /*         * Group exists and has a global state (at least now).         * Add member to group.         */#ifdef MSG_Q_DIST_GRP_REPORT        printf ("msgQDistGrpAdd: add member\n");#endif        msgQDistGrpLclAddMember (pDistGrpDbNode, msgQId);        }    return (pDistGrpDbNode->grpDbMsgQId);   /* retrun MSG_Q_ID */    }/***************************************************************************** msgQDistGrpDelete - delete a distributed message queue from a group (VxFusion option)** This routine deletes a distributed message queue from a group. ** NOTE: For this release, it is not possible to remove a distributed message* queue from a group.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: ERROR, always.** ERRNO: S_distLib_NO_OBJECT_DESTROY** INTERNAL NOTE: Takes <distGrpDbSemaphore>.*/STATUS msgQDistGrpDelete    (    char *   distGrpName,   /* group containing the queue to be deleted */    MSG_Q_ID msgQId         /* ID of the message queue to delete */    )    {#if 0    DIST_MSG_Q_ID  distMsgQId;    DIST_OBJ_ID    objId;#else        UNUSED_ARG(distGrpName);        UNUSED_ARG(msgQId);            errnoSet (S_distLib_NO_OBJECT_DESTROY);    return (ERROR);    /* BY NOW */#endif    /* the rest of this function is not compiled */#if 0    if (ID_IS_DISTRIBUTED (msgQId))        {        objId = ((DIST_OBJ_NODE *) msgQId)->objNodeId;        distMsgQId = DIST_OBJ_ID_TO_DIST_MSG_Q_ID (objId);        }    if (! (ID_IS_DISTRIBUTED (msgQId)) || IS_DIST_MSG_Q_LOCAL (distMsgQId))        {        /* msgQ is local */        SL_NODE                *pNode;        DIST_GRP_DB_NODE    *pDistGrpDbNode;        DIST_GRP_MSG_Q_NODE *pDistGrpMsgQNode;        DIST_GRP_MSG_Q_NODE    *pPrevNode = NULL;        msgQDistGrpDbLock();        pDistGrpDbNode = msgQDistGrpLclFindByName (distGrpName);        msgQDistGrpDbUnlock();        if (pDistGrpDbNode == NULL)            return (ERROR);        for (pNode = SLL_FIRST ((SL_LIST *) &pDistGrpDbNode->grpDbMsgQIdLst);             pNode != NULL;             pNode = SLL_NEXT (pNode))            {            pDistGrpMsgQNode = (DIST_GRP_MSG_Q_NODE *) pNode;            if (pDistGrpMsgQNode->msgQId == msgQId)                {                sllRemove ((SL_LIST *) &pDistGrpDbNode->grpDbMsgQIdLst,                        (SL_NODE *) pDistGrpMsgQNode, (SL_NODE *) pPrevNode);                break;                }                pPrevNode = pDistGrpMsgQNode;            }        }    else        {        /* msgQ is remote */        return (ERROR);        }    return (OK);#endif    }/***************************************************************************** msgQDistGrpAgree - agree on a group identifier (VxFusion option)** This routine determines the DIST_MSG_Q_GRP_ID associated with a* DIST_GRP_DB_NODE .** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: A DIST_MSG_Q_GRP_ID .** NOMANUAL*/DIST_MSG_Q_GRP_ID msgQDistGrpAgree    (    DIST_GRP_DB_NODE *pDistGrpDbNode  /* ptr to database node */    )    {    DIST_MSG_Q_GRP_ID    grpId;    /* Start GAP. */    grpId = distGapStart (pDistGrpDbNode);    return (grpId);    }/***************************************************************************** msgQDistGrpLclSend - send to all group members registrated locally (VxFusion option)** This routine sends a message to locally registered members of a group.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successful; ERROR, if not.** NOMANUAL*/LOCAL STATUS msgQDistGrpLclSend    (    DIST_MSG_Q_GRP_SEND_INQ * pInq,          /* pointer to inquiry node */    DIST_MSG_Q_GRP_ID         distMsgQGrpId, /* group on which to send  */    char *                    buffer,        /* message to send         */    UINT                      nBytes,        /* length of message       */    int                       timeout,       /* ticks to wait           */    int                       priority       /* priority                */    )    {    DIST_GRP_DB_NODE * pDistGrpDbNode;    SL_NODE * pNode;    DIST_GRP_MSG_Q_NODE * pDistGrpMsgQNode;    STATUS                status;    int                   tid;    msgQDistGrpDbLock();    pDistGrpDbNode = msgQDistGrpLclFindById (distMsgQGrpId);    msgQDistGrpDbUnlock();    if (pDistGrpDbNode == NULL)        {#ifdef MSG_Q_DIST_GRP_REPORT        printf ("msgQDistGrpLclSend: group 0x%lx is unknown\n",                (u_long) distMsgQGrpId);#endif        pInq->sendInqStatus = MSG_Q_DIST_GRP_STATUS_ILLEGAL_OBJ_ID;        return (ERROR);                    /* illegal object id */        }    /*     * For all members of this group; send'em the message.     * Note: a member does not need to be a local message queue.     */    for (pNode = SLL_FIRST ((SL_LIST *) &pDistGrpDbNode->grpDbMsgQIdLst);            pNode != NULL;            pNode = SLL_NEXT (pNode))        {        pDistGrpMsgQNode = (DIST_GRP_MSG_Q_NODE *) pNode;#ifdef MSG_Q_DIST_GRP_REPORT        printf ("msgQDistGrpLclSend: send message to msgQId %p\n",                pDistGrpMsgQNode->msgQId);#endif        /*         * First try to send with NO_WAIT. If this fails due to a full         * message queue and timeout is not NO_WAIT, spawn a task         * and call msgQSend with the user specified timeout.         */        status = msgQDistSend (pDistGrpMsgQNode->msgQId, buffer, nBytes,                               NO_WAIT, WAIT_FOREVER, priority);        if (status == ERROR)            {#ifdef MSG_Q_DIST_GRP_REPORT            printf ("msgQDistGrpLclSend: msgQDistSend() returned error\n");#endif            if (timeout != NO_WAIT && errno == S_objLib_OBJ_UNAVAILABLE)                {                msgQGrpSendInqLock(pInq);    /* lock inquiry node */                /* timeout is not NO_WAIT and msgQ is full */                pInq->sendInqNumBlocked++;                msgQGrpSendInqUnlock(pInq);    /* unlock inquiry node */                    /* spawn a task that can wait */                tid = taskSpawn (NULL,                                 DIST_MSG_Q_GRP_WAIT_TASK_PRIO,                                 0,                                 DIST_MSG_Q_GRP_WAIT_TASK_STACK_SZ,                                 (FUNCPTR) msgQDistGrpLclSendCanWait,                                 (int) distInqGetId ((DIST_INQ *) pInq),                                 (int) pDistGrpMsgQNode->msgQId,                                 (int) buffer,                                 (int) nBytes,                                 (int) timeout,                                 (int) priority,                                 0, 0, 0, 0);                if (tid == ERROR)                    pInq->sendInqStatus = MSG_Q_DIST_GRP_STATUS_ERROR;                }            else                {                pInq->sendInqStatus = MSG_Q_DIST_GRP_STATUS_ERROR;                }            }        }    if (pInq->sendInqNumBlocked == 0)        {        /* If we leave nobody blocked, decrease the outstanding counter. */        msgQGrpSendInqLock(pInq);    /* lock inquiry node */        if (--pInq->sendInqNumOutstanding == 0)            {            msgQGrpSendInqUnlock(pInq);    /* unlock inquiry node */            semGive (&pInq->sendInqWait);            }        else            msgQGrpSendInqUnlock(pInq);    /* unlock inquiry node */        }    return (OK);    }/***************************************************************************** msgQDistGrpLclSendCanWait - send buffer to local members of a group (VxFusion option)** This routine sends a message to local group members.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: N/A** NOMANUAL*/LOCAL void msgQDistGrpLclSendCanWait    (    DIST_INQ_ID    inqId,         /* inquiry id             */    MSG_Q_ID       msgQId,        /* queue on which to send */    char *         buffer,        /* message to send        */    UINT           nBytes,        /* length of message      */    int            timeout,       /* ticks to wait          */    int            priority       /* priority               */    )    {    DIST_MSG_Q_GRP_SEND_INQ * pInq;    STATUS                    status;    status = msgQDistSend (msgQId, buffer, nBytes, timeout,            WAIT_FOREVER, priority);    if (status == OK)        {        if ((pInq = (DIST_MSG_Q_GRP_SEND_INQ *) distInqFind (inqId)) == NULL)            return;        msgQGrpSendInqLock(pInq);            /* lock inquiry node */        if (--pInq->sendInqNumBlocked == 0)            if (--pInq->sendInqNumOutstanding == 0)                {                msgQGrpSendInqUnlock(pInq);    /* unlock inquiry node */                semGive (&pInq->sendInqWait);                return;                }        msgQGrpSendInqUnlock(pInq);        /* unlock inquiry node */        }    }/***************************************************************************** msgQDistGrpSend - send buffer to local and remote members of a group (VxFusion option)** This routine sends a message to local and remote members of a group.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successful; ERROR, is unsuccessful.** NOMANUAL*/STATUS msgQDistGrpSend    (    DIST_MSG_Q_GRP_ID   distMsgQGrpId,  /* group on which to send          */    char *              buffer,         /* message to send                 */    UINT                nBytes,         /* length of message               */    int                 msgQTimeout,    /* ticks to wait at message queues */    int                 overallTimeout, /* ticks to wait overall           */    int                 priority        /* priority                        */    )    {    DIST_PKT_MSG_Q_GRP_SEND    pktSend;    DIST_MSG_Q_GRP_SEND_INQ    inquiryNode;    DIST_INQ_ID                inquiryId;    DIST_IOVEC                 distIOVec[2];    STATUS                     status;    inquiryNode.sendInq.inqType = DIST_MSG_Q_GRP_INQ_TYPE_SEND;    msgQGrpSendInqLockInit (&inquiryNode);    semBInit (&(inquiryNode.sendInqWait), SEM_Q_FIFO, SEM_EMPTY);    inquiryNode.sendInqTask = taskIdSelf();    inquiryNode.sendInqNumBlocked = 0;    inquiryNode.sendInqNumOutstanding =            distNodeGetNumNodes(DIST_NODE_NUM_NODES_ALIVE);    inquiryNode.sendInqStatus = MSG_Q_DIST_GRP_STATUS_OK;    inquiryId = distInqRegister ((DIST_INQ *) &inquiryNode);    status = msgQDistGrpLclSend (&inquiryNode, distMsgQGrpId, buffer, nBytes,                                 msgQTimeout, priority);    if (status == ERROR)        {#ifdef MSG_Q_DIST_GRP_REPORT        printf ("msgQDistGrpSend: group is unknown\n");#endif        distInqCancel ((DIST_INQ *) &inquiryNode);        return (ERROR);        }#ifdef MSG_Q_DIST_GRP_REPORT    if (inquiryNode.sendInqNumBlocked)        printf ("msgQDistGrpSend: %d local agents blocked on queues\n",                inquiryNode.sendInqNumBlocked);#endif    /*     * Broadcast message to the net. Every message addressed to a group     * is broadcasted to the net.     */    pktSend.pktMsgQGrpSendHdr.pktType = DIST_PKT_TYPE_MSG_Q_GRP;    pktSend.pktMsgQGrpSendHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_GRP_SEND;    pktSend.pktMsgQGrpSendInqId = htonl((uint32_t) inquiryId);    pktSend.pktMsgQGrpSendTimeout =            htonl ((uint32_t) DIST_TICKS_TO_MSEC (msgQTimeout));    pktSend.pktMsgQGrpSendId = htons (distMsgQGrpId);    /* use IOV stuff here, since we do not want to copy data */    distIOVec[0].pIOBuffer = &pktSend;    distIOVec[0].IOLen = DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND);    distIOVec[1].pIOBuffer = buffer;    distIOVec[1].IOLen = nBytes;#ifdef MSG_Q_DIST_GRP_REPORT    printf ("msgQDistGrpSend: broadcast message\n");#endif    status = distNetIOVSend (DIST_IF_BROADCAST_ADDR, &distIOVec[0], 2,            WAIT_FOREVER, DIST_MSG_Q_PRIO_TO_NET_PRIO (priority));    if (status == ERROR)        {#ifdef MSG_Q_DIST_GRP_REPORT        printf ("msgQDistGrpSend: broadcast failed\n");#endif        distInqCancel ((DIST_INQ *) &inquiryNode);        errnoSet (S_distLib_UNREACHABLE);        return (ERROR);        }    /*     * semTake() blocks the requesting task until the service     * task gives the semaphore, because the request has     * been processed.     */    semTake (&(inquiryNode.sendInqWait), overallTimeout);    distInqCancel ((DIST_INQ *) &inquiryNode);    if (inquiryNode.sendInqNumOutstanding > 0)        {        /* overall timeout */        errnoSet (S_msgQDistLib_OVERALL_TIMEOUT);        return (ERROR);        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -