📄 msgqdistlib.c
字号:
inqId = (DIST_INQ_ID) pktStatus.statusInqId; if (! (pGenInq = distInqFind (inqId))) return (MSG_Q_DIST_STATUS_LOCAL_TIMEOUT); /* See who is addressed by the STATUS telegram. */ switch (pGenInq->inqType) { case DIST_MSG_Q_INQ_TYPE_NUM_MSGS: { DIST_MSG_Q_NUM_MSGS_INQ *pInq; pInq = (DIST_MSG_Q_NUM_MSGS_INQ *) pGenInq; /* * Possible errors are: * MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID */ switch (msgQStatus) { case MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID: errnoOfTaskSet (pInq->numMsgsInqTask, errnoRemote); break; default:#ifdef MSG_Q_DIST_REPORT printf ("msgQDistInput/STATUS/NUM_MSGS: status?\n");#endif break; } semGive (&pInq->numMsgsInqWait); return (MSG_Q_DIST_STATUS_OK); } case DIST_MSG_Q_INQ_TYPE_SEND: { DIST_MSG_Q_SEND_INQ * pInq; pInq = (DIST_MSG_Q_SEND_INQ *) pGenInq; /* * Possible errors here: * MSG_Q_DIST_STATUS_OK * MSG_Q_DIST_STATUS_ERROR * MSG_Q_DIST_STATUS_UNAVAIL * MSG_Q_DIST_STATUS_TIMEOUT * MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID * MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY */ switch (msgQStatus) { case MSG_Q_DIST_STATUS_OK: pInq->sendInqMsgQueued = TRUE; break; case MSG_Q_DIST_STATUS_ERROR: case MSG_Q_DIST_STATUS_UNAVAIL: case MSG_Q_DIST_STATUS_TIMEOUT: pInq->remoteError = TRUE; errnoOfTaskSet (pInq->sendInqTask, errnoRemote); break; case MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID: pInq->remoteError = TRUE; errnoOfTaskSet (pInq->sendInqTask, S_distLib_OBJ_ID_ERROR); break; case MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY: pInq->remoteError = TRUE; errnoOfTaskSet (pInq->sendInqTask, S_msgQDistLib_RMT_MEMORY_SHORTAGE); break; default:#ifdef MSG_Q_DIST_REPORT printf ("msgQDistInput/STATUS/SEND: status?\n");#endif break; } semGive (&pInq->sendInqWait); return (MSG_Q_DIST_STATUS_OK); } case DIST_MSG_Q_INQ_TYPE_RECV: { DIST_MSG_Q_RECV_INQ * pInq; pInq = (DIST_MSG_Q_RECV_INQ *) pGenInq; /* * Possible errors here: * MSG_Q_DIST_STATUS_ERROR * MSG_Q_DIST_STATUS_UNAVAIL * MSG_Q_DIST_STATUS_TIMEOUT * MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID * MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY */ switch (msgQStatus) { case MSG_Q_DIST_STATUS_UNAVAIL: case MSG_Q_DIST_STATUS_TIMEOUT: case MSG_Q_DIST_STATUS_ERROR: pInq->remoteError = TRUE; errnoOfTaskSet (pInq->recvInqTask, errnoRemote); break; case MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID: pInq->remoteError = TRUE; errnoOfTaskSet (pInq->recvInqTask, S_distLib_OBJ_ID_ERROR); break; case MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY: pInq->remoteError = TRUE; errnoOfTaskSet (pInq->recvInqTask, S_msgQDistLib_RMT_MEMORY_SHORTAGE); break; default:#ifdef MSG_Q_DIST_REPORT printf ("msgQDistInput/STATUS/RECV_REQ: status?\n");#endif break; } semGive (&pInq->recvInqWait); return (MSG_Q_DIST_STATUS_OK); } default:#ifdef MSG_Q_DIST_REPORT printf ("msgQDistInput/STATUS: unexpected inquiry (%d)\n", pGenInq->inqType);#endif return (MSG_Q_DIST_STATUS_INTERNAL_ERROR); } } default:#ifdef MSG_Q_DIST_REPORT printf ("msgQDistInput: unknown message queue subtype (%d)\n", pPkt->pktSubType);#endif return (MSG_Q_DIST_STATUS_PROTOCOL_ERROR); } }/***************************************************************************** msgQDistSendReply - send message to message queue and respond (VxFusion option)** This routine is used internally to do a msgQSend().** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: Status of message processsing.** NOMANUAL*/LOCAL DIST_MSG_Q_STATUS msgQDistSendReply ( DIST_NODE_ID nodeIdSender, /* sending node ID */ DIST_INQ_ID inqIdSender, /* sending inquiry ID */ MSG_Q_ID msgQId, /* ID destination Q */ char * buffer, /* start of data to send */ UINT nBytes, /* number of bytes to send */ int timeout, /* msgQSend() timeout */ int priority, /* message priority */ BOOL lastTry /* clean-up if this is last attempt */ ) { STATUS status; DIST_MSG_Q_STATUS msgQStatus = MSG_Q_DIST_STATUS_OK; status = msgQSend (msgQId, buffer, nBytes, timeout, priority); if (status == ERROR) { switch (errnoGet()) { case S_objLib_OBJ_UNAVAILABLE: /* timeout == NO_WAIT */ msgQStatus = MSG_Q_DIST_STATUS_UNAVAIL; break; case S_objLib_OBJ_TIMEOUT: /* timeout != NO_WAIT */ msgQStatus = MSG_Q_DIST_STATUS_TIMEOUT; break; default: msgQStatus = MSG_Q_DIST_STATUS_ERROR; } } if (lastTry) { free (buffer); msgQDistSendStatus (nodeIdSender, inqIdSender, msgQStatus); } else { if (msgQStatus != MSG_Q_DIST_STATUS_UNAVAIL) msgQDistSendStatus (nodeIdSender, inqIdSender, msgQStatus); } return (msgQStatus); }/***************************************************************************** msgQDistRecvReply - receive message from message queue and respond (VxFusion option)** This routine is used internally to call msgQReceive().** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: Status of operation.** NOMANUAL*/LOCAL DIST_MSG_Q_STATUS msgQDistRecvReply ( DIST_NODE_ID nodeIdRespond, /* node wanting the message */ DIST_INQ_ID inqIdRespond, /* inquiry ID of that node */ MSG_Q_ID msgQId, /* message Q to read */ char * buffer, /* start of buffer for data */ UINT maxNBytes, /* maximum number of bytes to read */ int timeout, /* timeout for msgQReceive() */ BOOL lastTry /* clean-up, if last attempt */ ) { DIST_PKT_MSG_Q_RECV_RPL pktRpl; DIST_MSG_Q_STATUS msgQStatus; DIST_IOVEC distIOVec[2]; STATUS status; int nBytes; nBytes = msgQReceive (msgQId, buffer, maxNBytes, timeout); if (nBytes == ERROR) { switch (errnoGet()) { case S_objLib_OBJ_UNAVAILABLE: /* timeout == NO_WAIT */ msgQStatus = MSG_Q_DIST_STATUS_UNAVAIL; break; case S_objLib_OBJ_TIMEOUT: /* timeout != NO_WAIT */ msgQStatus = MSG_Q_DIST_STATUS_TIMEOUT; break; default: msgQStatus = MSG_Q_DIST_STATUS_ERROR; } if (lastTry) { free(buffer); msgQDistSendStatus (nodeIdRespond, inqIdRespond, msgQStatus); } else { if (msgQStatus != MSG_Q_DIST_STATUS_UNAVAIL) msgQDistSendStatus (nodeIdRespond, inqIdRespond,msgQStatus); } return (msgQStatus); } /* We have received a message, now respond to the request. */ pktRpl.recvRplHdr.pktType = DIST_PKT_TYPE_MSG_Q; pktRpl.recvRplHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_RECV_RPL; pktRpl.recvRplInqId = (uint32_t) inqIdRespond; distIOVec[0].pIOBuffer = &pktRpl; distIOVec[0].IOLen = DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_RPL); distIOVec[1].pIOBuffer = buffer; distIOVec[1].IOLen = nBytes; status = distNetIOVSend (nodeIdRespond, &distIOVec[0], 2, WAIT_FOREVER, DIST_MSG_Q_RECV_PRIO); if (status == ERROR) { STATUS status; status = msgQSend (msgQId, buffer, nBytes, NO_WAIT, MSG_PRI_URGENT); if (status == ERROR) { /* XXX TODO this one can also fail; panic for now */ distPanic ("msgQDistRecvReply: msgQSend failed\n"); }#ifdef MSG_Q_DIST_REPORT printf ("msgQDistRecvReply: remote node is unreachable\n");#endif if (lastTry) free (buffer); return (MSG_Q_DIST_STATUS_UNREACH); }#ifdef MSG_Q_DIST_REPORT printf ("msgQDistRecvReply: received a message; forwarded it to remote\n");#endif if (lastTry) free (buffer); return (MSG_Q_DIST_STATUS_OK); /* packet already acknowledged */ }/***************************************************************************** msgQDistSendStatus - send status and errno (VxFusion option)** This routine sends operation status and errno inform to a remote node.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: N/A** NOMANUAL*/LOCAL STATUS msgQDistSendStatus ( DIST_NODE_ID nodeIdDest, /* the node to send status to */ DIST_INQ_ID inqId, /* inquiry ID of that node */ DIST_MSG_Q_STATUS msgQStatus /* the status to send */ ) { DIST_PKT_MSG_Q_STATUS pktStatus; STATUS status; pktStatus.statusHdr.pktType = DIST_PKT_TYPE_MSG_Q; pktStatus.statusHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_STATUS; pktStatus.statusInqId = (uint32_t) inqId; pktStatus.statusErrno = htonl ((uint32_t) errnoGet()); pktStatus.statusDStatus = htons ((uint16_t) msgQStatus); status = distNetSend (nodeIdDest, (DIST_PKT *) &pktStatus, DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_STATUS), WAIT_FOREVER, DIST_MSG_Q_ERROR_PRIO); return (status); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -