📄 msgqdistgrplib.c
字号:
/* msgQDistGrpLib.c - distributed message queue group library (VxFusion option) *//* Copyright 1999-2002 Wind River Systems, Inc. *//*modification history--------------------01q,23oct01,jws fix compiler warnings (SPR 71117); fix man pages (SPR 71239)01p,24jul01,r_s changed code to be ANSI compatible so that it compiles with diab. made asm macro changes for diab01o,09jun99,drm Allowing check for empty groups.01n,09jun99,drm Adding some htonl() / ntohl()01m,09jun99,drm Adding code to return S_msgQDistLib_OVERALL_TIMEOUT01l,01jun99,drm Adding check to make sure that only distributed message queues (and not groups) can be added to a group.01k,24may99,drm added vxfusion prefix to VxFusion related includes01j,23feb99,wlf doc edits01i,18feb99,wlf doc cleanup01h,29oct98,drm documentation modifications01g,09oct98,drm fixed a bug reported by Oce01f,13may98,ur cleanup when msgQDistGrpInit() fails locking on group send inquiries01e,08may98,ur removed 8 bit node id restriction01d,08apr98,ur optional enhancement CHECK_FOR_EMPTY_GROUP--untested01c,20mar98,ur set errno when database is full01b,01jul97,ur tested, ok.01a,11jun97,ur written.*//*DESCRIPTIONThis library provides the grouping facility for distributed message queues. Single distributed message queues can join one or more groups. A messagesent to a group is sent to all message queues that are members of thatgroup. A group, however, is prohibited from sending messages. Also, it isan error to call msgQDistNumMsgs() with a distributed message queue group ID.Groups are created with symbolic names and identified by a unique ID, MSG_Q_ID, as with normal message queues.If the group is new to the distributed system, the group agreement protocol (GAP) is employed to determine a globally unique identifier. As part of the protocol's negotiation, all group databases throughout the system are updated.The distributed message queue group library is initialized by callingdistInit().AVAILABILITYThis module is distributed as a component of the unbundled distributedmessage queues option, VxFusion.INCLUDE FILES: msgQDistGrpLib.hSEE ALSO: distLib, msgQDistGrpShow*/#include "vxWorks.h"#if defined (MSG_Q_DIST_GRP_REPORT) || defined (DIST_DIAGNOSTIC)#include "stdio.h"#endif#include "string.h"#include "stdlib.h"#include "taskLib.h"#include "hashLib.h"#include "sllLib.h"#include "errnoLib.h"#include "msgQLib.h"#include "netinet/in.h"#include "vxfusion/msgQDistLib.h"#include "vxfusion/msgQDistGrpLib.h"#include "vxfusion/distIfLib.h"#include "vxfusion/distStatLib.h"#include "vxfusion/private/msgQDistGrpLibP.h"#include "vxfusion/private/distNetLibP.h"#include "vxfusion/private/distNodeLibP.h"#include "vxfusion/private/distObjLibP.h"#include "vxfusion/private/distLibP.h"/* defines */#define UNUSED_ARG(x) if(sizeof(x)) {} /* to suppress compiler warnings */#define KEY_ARG_STR 13#define KEY_CMP_ARG_STR 0 /* not used */#define KEY_ARG_ID 65537#define KEY_CMP_ARG_ID 0 /* not used *//* global data */SEMAPHORE distGrpDbSemaphore;DIST_MSG_Q_GRP_ID distGrpIdNext = 0;/* local data */LOCAL HASH_ID distGrpDbNmId = NULL;LOCAL DIST_GRP_HASH_NODE * distGrpDbNm = NULL;LOCAL HASH_ID distGrpDbIdId = NULL;LOCAL DIST_GRP_HASH_NODE * distGrpDbId = NULL;LOCAL DIST_GRP_DB_NODE * distGrpDb = NULL;LOCAL SL_LIST msgQDistGrpFreeList;LOCAL BOOL msgQDistGrpLibInstalled = FALSE;/* local prototypes */LOCAL BOOL msgQDistGrpHCmpStr (DIST_GRP_HASH_NODE *pMatchNode, DIST_GRP_HASH_NODE *pHNode, int keyArg);LOCAL INT32 msgQDistGrpHFuncStr (int elements, DIST_GRP_HASH_NODE *pHNode, int keyArg);LOCAL BOOL msgQDistGrpHCmpId (DIST_GRP_HASH_NODE *pMatchNode, DIST_GRP_HASH_NODE *pHNode, int keyArg);LOCAL INT32 msgQDistGrpHFuncId (int elements, DIST_GRP_HASH_NODE *pHNode, int keyArg);LOCAL DIST_STATUS msgQDistGdbInput (DIST_NODE_ID nodeIdSrc, DIST_TBUF_HDR *pTBufHdr);LOCAL DIST_STATUS msgQDistGrpInput (DIST_NODE_ID nodeIdSrc, DIST_TBUF_HDR *pTBufHdr);LOCAL STATUS msgQDistGrpLclSend (DIST_MSG_Q_GRP_SEND_INQ *pInq, DIST_MSG_Q_GRP_ID distMsgQGrpId, char *buffer, UINT nBytes, int timeout, int priority);LOCAL void msgQDistGrpLclSendCanWait (DIST_INQ_ID inqId, MSG_Q_ID msgQId, char *buffer, UINT nBytes, int timeout, int priority);LOCAL STATUS msgQDistGrpAgent (DIST_NODE_ID nodeIdSender, DIST_INQ_ID inqIdSender, DIST_MSG_Q_GRP_ID distMsgQGrpId, char *buffer, UINT nBytes, int timeout, int priority);LOCAL STATUS msgQDistGrpSendStatus (DIST_NODE_ID nodeIdDest, DIST_INQ_ID inqId, DIST_MSG_Q_STATUS msgQStatus);LOCAL BOOL msgQDistGrpBurstOne (DIST_GRP_HASH_NODE *pNode, DIST_GRP_BURST *pBurst);/***************************************************************************** msgQDistGrpLibInit - initialize the distributed message queue group package (VxFusion option)** This routine currently does nothing.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: N/A** NOMANUAL*/void msgQDistGrpLibInit (void) { }/***************************************************************************** msgQDistGrpInit - initialize the group database (VxFusion option)** This routine initializes the group database.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successful; ERROR, if unsucessful.** NOMANUAL*/STATUS msgQDistGrpInit ( int sizeLog2 /* init 2^^sizeLog2 entries */ ) { DIST_GRP_DB_NODE * pDbNode; int hashTblSizeLog2; int grpHNBytes; int grpDbNBytes; int grpDbSize; int ix; STATUS grpDbServAddStatus; STATUS msgQGrpServAddStatus; if (sizeLog2 < 1) return (ERROR); if (msgQDistGrpLibInstalled == TRUE) return (OK); if (hashLibInit () == ERROR) return (ERROR); /* hashLibInit() failed */ if (distInqInit (DIST_INQ_HASH_TBL_SZ_LOG2) == ERROR) return (ERROR); hashTblSizeLog2 = sizeLog2 - 1; distGrpDbNmId = hashTblCreate (hashTblSizeLog2, msgQDistGrpHCmpStr, msgQDistGrpHFuncStr, KEY_ARG_STR); distGrpDbIdId = hashTblCreate (hashTblSizeLog2, msgQDistGrpHCmpId, msgQDistGrpHFuncId, KEY_ARG_ID); if (distGrpDbNmId && distGrpDbIdId) { grpDbSize = 1 << sizeLog2; grpHNBytes = grpDbSize * sizeof (DIST_GRP_HASH_NODE); grpDbNBytes = grpDbSize * sizeof (DIST_GRP_DB_NODE); distGrpDbNm = (DIST_GRP_HASH_NODE *) malloc (grpHNBytes); distGrpDbId = (DIST_GRP_HASH_NODE *) malloc (grpHNBytes); distGrpDb = (DIST_GRP_DB_NODE *) malloc (grpDbNBytes); if (distGrpDbNm && distGrpDbId && distGrpDb) { sllInit (&msgQDistGrpFreeList); for (ix = 0; ix < grpDbSize; ix++) { pDbNode = &distGrpDb[ix]; pDbNode->ixNode = ix; sllPutAtHead (&msgQDistGrpFreeList, (SL_NODE *) pDbNode); distGrpDbNm[ix].pDbNode = pDbNode; distGrpDbId[ix].pDbNode = pDbNode; } msgQDistGrpDbLockInit(); /* Add distributed group database service to table of services. */ grpDbServAddStatus = distNetServAdd ( DIST_PKT_TYPE_DGDB, msgQDistGdbInput, DIST_DGDB_SERV_NAME, DIST_DGDB_SERV_NET_PRIO, DIST_DGDB_SERV_TASK_PRIO, DIST_DGDB_SERV_TASK_STACK_SZ); /* Add message group service to table of services. */ msgQGrpServAddStatus = distNetServAdd ( DIST_PKT_TYPE_MSG_Q_GRP, msgQDistGrpInput, DIST_MSG_Q_GRP_SERV_NAME, DIST_MSG_Q_GRP_SERV_NET_PRIO, DIST_MSG_Q_GRP_SERV_TASK_PRIO, DIST_MSG_Q_GRP_SERV_TASK_STACK_SZ); if (grpDbServAddStatus != ERROR && msgQGrpServAddStatus != ERROR) { msgQDistGrpLibInstalled = TRUE; return (OK); } } } /* cleanup, when error */ if (distGrpDbNmId) hashTblDelete (distGrpDbNmId); if (distGrpDbIdId) hashTblDelete (distGrpDbIdId); if (distGrpDbNm) free (distGrpDbNm); if (distGrpDbId) free (distGrpDbId); if (distGrpDb) free (distGrpDb); return (ERROR); }/***************************************************************************** msgQDistGrpAdd - add a distributed message queue to a group (VxFusion option)** This routine adds the queue identified by the argument <msgQId> to a group* with the ASCII name specified by the argument <distGrpName>. ** Multicasting is based on distributed message queue groups. If the group * does not exist, one is created. Any number of message queues from different * nodes can be bound to a single group. In addition, a message queue can* be added into any number of groups; msgQDistGrpAdd() must be called for each* group of which the message queue is to be a member.** The <options> parameter is presently unused and must be set to 0.** This routine returns a message queue ID, MSG_Q_ID, that can be used directly * by msgQDistSend() or by the generic msgQSend() routine. Do not call the* msgQReceive() or msgQNumMsgs() routines or their distributed counterparts,* msgQDistReceive() and msgQDistNumMsgs(), with a group message queue ID.** As with msgQDistCreate(), use distNameAdd() to add the group message * queue ID returned by this routine to the distributed name database so * that the ID can be used by tasks on other nodes.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: MSG_Q_ID, or NULL if there is an error.** ERRNO:* \is* \i S_msgQDistGrpLib_NAME_TOO_LONG* The name of the group is too long.* \i S_msgQDistGrpLib_INVALID_OPTION* The <options> parameter is invalid.* \i S_msgQDistGrpLib_DATABASE_FULL* The group database is full.* \i S_distLib_OBJ_ID_ERROR* The <msgQId> parameter is not a distributed message queue.* \ie** SEE ALSO: msgQLib, msgQDistLib, distNameLib* * INTERNAL NOTE: Takes <distGrpDbSemaphore>.*/MSG_Q_ID msgQDistGrpAdd ( char * distGrpName, /* new or existing group name */ MSG_Q_ID msgQId, /* message queue to add to the group */ DIST_GRP_OPT options /* group message queue options - UNUSED */ ) { DIST_GRP_DB_NODE * pDistGrpDbNode; DIST_MSG_Q_ID dMsgQId; /* distributed message queue ID */ DIST_OBJ_NODE * pObjNode; /* ptr to object containing real ID */ DIST_GAP_NODE dGapNode; DIST_GAP_NODE * pDGapNodeTemp; /* * Check the parameters: * - msgQId must be a distributed message queue. It cannot be a * a standard message queue or a group. * - the group name cannot exceed DIST_NAME_MAX_LENGTH * - options must be 0 * * If any of the parameters are invalid, set errno and return * NULL to indicate failure. */ if (!ID_IS_DISTRIBUTED (msgQId)) /* not a distributed message queue */ { errnoSet (S_distLib_OBJ_ID_ERROR); return NULL; } else /* is distributed msgQ */ { pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (msgQId); if (!IS_DIST_MSG_Q_OBJ (pObjNode)) { /* legal object ID, but not a message queue */ errnoSet (S_distLib_OBJ_ID_ERROR); return NULL; } dMsgQId = (DIST_MSG_Q_ID) pObjNode->objNodeId; if (IS_DIST_MSG_Q_TYPE_GRP(dMsgQId)) { /* ID refers to a group, not a plain dist. message queue */ errnoSet (S_distLib_OBJ_ID_ERROR); return NULL; } } if (strlen (distGrpName) > DIST_NAME_MAX_LENGTH) { errnoSet (S_msgQDistGrpLib_NAME_TOO_LONG); return (NULL); /* name too long */ } if (options != 0) { errnoSet (S_msgQDistGrpLib_INVALID_OPTION); return (NULL); /* options parameter currently unused */ } /* Lock the database and try to find the group. */ msgQDistGrpDbLock(); pDistGrpDbNode = msgQDistGrpLclFindByName (distGrpName); if (pDistGrpDbNode == NULL) {#ifdef MSG_Q_DIST_GRP_REPORT DIST_MSG_Q_GRP_ID grpId;#endif /* Group is unknown by now, create it. */#ifdef MSG_Q_DIST_GRP_REPORT printf ("msgQDistGrpAdd: group is unknown by now--create it\n");#endif pDistGrpDbNode = msgQDistGrpLclCreate (distGrpName, distGrpIdNext++, DIST_GRP_STATE_LOCAL_TRY); msgQDistGrpDbUnlock(); if (pDistGrpDbNode == NULL) {#ifdef DIST_DIAGNOSTIC distLog ("msgQDistGrpAdd: failed to create new group\n");#endif errnoSet (S_msgQDistGrpLib_DATABASE_FULL); return (NULL); } /* * Agree on a global unique identifier for the group and * add the fist group member. */#ifdef MSG_Q_DIST_GRP_REPORT printf ("msgQDistGrpAdd: enter agreement phase, propose 0x%lx\n", (u_long) msgQDistGrpLclGetId (pDistGrpDbNode)); grpId =#endif msgQDistGrpAgree (pDistGrpDbNode);#ifdef MSG_Q_DIST_GRP_REPORT printf ("msgQDistGrpAdd: agreed on 0x%lx\n", (u_long) grpId);#endif msgQDistGrpLclAddMember (pDistGrpDbNode, msgQId); } else { /* Group already exists. Check the state. */#ifdef MSG_Q_DIST_GRP_REPORT printf ("msgQDistGrpAdd: group already exists--add object\n");#endif while (pDistGrpDbNode->grpDbState < DIST_GRP_STATE_GLOBAL) { /* * Group already exists, but has a local state. This means, * somebody else tries to install the group. * We unlock the group database and wait for a go-ahead.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -