⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 msgqdistgrplib.c

📁 VXWORKS源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
     if (inquiryNode.sendInqStatus != MSG_Q_DIST_GRP_STATUS_OK)        return (ERROR);    return (OK);    }/***************************************************************************** msgQDistGrpAgent - send an inquiry to a group (VxFusion option)** This routine sends an inquiry message to a message group.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successfully sent; ERROR, if not sent.** NOMANUAL*/LOCAL STATUS msgQDistGrpAgent    (    DIST_NODE_ID            nodeIdSender,   /* node id of the sender  */    DIST_INQ_ID             inqIdSender,    /* inquiry id at sender   */    DIST_MSG_Q_GRP_ID       distMsgQGrpId,  /* group on which to send */    char *                  buffer,         /* message to send        */    UINT                    nBytes,         /* length of message      */    int                     timeout,        /* ticks to wait          */    int                     priority        /* priority               */    )    {    DIST_MSG_Q_GRP_SEND_INQ    inquiryNode;    /*     * Our caller cannot wait, so we have to create     * the inquiry id.     */    inquiryNode.sendInq.inqType = DIST_MSG_Q_GRP_INQ_TYPE_SEND;    semBInit (&inquiryNode.sendInqWait, SEM_Q_FIFO, SEM_EMPTY);    /* inquiryNode.sendInqTask = taskIdSelf(); */    inquiryNode.sendInqNumBlocked = 0;    inquiryNode.sendInqNumOutstanding = 1;    inquiryNode.sendInqStatus = MSG_Q_DIST_GRP_STATUS_OK;    distInqRegister ((DIST_INQ *) &inquiryNode);    if (msgQDistGrpLclSend (&inquiryNode, distMsgQGrpId, buffer, nBytes,            timeout, priority) == ERROR)        {        free (buffer);        msgQDistGrpSendStatus (nodeIdSender, inqIdSender,                MSG_Q_DIST_GRP_STATUS_ILLEGAL_OBJ_ID);        return (ERROR);        }    semTake (&inquiryNode.sendInqWait, WAIT_FOREVER);        distInqCancel ((DIST_INQ *) &inquiryNode);        free (buffer);    if (inquiryNode.sendInqStatus != MSG_Q_DIST_GRP_STATUS_OK)        {#ifdef MSG_Q_DIST_GRP_REPORT        printf ("msgQDistGrpAgent: something failed\n");#endif        msgQDistGrpSendStatus (nodeIdSender, inqIdSender,                (INT16) inquiryNode.sendInqStatus);        return (ERROR);        }#ifdef MSG_Q_DIST_GRP_REPORT    printf ("msgQDistGrpAgent: respond with OK\n");#endif    msgQDistGrpSendStatus (nodeIdSender, inqIdSender,            MSG_Q_DIST_GRP_STATUS_OK);    return (OK);    }/***************************************************************************** msgQDistGrpSendStatus - send status and errno (VxFusion option)** This routine sends local status to an inquirying node.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if status sent; ERROR, if not sent.** NOMANUAL*/LOCAL STATUS msgQDistGrpSendStatus    (    DIST_NODE_ID        nodeIdDest,   /* node ID of destination */    DIST_INQ_ID         inqId,        /* the inquiry ID */    DIST_MSG_Q_STATUS   dStatus       /* status to send */    )    {    DIST_PKT_MSG_Q_GRP_STATUS    pktStatus;    STATUS                        status;    pktStatus.pktMsgQGrpStatusHdr.pktType = DIST_PKT_TYPE_MSG_Q_GRP;    pktStatus.pktMsgQGrpStatusHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_GRP_STATUS;    pktStatus.pktMsgQGrpStatusInqId = htonl ((uint32_t) inqId);    pktStatus.pktMsgQGrpStatusErrno = htonl ((uint32_t) errnoGet());    pktStatus.pktMsgQGrpStatusDStatus = htons ((uint16_t) dStatus);    status = distNetSend (nodeIdDest, (DIST_PKT *) &pktStatus,                          DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_STATUS),                          WAIT_FOREVER, DIST_MSG_Q_GRP_PRIO);    return (status);    }/***************************************************************************** msgQDistGdbInput - called for distributed group database updates (VxFusion option)** This routine updates the local group database.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: Status of update.** NOMANUAL*/LOCAL DIST_STATUS msgQDistGdbInput    (    DIST_NODE_ID    nodeIdSrc,   /* unused */    DIST_TBUF_HDR * pTBufHdr     /* ptr to received TBUF header */    )    {    DIST_PKT * pPkt;    int        pktLen;    DIST_PKT_DGDB_ADD    pktAdd;    DIST_MSG_Q_GRP_ID    grpId;    DIST_GRP_DB_NODE *   pDbNodeNew;    DIST_NODE_ID         grpCreator;    char                 grpName[DIST_NAME_MAX_LENGTH + 1];    int                  sz;        UNUSED_ARG(nodeIdSrc);        pktLen = pTBufHdr->tBufHdrOverall;    if (pktLen < sizeof (DIST_PKT))        distPanic ("msgQDistGdbInput: packet too short\n");    pPkt = (DIST_PKT *) (DIST_TBUF_GET_NEXT (pTBufHdr))->pTBufData;    switch (pPkt->pktSubType)        {        case DIST_PKT_TYPE_DGDB_ADD:            {            if (pktLen < DIST_PKT_HDR_SIZEOF (DIST_PKT_DGDB_ADD))                distPanic ("msgQDistGdbInput: packet too short\n");            distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *) &pktAdd,                          DIST_PKT_HDR_SIZEOF (DIST_PKT_DGDB_ADD));                        if ((sz = pktLen - DIST_PKT_HDR_SIZEOF (DIST_PKT_DGDB_ADD)) >                    DIST_NAME_MAX_LENGTH + 1)                distPanic ("msgQDistGdbInput: name too long\n");            distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr),                          DIST_PKT_HDR_SIZEOF (DIST_PKT_DGDB_ADD),                          (char *) &grpName, sz);            grpId = ntohs (pktAdd.pktDgdbAddId);            grpCreator = ntohl (pktAdd.pktDgdbAddCreator);            if (grpId >= distGrpIdNext)                distGrpIdNext = grpId + 1;#ifdef MSG_Q_DIST_GRP_REPORT            printf ("msgQDistGrpInput: group database add from node %ld\n",                    nodeIdSrc);            printf ("msgQDistGrpInput: bind `%s' to group id %d\n",                    (char *) &grpName, grpId);#endif            msgQDistGrpDbLock();            pDbNodeNew = msgQDistGrpLclCreate ((char *) &grpName, grpId,                                               DIST_GRP_STATE_GLOBAL);            if (pDbNodeNew == NULL)                {                msgQDistGrpDbUnlock();                distPanic ("msgQDistGdbInput: group creation failed\n");                }            msgQDistGrpLclSetCreator (pDbNodeNew, grpCreator);            msgQDistGrpDbUnlock();            return (DIST_GDB_STATUS_OK);            }        default:            return (DIST_GDB_STATUS_PROTOCOL_ERROR);        }    }/***************************************************************************** msgQDistGrpInput - called when a new group message arrives at the system (VxFusion option)** This routine is called whenever a group message is received.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: Status of message processing.** NOMANUAL*/LOCAL DIST_STATUS msgQDistGrpInput    (    DIST_NODE_ID    nodeIdSrc,        /* source node ID */    DIST_TBUF_HDR * pTBufHdr          /* ptr to message */    )    {    DIST_PKT * pPkt;    int        pktLen;    DIST_PKT_MSG_Q_GRP_SEND  pktSend;    DIST_MSG_Q_GRP_ID        dMsgQGrpId;    DIST_GRP_DB_NODE *       pDistGrpDbNode;    DIST_INQ_ID              inqIdSrc;    uint32_t                 timeout_msec;    UINT                     nBytes;    char *                   buffer;    int                      tid;    int                      timeout;    int                      prio;    DIST_PKT_MSG_Q_GRP_STATUS    pktStatus;    DIST_STATUS                  dStatus;    DIST_INQ_ID                  inqId;    DIST_INQ *                   pGenInq;    int                          errnoRemote;    DIST_MSG_Q_GRP_SEND_INQ *    pInq;    pktLen = pTBufHdr->tBufHdrOverall;    if (pktLen < sizeof (DIST_PKT))        distPanic ("msgQDistGrpInput: packet too short\n");    pPkt = (DIST_PKT *) ((DIST_TBUF_GET_NEXT (pTBufHdr))->pTBufData);    switch (pPkt->pktSubType)        {        case DIST_PKT_TYPE_MSG_Q_GRP_SEND:            {            if (pktLen < DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND))                 distPanic ("msgQDistGrpInput/SEND: packet too short\n");            distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *)&pktSend,                          DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND));            inqIdSrc = (DIST_INQ_ID)                        htonl(pktSend.pktMsgQGrpSendInqId);            timeout_msec = ntohl (pktSend.pktMsgQGrpSendTimeout);            timeout = DIST_MSEC_TO_TICKS (timeout_msec);            nBytes = pktLen - DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND);            dMsgQGrpId = ntohs (pktSend.pktMsgQGrpSendId);            /* Check for an empty group */            msgQDistGrpDbLock();            pDistGrpDbNode = msgQDistGrpLclFindById (dMsgQGrpId);            msgQDistGrpDbUnlock();            if (pDistGrpDbNode == NULL)                {                msgQDistGrpSendStatus (nodeIdSrc, inqIdSrc,                                       MSG_Q_DIST_GRP_STATUS_UNAVAIL);                return (MSG_Q_DIST_GRP_STATUS_UNAVAIL);                }            if (SLL_FIRST ((SL_LIST *)&pDistGrpDbNode->grpDbMsgQIdLst)                        == NULL)                {                msgQDistGrpSendStatus (nodeIdSrc, inqIdSrc,                                       MSG_Q_DIST_GRP_STATUS_OK);#ifdef MSG_Q_DIST_GRP_REPORT                printf ("msgQDistGrpInput/SEND: group has no local members\n");#endif                return (MSG_Q_DIST_GRP_STATUS_OK);                }            /* Using malloc() here is not very satisfiing. */            if ((buffer = (char *) malloc (nBytes)) == NULL)                {#ifdef MSG_Q_DIST_GRP_REPORT                printf ("msgQDistGrpInput/SEND: out of memory\n");#endif                msgQDistGrpSendStatus (nodeIdSrc,                                    inqIdSrc,                                    MSG_Q_DIST_GRP_STATUS_NOT_ENOUGH_MEMORY);                distStat.memShortage++;                distStat.msgQGrpInDiscarded++;                return (MSG_Q_DIST_GRP_STATUS_NOT_ENOUGH_MEMORY);                }                distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr),                          DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND),                          buffer, nBytes);            prio = NET_PRIO_TO_DIST_MSG_Q_PRIO (pTBufHdr->tBufHdrPrio);#ifdef MSG_Q_DIST_GRP_REPORT            printf ("msgQDistGrpInput: message from node %ld to group 0x%x\n",                    nodeIdSrc, dMsgQGrpId);#endif            /*             * Send message to all members, registrated on the local node for             * this group.             */            tid = taskSpawn (NULL,                            DIST_MSG_Q_GRP_WAIT_TASK_PRIO,                            0,                            DIST_MSG_Q_GRP_WAIT_TASK_STACK_SZ,                            (FUNCPTR) msgQDistGrpAgent,                            (int) nodeIdSrc,                            (int) inqIdSrc,                            (int) dMsgQGrpId,                            (int) buffer,                            (int) nBytes,                            (int) timeout,                            (int) prio,                            0, 0, 0);            if (tid == ERROR)                {                free (buffer);                msgQDistGrpSendStatus (nodeIdSrc, inqIdSrc,                                       MSG_Q_DIST_GRP_STATUS_UNAVAIL);                return (MSG_Q_DIST_GRP_STATUS_UNAVAIL);                }            return (MSG_Q_DIST_GRP_STATUS_OK);            }        case DIST_PKT_TYPE_MSG_Q_GRP_STATUS:            {            if (pktLen != DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_STATUS))                distPanic ("msgQDistGrpInput/STATUS: packet too short\n");            /* First copy the error packet form the TBuf list. */                        distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0,                          (char *) &pktStatus,                          DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_STATUS));            dStatus = (DIST_STATUS) ntohs (pktStatus.pktMsgQGrpStatusDStatus);            errnoRemote = ntohl (pktStatus.pktMsgQGrpStatusErrno);            inqId = (DIST_INQ_ID) ntohl (pktStatus.pktMsgQGrpStatusInqId);            if (! (pGenInq = distInqFind (inqId)))                return (MSG_Q_DIST_GRP_STATUS_LOCAL_TIMEOUT);            /* See who is addressed by the STATUS telegram. */            switch (pGenInq->inqType)                {                case DIST_MSG_Q_GRP_INQ_TYPE_SEND:                    {                    pInq = (DIST_MSG_Q_GRP_SEND_INQ *) pGenInq;                    /*                     * Possible errors here:                     *    MSG_Q_DIST_GRP_STATUS_OK                     *    MSG_Q_DIST_GRP_STATUS_ERROR                     *    MSG_Q_DIST_GRP_STATUS_UNAVAIL                     *    MSG_Q_DIST_GRP_STATUS_ILLEGAL_OBJ_ID                     *    MSG_Q_DIST_GRP_STATUS_NOT_ENOUGH_MEMORY                     */                    switch (dStatus)                        {                        case MSG_Q_DIST_GRP_STATUS_OK:                            if (--pInq->sendInqNumOutstanding == 0)                                semGive (&pInq->sendInqWait);                            break;                        case MSG_Q_DIST_GRP_STATUS_ERROR:                        case MSG_Q_DIST_GRP_STATUS_UNAVAIL:                        case MSG_Q_DIST_GRP_STATUS_ILLEGAL_OBJ_ID:                        case MSG_Q_DIST_GRP_STATUS_NOT_ENOUGH_MEMORY:                            errnoOfTaskSet (pInq->sendInqTask, errnoRemote);                            break;                        default:#ifdef MSG_Q_DIST_GRP_REPORT                            printf ("msgQDistGrpInput/STATUS/SEND: status?\n");#endif                            break;                        }                    return (MSG_Q_DIST_GRP_STATUS_OK);                    }                default:#ifdef MSG_Q_DIST_GRP_REPORT                    printf ("msgQDistGrpInput/STATUS: unexpected inq (%d)\n",                            pGenInq->inqType);#endif                    return (MSG_Q_DIST_GRP_STATUS_INTERNAL_ERROR);                }            }        default:#ifdef MSG_Q_DIST_GRP_REPORT            printf ("msgQDistGrpInput/STATUS: unknown group subtype (%d)\n",                    pPkt->pktSubType);#endif            return (MSG_Q_DIST_GRP_STATUS_PROTOCOL_ERROR);        }#ifdef UNDEFINED   /* supposedly unreached */    if (status == ERROR)        return (MSG_Q_DIST_GRP_STATUS_ERROR);    return (MSG_Q_DIST_GRP_STATUS_OK);#endif    }/***************************************************************************** msgQDistGrpLclSetId - change id of a group in local database (VxFusion option)** This routine changes the group id of a node in the local database.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -