📄 msgqdistlib.c
字号:
* Could not establish communications with the remote node.* \i S_msgQDistLib_INVALID_PRIORITY* The argument <priority> is invalid.* \i S_msgQDistLib_INVALID_TIMEOUT* The argument <overallTimeout> is NO_WAIT .* \i S_msgQDistLib_RMT_MEMORY_SHORTAGE* There is not enough memory on the remote node.* \i S_objLib_OBJ_UNAVAILABLE* The argument <msgQTimeout> is set to NO_WAIT, and the queue is full.* \i S_objLib_OBJ_TIMEOUT* The queue is full for <msgQTimeout> ticks.* \i S_msgQLib_INVALID_MSG_LENGTH* The argument <nBytes> is larger than the <maxMsgLength> set for the * message queue.* \i S_msgQDistLib_OVERALL_TIMEOUT* There was no response from the remote side in <overallTimeout> ticks.* \ie** SEE ALSO: msgQLib*/STATUS msgQDistSend ( MSG_Q_ID msgQId, /* message queue on which to send */ char * buffer, /* message to send */ UINT nBytes, /* length of message */ int msgQTimeout, /* ticks to wait at message queue */ int overallTimeout, /* ticks to wait overall */ int priority /* priority */ ) { DIST_MSG_Q_ID dMsgQId; DIST_OBJ_NODE * pObjNode; MSG_Q_ID lclMsgQId; int lclPriority; /* MSG_PRI_URGENT or MSG_PRI_NORMAL */ DIST_MSG_Q_GRP_ID distMsgQGrpId; STATUS status; DIST_PKT_MSG_Q_SEND pktSendHdr; DIST_MSG_Q_SEND_INQ inquiryNode; DIST_INQ_ID inquiryId; DIST_IOVEC distIOVec[2]; if (DIST_OBJ_VERIFY (msgQId) == ERROR) { errnoSet (S_distLib_OBJ_ID_ERROR); return (ERROR); } if (overallTimeout == NO_WAIT) { errnoSet (S_msgQDistLib_INVALID_TIMEOUT); return (ERROR); /* makes no sense */ } if (! DIST_MSG_Q_PRIO_VERIFY (priority)) { errnoSet (S_msgQDistLib_INVALID_PRIORITY); return (ERROR); /* invalid priority */ } pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (msgQId); if (! IS_DIST_MSG_Q_OBJ (pObjNode)) { errnoSet (S_distLib_OBJ_ID_ERROR); return (ERROR); /* legal object id, but not a message queue */ } dMsgQId = (DIST_MSG_Q_ID) pObjNode->objNodeId;#ifdef MSG_Q_DIST_REPORT printf ("msgQDistSend: msgQId %p, dMsgQId 0x%lx\n", msgQId, dMsgQId);#endif if (IS_DIST_MSG_Q_TYPE_GRP (dMsgQId)) { /* Message queue id points to a group. */ distMsgQGrpId = DIST_MSG_Q_ID_TO_DIST_MSG_Q_GRP_ID (dMsgQId); status = msgQDistGrpSend (distMsgQGrpId, buffer, nBytes, msgQTimeout, overallTimeout, priority);#ifdef MSG_Q_DIST_REPORT printf ("msgQDistSend: msgQDistGrpSend returned = %d\n", status);#endif return (status); } if (!IS_DIST_OBJ_LOCAL (pObjNode)) { /* Message queue id points to a remote queue. */ inquiryNode.sendInq.inqType = DIST_MSG_Q_INQ_TYPE_SEND; semBInit (&(inquiryNode.sendInqWait), SEM_Q_FIFO, SEM_EMPTY); inquiryNode.remoteError = FALSE; inquiryNode.sendInqMsgQueued = FALSE; inquiryNode.sendInqTask = taskIdSelf(); inquiryId = distInqRegister ((DIST_INQ *) &inquiryNode); pktSendHdr.sendHdr.pktType = DIST_PKT_TYPE_MSG_Q; pktSendHdr.sendHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_SEND; pktSendHdr.sendTblIx = DIST_MSG_Q_ID_TO_TBL_IX (dMsgQId); pktSendHdr.sendInqId = (uint32_t) inquiryId; pktSendHdr.sendTimeout = htonl ((uint32_t) DIST_TICKS_TO_MSEC (msgQTimeout)); /* use IOV stuff here, since we do not want to copy data */ distIOVec[0].pIOBuffer = &pktSendHdr; distIOVec[0].IOLen = DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_SEND); distIOVec[1].pIOBuffer = buffer; distIOVec[1].IOLen = nBytes; status = distNetIOVSend (pObjNode->objNodeReside, &distIOVec[0], 2, WAIT_FOREVER, DIST_MSG_Q_PRIO_TO_NET_PRIO (priority)); if (status == ERROR) { distInqCancel ((DIST_INQ *) &inquiryNode); errnoSet (S_distLib_UNREACHABLE); return (ERROR); } /* * semTake() blocks the requesting task until the service * task gives the semaphore, because the request has * been processed. */ semTake (&(inquiryNode.sendInqWait), overallTimeout); distInqCancel ((DIST_INQ *) &inquiryNode); if (inquiryNode.sendInqMsgQueued) return (OK); /* If errno = S_objLib_OBJ_TIMEOUT, it could either be a result * of the timeout from the semaphore or the remote errno. We must * check the remoteError flag of inquiryNode to determine what the * source of the error was. If it is a result of the semaphore, we * will set errno to S_msgQDistLib_OVERALLTIMEOUT. Otherwise, we'll * leave the errno as it is. */ if (inquiryNode.remoteError == FALSE) errno = S_msgQDistLib_OVERALL_TIMEOUT; return (ERROR); } /* Message queue id points to a local queue. */ lclPriority = DIST_MSG_Q_PRIO_TO_MSG_Q_PRIO (priority); lclMsgQId = msgQDistTblGet (DIST_MSG_Q_ID_TO_TBL_IX (dMsgQId)); if (lclMsgQId == NULL) return (ERROR); /* does not exist */ if (msgQSend (lclMsgQId, buffer, nBytes, msgQTimeout, lclPriority) == ERROR) { return (ERROR); /* error in msgQSend() */ } return (OK); }/***************************************************************************** msgQDistReceive - receive a message from a distributed message queue (VxFusion option)** This routine receives a message from the distributed message queue specified * by <msgQId>. The received message is copied into the specified buffer, * <buffer>, which is <maxNBytes> in length. If the message is longer than * <maxNBytes>, the remainder of the message is discarded (no error indication* is returned).** The argument <msgQTimeout> specifies the time in ticks to wait for the * queuing of the message. The argument <overallTimeout> specifies the time* in ticks to wait for both the sending and queuing of the message.* While it is an error to set <overallTimeout> to NO_WAIT (0), * WAIT_FOREVER (-1) is allowed for both <msgQTimeout> and <overallTimeout>.** Calling msgQDistReceive() on a distributed message group returns an* error.** NOTE: When msgQDistReceive() is called through msgQReceive(), * <msgQTimeout> is set to <timeout> and <overallTimeout> to WAIT_FOREVER .** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: The number of bytes copied to <buffer>, or ERROR. ** ERRNO:* \is* \i S_distLib_OBJ_ID_ERROR* The argument <msgQId> is invalid.* \i S_distLib_UNREACHABLE* Could not establish communications with the remote node.* \i S_msgQLib_INVALID_MSG_LENGTH* The argument <maxNBytes> is less than 0.* \i S_msgQDistLib_INVALID_TIMEOUT* The argument <overallTimeout> is NO_WAIT .* \i S_msgQDistLib_RMT_MEMORY_SHORTAGE* There is not enough memory on the remote node.* \i S_objLib_OBJ_UNAVAILABLE* The argument <msgQTimeout> is set to NO_WAIT, and no messages are available.* \i S_objLib_OBJ_TIMEOUT* No messages were received in <msgQTimeout> ticks.* \i S_msgQDistLib_OVERALL_TIMEOUT* There was no response from the remote side in <overallTimeout> ticks.* \ie*** SEE ALSO: msgQLib*/int msgQDistReceive ( MSG_Q_ID msgQId, /* message queue from which to receive */ char * buffer, /* buffer to receive message */ UINT maxNBytes, /* length of buffer */ int msgQTimeout, /* ticks to wait at the message queue */ int overallTimeout /* ticks to wait overall */ ) { DIST_MSG_Q_ID dMsgQId; DIST_OBJ_NODE * pObjNode; MSG_Q_ID lclMsgQId; DIST_PKT_MSG_Q_RECV_REQ pktReq; DIST_MSG_Q_RECV_INQ inquiryNode; DIST_INQ_ID inquiryId; STATUS status; int nBytes; if (DIST_OBJ_VERIFY (msgQId) == ERROR) { errnoSet (S_distLib_OBJ_ID_ERROR); return (ERROR); } if (overallTimeout == NO_WAIT) { errnoSet (S_msgQDistLib_INVALID_TIMEOUT); return (ERROR); /* makes no sense */ } /* * Even though <maxNBytes> is unsigned, check for < 0 to catch * possible caller errors. */ if ((int) maxNBytes < 0) { errnoSet (S_msgQLib_INVALID_MSG_LENGTH); return (ERROR); } pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (msgQId); if (! IS_DIST_MSG_Q_OBJ (pObjNode)) { errnoSet (S_distLib_OBJ_ID_ERROR); return (ERROR); /* legal object id, but not a message queue */ } dMsgQId = (DIST_MSG_Q_ID) pObjNode->objNodeId;#ifdef MSG_Q_DIST_REPORT printf ("msgQDistReceive: msgQId %p, dMsgQId 0x%lx\n", msgQId, dMsgQId);#endif if (IS_DIST_MSG_Q_TYPE_GRP (dMsgQId)) { /* MSG_Q_ID is a group id. */ errnoSet (S_msgQDistLib_NOT_GROUP_CALLABLE); return (ERROR); /* error to call msgQReceive() on groups */ } if (!IS_DIST_OBJ_LOCAL (pObjNode)) { /* * Queue is remote. * * Create a inquiry node and send a request to the remote * node. Block until timeout exceeds or the request is * answered. */ inquiryNode.recvInq.inqType = DIST_MSG_Q_INQ_TYPE_RECV; semBInit (&(inquiryNode.recvInqWait), SEM_Q_FIFO, SEM_EMPTY); inquiryNode.recvInqTask = taskIdSelf(); inquiryNode.pRecvInqBuffer = buffer; inquiryNode.recvInqMaxNBytes = maxNBytes; inquiryNode.recvInqMsgArrived = FALSE; inquiryNode.remoteError = FALSE; inquiryId = distInqRegister ((DIST_INQ *) &inquiryNode); pktReq.recvReqHdr.pktType = DIST_PKT_TYPE_MSG_Q; pktReq.recvReqHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_RECV_REQ; pktReq.recvReqTblIx = DIST_MSG_Q_ID_TO_TBL_IX (dMsgQId); pktReq.recvReqInqId = (uint32_t) inquiryId; pktReq.recvReqMaxNBytes = htonl ((uint32_t) maxNBytes); pktReq.recvReqTimeout = htonl ((uint32_t) DIST_TICKS_TO_MSEC (msgQTimeout)); status = distNetSend (pObjNode->objNodeReside, (DIST_PKT *) &pktReq, DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_REQ), WAIT_FOREVER, DIST_MSG_Q_RECV_PRIO); if (status == ERROR) { distInqCancel ((DIST_INQ *) &inquiryNode); errnoSet (S_distLib_UNREACHABLE); return (ERROR); } /* * semTake() blocks the requesting task until * the service task gives the semaphore, because * the request has been processed. */ semTake (&(inquiryNode.recvInqWait), overallTimeout); if (inquiryNode.recvInqMsgArrived) { /* * If <recvInqMsgArrived> is true, <recvInqMaxNBytes> has * the number of bytes received. */ nBytes = inquiryNode.recvInqMaxNBytes; distInqCancel ((DIST_INQ *) &inquiryNode); return (nBytes); } distInqCancel ((DIST_INQ *) &inquiryNode); /* If errno = S_objLib_OBJ_TIMEOUT, it could either be a result * of the timeout from the semaphore or the remote errno. We must * check the remoteError flag of inquiryNode to determine what the * source of the error was. If it is a result of the semaphore, we * will set errno to S_msgQDistLib_OVERALLTIMEOUT. Otherwise, we'll * leave the errno as it is. */ if (inquiryNode.remoteError == FALSE) errnoSet (S_msgQDistLib_OVERALL_TIMEOUT); return (ERROR); } /* The message queue is local to this node. This will be simple. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -