📄 distnetlib.c
字号:
* the window rotating. */ if (pServNode->servUp == FALSE) return (OK); semTake (&pServNode->servQLock, WAIT_FOREVER); DIST_TBUF_HDR_ENQUEUE (pServNode->pServQ, pDeliver); semGive (&pServNode->servQLock); semGive (&pServNode->servWait4Jobs); return (OK); } return (ERROR); }/***************************************************************************** distNetCtl - control function for network layer (VxFusion option)** This routine performs control functions on the network layer.* The following functions are accepted:* \is* \i DIST_CTL_SERVICE_HOOK* Set a function to be called, each time a service called by a* remote node fails.* \i DIST_CTL_SERVICE_CONF* Unsed to configure a certain service. The <argument> parameter is* a pointer to DIST_SERV_CONF, which holds the service id and its* configuration to set.* \ie** RETURNS: Return value of function called, or ERROR.** ERRNO:* S_distLib_UNKNOWN_REQUEST** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** NOMANUAL*/int distNetCtl ( int function, /* function code */ int argument /* arbitrary argument */ ) { DIST_SERV_CONF *pServConfig; switch (function) { case DIST_CTL_SERVICE_HOOK: distNetServiceHook = (FUNCPTR) argument; return (OK); case DIST_CTL_SERVICE_CONF: { if ((pServConfig = (DIST_SERV_CONF *) argument) == NULL) return (ERROR); return (distNetServConfig (pServConfig->servId, pServConfig->taskPrio, pServConfig->netPrio)); } default: errnoSet (S_distLib_UNKNOWN_REQUEST); return (ERROR); } }/***************************************************************************** distNetSend - send message from a continuous buffer to the network (VxFusion option)** This routine sends a message of length <nBytes> starting at <buffer>* to node <distNodeDest>. If necessary, the message is fragmented.** The routine blocks until the message is acknowledged or timeout* has expired.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successful.* NOMANUAL*/STATUS distNetSend ( DIST_NODE_ID nodeIdDest, /* node to which message is destinated */ DIST_PKT * pPkt, /* packet to send */ UINT nBytes, /* length of message */ int timo, /* ticks to wait */ int prio /* priority level to send message on */ ) { char * pFrag; char * pBufEnd; DIST_TBUF * pTBuf; DIST_TBUF_HDR * pTBufHdr; int mtu; int tBufNBytes; short seq = 0; distStat.netOutReceived++; if (nodeIdDest == DIST_IF_BROADCAST_ADDR && distNodeGetNumNodes (DIST_NODE_NUM_NODES_ALIVE) <= 1) return (OK); pPkt->pktLen = htons ((UINT16)nBytes); /* Get first TBuf for header. */ if ((pTBufHdr = DIST_TBUF_HDR_ALLOC ()) == NULL) { distStat.netOutDiscarded++; return (ERROR); /* out of TBufs */ } pTBufHdr->tBufHdrDest = nodeIdDest; pTBufHdr->tBufHdrOverall = (INT16) nBytes; pTBufHdr->tBufHdrTimo = timo; pTBufHdr->tBufHdrPrio = prio; /* Split the packet to n telegrams and copy them to TBufs. */ pBufEnd = (pFrag = (char *) pPkt) + nBytes; mtu = DIST_IF_MTU - DIST_IF_HDR_SZ; while ((pTBuf = DIST_TBUF_ALLOC ()) != NULL) { DIST_TBUF_ENQUEUE (pTBufHdr, pTBuf); tBufNBytes = MIN (pBufEnd - pFrag, mtu); pTBuf->tBufFlags = DIST_TBUF_FLAG_MF; if (seq == 0) pTBuf->tBufFlags |= DIST_TBUF_FLAG_HDR; if (nodeIdDest == DIST_IF_BROADCAST_ADDR) { pTBuf->tBufType = DIST_TBUF_TTYPE_BDTA; pTBuf->tBufFlags |= DIST_TBUF_FLAG_BROADCAST; } else pTBuf->tBufType = DIST_TBUF_TTYPE_DTA; pTBuf->tBufSeq = seq++; pTBuf->tBufNBytes = (UINT16) tBufNBytes; bcopy (pFrag, pTBuf->pTBufData, tBufNBytes); pFrag += tBufNBytes; if (pFrag == pBufEnd) /* done? */ { pTBuf->tBufFlags &= ~DIST_TBUF_FLAG_MF;#ifdef DIST_NET_REPORT printf ("distNetSend:%p: node %lu, numSeq %d, lenOverall %d\n", pTBufHdr, nodeIdDest, seq, pTBufHdr->tBufHdrOverall);#endif return (distNodePktSend (pTBufHdr)); } } /* TBuf shortage */ DIST_TBUF_FREEM (pTBufHdr); distStat.netOutDiscarded++; return (ERROR); }/***************************************************************************** distNetIOVSend - send message from discontinuous buffer to the network (VxFusion option)** This routine sends a message from a scatter/gather buffer (IOV)* to node <nodeIdDest>. If necessary, the message is fragmented.** This routine blocks until the message is acknowledged or timeout* has exceeded.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successful.* NOMANUAL*/STATUS distNetIOVSend ( DIST_NODE_ID nodeIdDest, /* node to which message is destinated */ DIST_IOVEC * pIOV, /* list of buffers to gather */ int numIOV, /* number of buffers */ int timo, /* ticks to wait */ int prio /* priority level to send message on */ ) { DIST_IOVEC * p; DIST_TBUF_HDR * pTBufHdr; DIST_TBUF * pTBuf = NULL; DIST_PKT * pPkt; int szSrc, szDest, sz; int seq; char * pSrc; char * pDest = NULL; distStat.netOutReceived++; if (numIOV <= 0) return (ERROR); if (nodeIdDest == DIST_IF_BROADCAST_ADDR && distNodeGetNumNodes (DIST_NODE_NUM_NODES_ALIVE) <= 1) { return (OK); } /* Get first TBuf for header. */ if ((pTBufHdr = DIST_TBUF_HDR_ALLOC ()) == NULL) { distStat.netOutDiscarded++; return (ERROR); } pTBufHdr->tBufHdrDest = nodeIdDest; pTBufHdr->tBufHdrTimo = timo; pTBufHdr->tBufHdrPrio = prio; pTBufHdr->tBufHdrOverall = 0; /* later */ /* Copy the discontinious buffer to a list of tBufs. */ p = pIOV; szDest = 0; seq = 0; while (numIOV--) { pTBufHdr->tBufHdrOverall += (szSrc = p->IOLen); pSrc = p->pIOBuffer; do { if (szDest == 0) { if ((pTBuf = DIST_TBUF_ALLOC ()) == NULL) { distStat.netOutDiscarded++; DIST_TBUF_FREEM (pTBufHdr); return (ERROR); } pTBuf->tBufFlags = 0; if (seq == 0) pTBuf->tBufFlags |= DIST_TBUF_FLAG_HDR; pTBuf->tBufFlags |= DIST_TBUF_FLAG_MF; if (nodeIdDest == DIST_IF_BROADCAST_ADDR) { pTBuf->tBufType = DIST_TBUF_TTYPE_BDTA; pTBuf->tBufFlags |= DIST_TBUF_FLAG_BROADCAST; } else pTBuf->tBufType = DIST_TBUF_TTYPE_DTA; pTBuf->tBufSeq = (UINT16) seq++; pTBuf->tBufNBytes = 0; DIST_TBUF_ENQUEUE (pTBufHdr, pTBuf); szDest = DIST_IF_MTU - DIST_IF_HDR_SZ; pDest = pTBuf->pTBufData; } sz = MIN (szSrc, szDest); bcopy (pSrc, pDest, sz); pTBuf->tBufNBytes += sz; szSrc -= sz; szDest -= sz; pSrc += sz; pDest += sz; } while (szSrc > 0); p++; } pTBuf->tBufFlags &= ~DIST_TBUF_FLAG_MF;#ifdef DIST_NET_REPORT printf ("distNetSend:%p: node %lu, numSeq %d, lenOverall %d\n", pTBufHdr, nodeIdDest, seq, pTBufHdr->tBufHdrOverall);#endif pPkt = (DIST_PKT *) ((DIST_TBUF_GET_NEXT (pTBufHdr))->pTBufData); pPkt->pktLen = htons (pTBufHdr->tBufHdrOverall); return (distNodePktSend (pTBufHdr)); }/***************************************************************************** distNetInput - accept a telegram buffer and send it to upper VxFusion layers (VxFusion option)** This routine accepts a telegram buffer from an adapter and tries to * reassemble a message from it. If the telegram buffer contains an entire * message, distNetInput() simply forwards the message to the upper VxFusion * layers and sends an acknowledgment for the message. If the telegram buffer * contains a message fragment, the message fragment is added to existing * message fragments, if any. If the message fragment completes a message, the * message is forwarded to the upper VxFusion layers and distNetInput() * sends an acknowledgment for the message.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: N/A* NOMANUAL*/void distNetInput ( DIST_NODE_ID nodeIdIn, /* source node of the telegram buffer */ int prioIn, /* priority the telegram arrived with */ DIST_TBUF * pTBufIn /* the telegram buffer itself */ ) { void * pComm; /* from distNodeReassemble(), to distNodeGetReassembled() */ DIST_TBUF_HDR * pReassembled; DIST_PKT * pPkt; distStat.netInReceived++; /* Try to reassemble. */ if ((pComm = distNodeReassemble (nodeIdIn, prioIn, pTBufIn)) == NULL) return; /* * Now that we are here, a packet has been reassembled. * <pComm> points to the communication, that has one or * more packets. */ while ((pReassembled = distNodeGetReassembled (pComm)) != NULL) {#ifdef DIST_NET_REPORT { char * buffer; int len; printf ("distNetInput: packet %p reassembled\n", pReassembled); buffer = malloc ((len = pReassembled->tBufHdrOverall)); if (buffer) { distTBufCopy (DIST_TBUF_GET_NEXT (pReassembled), 0, buffer, len); distDump (buffer, len); free (buffer); } }#endif distStat.netReassembled++; pPkt = (DIST_PKT *) ((DIST_TBUF_GET_NEXT (pReassembled))->pTBufData); if ((ntohs (pPkt->pktLen) != pReassembled->tBufHdrOverall) || (distNetServCall (pPkt->pktType, pReassembled) == ERROR)) {#ifdef DIST_DIAGNOSTIC if (ntohs (pPkt->pktLen) != pReassembled->tBufHdrOverall) { distLog("distNetInput: wrong length (has %d, expected %d)\n", pReassembled->tBufHdrOverall, ntohs (pPkt->pktLen)); } else distLog("distNetInput: unknown type (%d) or service down\n", pPkt->pktType);#endif distNodePktDiscard (pReassembled->tBufHdrSrc, pReassembled); } else distNodePktAck (pReassembled->tBufHdrSrc, pReassembled, 0); /* loop back for the next packet. */ } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -