📄 msgqdistgrplib.c
字号:
if (inquiryNode.sendInqStatus != MSG_Q_DIST_GRP_STATUS_OK) return (ERROR); return (OK); }/***************************************************************************** msgQDistGrpAgent - send an inquiry to a group (VxFusion option)** This routine sends an inquiry message to a message group.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successfully sent; ERROR, if not sent.** NOMANUAL*/LOCAL STATUS msgQDistGrpAgent ( DIST_NODE_ID nodeIdSender, /* node id of the sender */ DIST_INQ_ID inqIdSender, /* inquiry id at sender */ DIST_MSG_Q_GRP_ID distMsgQGrpId, /* group on which to send */ char * buffer, /* message to send */ UINT nBytes, /* length of message */ int timeout, /* ticks to wait */ int priority /* priority */ ) { DIST_MSG_Q_GRP_SEND_INQ inquiryNode; /* * Our caller cannot wait, so we have to create * the inquiry id. */ inquiryNode.sendInq.inqType = DIST_MSG_Q_GRP_INQ_TYPE_SEND; semBInit (&inquiryNode.sendInqWait, SEM_Q_FIFO, SEM_EMPTY); /* inquiryNode.sendInqTask = taskIdSelf(); */ inquiryNode.sendInqNumBlocked = 0; inquiryNode.sendInqNumOutstanding = 1; inquiryNode.sendInqStatus = MSG_Q_DIST_GRP_STATUS_OK; distInqRegister ((DIST_INQ *) &inquiryNode); if (msgQDistGrpLclSend (&inquiryNode, distMsgQGrpId, buffer, nBytes, timeout, priority) == ERROR) { free (buffer); msgQDistGrpSendStatus (nodeIdSender, inqIdSender, MSG_Q_DIST_GRP_STATUS_ILLEGAL_OBJ_ID); return (ERROR); } semTake (&inquiryNode.sendInqWait, WAIT_FOREVER); distInqCancel ((DIST_INQ *) &inquiryNode); free (buffer); if (inquiryNode.sendInqStatus != MSG_Q_DIST_GRP_STATUS_OK) {#ifdef MSG_Q_DIST_GRP_REPORT printf ("msgQDistGrpAgent: something failed\n");#endif msgQDistGrpSendStatus (nodeIdSender, inqIdSender, (INT16) inquiryNode.sendInqStatus); return (ERROR); }#ifdef MSG_Q_DIST_GRP_REPORT printf ("msgQDistGrpAgent: respond with OK\n");#endif msgQDistGrpSendStatus (nodeIdSender, inqIdSender, MSG_Q_DIST_GRP_STATUS_OK); return (OK); }/***************************************************************************** msgQDistGrpSendStatus - send status and errno (VxFusion option)** This routine sends local status to an inquirying node.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if status sent; ERROR, if not sent.** NOMANUAL*/LOCAL STATUS msgQDistGrpSendStatus ( DIST_NODE_ID nodeIdDest, /* node ID of destination */ DIST_INQ_ID inqId, /* the inquiry ID */ DIST_MSG_Q_STATUS dStatus /* status to send */ ) { DIST_PKT_MSG_Q_GRP_STATUS pktStatus; STATUS status; pktStatus.pktMsgQGrpStatusHdr.pktType = DIST_PKT_TYPE_MSG_Q_GRP; pktStatus.pktMsgQGrpStatusHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_GRP_STATUS; pktStatus.pktMsgQGrpStatusInqId = htonl ((uint32_t) inqId); pktStatus.pktMsgQGrpStatusErrno = htonl ((uint32_t) errnoGet()); pktStatus.pktMsgQGrpStatusDStatus = htons ((uint16_t) dStatus); status = distNetSend (nodeIdDest, (DIST_PKT *) &pktStatus, DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_STATUS), WAIT_FOREVER, DIST_MSG_Q_GRP_PRIO); return (status); }/***************************************************************************** msgQDistGdbInput - called for distributed group database updates (VxFusion option)** This routine updates the local group database.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: Status of update.** NOMANUAL*/LOCAL DIST_STATUS msgQDistGdbInput ( DIST_NODE_ID nodeIdSrc, /* unused */ DIST_TBUF_HDR * pTBufHdr /* ptr to received TBUF header */ ) { DIST_PKT * pPkt; int pktLen; DIST_PKT_DGDB_ADD pktAdd; DIST_MSG_Q_GRP_ID grpId; DIST_GRP_DB_NODE * pDbNodeNew; DIST_NODE_ID grpCreator; char grpName[DIST_NAME_MAX_LENGTH + 1]; int sz; UNUSED_ARG(nodeIdSrc); pktLen = pTBufHdr->tBufHdrOverall; if (pktLen < sizeof (DIST_PKT)) distPanic ("msgQDistGdbInput: packet too short\n"); pPkt = (DIST_PKT *) (DIST_TBUF_GET_NEXT (pTBufHdr))->pTBufData; switch (pPkt->pktSubType) { case DIST_PKT_TYPE_DGDB_ADD: { if (pktLen < DIST_PKT_HDR_SIZEOF (DIST_PKT_DGDB_ADD)) distPanic ("msgQDistGdbInput: packet too short\n"); distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *) &pktAdd, DIST_PKT_HDR_SIZEOF (DIST_PKT_DGDB_ADD)); if ((sz = pktLen - DIST_PKT_HDR_SIZEOF (DIST_PKT_DGDB_ADD)) > DIST_NAME_MAX_LENGTH + 1) distPanic ("msgQDistGdbInput: name too long\n"); distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), DIST_PKT_HDR_SIZEOF (DIST_PKT_DGDB_ADD), (char *) &grpName, sz); grpId = ntohs (pktAdd.pktDgdbAddId); grpCreator = ntohl (pktAdd.pktDgdbAddCreator); if (grpId >= distGrpIdNext) distGrpIdNext = grpId + 1;#ifdef MSG_Q_DIST_GRP_REPORT printf ("msgQDistGrpInput: group database add from node %ld\n", nodeIdSrc); printf ("msgQDistGrpInput: bind `%s' to group id %d\n", (char *) &grpName, grpId);#endif msgQDistGrpDbLock(); pDbNodeNew = msgQDistGrpLclCreate ((char *) &grpName, grpId, DIST_GRP_STATE_GLOBAL); if (pDbNodeNew == NULL) { msgQDistGrpDbUnlock(); distPanic ("msgQDistGdbInput: group creation failed\n"); } msgQDistGrpLclSetCreator (pDbNodeNew, grpCreator); msgQDistGrpDbUnlock(); return (DIST_GDB_STATUS_OK); } default: return (DIST_GDB_STATUS_PROTOCOL_ERROR); } }/***************************************************************************** msgQDistGrpInput - called when a new group message arrives at the system (VxFusion option)** This routine is called whenever a group message is received.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: Status of message processing.** NOMANUAL*/LOCAL DIST_STATUS msgQDistGrpInput ( DIST_NODE_ID nodeIdSrc, /* source node ID */ DIST_TBUF_HDR * pTBufHdr /* ptr to message */ ) { DIST_PKT * pPkt; int pktLen; DIST_PKT_MSG_Q_GRP_SEND pktSend; DIST_MSG_Q_GRP_ID dMsgQGrpId; DIST_GRP_DB_NODE * pDistGrpDbNode; DIST_INQ_ID inqIdSrc; uint32_t timeout_msec; UINT nBytes; char * buffer; int tid; int timeout; int prio; DIST_PKT_MSG_Q_GRP_STATUS pktStatus; DIST_STATUS dStatus; DIST_INQ_ID inqId; DIST_INQ * pGenInq; int errnoRemote; DIST_MSG_Q_GRP_SEND_INQ * pInq; pktLen = pTBufHdr->tBufHdrOverall; if (pktLen < sizeof (DIST_PKT)) distPanic ("msgQDistGrpInput: packet too short\n"); pPkt = (DIST_PKT *) ((DIST_TBUF_GET_NEXT (pTBufHdr))->pTBufData); switch (pPkt->pktSubType) { case DIST_PKT_TYPE_MSG_Q_GRP_SEND: { if (pktLen < DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND)) distPanic ("msgQDistGrpInput/SEND: packet too short\n"); distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *)&pktSend, DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND)); inqIdSrc = (DIST_INQ_ID) htonl(pktSend.pktMsgQGrpSendInqId); timeout_msec = ntohl (pktSend.pktMsgQGrpSendTimeout); timeout = DIST_MSEC_TO_TICKS (timeout_msec); nBytes = pktLen - DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND); dMsgQGrpId = ntohs (pktSend.pktMsgQGrpSendId); /* Check for an empty group */ msgQDistGrpDbLock(); pDistGrpDbNode = msgQDistGrpLclFindById (dMsgQGrpId); msgQDistGrpDbUnlock(); if (pDistGrpDbNode == NULL) { msgQDistGrpSendStatus (nodeIdSrc, inqIdSrc, MSG_Q_DIST_GRP_STATUS_UNAVAIL); return (MSG_Q_DIST_GRP_STATUS_UNAVAIL); } if (SLL_FIRST ((SL_LIST *)&pDistGrpDbNode->grpDbMsgQIdLst) == NULL) { msgQDistGrpSendStatus (nodeIdSrc, inqIdSrc, MSG_Q_DIST_GRP_STATUS_OK);#ifdef MSG_Q_DIST_GRP_REPORT printf ("msgQDistGrpInput/SEND: group has no local members\n");#endif return (MSG_Q_DIST_GRP_STATUS_OK); } /* Using malloc() here is not very satisfiing. */ if ((buffer = (char *) malloc (nBytes)) == NULL) {#ifdef MSG_Q_DIST_GRP_REPORT printf ("msgQDistGrpInput/SEND: out of memory\n");#endif msgQDistGrpSendStatus (nodeIdSrc, inqIdSrc, MSG_Q_DIST_GRP_STATUS_NOT_ENOUGH_MEMORY); distStat.memShortage++; distStat.msgQGrpInDiscarded++; return (MSG_Q_DIST_GRP_STATUS_NOT_ENOUGH_MEMORY); } distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_SEND), buffer, nBytes); prio = NET_PRIO_TO_DIST_MSG_Q_PRIO (pTBufHdr->tBufHdrPrio);#ifdef MSG_Q_DIST_GRP_REPORT printf ("msgQDistGrpInput: message from node %ld to group 0x%x\n", nodeIdSrc, dMsgQGrpId);#endif /* * Send message to all members, registrated on the local node for * this group. */ tid = taskSpawn (NULL, DIST_MSG_Q_GRP_WAIT_TASK_PRIO, 0, DIST_MSG_Q_GRP_WAIT_TASK_STACK_SZ, (FUNCPTR) msgQDistGrpAgent, (int) nodeIdSrc, (int) inqIdSrc, (int) dMsgQGrpId, (int) buffer, (int) nBytes, (int) timeout, (int) prio, 0, 0, 0); if (tid == ERROR) { free (buffer); msgQDistGrpSendStatus (nodeIdSrc, inqIdSrc, MSG_Q_DIST_GRP_STATUS_UNAVAIL); return (MSG_Q_DIST_GRP_STATUS_UNAVAIL); } return (MSG_Q_DIST_GRP_STATUS_OK); } case DIST_PKT_TYPE_MSG_Q_GRP_STATUS: { if (pktLen != DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_STATUS)) distPanic ("msgQDistGrpInput/STATUS: packet too short\n"); /* First copy the error packet form the TBuf list. */ distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *) &pktStatus, DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_GRP_STATUS)); dStatus = (DIST_STATUS) ntohs (pktStatus.pktMsgQGrpStatusDStatus); errnoRemote = ntohl (pktStatus.pktMsgQGrpStatusErrno); inqId = (DIST_INQ_ID) ntohl (pktStatus.pktMsgQGrpStatusInqId); if (! (pGenInq = distInqFind (inqId))) return (MSG_Q_DIST_GRP_STATUS_LOCAL_TIMEOUT); /* See who is addressed by the STATUS telegram. */ switch (pGenInq->inqType) { case DIST_MSG_Q_GRP_INQ_TYPE_SEND: { pInq = (DIST_MSG_Q_GRP_SEND_INQ *) pGenInq; /* * Possible errors here: * MSG_Q_DIST_GRP_STATUS_OK * MSG_Q_DIST_GRP_STATUS_ERROR * MSG_Q_DIST_GRP_STATUS_UNAVAIL * MSG_Q_DIST_GRP_STATUS_ILLEGAL_OBJ_ID * MSG_Q_DIST_GRP_STATUS_NOT_ENOUGH_MEMORY */ switch (dStatus) { case MSG_Q_DIST_GRP_STATUS_OK: if (--pInq->sendInqNumOutstanding == 0) semGive (&pInq->sendInqWait); break; case MSG_Q_DIST_GRP_STATUS_ERROR: case MSG_Q_DIST_GRP_STATUS_UNAVAIL: case MSG_Q_DIST_GRP_STATUS_ILLEGAL_OBJ_ID: case MSG_Q_DIST_GRP_STATUS_NOT_ENOUGH_MEMORY: errnoOfTaskSet (pInq->sendInqTask, errnoRemote); break; default:#ifdef MSG_Q_DIST_GRP_REPORT printf ("msgQDistGrpInput/STATUS/SEND: status?\n");#endif break; } return (MSG_Q_DIST_GRP_STATUS_OK); } default:#ifdef MSG_Q_DIST_GRP_REPORT printf ("msgQDistGrpInput/STATUS: unexpected inq (%d)\n", pGenInq->inqType);#endif return (MSG_Q_DIST_GRP_STATUS_INTERNAL_ERROR); } } default:#ifdef MSG_Q_DIST_GRP_REPORT printf ("msgQDistGrpInput/STATUS: unknown group subtype (%d)\n", pPkt->pktSubType);#endif return (MSG_Q_DIST_GRP_STATUS_PROTOCOL_ERROR); }#ifdef UNDEFINED /* supposedly unreached */ if (status == ERROR) return (MSG_Q_DIST_GRP_STATUS_ERROR); return (MSG_Q_DIST_GRP_STATUS_OK);#endif }/***************************************************************************** msgQDistGrpLclSetId - change id of a group in local database (VxFusion option)** This routine changes the group id of a node in the local database.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -