📄 msgqdistlib.c
字号:
lclMsgQId = msgQDistTblGet (DIST_MSG_Q_ID_TO_TBL_IX(dMsgQId)); if (lclMsgQId == NULL) {#ifdef DIST_DIAGNOSTIC printf ("msgQDistReceive: distributed message queue does not exist\n");#endif return (ERROR); /* does not exist */ } return (msgQReceive (lclMsgQId, buffer, maxNBytes, msgQTimeout)); }/***************************************************************************** msgQDistNumMsgs - get the number of messages in a distributed message queue (VxFusion option)** This routine returns the number of messages currently queued to a specified* distributed message queue.** NOTE:* When msgQDistNumMsgs() is called through msgQNumMsgs(), <overallTimeout>* is set to WAIT_FOREVER . You cannot set <overallTimeout> to NO_WAIT (0)* because the process of sending a message from the local node to the remote* node always takes a finite amount of time.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS:* The number of messages queued, or ERROR if the operation fails.** ERRNO:* \is* \i S_distLib_OBJ_ID_ERROR* The argument <msgQId> is invalid.* \i S_distLib_UNREACHABLE* Could not establish communications with the remote node.* \i S_msgQDistLib_INVALID_TIMEOUT* The argument <overallTimeout> is NO_WAIT .* \ie** SEE ALSO: msgQLib*/int msgQDistNumMsgs ( MSG_Q_ID msgQId, /* message queue to examine */ int overallTimeout /* ticks to wait overall */ ) { DIST_MSG_Q_ID dMsgQId; DIST_OBJ_NODE * pObjNode; MSG_Q_ID lclMsgQId; DIST_PKT_MSG_Q_NUM_MSGS_REQ pktReq; DIST_MSG_Q_NUM_MSGS_INQ inquiryNode; DIST_INQ_ID inquiryId; int numMsgs; STATUS status; if (DIST_OBJ_VERIFY (msgQId) == ERROR) { errnoSet (S_distLib_OBJ_ID_ERROR); return (ERROR); } if (overallTimeout == NO_WAIT) { errnoSet (S_msgQDistLib_INVALID_TIMEOUT); return (ERROR); /* makes no sense */ } pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (msgQId); if (! IS_DIST_MSG_Q_OBJ (pObjNode)) { errnoSet (S_distLib_OBJ_ID_ERROR); return (ERROR); /* legal object id, but not a message queue */ } dMsgQId = (DIST_MSG_Q_ID) pObjNode->objNodeId;#ifdef MSG_Q_DIST_REPORT printf ("msgQDistNumMsgs: msgQId %p, dMsgQId 0x%lx\n", msgQId, dMsgQId);#endif if (IS_DIST_MSG_Q_TYPE_GRP (dMsgQId)) { errnoSet (S_msgQDistLib_NOT_GROUP_CALLABLE); return (ERROR); /* error to call msgQNumMsgs() on groups */ } if (!IS_DIST_OBJ_LOCAL (pObjNode)) /* message queue is remote */ { inquiryNode.numMsgsInq.inqType = DIST_MSG_Q_INQ_TYPE_NUM_MSGS; semBInit (&(inquiryNode.numMsgsInqWait), SEM_Q_FIFO, SEM_EMPTY); inquiryNode.numMsgsInqNum = ERROR; inquiryNode.numMsgsInqTask = taskIdSelf(); inquiryId = distInqRegister ((DIST_INQ *) &inquiryNode); pktReq.numMsgsReqHdr.pktType = DIST_PKT_TYPE_MSG_Q; pktReq.numMsgsReqHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_NUM_MSGS_REQ; pktReq.numMsgsReqTblIx = DIST_MSG_Q_ID_TO_TBL_IX (dMsgQId); pktReq.numMsgsReqInqId = (uint32_t) inquiryId; status = distNetSend (pObjNode->objNodeReside, (DIST_PKT *) &pktReq, DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_NUM_MSGS_REQ), WAIT_FOREVER, DIST_MSG_Q_NUM_MSGS_PRIO); if (status == ERROR) { distInqCancel ((DIST_INQ *) &inquiryNode); errnoSet (S_distLib_UNREACHABLE); return (ERROR); } semTake (&(inquiryNode.numMsgsInqWait), overallTimeout); numMsgs = inquiryNode.numMsgsInqNum; distInqCancel ((DIST_INQ *) &inquiryNode); return (numMsgs); } lclMsgQId = msgQDistTblGet (DIST_MSG_Q_ID_TO_TBL_IX(dMsgQId)); if (lclMsgQId == NULL) return (ERROR); /* does not exist */ return (msgQNumMsgs (lclMsgQId)); }/***************************************************************************** msgQDistGetMapped - retrieve entry from distributed msgQ table (VxFusion option)** This routine gets an entry from the distributed message queue table.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: MSG_Q_ID, or NULL.** NOMANUAL*/MSG_Q_ID msgQDistGetMapped ( MSG_Q_ID msgQId /* msgQ ID to map */ ) { DIST_MSG_Q_ID dMsgQId; DIST_OBJ_NODE * pObjNode; TBL_IX tblIx; if (DIST_OBJ_VERIFY (msgQId) == ERROR) return (NULL); pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (msgQId); if (! IS_DIST_MSG_Q_OBJ (pObjNode)) return (NULL); /* legal object id, but not a message queue */ if (! IS_DIST_OBJ_LOCAL (pObjNode)) return (NULL); dMsgQId = (DIST_MSG_Q_ID) pObjNode->objNodeId; if (IS_DIST_MSG_Q_TYPE_GRP (dMsgQId)) return (NULL); tblIx = DIST_MSG_Q_ID_TO_TBL_IX (dMsgQId); return (msgQDistTblGet (tblIx)); }/***************************************************************************** msgQDistTblPut - put a message queue to the queue table (VxFusion option)** This routine puts a MSG_Q_ID in the queue table.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successful** NOMANUAL*/LOCAL STATUS msgQDistTblPut ( MSG_Q_ID msgQId, /* ID to put in table */ TBL_IX * pTblIx /* where to return index in table */ ) { TBL_NODE * pNode; msgQDistTblLock(); pNode = (TBL_NODE *) sllGet (&msgQDistTblFreeList); msgQDistTblUnlock(); if (pNode == NULL) return (ERROR); /* all elements of the table are in use */ pNode->tblMsgQId = msgQId; *pTblIx = pNode->tblIx;#ifdef MSG_Q_DIST_REPORT printf ("msgQDistTblPut: pTblNode %p (tblIx 0x%x), msgQId 0x%lx\n", pNode, pNode->tblIx, (uint32_t) pNode->tblMsgQId);#endif return (OK); }#ifdef __SUPPORT_MSG_Q_DIST_DELETE/***************************************************************************** msgQDistTblDelete - delete a message queue from the table (VxFusion option)** This routine deletes in queue ID at table index <tblIx>.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successfully deleted.** NOMANUAL*/LOCAL STATUS msgQDistTblDelete ( TBL_IX tblIx /* index in queue table */ ) { TBL_NODE * pNode; if (tblIx >= msgQDistTblSize) return(ERROR); /* invalid argument */ pNode = &(pMsgQDistTbl[tblIx]); msgQDistTblLock(); sllPutAtHead (&msgQDistTblFreeList, (SL_NODE *) pNode); msgQDistTblUnlock(); return(OK); }#endif /* __SUPPORT_MSG_Q_DIST_DELETE *//***************************************************************************** msgQDistTblGet - get message queue ID from table (VxFusion option)** This routine takes a message queue table index, <tblIx>, and returns* the corresponding MSG_Q_ID, or NULL.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: MSG_Q_ID or NULL.** NOMANUAL*/LOCAL MSG_Q_ID msgQDistTblGet ( TBL_IX tblIx /* index in queue table */ ) { if (tblIx >= msgQDistTblSize) return(NULL); /* invalid argument */#ifdef MSG_Q_DIST_REPORT printf ("msgQDistTblGet: tblIx 0x%x, msgQId 0x%lx\n", tblIx, (uint32_t) pMsgQDistTbl[tblIx].tblMsgQId);#endif return (pMsgQDistTbl[tblIx].tblMsgQId); }/***************************************************************************** msgQDistInput - called everytime a new message arrives at the system (VxFusion option)** This routine processes messages received by VxFusion.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: The status of message processing.** NOMANUAL*/LOCAL DIST_STATUS msgQDistInput ( DIST_NODE_ID nodeIdSrc, /* source node ID */ DIST_TBUF_HDR * pTBufHdr /* ptr to the message */ ) { DIST_PKT * pPkt; int pktLen; DIST_PKT_MSG_Q_SEND pktSend; DIST_INQ_ID inqIdSrc; MSG_Q_ID msgQId; char * buffer; UINT nBytes; int prio; int ret; int tid; DIST_PKT_MSG_Q_RECV_REQ pktReq; /* incoming request packet */ UINT maxBytes; pktLen = pTBufHdr->tBufHdrOverall; if (pktLen < sizeof (DIST_PKT)) distPanic ("msgQDistInput: packet too short\n"); pPkt = (DIST_PKT *) ((DIST_TBUF_GET_NEXT (pTBufHdr))->pTBufData); switch (pPkt->pktSubType) { case DIST_PKT_TYPE_MSG_Q_SEND: { /* * Received a message from a remote sender. * Find id of local message queue, and call msgQSend(). */ if (pktLen < DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_SEND)) distPanic ("msgQDistInput/SEND: packet too short\n"); distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *) &pktSend, DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_SEND)); inqIdSrc = (DIST_INQ_ID) pktSend.sendInqId; nBytes = pktLen - DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_SEND); /* * Using malloc() here is not very satisfiing. Maybe we can * extend msgQLib with a routine, that directly sends a list * of tBufs to a message queue. */ if ((buffer = (char *) malloc (nBytes)) == NULL) {#ifdef MSG_Q_DIST_REPORT
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -