📄 distnodelib.c
字号:
DIST_TBUF_FREE (pTBufNew); return (NULL); } if (pTBuf->tBufSeq > seq) { DIST_TBUF_INSERT_AFTER (pReassHdr, pTBufPrev, pTBufNew); pReassHdr->tBufHdrNLeaks--; break; } pTBufPrev = pTBuf; pTBuf = DIST_TBUF_GET_NEXT (pTBuf); } } /* The telegram is in the reassembly list now. */ pReassHdr->tBufHdrOverall += nBytes; /* * Test if the packet is fully reassembled (there * are no leaks within the packet and the last * fragment has the 'more fragments' bit cleared). */ pTBufLast = DIST_TBUF_GET_LAST (pReassHdr); if (!pReassHdr->tBufHdrNLeaks && !(DIST_TBUF_HAS_MF (pTBufLast))) { /* * We have received a complete packet and * do not expect to receive more fragments. */#ifdef DIST_NODE_REPORT { DIST_TBUF *pFirst = DIST_TBUF_GET_NEXT (pReassHdr); /* this is a hack */ printf ("dist..Reass: packet %d complete (type %d/%d)\n", id, *((char *) (pFirst->pTBufData)), *((char *) (pFirst->pTBufData) + 1)); }#endif DIST_NODE_BSET (pCommIn->pCommCompleted, id); /* * Test if the reassembled packet is the one * we expect next. */ if (pReassHdr->tBufHdrId == pCommIn->commPktNextExpect) { /* * This is the packet, we expect next. * Return communication pointer to caller, * so that it knows about the job. */ distNodeDbUnlock (); return (pCommIn); } } distNodeDbUnlock (); return (NULL); } pReassHdrPrev = pReassHdr; pReassHdr = DIST_TBUF_HDR_GET_NEXT (pReassHdr); } /* * This is the first fragment of a new packet. There are either * more fragments, or this packet is not the one, we are waiting * for. */ #ifdef DIST_NODE_REPORT printf ("dist..Reass: first fragment of new packet\n");#endif if ((pTBufHdrNew = DIST_TBUF_HDR_ALLOC ()) == NULL) { distStat.netInDiscarded++; /* nodeInDiscarded counts pkts discarded */ distNodeDbUnlock (); DIST_TBUF_FREE (pTBufNew); return (NULL); } pTBufHdrNew->tBufHdrSrc = nodeIdSrc; pTBufHdrNew->tBufHdrId = id; pTBufHdrNew->tBufHdrPrio = prio; pTBufHdrNew->tBufHdrOverall = nBytes; pTBufHdrNew->tBufHdrNLeaks = seq; pTBufHdrNew->tBufHdrTimo = WAIT_FOREVER; /* reassembly timeout */ DIST_TBUF_SET_NEXT (pTBufHdrNew, pTBufNew); DIST_TBUF_SET_LAST (pTBufHdrNew, pTBufNew); DIST_TBUF_HDR_INSERT_AFTER (pCommIn->pCommQReass, pReassHdrPrev, pTBufHdrNew); if (pCommIn->commPktNextExpect == id) /* this is the packet we want to deliver next */ pCommIn->pCommQNextDeliver = pTBufHdrNew; if (seq == 0 && ! hasMf) { /* We have received a complete packet in a single telegram. */#ifdef DIST_NODE_REPORT /* this is a hack */ printf ("dist..Reass: packet %d complete (type %d/%d)\n", id, *((char *) (pTBufNew->pTBufData)), *((char *) (pTBufNew->pTBufData) + 1));#endif DIST_NODE_BSET (pCommIn->pCommCompleted, id); if (id == pCommIn->commPktNextExpect) { /* * This is the packet, we expect next. * Return communication pointer to caller, * so that it knows about the job. */ distNodeDbUnlock (); return (pCommIn); } } distNodeDbUnlock (); /* Check if we have to send negative acknowledges (NACKs). */ if (! DIST_TBUF_IS_BROADCAST (pTBufNew)) { DIST_TBUF_HDR *pPrev; DIST_TBUF *pLastOfPrev; /* * Check if we have received the last fragment of the previous * packet. We cannot find out how many fragments are missing at * the end of the packet. So just send a NACK for the successor * of the last telegram received. */ if ((pPrev = DIST_TBUF_HDR_GET_PREV (pTBufHdrNew)) != NULL && (pLastOfPrev = DIST_TBUF_GET_LAST (pTBufHdrNew)) != NULL && DIST_TBUF_HAS_MF (pLastOfPrev)) { distNodeSendNegAck (pNodeSrc, pPrev->tBufHdrId, pLastOfPrev->tBufSeq + 1); } /* * Check if some telegrams from the beginning of the new packet * are missing. Send a NACK for each of them. */ if (seq > 0) { INT16 i; for (i = 0; i < seq; i++) distNodeSendNegAck (pNodeSrc, id, i); } } return (NULL); }/***************************************************************************** distNodePktSend - send a packet already stored in TBufs (VxFusion option)** This routine waits for an open send window and transmits the packet,* by sending the data of each single TBuf.* Afterwards it blocks for an ACK from the remote node.** NOTE: Takes <distNodeDbLock>.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successful.* NOMANUAL*/STATUS distNodePktSend ( DIST_TBUF_HDR * pTBufHdr /* message to send */ ) { DIST_NODE_DB_NODE * pDistNodeFound; DIST_TRANSMISSION distTm; DIST_NODE_COMM * pComm; DIST_NODE_ID nodeIdDest = pTBufHdr->tBufHdrDest; DIST_TBUF * pTBuf; short winLo, winHi; short pktId; int priority; int wait4NumAcks; distNodeDbLock (); distStat.nodeOutReceived++; if ((pDistNodeFound = distNodeFindById (nodeIdDest)) == NULL) {#ifdef DIST_DIAGNOSTIC distLog ("distNodePktSend: unknown node 0x%lx\n", nodeIdDest);#endif distStat.nodeDBNoMatch++; distNodeDbUnlock (); DIST_TBUF_FREEM (pTBufHdr); return (ERROR); } if (! DIST_NODE_IS_ALIVE (pDistNodeFound)) {#ifdef DIST_DIAGNOSTIC distLog ("distNodePktSend: destination not alive\n");#endif distStat.nodeNotAlive++; distNodeDbUnlock (); DIST_TBUF_FREEM (pTBufHdr); return (ERROR); }#ifdef DIST_DIAGNOSTIC if (DIST_TBUF_GET_NEXT (pTBufHdr) == NULL) { distLog ("distNodePktSend: TBuf chain contains no data\n"); }#endif if (nodeIdDest == DIST_IF_BROADCAST_ADDR) { pComm = &pDistNodeFound->nodeBroadcast; wait4NumAcks = distNodeNumNodesAlive - 1; } else { pComm = &pDistNodeFound->nodeUnicast; wait4NumAcks = 1; } semBInit (&distTm.tmWait4, SEM_Q_FIFO, SEM_EMPTY); distTm.tmStatus = DIST_TM_STATUS_OK; distTm.tmRetmTimo = distNodeRetryTimeout; distTm.tmNumTm = 1; distTm.tmAckCnt = wait4NumAcks; pTBufHdr->pTBufHdrTm = &distTm; /* * Each node has two queues for outgoing packets. * One is the queue for packets that should been * send, but the "window is closed" (full). * The other is the queue for packets send, and * waiting for an ACK. * In both of the two queues, packets get older. */ pTBufHdr->tBufHdrId = pktId = pComm->commPktNextSend; winLo = pComm->commAckNextExpect; winHi = (INT16) winAdd (pComm->commAckNextExpect, DIST_NODE_WIN_SZ); /* * Can we send the packet directly or do we have to wait * until the "window opens". */ if (! winWithin (winLo, pktId, winHi)) { /* Put packet to the WaitToSend queue. */ DIST_TBUF_HDR_ENQUEUE (pComm->pCommQWinOut, pTBufHdr); distNodeDbUnlock (); /* unlock before sleep */ /* Wait for a place within the window. */ semTake (&distTm.tmWait4, WAIT_FOREVER); if (distTm.tmStatus != DIST_TM_STATUS_OK) { distNodeDbUnlock (); /* this is not needed here! */ DIST_TBUF_FREEM (pTBufHdr); return (ERROR); /* e.g. timeout, node removed */ } distNodeDbLock (); /* Remove it from the WaitToSend queue. */ DIST_TBUF_HDR_UNLINK (pComm->pCommQWinOut, pTBufHdr); } pComm->commPktNextSend = (INT16) winAdd (pComm->commPktNextSend, 1); /* Put packet to the WaitForAck queue and send it. */ DIST_TBUF_HDR_ENQUEUE (pComm->pCommQAck, pTBufHdr); pTBuf = (DIST_TBUF *) pTBufHdr; priority = pTBufHdr->tBufHdrPrio; while ((pTBuf = DIST_TBUF_GET_NEXT (pTBuf)) != NULL) { pTBuf->tBufId = pktId; pTBuf->tBufAck = (UINT16) winSub (pComm->commPktNextExpect, 1);#ifdef DIST_NODE_REPORT printf ("distNodePktSend:%p: id %d, ack %d, seq %d, len %d, type %d\n", pTBuf, pTBuf->tBufId, pTBuf->tBufAck, pTBuf->tBufSeq, pTBuf->tBufNBytes, pTBuf->tBufType);#endif if (DIST_IF_SEND (nodeIdDest, pTBuf, priority) == ERROR) { distNodeDbUnlock (); DIST_TBUF_HDR_UNLINK (pComm->pCommQAck, pTBufHdr); DIST_TBUF_FREEM (pTBufHdr); return (ERROR); } pComm->commAckDelayed = FALSE; } distNodeDbUnlock (); /* unlock before sleep */ /* Wait for ACK. */ #ifdef DIST_NODE_REPORT distLog ("distNodePktSend: sleep while waiting for ACK\n");#endif semTake (&distTm.tmWait4, WAIT_FOREVER);#ifdef DIST_NODE_REPORT distLog ("distNodePktSend: awacked: status of tm is %d\n", distTm.tmStatus);#endif /* * distNodePktReass() or distNodeTimer() has already * dequeued the packet. */ DIST_TBUF_FREEM (pTBufHdr); if (distTm.tmStatus != DIST_TM_STATUS_OK) return (ERROR); /* e.g. timeout */ return (OK); }/***************************************************************************** distNodePktResend - resend a packet (VxFusion option)** Resend a packet already transmitted with distNodePktSend ().** Currently called one place in distNodeDbCommTimer() which* has <distNodeDbLock> taken before it is called.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successful.* NOMANUAL*/LOCAL STATUS distNodePktResend ( DIST_NODE_COMM * pComm, /* communication channel */ DIST_TBUF_HDR * pTBufHdr /* packet to retransmit */ ) { DIST_TBUF * pTBuf; DIST_NODE_ID nodeIdDest = pTBufHdr->tBufHdrDest; int priority = pTBufHdr->tBufHdrPrio; distStat.nodePktResend++; pTBuf = (DIST_TBUF *) pTBufHdr; while ((pTBuf = DIST_TBUF_GET_NEXT (pTBuf)) != NULL) { pTBuf->tBufAck = (UINT16) winSub (pComm->commPktNextExpect, 1);#ifdef DIST_NODE_REPORT printf ("distNodePktResend: resend with ACK for %d (%d)\n", pTBuf->tBufAck, pComm->commPktNextExpect);#endif if (DIST_IF_SEND (nodeIdDest, pTBuf, priority) == ERROR) { DIST_TBUF_HDR_UNLINK (pComm->pCommQAck, pTBufHdr); DIST_TBUF_FREEM (pTBufHdr); return (ERROR); } } return (OK); } /***************************************************************************** distNodePktAck - ask for an ACK for a received and consumed packet (VxFusion option)** This routine acknowledeges a packet. If DIST_NODE_ACK_IMMEDIATELY is* not set in <options>, an ACK telegram is not transmitted immediately* but a flag is set. If data is sent to the ACK awaiting node, within a* certain range of time, the ACK is sent within the data telegram.* Else an ACK telegram is transmitted. This is known as `piggy-backing'.* Due to the design, piggy-backing only works for unicasts. For broadcasts,* an ACK is always sent immediately.** NOTE: Takes <distNodeDbLock>.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successful.* NOMANUAL*/STATUS distNodePktAck ( DIST_NODE_ID nodeIdSrc, /* source node */ DIST_TBUF_HDR * pPktAck, /* ack packet */ int options /* options */ ) { DIST_NODE_DB_NODE * pDistNodeFound; DIST_NODE_COMM * pComm; short id = pPktAck->tBufHdrId; int ackBroadca
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -