⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distnodelib.c

📁 VXWORKS源代码
💻 C
📖 第 1 页 / 共 5 页
字号:
                        DIST_TBUF_FREE (pTBufNew);                        return (NULL);                        }                        if (pTBuf->tBufSeq > seq)                        {                        DIST_TBUF_INSERT_AFTER (pReassHdr,                                                pTBufPrev,                                                pTBufNew);                        pReassHdr->tBufHdrNLeaks--;                        break;                        }                    pTBufPrev = pTBuf;                    pTBuf = DIST_TBUF_GET_NEXT (pTBuf);                    }                }            /* The telegram is in the reassembly list now. */            pReassHdr->tBufHdrOverall += nBytes;            /*             * Test if the packet is fully reassembled (there             * are no leaks within the packet and the last             * fragment has the 'more fragments' bit cleared).             */            pTBufLast = DIST_TBUF_GET_LAST (pReassHdr);            if (!pReassHdr->tBufHdrNLeaks && !(DIST_TBUF_HAS_MF (pTBufLast)))                {                /*                 * We have received a complete packet and                 * do not expect to receive more fragments.                 */#ifdef DIST_NODE_REPORT                {                DIST_TBUF    *pFirst = DIST_TBUF_GET_NEXT (pReassHdr);                                /* this is a hack */                                printf ("dist..Reass: packet %d complete (type %d/%d)\n", id,                        *((char *) (pFirst->pTBufData)),                        *((char *) (pFirst->pTBufData) + 1));                }#endif                DIST_NODE_BSET (pCommIn->pCommCompleted, id);                /*                 * Test if the reassembled packet is the one                 * we expect next.                 */                if (pReassHdr->tBufHdrId == pCommIn->commPktNextExpect)                    {                    /*                     * This is the packet, we expect next.                     * Return communication pointer to caller,                     * so that it knows about the job.                     */                    distNodeDbUnlock ();                    return (pCommIn);                    }                }            distNodeDbUnlock ();            return (NULL);            }        pReassHdrPrev = pReassHdr;        pReassHdr = DIST_TBUF_HDR_GET_NEXT (pReassHdr);        }        /*     * This is the first fragment of a new packet. There are either     * more fragments, or this packet is not the one, we are waiting     * for.     */     #ifdef DIST_NODE_REPORT    printf ("dist..Reass: first fragment of new packet\n");#endif    if ((pTBufHdrNew = DIST_TBUF_HDR_ALLOC ()) == NULL)        {        distStat.netInDiscarded++; /* nodeInDiscarded counts pkts discarded */        distNodeDbUnlock ();        DIST_TBUF_FREE (pTBufNew);        return (NULL);        }    pTBufHdrNew->tBufHdrSrc = nodeIdSrc;    pTBufHdrNew->tBufHdrId = id;    pTBufHdrNew->tBufHdrPrio = prio;    pTBufHdrNew->tBufHdrOverall = nBytes;    pTBufHdrNew->tBufHdrNLeaks = seq;    pTBufHdrNew->tBufHdrTimo = WAIT_FOREVER;    /* reassembly timeout */    DIST_TBUF_SET_NEXT (pTBufHdrNew, pTBufNew);    DIST_TBUF_SET_LAST (pTBufHdrNew, pTBufNew);    DIST_TBUF_HDR_INSERT_AFTER (pCommIn->pCommQReass, pReassHdrPrev,        pTBufHdrNew);    if (pCommIn->commPktNextExpect == id)        /* this is the packet we want to deliver next */        pCommIn->pCommQNextDeliver = pTBufHdrNew;    if (seq == 0 && ! hasMf)        {        /* We have received a complete packet in a single telegram. */#ifdef DIST_NODE_REPORT        /* this is a hack */                printf ("dist..Reass: packet %d complete (type %d/%d)\n", id,                *((char *) (pTBufNew->pTBufData)),                *((char *) (pTBufNew->pTBufData) + 1));#endif        DIST_NODE_BSET (pCommIn->pCommCompleted, id);        if (id == pCommIn->commPktNextExpect)            {            /*             * This is the packet, we expect next.             * Return communication pointer to caller,             * so that it knows about the job.             */            distNodeDbUnlock ();            return (pCommIn);            }        }    distNodeDbUnlock ();    /* Check if we have to send negative acknowledges (NACKs). */        if (! DIST_TBUF_IS_BROADCAST (pTBufNew))        {        DIST_TBUF_HDR    *pPrev;        DIST_TBUF        *pLastOfPrev;        /*         * Check if we have received the last fragment of the previous         * packet. We cannot find out how many fragments are missing at         * the end of the packet. So just send a NACK for the successor         * of the last telegram received.         */        if ((pPrev = DIST_TBUF_HDR_GET_PREV (pTBufHdrNew)) != NULL &&                (pLastOfPrev = DIST_TBUF_GET_LAST (pTBufHdrNew)) != NULL &&                DIST_TBUF_HAS_MF (pLastOfPrev))            {            distNodeSendNegAck (pNodeSrc, pPrev->tBufHdrId,                    pLastOfPrev->tBufSeq + 1);            }        /*         * Check if some telegrams from the beginning of the new packet         * are missing. Send a NACK for each of them.         */        if (seq > 0)            {            INT16 i;            for (i = 0; i < seq; i++)                distNodeSendNegAck (pNodeSrc, id, i);            }        }    return (NULL);    }/***************************************************************************** distNodePktSend - send a packet already stored in TBufs (VxFusion option)** This routine waits for an open send window and transmits the packet,* by sending the data of each single TBuf.* Afterwards it blocks for an ACK from the remote node.** NOTE: Takes <distNodeDbLock>.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successful.* NOMANUAL*/STATUS distNodePktSend    (    DIST_TBUF_HDR * pTBufHdr    /* message to send */    )    {    DIST_NODE_DB_NODE *  pDistNodeFound;    DIST_TRANSMISSION    distTm;    DIST_NODE_COMM *     pComm;    DIST_NODE_ID         nodeIdDest = pTBufHdr->tBufHdrDest;    DIST_TBUF *          pTBuf;    short                winLo, winHi;    short                pktId;    int                  priority;    int                  wait4NumAcks;    distNodeDbLock ();    distStat.nodeOutReceived++;    if ((pDistNodeFound = distNodeFindById (nodeIdDest)) == NULL)        {#ifdef DIST_DIAGNOSTIC        distLog ("distNodePktSend: unknown node 0x%lx\n", nodeIdDest);#endif        distStat.nodeDBNoMatch++;        distNodeDbUnlock ();        DIST_TBUF_FREEM (pTBufHdr);        return (ERROR);        }    if (! DIST_NODE_IS_ALIVE (pDistNodeFound))        {#ifdef DIST_DIAGNOSTIC        distLog ("distNodePktSend: destination not alive\n");#endif        distStat.nodeNotAlive++;        distNodeDbUnlock ();        DIST_TBUF_FREEM (pTBufHdr);        return (ERROR);        }#ifdef DIST_DIAGNOSTIC    if (DIST_TBUF_GET_NEXT (pTBufHdr) == NULL)        {        distLog ("distNodePktSend: TBuf chain contains no data\n");        }#endif    if (nodeIdDest == DIST_IF_BROADCAST_ADDR)        {        pComm = &pDistNodeFound->nodeBroadcast;        wait4NumAcks = distNodeNumNodesAlive - 1;        }    else        {        pComm = &pDistNodeFound->nodeUnicast;        wait4NumAcks = 1;        }    semBInit (&distTm.tmWait4, SEM_Q_FIFO, SEM_EMPTY);    distTm.tmStatus = DIST_TM_STATUS_OK;    distTm.tmRetmTimo = distNodeRetryTimeout;    distTm.tmNumTm = 1;    distTm.tmAckCnt = wait4NumAcks;    pTBufHdr->pTBufHdrTm = &distTm;    /*     * Each node has two queues for outgoing packets.     * One is the queue for packets that should been     * send, but the "window is closed" (full).     * The other is the queue for packets send, and     * waiting for an ACK.     * In both of the two queues, packets get older.     */    pTBufHdr->tBufHdrId = pktId = pComm->commPktNextSend;    winLo = pComm->commAckNextExpect;    winHi = (INT16) winAdd (pComm->commAckNextExpect, DIST_NODE_WIN_SZ);    /*     * Can we send the packet directly or do we have to wait     * until the "window opens".     */    if (! winWithin (winLo, pktId, winHi))        {        /* Put packet to the WaitToSend queue. */                DIST_TBUF_HDR_ENQUEUE (pComm->pCommQWinOut, pTBufHdr);        distNodeDbUnlock ();    /* unlock before sleep */        /* Wait for a place within the window. */                semTake (&distTm.tmWait4, WAIT_FOREVER);        if (distTm.tmStatus != DIST_TM_STATUS_OK)            {            distNodeDbUnlock ();       /* this is not needed here! */            DIST_TBUF_FREEM (pTBufHdr);            return (ERROR);            /* e.g. timeout, node removed */            }        distNodeDbLock ();                /* Remove it from the WaitToSend queue. */                DIST_TBUF_HDR_UNLINK (pComm->pCommQWinOut, pTBufHdr);        }    pComm->commPktNextSend = (INT16) winAdd (pComm->commPktNextSend, 1);    /* Put packet to the WaitForAck queue and send it. */        DIST_TBUF_HDR_ENQUEUE (pComm->pCommQAck, pTBufHdr);        pTBuf = (DIST_TBUF *) pTBufHdr;    priority = pTBufHdr->tBufHdrPrio;    while ((pTBuf = DIST_TBUF_GET_NEXT (pTBuf)) != NULL)        {        pTBuf->tBufId = pktId;        pTBuf->tBufAck = (UINT16) winSub (pComm->commPktNextExpect, 1);#ifdef DIST_NODE_REPORT        printf ("distNodePktSend:%p: id %d, ack %d, seq %d, len %d, type %d\n",                pTBuf, pTBuf->tBufId, pTBuf->tBufAck, pTBuf->tBufSeq,                pTBuf->tBufNBytes, pTBuf->tBufType);#endif        if (DIST_IF_SEND (nodeIdDest, pTBuf, priority) == ERROR)            {            distNodeDbUnlock ();            DIST_TBUF_HDR_UNLINK (pComm->pCommQAck, pTBufHdr);            DIST_TBUF_FREEM (pTBufHdr);            return (ERROR);            }        pComm->commAckDelayed = FALSE;        }        distNodeDbUnlock ();    /* unlock before sleep */    /* Wait for ACK. */    #ifdef DIST_NODE_REPORT    distLog ("distNodePktSend: sleep while waiting for ACK\n");#endif    semTake (&distTm.tmWait4, WAIT_FOREVER);#ifdef DIST_NODE_REPORT    distLog ("distNodePktSend: awacked: status of tm is %d\n",             distTm.tmStatus);#endif    /*     * distNodePktReass() or distNodeTimer() has already     * dequeued the packet.     */    DIST_TBUF_FREEM (pTBufHdr);    if (distTm.tmStatus != DIST_TM_STATUS_OK)        return (ERROR);                    /* e.g. timeout */    return (OK);    }/***************************************************************************** distNodePktResend - resend a packet (VxFusion option)** Resend a packet already transmitted with distNodePktSend ().** Currently called one place in distNodeDbCommTimer() which* has <distNodeDbLock> taken before it is called.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successful.* NOMANUAL*/LOCAL STATUS distNodePktResend    (    DIST_NODE_COMM * pComm,          /* communication channel */    DIST_TBUF_HDR *  pTBufHdr        /* packet to retransmit */    )    {    DIST_TBUF *     pTBuf;    DIST_NODE_ID    nodeIdDest = pTBufHdr->tBufHdrDest;    int             priority = pTBufHdr->tBufHdrPrio;    distStat.nodePktResend++;    pTBuf = (DIST_TBUF *) pTBufHdr;    while ((pTBuf = DIST_TBUF_GET_NEXT (pTBuf)) != NULL)        {        pTBuf->tBufAck = (UINT16) winSub (pComm->commPktNextExpect, 1);#ifdef DIST_NODE_REPORT        printf ("distNodePktResend: resend with ACK for %d (%d)\n",                pTBuf->tBufAck, pComm->commPktNextExpect);#endif        if (DIST_IF_SEND (nodeIdDest, pTBuf, priority) == ERROR)            {            DIST_TBUF_HDR_UNLINK (pComm->pCommQAck, pTBufHdr);            DIST_TBUF_FREEM (pTBufHdr);            return (ERROR);            }        }    return (OK);    }    /***************************************************************************** distNodePktAck - ask for an ACK for a received and consumed packet (VxFusion option)** This routine acknowledeges a packet. If DIST_NODE_ACK_IMMEDIATELY is* not set in <options>, an ACK telegram is not transmitted immediately* but a flag is set. If data is sent to the ACK awaiting node, within a* certain range of time, the ACK is sent within the data telegram.* Else an ACK telegram is transmitted. This is known as `piggy-backing'.* Due to the design, piggy-backing only works for unicasts. For broadcasts,* an ACK is always sent immediately.** NOTE: Takes <distNodeDbLock>.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successful.* NOMANUAL*/STATUS distNodePktAck    (    DIST_NODE_ID    nodeIdSrc,  /* source node */    DIST_TBUF_HDR * pPktAck,    /* ack packet */    int             options     /* options */    )    {    DIST_NODE_DB_NODE * pDistNodeFound;    DIST_NODE_COMM *    pComm;    short               id = pPktAck->tBufHdrId;    int                 ackBroadca

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -