⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 msgqdistgrplib.c

📁 VXWORKS源代码
💻 C
📖 第 1 页 / 共 4 页
字号:
/* msgQDistGrpLib.c - distributed message queue group library (VxFusion option) *//* Copyright 1999-2002 Wind River Systems, Inc. *//*modification history--------------------01q,23oct01,jws  fix compiler warnings (SPR 71117); fix man pages (SPR 71239)01p,24jul01,r_s  changed code to be ANSI compatible so that it compiles with                 diab. made asm macro changes for diab01o,09jun99,drm  Allowing check for empty groups.01n,09jun99,drm  Adding some htonl() / ntohl()01m,09jun99,drm  Adding code to return S_msgQDistLib_OVERALL_TIMEOUT01l,01jun99,drm  Adding check to make sure that only distributed message                 queues (and not groups) can be added to a group.01k,24may99,drm  added vxfusion prefix to VxFusion related includes01j,23feb99,wlf  doc edits01i,18feb99,wlf  doc cleanup01h,29oct98,drm  documentation modifications01g,09oct98,drm  fixed a bug reported by Oce01f,13may98,ur   cleanup when msgQDistGrpInit() fails                 locking on group send inquiries01e,08may98,ur   removed 8 bit node id restriction01d,08apr98,ur   optional enhancement CHECK_FOR_EMPTY_GROUP--untested01c,20mar98,ur   set errno when database is full01b,01jul97,ur   tested, ok.01a,11jun97,ur   written.*//*DESCRIPTIONThis library provides the grouping facility for distributed message queues.  Single distributed message queues can join one or more groups.  A messagesent to a group is sent to all message queues that are members of thatgroup.  A group, however, is prohibited from sending messages.  Also, it isan error to call msgQDistNumMsgs() with a distributed message queue group ID.Groups are created with symbolic names and identified by a unique ID, MSG_Q_ID, as with normal message queues.If the group is new to the distributed system, the group agreement protocol (GAP) is employed to determine a globally unique identifier.  As part of the protocol's negotiation, all group databases throughout the system are updated.The distributed message queue group library is initialized by callingdistInit().AVAILABILITYThis module is distributed as a component of the unbundled distributedmessage queues option, VxFusion.INCLUDE FILES: msgQDistGrpLib.hSEE ALSO: distLib, msgQDistGrpShow*/#include "vxWorks.h"#if defined (MSG_Q_DIST_GRP_REPORT) || defined (DIST_DIAGNOSTIC)#include "stdio.h"#endif#include "string.h"#include "stdlib.h"#include "taskLib.h"#include "hashLib.h"#include "sllLib.h"#include "errnoLib.h"#include "msgQLib.h"#include "netinet/in.h"#include "vxfusion/msgQDistLib.h"#include "vxfusion/msgQDistGrpLib.h"#include "vxfusion/distIfLib.h"#include "vxfusion/distStatLib.h"#include "vxfusion/private/msgQDistGrpLibP.h"#include "vxfusion/private/distNetLibP.h"#include "vxfusion/private/distNodeLibP.h"#include "vxfusion/private/distObjLibP.h"#include "vxfusion/private/distLibP.h"/* defines */#define UNUSED_ARG(x)  if(sizeof(x)) {} /* to suppress compiler warnings */#define KEY_ARG_STR        13#define KEY_CMP_ARG_STR    0       /* not used */#define KEY_ARG_ID        65537#define KEY_CMP_ARG_ID    0        /* not used *//* global data */SEMAPHORE                    distGrpDbSemaphore;DIST_MSG_Q_GRP_ID            distGrpIdNext = 0;/* local data */LOCAL HASH_ID                distGrpDbNmId = NULL;LOCAL DIST_GRP_HASH_NODE *   distGrpDbNm = NULL;LOCAL HASH_ID                distGrpDbIdId = NULL;LOCAL DIST_GRP_HASH_NODE *   distGrpDbId = NULL;LOCAL DIST_GRP_DB_NODE *     distGrpDb = NULL;LOCAL SL_LIST                msgQDistGrpFreeList;LOCAL BOOL                   msgQDistGrpLibInstalled = FALSE;/* local prototypes */LOCAL BOOL          msgQDistGrpHCmpStr (DIST_GRP_HASH_NODE *pMatchNode,                                        DIST_GRP_HASH_NODE *pHNode,                                        int keyArg);LOCAL INT32         msgQDistGrpHFuncStr (int elements,                                         DIST_GRP_HASH_NODE *pHNode,                                         int keyArg);LOCAL BOOL          msgQDistGrpHCmpId (DIST_GRP_HASH_NODE *pMatchNode,                                       DIST_GRP_HASH_NODE *pHNode,                                       int keyArg);LOCAL INT32         msgQDistGrpHFuncId (int elements,                                        DIST_GRP_HASH_NODE *pHNode,                                        int keyArg);LOCAL DIST_STATUS    msgQDistGdbInput (DIST_NODE_ID nodeIdSrc,                                       DIST_TBUF_HDR *pTBufHdr);LOCAL DIST_STATUS    msgQDistGrpInput (DIST_NODE_ID nodeIdSrc,                                       DIST_TBUF_HDR *pTBufHdr);LOCAL STATUS         msgQDistGrpLclSend (DIST_MSG_Q_GRP_SEND_INQ *pInq,                                         DIST_MSG_Q_GRP_ID distMsgQGrpId,                                         char *buffer,                                         UINT nBytes, int timeout,                                         int priority);LOCAL void           msgQDistGrpLclSendCanWait (DIST_INQ_ID inqId,                                                MSG_Q_ID msgQId,                                                char *buffer, UINT nBytes,                                                int timeout, int priority);LOCAL STATUS         msgQDistGrpAgent (DIST_NODE_ID nodeIdSender,                                       DIST_INQ_ID inqIdSender,                                       DIST_MSG_Q_GRP_ID distMsgQGrpId,                                       char *buffer,                                       UINT nBytes, int timeout,                                       int priority);LOCAL STATUS         msgQDistGrpSendStatus (DIST_NODE_ID nodeIdDest,                                            DIST_INQ_ID inqId,                                            DIST_MSG_Q_STATUS msgQStatus);LOCAL BOOL           msgQDistGrpBurstOne (DIST_GRP_HASH_NODE *pNode,                                          DIST_GRP_BURST *pBurst);/***************************************************************************** msgQDistGrpLibInit - initialize the distributed message queue group package (VxFusion option)** This routine currently does nothing.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: N/A** NOMANUAL*/void msgQDistGrpLibInit (void)    {    }/***************************************************************************** msgQDistGrpInit - initialize the group database (VxFusion option)** This routine initializes the group database.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: OK, if successful; ERROR, if unsucessful.** NOMANUAL*/STATUS msgQDistGrpInit    (    int               sizeLog2   /* init 2^^sizeLog2 entries */    )    {    DIST_GRP_DB_NODE * pDbNode;    int                hashTblSizeLog2;    int                grpHNBytes;    int                grpDbNBytes;    int                grpDbSize;    int                ix;    STATUS             grpDbServAddStatus;    STATUS             msgQGrpServAddStatus;    if (sizeLog2 < 1)        return (ERROR);    if (msgQDistGrpLibInstalled == TRUE)        return (OK);    if (hashLibInit () == ERROR)        return (ERROR); /* hashLibInit() failed */    if (distInqInit (DIST_INQ_HASH_TBL_SZ_LOG2) == ERROR)        return (ERROR);    hashTblSizeLog2 = sizeLog2 - 1;    distGrpDbNmId = hashTblCreate (hashTblSizeLog2, msgQDistGrpHCmpStr,                                   msgQDistGrpHFuncStr, KEY_ARG_STR);    distGrpDbIdId = hashTblCreate (hashTblSizeLog2, msgQDistGrpHCmpId,                                   msgQDistGrpHFuncId, KEY_ARG_ID);    if (distGrpDbNmId && distGrpDbIdId)        {        grpDbSize = 1 << sizeLog2;        grpHNBytes = grpDbSize * sizeof (DIST_GRP_HASH_NODE);        grpDbNBytes = grpDbSize * sizeof (DIST_GRP_DB_NODE);        distGrpDbNm = (DIST_GRP_HASH_NODE *) malloc (grpHNBytes);        distGrpDbId = (DIST_GRP_HASH_NODE *) malloc (grpHNBytes);        distGrpDb = (DIST_GRP_DB_NODE *) malloc (grpDbNBytes);        if (distGrpDbNm && distGrpDbId && distGrpDb)            {            sllInit (&msgQDistGrpFreeList);            for (ix = 0; ix < grpDbSize; ix++)                {                pDbNode = &distGrpDb[ix];                pDbNode->ixNode = ix;                sllPutAtHead (&msgQDistGrpFreeList, (SL_NODE *) pDbNode);                distGrpDbNm[ix].pDbNode = pDbNode;                distGrpDbId[ix].pDbNode = pDbNode;                }            msgQDistGrpDbLockInit();            /* Add distributed group database service to table of services. */            grpDbServAddStatus = distNetServAdd (                    DIST_PKT_TYPE_DGDB,                    msgQDistGdbInput,                    DIST_DGDB_SERV_NAME,                    DIST_DGDB_SERV_NET_PRIO,                    DIST_DGDB_SERV_TASK_PRIO,                    DIST_DGDB_SERV_TASK_STACK_SZ);            /* Add message group service to table of services. */                        msgQGrpServAddStatus =                distNetServAdd ( DIST_PKT_TYPE_MSG_Q_GRP,                                 msgQDistGrpInput,                                 DIST_MSG_Q_GRP_SERV_NAME,                                 DIST_MSG_Q_GRP_SERV_NET_PRIO,                                 DIST_MSG_Q_GRP_SERV_TASK_PRIO,                                 DIST_MSG_Q_GRP_SERV_TASK_STACK_SZ);            if (grpDbServAddStatus != ERROR && msgQGrpServAddStatus != ERROR)                {                msgQDistGrpLibInstalled = TRUE;                return (OK);                }            }        }        /* cleanup, when error */        if (distGrpDbNmId)    hashTblDelete (distGrpDbNmId);        if (distGrpDbIdId)    hashTblDelete (distGrpDbIdId);        if (distGrpDbNm)      free (distGrpDbNm);        if (distGrpDbId)      free (distGrpDbId);        if (distGrpDb)        free (distGrpDb);        return (ERROR);    }/***************************************************************************** msgQDistGrpAdd - add a distributed message queue to a group (VxFusion option)** This routine adds the queue identified by the argument <msgQId> to a group* with the ASCII name specified by the argument <distGrpName>. ** Multicasting is based on distributed message queue groups.  If the group * does not exist, one is created.  Any number of message queues from different * nodes can be bound to a single group. In addition, a message queue can* be added into any number of groups; msgQDistGrpAdd() must be called for each* group of which the message queue is to be a member.** The <options> parameter is presently unused and must be set to 0.** This routine returns a message queue ID, MSG_Q_ID, that can be used directly * by msgQDistSend() or by the generic msgQSend() routine.  Do not call the* msgQReceive() or msgQNumMsgs() routines or their distributed counterparts,* msgQDistReceive() and msgQDistNumMsgs(), with a group message queue ID.** As with msgQDistCreate(), use distNameAdd() to add the group message * queue ID returned by this routine to the distributed name database so * that the ID can be used by tasks on other nodes.** AVAILABILITY* This routine is distributed as a component of the unbundled distributed* message queues option, VxFusion.** RETURNS: MSG_Q_ID, or NULL if there is an error.** ERRNO:* \is* \i S_msgQDistGrpLib_NAME_TOO_LONG* The name of the group is too long.* \i S_msgQDistGrpLib_INVALID_OPTION* The <options> parameter is invalid.* \i S_msgQDistGrpLib_DATABASE_FULL* The group database is full.* \i S_distLib_OBJ_ID_ERROR* The <msgQId> parameter is not a distributed message queue.* \ie** SEE ALSO: msgQLib, msgQDistLib, distNameLib* * INTERNAL NOTE: Takes <distGrpDbSemaphore>.*/MSG_Q_ID msgQDistGrpAdd    (    char *            distGrpName,  /* new or existing group name           */    MSG_Q_ID          msgQId,       /* message queue to add to the group    */    DIST_GRP_OPT      options       /* group message queue options - UNUSED */    )    {    DIST_GRP_DB_NODE * pDistGrpDbNode;    DIST_MSG_Q_ID      dMsgQId;      /* distributed message queue ID */    DIST_OBJ_NODE * pObjNode;     /* ptr to object containing real ID */     DIST_GAP_NODE   dGapNode;    DIST_GAP_NODE * pDGapNodeTemp;    /*     * Check the parameters:       * - msgQId must be a distributed message queue. It cannot be a     *   a standard message queue or a group.     * - the group name cannot exceed DIST_NAME_MAX_LENGTH     * - options must be 0     *     * If any of the parameters are invalid, set errno and return     * NULL to indicate failure.     */    if (!ID_IS_DISTRIBUTED (msgQId)) /* not a distributed message queue */        {        errnoSet (S_distLib_OBJ_ID_ERROR);        return NULL;        }    else                             /* is distributed msgQ */        {        pObjNode = MSG_Q_ID_TO_DIST_OBJ_NODE (msgQId);        if (!IS_DIST_MSG_Q_OBJ (pObjNode))            {            /* legal object ID, but not a message queue */            errnoSet (S_distLib_OBJ_ID_ERROR);            return NULL;                }               dMsgQId = (DIST_MSG_Q_ID) pObjNode->objNodeId;        if (IS_DIST_MSG_Q_TYPE_GRP(dMsgQId))            {            /* ID refers to a group, not a plain dist. message queue */            errnoSet (S_distLib_OBJ_ID_ERROR);            return NULL;            }        }    if (strlen (distGrpName) > DIST_NAME_MAX_LENGTH)        {        errnoSet (S_msgQDistGrpLib_NAME_TOO_LONG);        return (NULL);    /* name too long */        }    if (options != 0)        {        errnoSet (S_msgQDistGrpLib_INVALID_OPTION);        return (NULL);    /* options parameter currently unused */        }    /* Lock the database and try to find the group. */    msgQDistGrpDbLock();    pDistGrpDbNode = msgQDistGrpLclFindByName (distGrpName);    if (pDistGrpDbNode == NULL)        {#ifdef MSG_Q_DIST_GRP_REPORT        DIST_MSG_Q_GRP_ID    grpId;#endif        /* Group is unknown by now, create it. */#ifdef MSG_Q_DIST_GRP_REPORT        printf ("msgQDistGrpAdd: group is unknown by now--create it\n");#endif        pDistGrpDbNode = msgQDistGrpLclCreate (distGrpName, distGrpIdNext++,                                               DIST_GRP_STATE_LOCAL_TRY);        msgQDistGrpDbUnlock();        if (pDistGrpDbNode == NULL)            {#ifdef DIST_DIAGNOSTIC            distLog ("msgQDistGrpAdd: failed to create new group\n");#endif            errnoSet (S_msgQDistGrpLib_DATABASE_FULL);            return (NULL);            }        /*         * Agree on a global unique identifier for the group and         * add the fist group member.         */#ifdef MSG_Q_DIST_GRP_REPORT        printf ("msgQDistGrpAdd: enter agreement phase, propose 0x%lx\n",                (u_long) msgQDistGrpLclGetId (pDistGrpDbNode));        grpId =#endif            msgQDistGrpAgree (pDistGrpDbNode);#ifdef MSG_Q_DIST_GRP_REPORT        printf ("msgQDistGrpAdd: agreed on 0x%lx\n", (u_long) grpId);#endif        msgQDistGrpLclAddMember (pDistGrpDbNode, msgQId);        }    else        {        /* Group already exists. Check the state. */#ifdef MSG_Q_DIST_GRP_REPORT        printf ("msgQDistGrpAdd: group already exists--add object\n");#endif        while (pDistGrpDbNode->grpDbState < DIST_GRP_STATE_GLOBAL)            {                /*                 * Group already exists, but has a local state. This means,                 * somebody else tries to install the group.                 * We unlock the group database and wait for a go-ahead.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -