⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 distnodelib.c

📁 VXWORKS源代码
💻 C
📖 第 1 页 / 共 5 页
字号:
    DIST_TBUF_HDR *       pTBufAckNext;    DIST_TBUF_HDR *       pTBufHdrNew;    short                 winLo, winHi, pktNext;    short                 id;    short                 ack;    short                 seq;    short                 nBytes;    short                 type;    int                   hasMf;    /* Get information from telegram */        id = pTBufNew->tBufId;    ack = pTBufNew->tBufAck;    seq = pTBufNew->tBufSeq;    type = pTBufNew->tBufType;    nBytes = pTBufNew->tBufNBytes;    hasMf = DIST_TBUF_HAS_MF (pTBufNew);    distNodeDbLock ();    /* Find node in hash table. */        pNodeSrc = distNodeFindById (nodeIdSrc);        if (pNodeSrc == NULL)        {        if (nodeIdSrc == distNodeLocalId)            distPanic ("distNodePktReass: someone has same node id we have\n");        /*         * Node is unknown by now. Create new node, if this is a         * XACK in bootstrapping mode or this is a BOOTSTRAP telegram.         * Else ignore telegram.         */        if (distNodeLocalState == DIST_NODE_STATE_BOOT                && type == DIST_TBUF_TTYPE_BACK                && nBytes == sizeof (DIST_XACK))            {            DIST_XACK    *pXAck = (DIST_XACK *) pTBufNew->pTBufData;            int            state;            state = ntohs (pXAck->xAckPktNodeState);            if ((pNodeSrc = distNodeCreate (nodeIdSrc, state)) == NULL)                {                /*                 * We had problems creating the new node.                 * This is a fatal error, since we will miss that one.                 */                distStat.nodeDBFatal++;                DIST_TBUF_FREE (pTBufNew);                distPanic ("distNodePktReass: error accepting node 0x%lx\n",                        nodeIdSrc);                }            }        else if (type == DIST_TBUF_TTYPE_BOOTSTRAP)            {            int state = DIST_NODE_STATE_BOOT;#ifdef DIST_NODE_REPORT            printf ("distNodeReassemble: got bootstrap pkt from node 0x%lx\n",                    nodeIdSrc);#endif            if ((pNodeSrc = distNodeCreate (nodeIdSrc, state)) == NULL)                {                /*                 * We had problems creating the new node.                 * This is a fatal error, since we will miss that one.                 */                distStat.nodeDBFatal++;                DIST_TBUF_FREE (pTBufNew);                distPanic ("distNodePktReass: error accepting node 0x%lx\n",                        nodeIdSrc);                }            /*             * If this is a BOOTSTRAP telegram, we have to clean up             * the communication structures.             */            distNodeCleanup (pNodeSrc);            pNodeSrc->nodeBroadcast.commPktNextExpect = (INT16)winAdd (id, 1);            }        else            {            /*             * Packets from unknown nodes are ignored, unless they             * are of type XACK or BOOTSTRAP.             */#ifdef DIST_DIAGNOSTIC            distLog ("distNodePktReass: packet from unknown node 0x%lx \                     --ignored\n", nodeIdSrc);#endif            DIST_TBUF_FREE (pTBufNew);            distNodeDbUnlock();            return (NULL);            }#ifdef DIST_DIAGNOSTIC        distLog ("distNodePktReass: new node 0x%lx created (state %s)\n",                 nodeIdSrc,                 distNodeStateToName (distNodeGetState (pNodeSrc)));#endif        }    if (DIST_TBUF_IS_BROADCAST (pTBufNew))        {        if ((pNode = distNodeFindById (DIST_IF_BROADCAST_ADDR)) == NULL)            {            distStat.nodeDBNoMatch++;            DIST_TBUF_FREE (pTBufNew);            distNodeDbUnlock();            return (NULL);            }        pCommIn = &pNodeSrc->nodeBroadcast;        pCommOut = &pNode->nodeBroadcast;        }    else        pCommIn = pCommOut = &pNodeSrc->nodeUnicast;#ifdef DIST_NODE_REPORT    {    char * typeName;    switch (type)        {        case DIST_TBUF_TTYPE_DTA:            typeName = "DATA";            break;        case DIST_TBUF_TTYPE_ACK:            typeName = "ACK";            break;        case DIST_TBUF_TTYPE_BDTA:            typeName = "B-DATA";            break;        case DIST_TBUF_TTYPE_BACK:            typeName = "ACK-B";            break;        case DIST_TBUF_TTYPE_BOOTSTRAP:            typeName = "BOOTSTRAP";            break;        case DIST_TBUF_TTYPE_NACK:            typeName = "NACK";            break;        default:            typeName = "unknown";        }    printf ("dist..Reass:0x%lx/%s: ", nodeIdSrc, typeName);    printf ("id %d, ack %d, seq %d, type %d, len %d, hasMf %d\n",            id, ack, seq, pTBufNew->tBufType, nBytes, hasMf);    }#endif    if (type == DIST_TBUF_TTYPE_BOOTSTRAP)        {        int    bootTp;        if (distNodeGetState (pNodeSrc) != DIST_NODE_STATE_BOOT)            {            /*             * If we receive a BOOTSTARP telegram from a node, that is             * not in booting mode, it must be rebooting. Cleanup             * the structure.             */            distNodeCleanup (pNodeSrc);            }        pNodeSrc->nodeBroadcast.commPktNextExpect = (INT16) winAdd (id, 1);        bootTp = ((DIST_PKT_BOOT *) pTBufNew->pTBufData)->pktBootType;        switch (bootTp)            {            case DIST_BOOTING_REQ:                /* Send XACK. */                                distNodeSetState (pNodeSrc, DIST_NODE_STATE_BOOT);                distNodeSendAck (pNodeSrc, 1, DIST_NODE_ACK_EXTENDED);                break;#ifdef DIST_DIAGNOSTIC            default:                distLog ("dist..Reass: unknown type of BOOTSTRAP telegram\n");#endif            }        distNodeDbUnlock ();        DIST_TBUF_FREE (pTBufNew);        return (NULL);        }    /* We will fall in here, when we receive a XACK. */        if (type == DIST_TBUF_TTYPE_BACK && nBytes == sizeof (DIST_XACK))        {        DIST_XACK    *pXAck = (DIST_XACK *) pTBufNew->pTBufData;        distNodeSetState (pNodeSrc, ntohs (pXAck->xAckPktNodeState));        /* In bootstrapping mode. */        if (distNodeLocalState == DIST_NODE_STATE_BOOT)            {            short        pktNextSend;#ifdef DIST_NODE_REPORT            printf ("dist..Reass: XACK in bootstrap mode\n");#endif            /*             * The first node that answers to our BOOTSTRAP packet,             * will be our godfather.             */            if (distNodeGodfatherId == DIST_IF_BROADCAST_ADDR)                distNodeGodfatherId = nodeIdSrc;            pktNextSend = ntohs (pXAck->xAckPktNextSend);            pCommIn->commPktNextExpect = pktNextSend;            }        distNodeDbUnlock ();        DIST_TBUF_FREE (pTBufNew);        return (NULL);        }#ifdef DIST_NODE_REPORT    if (! DIST_NODE_IS_ALIVE (pNodeSrc))        {        printf ("dist..Reass: got a telegram from crashed node\n");        }#endif    /*     * Process ACK field. An ACK commits all packets with     * an id lower or equal to the value of the ACK.     *     * Traverse the list of non-acknowledged packets and     * decrease the ACK counters of all acknowledged packets     * in the list. If no ACK is missing for a certain packet,     * dequeue it and wakeup the sender.     */    pktNext = pCommOut->commPktNextSend;    if (winWithin (pCommOut->commAckNextExpect, ack, pktNext))        {        short    ackLo = pCommOut->commAckNextExpect;        short    ackHi = (INT16) winAdd (ack, 1);        pTBufAck = pCommOut->pCommQAck;        pCommIn->commAckLastRecvd = ack;#ifdef DIST_NODE_REPORT        printf ("dist..Reass: ACK %d, AckQ %p:\n", ack, pTBufAck);#endif        while (pTBufAck)            {            pTBufAckNext = DIST_TBUF_HDR_GET_NEXT (pTBufAck);            if (winWithin (ackLo, pTBufAck->tBufHdrId, ackHi))                {#ifdef DIST_NODE_REPORT                printf ("dist..Reass: within window, wait for %d more ACKs\n",                        pTBufAck->pTBufHdrTm->tmAckCnt - 1);#endif                if (--pTBufAck->pTBufHdrTm->tmAckCnt == 0)                    {                    pCommOut->commAckNextExpect =                        (INT16) winAdd (pCommOut->commAckNextExpect, 1);                    DIST_TBUF_HDR_UNLINK (pCommOut->pCommQAck, pTBufAck);                    pTBufAck->pTBufHdrTm->tmStatus = DIST_TM_STATUS_OK;#ifdef DIST_NODE_REPORT                    printf ("dist..Reass: %d ACKed\n", pTBufAck->tBufHdrId);#endif                    semGive (&pTBufAck->pTBufHdrTm->tmWait4);                                        /*                     * distNodePktSend()--which should awake now--frees                     * the packet.                     */                    }                }            pTBufAck = pTBufAckNext;            }        }    /*     * If this is a NACK telegram, resend the single fragment, the     * NACK asks us for. <id> holds the id of the packet, <seq> the     * sequence number of the fragment within the packet.     * The NACK telegram can be destroyed at this point, since     * it contains no data.     */    if (type == DIST_TBUF_TTYPE_NACK)        {        DIST_TBUF_FREE (pTBufNew);#ifdef DIST_NODE_REPORT        printf ("dist..Reass: got NACK for (id %d, seq %d) from node 0x%lx\n",                id, seq, nodeIdSrc);#endif        pTBufAck = pCommOut->pCommQAck;        while (pTBufAck)            {            if (pTBufAck->tBufHdrId == id)                {                DIST_TBUF    *pFrag;                for (pFrag = DIST_TBUF_GET_NEXT (pTBufAck);                        pFrag && pFrag->tBufSeq != seq;                        pFrag = DIST_TBUF_GET_NEXT (pFrag));                pFrag->tBufAck =                      (UINT16)winSub (pCommIn->commPktNextExpect, 1);#ifdef DIST_NODE_REPORT                if (DIST_IF_SEND (nodeIdSrc, pFrag, 0) == ERROR)                    printf ("dist..Reass: error sending NACK to node 0x%lx\n",                            nodeIdSrc);#else                /* If this fails, there is no need to mess around. */                DIST_IF_SEND (nodeIdSrc, pFrag, 0);#endif                distStat.nodeFragResend++;                distNodeDbUnlock ();                return (NULL);                }                        pTBufAck = DIST_TBUF_HDR_GET_NEXT (pTBufAck);            }        distNodeDbUnlock ();        return (NULL);        }    /* If this is not a DTA (data) telegram, destroy it. */        if (type != DIST_TBUF_TTYPE_DTA && type != DIST_TBUF_TTYPE_BDTA)        {        distNodeDbUnlock ();        DIST_TBUF_FREE (pTBufNew);        return (NULL);        }    /*     * Go on processing this telegram?     * 1) The packet id must be within the window.     *    The window starts with the id of the packet     *    expected next and has a size of DIST_NODE_WIN_SZ.     * 2) The packet should not be reassembled already.     */    winLo = pCommIn->commPktNextExpect;    winHi = (INT16) winAdd (winLo, DIST_NODE_WIN_SZ);    if (! winWithin (winLo, id, winHi) ||            DIST_NODE_BTST (pCommIn->pCommCompleted, id))        {        /* Not within the window. Destroy the telegram. */        distNodeDbUnlock ();#ifdef DIST_NODE_REPORT        if (DIST_NODE_BTST (pCommIn->pCommCompleted, id))            printf ("distNodePktReass: packet already reassembled\n");        else            printf ("distNodePktReass: not within window (%d..%d)\n",                    winLo, winSub (winHi, 1));#endif        DIST_TBUF_FREE (pTBufNew);        return (NULL);        }    /* Find queue for packet in reassembly list. */         pReassHdr = pCommIn->pCommQReass;    pReassHdrPrev = NULL;    while (pReassHdr)        {        if (pReassHdr->tBufHdrId > id)            break;    /* insert a new packet */        if (pReassHdr->tBufHdrId == id)            {            /* We have found the packet, the new telegram belongs to. */            DIST_TBUF    *pTBuf, *pTBufPrev;            DIST_TBUF    *pTBufLast = DIST_TBUF_GET_LAST (pReassHdr);            short        seqLast = pTBufLast->tBufSeq;#ifdef DIST_NODE_REPORT            printf ("dist..Reass: packet found, inserting telegram\n");#endif            if (seqLast < seq)                {                /*                 * This telegram has the highest sequence number ever                 * received. Put it to the tail of the TBuf list.                 */                DIST_TBUF_ENQUEUE (pReassHdr, pTBufNew);                pReassHdr->tBufHdrNLeaks += seq - seqLast - 1;#ifdef UNDEFINED  /* unused, but be careful before excising for good */                pTBufLast = pTBufNew;#endif                if (! DIST_TBUF_IS_BROADCAST (pTBufNew)                    && (seq - seqLast) > 1)                    {                    INT16 i;                    /*                     * unlock database--not strictly necessary, but                     * we are consuming time, holding the lock...                     */                    for (i = seqLast + 1; i < seq; i++)                        distNodeSendNegAck (pNodeSrc, id, i);                    }                }            else                {                /*                 * This telegram must be inserted somewhere in the TBuf list.                 */                pTBufPrev = (DIST_TBUF *) pReassHdr;                pTBuf = DIST_TBUF_GET_NEXT (pReassHdr);                while (pTBuf != NULL)                    {                    if (pTBuf->tBufSeq == seq)                        {                        /* We have already received this one. */                                                distNodeDbUnlock ();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -