📄 msgqdistlib.c
字号:
printf ("msgQDistInput/SEND: out of memory\n");#endif msgQDistSendStatus (nodeIdSrc, inqIdSrc, MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY); return (MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY); } distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_SEND), buffer, nBytes); prio = NET_PRIO_TO_DIST_MSG_Q_PRIO (pTBufHdr->tBufHdrPrio); msgQId = msgQDistTblGet (pktSend.sendTblIx); if (msgQId == NULL) { free (buffer);#ifdef MSG_Q_DIST_REPORT printf ("msgQDistInput/SEND: unknown message queue id\n");#endif msgQDistSendStatus (nodeIdSrc, inqIdSrc, MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID); return (MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID); } /* * First try to send with NO_WAIT. We set the lastry * argument to FALSE, because we will try again. */ ret = msgQDistSendReply (nodeIdSrc, inqIdSrc, msgQId, buffer, nBytes, NO_WAIT, prio, FALSE); switch (ret) { case MSG_Q_DIST_STATUS_OK: free (buffer); return (MSG_Q_DIST_STATUS_OK); case MSG_Q_DIST_STATUS_ERROR: free (buffer); return (MSG_Q_DIST_STATUS_ERROR); case MSG_Q_DIST_STATUS_UNAVAIL: { int timeout; timeout = DIST_MSEC_TO_TICKS (ntohl (pktSend.sendTimeout)); if (timeout != NO_WAIT) {#ifdef MSG_Q_DIST_REPORT printf ("msgQDistInput/SEND: timeout = %d\n", timeout);#endif /* * Send with NO_WAIT has failed and user * supplied timeout differs from NO_WAIT. * Spawn a task and wait on message queue. */ tid = taskSpawn (NULL, DIST_MSG_Q_WAIT_TASK_PRIO, 0, DIST_MSG_Q_WAIT_TASK_STACK_SZ, (FUNCPTR) msgQDistSendReply, (int) nodeIdSrc, (int) inqIdSrc, (int) msgQId, (int) buffer, nBytes, timeout, prio, TRUE, 0, 0); if (tid != ERROR) { /* msgQDistSendReply () frees <buffer> */ return (MSG_Q_DIST_STATUS_OK); } } free (buffer); /* For this case where the user specified NO_WAIT * we must send back a status now. We didn't do this * in msgQDistSendReply() because we didn't know if * the user specified NO_WAIT or whether it was the first * try before spawning a task. */ msgQDistSendStatus (nodeIdSrc, inqIdSrc, (INT16) ret); return (MSG_Q_DIST_STATUS_UNAVAIL); } default: free (buffer);#ifdef MSG_Q_DIST_REPORT printf ("msgQDistInput/SEND: illegal status\n");#endif return (MSG_Q_DIST_STATUS_INTERNAL_ERROR); } } case DIST_PKT_TYPE_MSG_Q_RECV_REQ: { /* * A remote node requests to receive data from local queue. * Find id of local message queue, and call msgQReceive(). */ DIST_MSG_Q_STATUS ret; if (pktLen != DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_REQ)) distPanic ("msgQDistInput/RECV_REQ: packet too short\n"); distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *) &pktReq, DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_REQ)); inqIdSrc = (DIST_INQ_ID) pktReq.recvReqInqId; msgQId = msgQDistTblGet (pktReq.recvReqTblIx); if (msgQId == NULL) {#ifdef MSG_Q_DIST_REPORT printf ("msgQDistInput/RECV_REQ: unknown message queue id\n");#endif msgQDistSendStatus (nodeIdSrc, inqIdSrc, MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID); return (MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID); } maxBytes = ntohl ((uint32_t) pktReq.recvReqMaxNBytes); buffer = (char *) malloc (maxBytes); if (buffer == NULL) {#ifdef MSG_Q_DIST_REPORT printf ("msgQDistInput/RECV_REQ: out of memory\n");#endif msgQDistSendStatus (nodeIdSrc, inqIdSrc, MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY); return (MSG_Q_DIST_STATUS_NOT_ENOUGH_MEMORY); } /* First try to receive a message with NO_WAIT. */ ret = msgQDistRecvReply (nodeIdSrc, inqIdSrc, msgQId, buffer, maxBytes, NO_WAIT, FALSE); switch (ret) { case MSG_Q_DIST_STATUS_OK: free (buffer); return (MSG_Q_DIST_STATUS_OK); case MSG_Q_DIST_STATUS_ERROR: free (buffer); return (MSG_Q_DIST_STATUS_ERROR); case MSG_Q_DIST_STATUS_UNAVAIL: { uint32_t timeout_msec; int timeout; timeout_msec = ntohl ((uint32_t) pktReq.recvReqTimeout); timeout = DIST_MSEC_TO_TICKS (timeout_msec); if (timeout != NO_WAIT) { /* * Receiving with NO_WAIT has failed and user * supplied timeout differs from NO_WAIT. * Spawn a task and wait on message queue. */ int tid;#ifdef MSG_Q_DIST_REPORT printf ("msgQDistInput/RECV_REQ: timeout = %d\n", timeout);#endif tid = taskSpawn (NULL, DIST_MSG_Q_WAIT_TASK_PRIO, 0, DIST_MSG_Q_WAIT_TASK_STACK_SZ, /* task entry point */ (FUNCPTR) msgQDistRecvReply, /* who is the receiver */ (int) nodeIdSrc, (int) inqIdSrc, /* receiving options */ (int) msgQId, (int) buffer, maxBytes, timeout, /* some options */ TRUE /* lastTry */, 0, 0, 0); if (tid != ERROR) return (MSG_Q_DIST_STATUS_OK); } free (buffer); /* For this case where the user specified NO_WAIT * we must send back a status now. We didn't do this * in msgQDistRecvReply() because we didn't know if * the user specified NO_WAIT or whether it was the first * try before spawning a task. */ msgQDistSendStatus (nodeIdSrc, inqIdSrc, ret); return (MSG_Q_DIST_STATUS_UNAVAIL); } default:#ifdef MSG_Q_DIST_REPORT printf ("msgQDistInput/SEND: illegal status\n");#endif free (buffer); return (MSG_Q_DIST_STATUS_INTERNAL_ERROR); } } case DIST_PKT_TYPE_MSG_Q_RECV_RPL: { DIST_PKT_MSG_Q_RECV_RPL pktRpl; DIST_MSG_Q_RECV_INQ * pInq; DIST_INQ_ID inqId; int nBytes; if (pktLen < DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_RPL)) distPanic ("msgQDistInput/RECV_RPL: packet too short\n"); /* First copy the reply header. */ distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *) &pktRpl, DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_RPL)); inqId = (DIST_INQ_ID) pktRpl.recvRplInqId; if (! (pInq = (DIST_MSG_Q_RECV_INQ *) distInqFind (inqId))) return (MSG_Q_DIST_STATUS_LOCAL_TIMEOUT); /* Now copy message directly to user's buffer. */ nBytes = distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_RECV_RPL), pInq->pRecvInqBuffer, pInq->recvInqMaxNBytes); pInq->recvInqMaxNBytes = nBytes; pInq->recvInqMsgArrived = TRUE; semGive (&pInq->recvInqWait); return (MSG_Q_DIST_STATUS_OK); } case DIST_PKT_TYPE_MSG_Q_NUM_MSGS_REQ: { /* * Remote note requests numMsgs service from local queue. * Find id of local message queue, and call msgQNumMsgs(). */ DIST_PKT_MSG_Q_NUM_MSGS_REQ pktReq; DIST_PKT_MSG_Q_NUM_MSGS_RPL pktRpl; DIST_INQ_ID inqIdSrc; /* remote inquiry id */ MSG_Q_ID lclMsgQId; STATUS status; if (pktLen != DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_NUM_MSGS_REQ)) distPanic ("msgQDistInput/NUM_MSGS_REQ: packet too short\n"); distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *) &pktReq, DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_NUM_MSGS_REQ)); inqIdSrc = (DIST_INQ_ID) pktReq.numMsgsReqInqId; lclMsgQId = msgQDistTblGet (pktReq.numMsgsReqTblIx); if (lclMsgQId == NULL) {#ifdef MSG_Q_DIST_REPORT printf ("msgQDistInput/RECV_REQ: unknown message queue id\n");#endif msgQDistSendStatus (nodeIdSrc, inqIdSrc, MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID); return (MSG_Q_DIST_STATUS_ILLEGAL_OBJ_ID); } pktRpl.numMsgsRplHdr.pktType = DIST_PKT_TYPE_MSG_Q; pktRpl.numMsgsRplHdr.pktSubType = DIST_PKT_TYPE_MSG_Q_NUM_MSGS_RPL; pktRpl.numMsgsRplInqId = (uint32_t) inqIdSrc; pktRpl.numMsgsRplNum = htonl (msgQNumMsgs (lclMsgQId)); status = distNetSend (nodeIdSrc, (DIST_PKT *) &pktRpl, DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_NUM_MSGS_RPL), WAIT_FOREVER, DIST_MSG_Q_NUM_MSGS_PRIO); if (status == ERROR) {#ifdef DIST_DIAGNOSTIC distLog ("msgQDistInput: reply to NumMsgsReq failed\n");#endif return (MSG_Q_DIST_STATUS_UNREACH); } return (MSG_Q_DIST_STATUS_OK); } case DIST_PKT_TYPE_MSG_Q_NUM_MSGS_RPL: { DIST_PKT_MSG_Q_NUM_MSGS_RPL pktRpl; DIST_MSG_Q_NUM_MSGS_INQ * pInq; DIST_INQ_ID inqId; if (pktLen != DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_NUM_MSGS_RPL)) distPanic ("msgQDistInput/NUM_MSGS_RPL: packet too short\n"); distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *) &pktRpl, DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_NUM_MSGS_RPL)); inqId = (DIST_INQ_ID) pktRpl.numMsgsRplInqId; if (! (pInq = (DIST_MSG_Q_NUM_MSGS_INQ *) distInqFind (inqId))) return (MSG_Q_DIST_STATUS_LOCAL_TIMEOUT); pInq->numMsgsInqNum = (int) ntohl (pktRpl.numMsgsRplNum); semGive (&(pInq->numMsgsInqWait)); return (MSG_Q_DIST_STATUS_OK); } case DIST_PKT_TYPE_MSG_Q_STATUS: { DIST_PKT_MSG_Q_STATUS pktStatus; DIST_MSG_Q_STATUS msgQStatus; DIST_INQ_ID inqId; DIST_INQ * pGenInq; int errnoRemote; if (pktLen != DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_STATUS)) distPanic ("msgQDistInput/STATUS: packet too short\n"); /* First copy the error packet form the TBuf list. */ distTBufCopy (DIST_TBUF_GET_NEXT (pTBufHdr), 0, (char *) &pktStatus, DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_STATUS)); msgQStatus = (DIST_MSG_Q_STATUS) ntohs (pktStatus.statusDStatus); errnoRemote = ntohl (pktStatus.statusErrno);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -