📄 mcast.c
字号:
{ int i; /* initialize destination and subscription tables */ if (PMCAST_InitMCastTbl (MCAST_MAX_DST_RECORDS) != MCAST_Success) { PMCAST_Quit ("Could not initialize MCastTbl in MCAST_Init.\n"); } if (PMCAST_InitSubTbl (MCAST_MAX_SUB_RECORDS) != MCAST_Success) { PMCAST_Quit ("Could not initialize SubTbl in MCAST_Init.\n"); } /* initialize name list */ for (i=0; i<MCAST_NAME_TBL_SIZE; i++) { MCAST_NameList[i] = NULL; } FML_RegisterHandler(&fmh_RequestNewGroup, PMCAST_RequestNewGroup); FML_RegisterHandler(&fmh_ReplyNewGroup, PMCAST_ReplyNewGroup); FML_RegisterHandler(&fmh_RequestSubscription, PMCAST_RequestSubscription); FML_RegisterHandler(&fmh_ReplySubscription, PMCAST_ReplySubscription); FML_RegisterHandler(&fmh_RequestLeave, PMCAST_RequestLeave); FML_RegisterHandler(&fmh_RequestHandle, PMCAST_RequestHandle); FML_RegisterHandler(&fmh_ReplyHandle, PMCAST_ReplyHandle); FML_RegisterHandler(&fmh_RequestRegister, PMCAST_RequestRegister); FML_RegisterHandler(&fmh_ReplyRegister, PMCAST_ReplyRegister); FML_RegisterHandler(&fmh_RequestDList, PMCAST_RequestDList); FML_RegisterHandler(&fmh_ReplyDList, PMCAST_ReplyDList); FML_RegisterHandler(&fmh_AddDList, PMCAST_AddDList); FML_RegisterHandler(&fmh_UserMessage, PMCAST_UserMessage); FML_RegisterHandler(&fmh_UserForward, PMCAST_UserForward); FML_RegisterHandler(&fmh_SBarrier, PMCAST_SBarrier); FML_RegisterHandler(&fmh_Abort, PMCAST_Abort);}/************************************************************************ ************************************************************************ ************************************************************************ *** *** PROCEDURES DEALING WITH SUBSCRIPTION LISTS *** ************************************************************************ ************************************************************************ ***********************************************************************//* message buffers to hold incoming messages */struct ReplyNewGroup ReplyNewGroupBuf;struct ReplyRegister ReplyRegisterBuf;struct ReplySubscription ReplySubscriptionBuf;struct ReplyHandle ReplyHandleBuf;/* flags indicating if a message being waited for has arrived */BOOLEAN WaitForNewGroupReply;BOOLEAN WaitForRegisterReply;BOOLEAN WaitForSubscriptionReply;BOOLEAN WaitForHandleReply;BOOLEAN WaitForSBarrier;/************************************************************************ * * MCAST_CreateP ((char *) Name, (struct MCAST_HandleS *) Handle) * Create a multicast group, associate ascii string "name" with the group. * Returns a handle to the new group in Handle. * ***********************************************************************//* Transport type not used in current implementation */long MCAST_CreateP (const char *Name, MCAST_Handle *Handle, ULONG Owner, long TransportType){ int pe, len, LocalID, rc; FM_stream *strm; struct MCAST_NameEntry *p; struct RequestRegister ReqRegBuf; long rtnval; /* check name is not too long */ if (strlen(Name)+1 > MCAST_MAX_NAME_LEN) { fprintf (stderr, "MCAST_Create: Name = %s too long\n", Name); return (MCAST_NameTooLong); } if (Owner>=FM_numnodes) { fprintf (stderr, "Owner = %d\n", (int) Owner); PMCAST_Quit ("MCAST_Create: invalid group owner found\n"); } /* hash name to number of procesor (name server) with handle */ /* allocate storage for the handle */ *Handle = (struct MCAST_HandleS *) malloc (sizeof(struct MCAST_HandleS)); if (!(*Handle)) return (MCAST_MemoryError); /* if local processor is owner */ if (Owner == FM_nodeid) { /* allocate table entry and fill it in */ LocalID = PMCAST_NewGroup(MCAST_OWNER); if (LocalID == NULLINT) { fprintf (stderr, "MCAST_Create: Name=%s, Table Full\n", Name); return (MCAST_TableFull); } (*Handle)->Owner = Owner; (*Handle)->LocalID = LocalID;/*XXX KALYAN fprintf(stderr,"Node[%d]: By local request, created new group \"%s\" with ID %d\n", (int)FM_nodeid, Name, LocalID); fflush(stderr);*/ } else { /* send message to planned owner to create create group */ for (rc=0; !(strm=FM_begin_message(Owner,0,fmh_RequestNewGroup)); rc++) { if (rc >= MCAST_MAXRETRY) PMCAST_Quit("MCAST_Create: Couldn't get stream for sending\n"); } FM_end_message (strm); /* wait for reply with handle if successful, or error indicator */ WaitForNewGroupReply = TRUE; while (WaitForNewGroupReply) FM_extract (~0); if (ReplyNewGroupBuf.ReturnCode != 0) PMCAST_Quit("FMVAST_Create: Group creation failed\n"); (*Handle)->Owner = Owner; (*Handle)->LocalID = ReplyNewGroupBuf.LocalID;/*XXX KALYAN fprintf(stderr,"Node[%d]: By forwarding local request to Node %d, created new group \"%s\" with ID %d\n", (int)FM_nodeid, (int)Owner, Name, (int)ReplyNewGroupBuf.LocalID); fflush(stderr);*/ } /* register multicast group with registration service */ pe = PMCAST_Name2PEHash (Name); if (pe == FM_nodeid) { /* register multicast group with registration service */ rtnval = PMCAST_AddName(Name, &p); if (rtnval == MCAST_DuplicateName) { if(0)/*KALYAN*/ fprintf (stderr, "MCAST_Create: Couldn't register %s, duplicate name\n", Name); return (MCAST_DuplicateName); } else if (rtnval != MCAST_Success) { return (rtnval); } p->Handle.Owner = (*Handle)->Owner; p->Handle.LocalID = (*Handle)->LocalID; } else { /* send message to request registration */ len = strlen(Name)+1; for (rc=0; !(strm=FM_begin_message(pe,3*sizeof(ULONG)+len,fmh_RequestRegister)); rc++) { if (rc >= MCAST_MAXRETRY) PMCAST_Quit("MCAST_Create: Couldn't get stream for sending\n"); } ReqRegBuf.Owner = htonl((*Handle)->Owner); FM_send_piece(strm, (char *) &(ReqRegBuf.Owner), sizeof(ULONG)); ReqRegBuf.LocalID = htonl((*Handle)->LocalID); FM_send_piece(strm, (char *) &(ReqRegBuf.LocalID), sizeof(ULONG)); ReqRegBuf.Length = htonl(len); FM_send_piece(strm, (char *) &(ReqRegBuf.Length), sizeof(ULONG)); FM_send_piece(strm, (char *)Name, len); FM_end_message (strm); /* wait for response; if return code is not 0, request failed */ WaitForRegisterReply = TRUE; while (WaitForRegisterReply) FM_extract (~0); if(1) { /*KALYAN*/ p=0; rtnval = PMCAST_AddName(Name, &p); if(rtnval == MCAST_DuplicateName) p=PMCAST_LookUpName(Name); if(p) { p->Handle.Owner = (*Handle)->Owner; p->Handle.LocalID = (*Handle)->LocalID; } } /*KALYAN*/ if (ReplyRegisterBuf.ReturnCode == MCAST_DuplicateName) { if(0)/*KALYAN*/ fprintf (stderr, "MCAST_Create: Couldn't remote-register %s, duplicate name\n", Name); return (MCAST_DuplicateName); } else { return (ReplyRegisterBuf.ReturnCode); } } /* load handle into handle cache for this processor */ if (pe != FM_nodeid) { int i; /* make sure handle is in handle cache */ i = PMCAST_HCLookUp (*Handle); } return (MCAST_Success);}long MCAST_Create (const char *Name, MCAST_Handle *Handle, long TransportType){ /* owner for group is creator */ return (MCAST_CreateP (Name, Handle, PMCAST_Name2PEHash (Name), TransportType));}/************************************************************************ * * MCAST_GetHandle * Obtain handle for an existing multicast group with ascii string "name" * returns TRUE if successful, FALSE indicates group does not exist. * ***********************************************************************//* message buffer holding response to query */long MCAST_GetHandle (const char *Name, MCAST_Handle *Handle){ int pe; ULONG len, netLen; struct MCAST_NameEntry *p; FM_stream *strm; len = strlen(Name)+1; if (len > MCAST_MAX_NAME_LEN) { fprintf(stderr, "MCAST_GetHandle: name too long (%s), increase MCAST_MAX_NAME_LEN\n", Name); return (MCAST_NameTooLong); } /* hash name to number of procesor (name server) with handle */ pe = PMCAST_Name2PEHash (Name); /* if remote PE, wait for response */ if (pe == FM_nodeid) { p = PMCAST_LookUpName (Name); if (p) { /* found */ ReplyHandleBuf.Owner = p->Handle.Owner; ReplyHandleBuf.LocalID = p->Handle.LocalID; ReplyHandleBuf.ReturnCode = MCAST_Success; } else { /* not found */ ReplyHandleBuf.ReturnCode = MCAST_NoGroup; } } else { /* send message to processor name maps to */ {int rc; for (rc=0; !(strm=FM_begin_message(pe,sizeof(ULONG)+len,fmh_RequestHandle)); rc++) { if (rc >= MCAST_MAXRETRY) PMCAST_Quit("MCAST_GetHandle: Couldn't get stream for sending\n"); } } netLen = htonl(len); FM_send_piece(strm, (char *) &netLen, sizeof(ULONG)); FM_send_piece(strm, (char *)Name, len); FM_end_message (strm); /* wait for reply, response loaded into message buffer */ WaitForHandleReply = TRUE; while (WaitForHandleReply) FM_extract (~0); } if (ReplyHandleBuf.ReturnCode != MCAST_Success) { return (ReplyHandleBuf.ReturnCode); } else { /* allocate storage for the handle, report handle back to caller */ *Handle = (struct MCAST_HandleS *) malloc (sizeof(struct MCAST_HandleS)); if (!(*Handle)) return (MCAST_MemoryError); (*Handle)->Owner = ReplyHandleBuf.Owner; (*Handle)->LocalID = ReplyHandleBuf.LocalID; /* load distribution list into handle cache */ /* added so no misses occur on sends, which prevents sends in handlers */ if ((*Handle)->Owner != FM_nodeid) { int i; /* make sure handle is in handle cache */ i = PMCAST_HCLookUp (*Handle); } return (MCAST_Success); }}/************************************************************************ * * Create a new entry in the name table, return a pointer to it * fill in name, but not the other fields * ***********************************************************************/long PMCAST_AddName(const char * Name, struct MCAST_NameEntry **p){ int i; struct MCAST_NameEntry *q;/* hash name into name table */ i = PMCAST_Name2IndexHash (Name);/* check if name already there */ for (q=MCAST_NameList[i]; q!=NULL; q=q->Next) { if (strcmp (q->Name, Name) == 0) { return(MCAST_DuplicateName); } }/* allocate storage for name entry and add to list */ *p = (struct MCAST_NameEntry *) malloc (sizeof (struct MCAST_NameEntry)); if (!(*p)) { return (MCAST_MemoryError); } (*p)->Name = malloc (strlen(Name)+1); if (!((*p)->Name)) { free (*p); *p = NULL; return (MCAST_MemoryError); } strcpy ((*p)->Name, Name); (*p)->Next = MCAST_NameList[i]; MCAST_NameList[i] = *p; return (MCAST_Success);}/************************************************************************ * * Look up name in name list, return pointer to record if found, * return NULL if not found * ***********************************************************************/struct MCAST_NameEntry *PMCAST_LookUpName (const char *Name){ int i; struct MCAST_NameEntry *p; /* look up handle in table */ i = PMCAST_Name2IndexHash (Name); for (p=MCAST_NameList[i]; p!=NULL; p=p->Next) { if (strcmp (Name, p->Name) == 0) { return (p); } } return (NULL);} /************************************************************************ * * unsigned int PMCAST_Name2PEHash (char * Name) * convert a name of a multicast group to the number of the processor * with the handle for that group. * ***********************************************************************/int PMCAST_Name2PEHash (const char *Name){ unsigned int sum, i; /* simple, perhaps too simple, hash function */ sum = 0; for (i=0; Name[i] != '\0'; i++) sum += (unsigned int) Name[i];if(1){if(FM_numnodes>=128){sum=((sum%6)*4);}}/*XXX KALYAN: Hack for serialization bug on large #PEs*/ return (sum%FM_numnodes);}/************************************************************************ * * ***********************************************************************/int PMCAST_Name2IndexHash (const char *Name){ unsigned int sum, i; /* simple, perhaps too simple, hash function */ sum = 0; for (i
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -