📄 msgqdistlib.c
字号:
/* msgQDistLib.c - distributed objects message queue library (VxFusion option) *//* Copyright 1999 - 2002 Wind River Systems, Inc. *//*modification history--------------------01o,23oct01,jws fix compiler warnings (SPR 71154); fix man pages (SPR 71239)01n,24jul01,r_s changed code to be ANSI compatible so that it compiles with diab. made asm macro changes for diab01m,09jun99,drm Changing an "errno = " to errnoSet()01l,01jun99,drm Changing documentation for msgQDistReceive() to indicate that the return value is the number of bytes received or ERROR rather than OK or ERROR.01k,24may99,drm added vxfusion prefix to VxFusion related includes01j,23feb99,drm adding S_distLib_UNREACHABLE to documentation01i,23feb99,drm returning different errno when overallTimeout expires01h,18feb99,wlf doc cleanup01g,28oct98,drm documentation modifications01f,12aug98,drm added #include stmt for distLibP.h01e,08may98,ur removed 8 bit node id restriction01d,15apr98,ur retransmit errors, if failed to send/receive01c,09apr98,ur added some errno setting, for remote errors01b,04mar98,ur patched memory leak in msgQDistInput/RECV_REQ.01a,06jun97,ur written.*//*DESCRIPTIONThis library provides the interface to distributed message queues.Any task on any node in the system can send messages to or receivefrom a distributed messsage queue. Full duplex communication betweentwo tasks generally requires two distributed messsage queues, one foreach direction.Distributed messsage queues are created with msgQDistCreate(). Aftercreation, they can be manipulated using the generic routines for localmessage queues; for more information on the use of these routines, see themanual entry for msgQLib. The msgQDistLib library also provides the msgQDistSend(), msgQDistReceive(), and msgQDistNumMsgs() routines which support additional parameters that are useful for working with distributed message queues.The distributed objects message queue library is initialized by callingdistInit().AVAILABILITYThis module is distributed as a component of the unbundled distributedmessage queues option, VxFusion.INCLUDE FILES: msgQDistLib.hSEE ALSO: msgQLib, msgQDistShow, distLib*/#include "vxWorks.h"#if defined (MSG_Q_DIST_REPORT) || defined (DIST_DIAGNOSTIC)#include "stdio.h"#endif#include "stdlib.h"#include "string.h"#include "sllLib.h"#include "errnoLib.h"#include "msgQLib.h"#include "semLib.h"#include "taskLib.h"#include "netinet/in.h"#include "private/semLibP.h"#include "private/msgQLibP.h"#include "vxfusion/msgQDistLib.h"#include "vxfusion/msgQDistGrpLib.h"#include "vxfusion/distIfLib.h"#include "vxfusion/distLib.h"#include "vxfusion/distStatLib.h"#include "vxfusion/private/msgQDistLibP.h"#include "vxfusion/private/msgQDistGrpLibP.h"#include "vxfusion/private/distPktLibP.h"#include "vxfusion/private/distNetLibP.h"#include "vxfusion/private/distLibP.h"/* globals */TBL_NODE *pMsgQDistTbl; /* windSh needs this global *//* locals */LOCAL SL_LIST msgQDistTblFreeList;LOCAL SEMAPHORE msgQDistTblLock;LOCAL int msgQDistTblSize;LOCAL BOOL msgQDistLibInstalled = FALSE;/* local prototypes */LOCAL STATUS msgQDistTblPut (MSG_Q_ID msgQId, TBL_IX *pTblIx);#ifdef __SUPPORT_MSG_Q_DIST_DELETELOCAL STATUS msgQDistTblDelete (TBL_IX tblIx);#endifLOCAL MSG_Q_ID msgQDistTblGet (TBL_IX tblIx);LOCAL DIST_STATUS msgQDistInput (DIST_NODE_ID nodeIdSrc, DIST_TBUF_HDR *pTBufHdr);LOCAL STATUS msgQDistSendStatus (DIST_NODE_ID nodeIdDest, DIST_INQ_ID inqId, short error);LOCAL DIST_MSG_Q_STATUS msgQDistRecvReply (DIST_NODE_ID nodeIdReceiver, DIST_INQ_ID inqIdReceiver, MSG_Q_ID msgQId, char *buffer, UINT maxNBytes, int timeout, BOOL lastTry);LOCAL DIST_MSG_Q_STATUS msgQDistSendReply (DIST_NODE_ID nodeIdSender, DIST_INQ_ID inqIdSender, MSG_Q_ID msgQId, char *buffer, UINT nBytes, int timeout, int priority, BOOL lastTry);/***************************************************************************** msgQDistLibInit - initialize the distributed message queue package (VxFusion option)** This routine initializes the distributed message queue package.* It currently does nothing.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: N/A** NOMANUAL*/void msgQDistLibInit (void) { }/***************************************************************************** msgQDistInit - initialize distributed message queue library (VxFusion option)** This routine initializes the distributed message queue library.* It must be called before any other routine in the library.* The argument <msgQDistMax> limits the number of distributed message* queues created on this node. The maximum number of distributed message* queues is DIST_MSG_Q_MAX_QS .** RETURNS: OK or ERROR.** NOMANUAL*/STATUS msgQDistInit ( int sizeLog2 /* create 2^^sizeLog2 msgQ's */ ) { TBL_IX tblIx; STATUS status; int size; int msgQDistMax; msgQDistMax = 1 << sizeLog2; if (msgQDistMax > DIST_MSG_Q_MAX_QS) {#ifdef DIST_DIAGNOSTIC printf ("msgQDistInit: number of message queues is limited to %d\n", DIST_MSG_Q_MAX_QS);#endif return (ERROR); /* too many local msgQs for underlying layer */ } if (msgQDistLibInstalled == TRUE) return (OK); if (distInqInit (DIST_INQ_HASH_TBL_SZ_LOG2) == ERROR) return (ERROR); size = MEM_ROUND_UP (sizeof (TBL_NODE)); pMsgQDistTbl = (TBL_NODE *) malloc (msgQDistMax * size); if (pMsgQDistTbl == NULL) {#ifdef DIST_DIAGNOSTIC printf ("msgQDistInit: memory allocation failed\n");#endif return (ERROR); /* out of memory */ } semBInit (&msgQDistTblLock, SEM_Q_PRIORITY, SEM_EMPTY); msgQDistTblSize = msgQDistMax; sllInit (&msgQDistTblFreeList); for (tblIx = 0; tblIx < msgQDistMax; tblIx++) { pMsgQDistTbl[tblIx].tblIx = tblIx; sllPutAtHead (&msgQDistTblFreeList, (SL_NODE *) &(pMsgQDistTbl[tblIx])); } msgQDistTblUnlock(); /* Add message queue service to table of services. */ status = distNetServAdd (DIST_PKT_TYPE_MSG_Q, msgQDistInput, DIST_MSG_Q_SERV_NAME, DIST_MSG_Q_SERV_NET_PRIO, DIST_MSG_Q_SERV_TASK_PRIO, DIST_MSG_Q_SERV_TASK_STACK_SZ); if (status == ERROR) {#ifdef DIST_DIAGNOSTIC printf ("msgQDistInit: cannot attach service\n");#endif return (ERROR); } msgQDistSendRtn = (FUNCPTR) msgQDistSend; msgQDistReceiveRtn = (FUNCPTR) msgQDistReceive; msgQDistNumMsgsRtn = (FUNCPTR) msgQDistNumMsgs; msgQDistLibInstalled = TRUE; return (OK); }/***************************************************************************** msgQDistCreate - create a distributed message queue (VxFusion option)** This routine creates a distributed message queue capable of* holding up to <maxMsgs> messages, each up to <maxMsgLength> bytes long.* This routine returns a message queue ID used to identify the created* message queue. The queue can be created with the following options:* \is* \i MSG_Q_FIFO (0x00)* The queue pends tasks in FIFO order.* \i MSG_Q_PRIORITY (0x01)* The queue pends tasks in priority order. Remote tasks share the same* priority level.* \ie** The global message queue identifier returned can be used directly by generic* message queue handling routines in msgQLib, such as, msgQSend(), * msgQReceive(), and msgQNumMsgs().** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS:* MSG_Q_ID, or NULL if there is an error.** ERRNO:* \is* \i S_memLib_NOT_ENOUGH_MEMORY* If the routine is unable to allocate memory for message queues and message * buffers.* \i S_intLib_NOT_ISR_CALLABLE* If the routine is called from an interrupt service routine.* \i S_msgQLib_INVALID_QUEUE_TYPE* If the type of queue is invalid.* \i S_msgQDistLib_INVALID_MSG_LENGTH* If the message is too long for the VxFusion network layer.* \ie** SEE ALSO: msgQLib*/MSG_Q_ID msgQDistCreate ( int maxMsgs, /* max messages that can be queued */ int maxMsgLength, /* max bytes in a message */ int options /* message queue options */ ) { DIST_OBJ_NODE * pObjNode; MSG_Q_ID msgQId; TBL_IX tblIx; int maxMsgLen; if (!msgQDistLibInstalled) return (NULL); /* call msgQDistInit() first */ maxMsgLen = (DIST_IF_MAX_FRAGS * (DIST_IF_MTU - DIST_IF_HDR_SZ)) - DIST_PKT_HDR_SIZEOF (DIST_PKT_MSG_Q_SEND); if (maxMsgLength > maxMsgLen) { errnoSet (S_msgQDistLib_INVALID_MSG_LENGTH); return (NULL); /* msg too long for network layer */ } if (options &~ MSG_Q_TYPE_MASK) { errnoSet (S_msgQLib_INVALID_QUEUE_TYPE); return (NULL); /* illegal option */ } if ((msgQId = msgQCreate (maxMsgs, maxMsgLength, options)) == NULL) return (NULL); /* msgQCreate() failed */ if (msgQDistTblPut (msgQId, &tblIx) == ERROR) { msgQDelete (msgQId); return (NULL); /* table full */ } pObjNode = distObjNodeGet(); pObjNode->objNodeType = DIST_OBJ_TYPE_MSG_Q; pObjNode->objNodeReside = distNodeLocalGetId(); pObjNode->objNodeId = TBL_IX_TO_DIST_OBJ_ID (tblIx);#ifdef MSG_Q_DIST_REPORT printf ("msgQDistCreate: dMsgQId 0x%lx, msgQId %p\n", dMsgQId, DIST_OBJ_NODE_TO_MSG_Q_ID (pObjNode));#endif return (DIST_OBJ_NODE_TO_MSG_Q_ID (pObjNode)); }/***************************************************************************** msgQDistSend - send a message to a distributed message queue (VxFusion option)** This routine sends the message specified by <buffer> of length <nBytes> to * the distributed message queue or group specified by <msgQId>.** The argument <msgQTimeout> specifies the time in ticks to wait for the * queuing of the message. The argument <overallTimeout> specifies the time in* ticks to wait for both the sending and queuing of the message.* While it is an error to set <overallTimeout> to NO_WAIT (0), * WAIT_FOREVER (-1) is allowed for both <msgQTimeout> and <overallTimeout>.** The <priority> parameter specifies the priority of the message being sent.* It ranges between DIST_MSG_PRI_0 (highest priority) and DIST_MSG_PRI_7 * (lowest priority). A priority of MSG_PRI_URGENT is mapped* to DIST_MSG_PRI_0; MSG_PRI_NORMAL is mapped to DIST_MSG_PRI_4 .* Messages sent with high priorities (DIST_MSG_PRI_0 to DIST_MSG_PRI_3)* are put to the head of the list of queued messages.* Lower priority messages (DIST_MSG_PRI_4 to DIST_MSG_PRI_7) are placed* at the queue's tail.** NOTE: When msgQDistSend() is called through msgQSend(), <msgQTimeout> is * set to <timeout> and <overallTimeout> to WAIT_FOREVER .** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, or ERROR if the operation fails.** ERRNO:* \is* \i S_distLib_OBJ_ID_ERROR* The argument <msgQId> is invalid.* \i S_distLib_UNREACHABLE
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -