📄 distnodelib.c
字号:
DIST_TBUF_HDR * pTBufAckNext; DIST_TBUF_HDR * pTBufHdrNew; short winLo, winHi, pktNext; short id; short ack; short seq; short nBytes; short type; int hasMf; /* Get information from telegram */ id = pTBufNew->tBufId; ack = pTBufNew->tBufAck; seq = pTBufNew->tBufSeq; type = pTBufNew->tBufType; nBytes = pTBufNew->tBufNBytes; hasMf = DIST_TBUF_HAS_MF (pTBufNew); distNodeDbLock (); /* Find node in hash table. */ pNodeSrc = distNodeFindById (nodeIdSrc); if (pNodeSrc == NULL) { if (nodeIdSrc == distNodeLocalId) distPanic ("distNodePktReass: someone has same node id we have\n"); /* * Node is unknown by now. Create new node, if this is a * XACK in bootstrapping mode or this is a BOOTSTRAP telegram. * Else ignore telegram. */ if (distNodeLocalState == DIST_NODE_STATE_BOOT && type == DIST_TBUF_TTYPE_BACK && nBytes == sizeof (DIST_XACK)) { DIST_XACK *pXAck = (DIST_XACK *) pTBufNew->pTBufData; int state; state = ntohs (pXAck->xAckPktNodeState); if ((pNodeSrc = distNodeCreate (nodeIdSrc, state)) == NULL) { /* * We had problems creating the new node. * This is a fatal error, since we will miss that one. */ distStat.nodeDBFatal++; DIST_TBUF_FREE (pTBufNew); distPanic ("distNodePktReass: error accepting node 0x%lx\n", nodeIdSrc); } } else if (type == DIST_TBUF_TTYPE_BOOTSTRAP) { int state = DIST_NODE_STATE_BOOT;#ifdef DIST_NODE_REPORT printf ("distNodeReassemble: got bootstrap pkt from node 0x%lx\n", nodeIdSrc);#endif if ((pNodeSrc = distNodeCreate (nodeIdSrc, state)) == NULL) { /* * We had problems creating the new node. * This is a fatal error, since we will miss that one. */ distStat.nodeDBFatal++; DIST_TBUF_FREE (pTBufNew); distPanic ("distNodePktReass: error accepting node 0x%lx\n", nodeIdSrc); } /* * If this is a BOOTSTRAP telegram, we have to clean up * the communication structures. */ distNodeCleanup (pNodeSrc); pNodeSrc->nodeBroadcast.commPktNextExpect = (INT16)winAdd (id, 1); } else { /* * Packets from unknown nodes are ignored, unless they * are of type XACK or BOOTSTRAP. */#ifdef DIST_DIAGNOSTIC distLog ("distNodePktReass: packet from unknown node 0x%lx \ --ignored\n", nodeIdSrc);#endif DIST_TBUF_FREE (pTBufNew); distNodeDbUnlock(); return (NULL); }#ifdef DIST_DIAGNOSTIC distLog ("distNodePktReass: new node 0x%lx created (state %s)\n", nodeIdSrc, distNodeStateToName (distNodeGetState (pNodeSrc)));#endif } if (DIST_TBUF_IS_BROADCAST (pTBufNew)) { if ((pNode = distNodeFindById (DIST_IF_BROADCAST_ADDR)) == NULL) { distStat.nodeDBNoMatch++; DIST_TBUF_FREE (pTBufNew); distNodeDbUnlock(); return (NULL); } pCommIn = &pNodeSrc->nodeBroadcast; pCommOut = &pNode->nodeBroadcast; } else pCommIn = pCommOut = &pNodeSrc->nodeUnicast;#ifdef DIST_NODE_REPORT { char * typeName; switch (type) { case DIST_TBUF_TTYPE_DTA: typeName = "DATA"; break; case DIST_TBUF_TTYPE_ACK: typeName = "ACK"; break; case DIST_TBUF_TTYPE_BDTA: typeName = "B-DATA"; break; case DIST_TBUF_TTYPE_BACK: typeName = "ACK-B"; break; case DIST_TBUF_TTYPE_BOOTSTRAP: typeName = "BOOTSTRAP"; break; case DIST_TBUF_TTYPE_NACK: typeName = "NACK"; break; default: typeName = "unknown"; } printf ("dist..Reass:0x%lx/%s: ", nodeIdSrc, typeName); printf ("id %d, ack %d, seq %d, type %d, len %d, hasMf %d\n", id, ack, seq, pTBufNew->tBufType, nBytes, hasMf); }#endif if (type == DIST_TBUF_TTYPE_BOOTSTRAP) { int bootTp; if (distNodeGetState (pNodeSrc) != DIST_NODE_STATE_BOOT) { /* * If we receive a BOOTSTARP telegram from a node, that is * not in booting mode, it must be rebooting. Cleanup * the structure. */ distNodeCleanup (pNodeSrc); } pNodeSrc->nodeBroadcast.commPktNextExpect = (INT16) winAdd (id, 1); bootTp = ((DIST_PKT_BOOT *) pTBufNew->pTBufData)->pktBootType; switch (bootTp) { case DIST_BOOTING_REQ: /* Send XACK. */ distNodeSetState (pNodeSrc, DIST_NODE_STATE_BOOT); distNodeSendAck (pNodeSrc, 1, DIST_NODE_ACK_EXTENDED); break;#ifdef DIST_DIAGNOSTIC default: distLog ("dist..Reass: unknown type of BOOTSTRAP telegram\n");#endif } distNodeDbUnlock (); DIST_TBUF_FREE (pTBufNew); return (NULL); } /* We will fall in here, when we receive a XACK. */ if (type == DIST_TBUF_TTYPE_BACK && nBytes == sizeof (DIST_XACK)) { DIST_XACK *pXAck = (DIST_XACK *) pTBufNew->pTBufData; distNodeSetState (pNodeSrc, ntohs (pXAck->xAckPktNodeState)); /* In bootstrapping mode. */ if (distNodeLocalState == DIST_NODE_STATE_BOOT) { short pktNextSend;#ifdef DIST_NODE_REPORT printf ("dist..Reass: XACK in bootstrap mode\n");#endif /* * The first node that answers to our BOOTSTRAP packet, * will be our godfather. */ if (distNodeGodfatherId == DIST_IF_BROADCAST_ADDR) distNodeGodfatherId = nodeIdSrc; pktNextSend = ntohs (pXAck->xAckPktNextSend); pCommIn->commPktNextExpect = pktNextSend; } distNodeDbUnlock (); DIST_TBUF_FREE (pTBufNew); return (NULL); }#ifdef DIST_NODE_REPORT if (! DIST_NODE_IS_ALIVE (pNodeSrc)) { printf ("dist..Reass: got a telegram from crashed node\n"); }#endif /* * Process ACK field. An ACK commits all packets with * an id lower or equal to the value of the ACK. * * Traverse the list of non-acknowledged packets and * decrease the ACK counters of all acknowledged packets * in the list. If no ACK is missing for a certain packet, * dequeue it and wakeup the sender. */ pktNext = pCommOut->commPktNextSend; if (winWithin (pCommOut->commAckNextExpect, ack, pktNext)) { short ackLo = pCommOut->commAckNextExpect; short ackHi = (INT16) winAdd (ack, 1); pTBufAck = pCommOut->pCommQAck; pCommIn->commAckLastRecvd = ack;#ifdef DIST_NODE_REPORT printf ("dist..Reass: ACK %d, AckQ %p:\n", ack, pTBufAck);#endif while (pTBufAck) { pTBufAckNext = DIST_TBUF_HDR_GET_NEXT (pTBufAck); if (winWithin (ackLo, pTBufAck->tBufHdrId, ackHi)) {#ifdef DIST_NODE_REPORT printf ("dist..Reass: within window, wait for %d more ACKs\n", pTBufAck->pTBufHdrTm->tmAckCnt - 1);#endif if (--pTBufAck->pTBufHdrTm->tmAckCnt == 0) { pCommOut->commAckNextExpect = (INT16) winAdd (pCommOut->commAckNextExpect, 1); DIST_TBUF_HDR_UNLINK (pCommOut->pCommQAck, pTBufAck); pTBufAck->pTBufHdrTm->tmStatus = DIST_TM_STATUS_OK;#ifdef DIST_NODE_REPORT printf ("dist..Reass: %d ACKed\n", pTBufAck->tBufHdrId);#endif semGive (&pTBufAck->pTBufHdrTm->tmWait4); /* * distNodePktSend()--which should awake now--frees * the packet. */ } } pTBufAck = pTBufAckNext; } } /* * If this is a NACK telegram, resend the single fragment, the * NACK asks us for. <id> holds the id of the packet, <seq> the * sequence number of the fragment within the packet. * The NACK telegram can be destroyed at this point, since * it contains no data. */ if (type == DIST_TBUF_TTYPE_NACK) { DIST_TBUF_FREE (pTBufNew);#ifdef DIST_NODE_REPORT printf ("dist..Reass: got NACK for (id %d, seq %d) from node 0x%lx\n", id, seq, nodeIdSrc);#endif pTBufAck = pCommOut->pCommQAck; while (pTBufAck) { if (pTBufAck->tBufHdrId == id) { DIST_TBUF *pFrag; for (pFrag = DIST_TBUF_GET_NEXT (pTBufAck); pFrag && pFrag->tBufSeq != seq; pFrag = DIST_TBUF_GET_NEXT (pFrag)); pFrag->tBufAck = (UINT16)winSub (pCommIn->commPktNextExpect, 1);#ifdef DIST_NODE_REPORT if (DIST_IF_SEND (nodeIdSrc, pFrag, 0) == ERROR) printf ("dist..Reass: error sending NACK to node 0x%lx\n", nodeIdSrc);#else /* If this fails, there is no need to mess around. */ DIST_IF_SEND (nodeIdSrc, pFrag, 0);#endif distStat.nodeFragResend++; distNodeDbUnlock (); return (NULL); } pTBufAck = DIST_TBUF_HDR_GET_NEXT (pTBufAck); } distNodeDbUnlock (); return (NULL); } /* If this is not a DTA (data) telegram, destroy it. */ if (type != DIST_TBUF_TTYPE_DTA && type != DIST_TBUF_TTYPE_BDTA) { distNodeDbUnlock (); DIST_TBUF_FREE (pTBufNew); return (NULL); } /* * Go on processing this telegram? * 1) The packet id must be within the window. * The window starts with the id of the packet * expected next and has a size of DIST_NODE_WIN_SZ. * 2) The packet should not be reassembled already. */ winLo = pCommIn->commPktNextExpect; winHi = (INT16) winAdd (winLo, DIST_NODE_WIN_SZ); if (! winWithin (winLo, id, winHi) || DIST_NODE_BTST (pCommIn->pCommCompleted, id)) { /* Not within the window. Destroy the telegram. */ distNodeDbUnlock ();#ifdef DIST_NODE_REPORT if (DIST_NODE_BTST (pCommIn->pCommCompleted, id)) printf ("distNodePktReass: packet already reassembled\n"); else printf ("distNodePktReass: not within window (%d..%d)\n", winLo, winSub (winHi, 1));#endif DIST_TBUF_FREE (pTBufNew); return (NULL); } /* Find queue for packet in reassembly list. */ pReassHdr = pCommIn->pCommQReass; pReassHdrPrev = NULL; while (pReassHdr) { if (pReassHdr->tBufHdrId > id) break; /* insert a new packet */ if (pReassHdr->tBufHdrId == id) { /* We have found the packet, the new telegram belongs to. */ DIST_TBUF *pTBuf, *pTBufPrev; DIST_TBUF *pTBufLast = DIST_TBUF_GET_LAST (pReassHdr); short seqLast = pTBufLast->tBufSeq;#ifdef DIST_NODE_REPORT printf ("dist..Reass: packet found, inserting telegram\n");#endif if (seqLast < seq) { /* * This telegram has the highest sequence number ever * received. Put it to the tail of the TBuf list. */ DIST_TBUF_ENQUEUE (pReassHdr, pTBufNew); pReassHdr->tBufHdrNLeaks += seq - seqLast - 1;#ifdef UNDEFINED /* unused, but be careful before excising for good */ pTBufLast = pTBufNew;#endif if (! DIST_TBUF_IS_BROADCAST (pTBufNew) && (seq - seqLast) > 1) { INT16 i; /* * unlock database--not strictly necessary, but * we are consuming time, holding the lock... */ for (i = seqLast + 1; i < seq; i++) distNodeSendNegAck (pNodeSrc, id, i); } } else { /* * This telegram must be inserted somewhere in the TBuf list. */ pTBufPrev = (DIST_TBUF *) pReassHdr; pTBuf = DIST_TBUF_GET_NEXT (pReassHdr); while (pTBuf != NULL) { if (pTBuf->tBufSeq == seq) { /* We have already received this one. */ distNodeDbUnlock ();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -