📄 mcast.c
字号:
long rtnval; if (TblFree == NULLINT) return (NULLINT); rtnval = TblFree; MCAST_Tbl[rtnval].Status = Status; MCAST_Tbl[rtnval].NDsts = 0; MCAST_Tbl[rtnval].DList = NULL; MCAST_Tbl[rtnval].CList = NULL; TblFree = MCAST_Tbl[rtnval].NextFree; MCAST_Tbl[rtnval].NextFree = NULLINT; return (rtnval);}/************************************************************************ * Procedure to add a processor to a group * LocalID = index into m-cast table identifying group * PE = processor added to subscription list * SubIndex = index into subscription table at destination for group */long PMCAST_AddProcessor (int LocalID, int PE, int SubIndex){ struct MCAST_PERecord *p; /* allocate memory for a PE record and fill it in */ p = (struct MCAST_PERecord *)MB_GetBuffer(FreePERecords); if (p == NULL) return (MCAST_MemoryError); p->PE = PE; p->SubIndex = SubIndex; p->Next = MCAST_Tbl[LocalID].DList; MCAST_Tbl[LocalID].DList = p; (MCAST_Tbl[LocalID].NDsts)++; return (MCAST_Success);}/************************************************************************ * Procedure to add a processor to list of PEs with cached copies * LocalID = index into m-cast table identifying group to add to * PE = processor now with a cached copy * SubIndex = LocalID in cached processor * This procedure is similar to PMCAST_AddProcessor except add to CList */long PMCAST_AddCachedProcessor (int LocalID, int PE, int SubIndex){ struct MCAST_PERecord *p; /* allocate memory for a PE record and fill it in */ p = (struct MCAST_PERecord *)MB_GetBuffer(FreePERecords); if (p == NULL) return (MCAST_MemoryError); p->PE = PE; p->SubIndex = SubIndex; p->Next = MCAST_Tbl[LocalID].CList; MCAST_Tbl[LocalID].CList = p; return (MCAST_Success);}/************************************************************************ * Procedure to remove a processor from a group * LocalID = index into m-cast table identifying group * PE = processor being removed from distribution list */long PMCAST_RmProcessor (int LocalID, int PE){ struct MCAST_PERecord *p, *q; p = MCAST_Tbl[LocalID].DList; if (p == NULL) return (MCAST_SubError); if (p->PE == PE) { /* remove first element in distribution list */ MCAST_Tbl[LocalID].DList = p->Next; MB_FreeBuffer (FreePERecords, (void *) p); (MCAST_Tbl[LocalID].NDsts)--; return (MCAST_Success); } q = p; /* q points to previous element */ for (p=p->Next; p !=NULL; q=p, p=p->Next) { /* if this is the record, remove from list */ if (p->PE == PE) { q->Next = p->Next; MB_FreeBuffer (FreePERecords, p); (MCAST_Tbl[LocalID].NDsts)--; return (MCAST_Success); } } return (MCAST_SubError);}/************************************************************************ ************************************************************************ ************************************************************************ * * CACHE FOR MULTICAST GROUP HANDLES * ************************************************************************ ************************************************************************ ***********************************************************************//************************************************************************ * Look up in handle cache. * Search for handle in cache. Return index into destination list table. * If handle is currently not in cache, communicate with Owner to load * it into the cache */int PMCAST_HCLookUp (MCAST_Handle Handle){ int i, hcindex; ULONG LocalID, netLocalID, netCLocalID, rtncode; FM_stream *strm; /* hash into cache array */ hcindex = ((Handle->Owner*MCAST_MAX_GROUPS_PER_PE)+Handle->LocalID)%MCAST_HCACHE_SIZE; /* search through cache to find handle */ if (MCAST_HCache[hcindex].InUse != MCAST_UNUSED) { for (i=hcindex; i!=NULLINT; i=MCAST_HCache[i].Next) { if ((MCAST_HCache[i].Handle.Owner == Handle->Owner) && (MCAST_HCache[i].Handle.LocalID == Handle->LocalID)) { MCAST_Stats.NHCHits++; return (MCAST_HCache[i].LocalID); } } } MCAST_Stats.NHCMisses++; /* miss in cache; request destination list from owner; wait for reply */ { int rc; for (rc=0; !(strm=FM_begin_message(Handle->Owner,2*sizeof(ULONG),fmh_RequestDList)); rc++) { if (rc >= MCAST_MAXRETRY) PMCAST_Quit("PMCAST_HCLookUp: Couldn't get stream for sending\n"); } } /* allocate spot in cache */ LocalID = PMCAST_NewGroup (MCAST_CACHED); if (LocalID == NULLINT) PMCAST_Quit ("PMCAST_HCLookUp: ran out of destination table space\n"); netLocalID = htonl(Handle->LocalID); FM_send_piece(strm, (ULONG *) &netLocalID, sizeof(ULONG)); netCLocalID = htonl(LocalID); FM_send_piece(strm, (ULONG *) &netCLocalID, sizeof(ULONG)); FM_end_message (strm); /* wait for reply, response loaded into buffer ReplyDListBuf */ WaitForDListReply = TRUE; while (WaitForDListReply) FM_extract (~0); if (ReplyDListBuf.ReturnCode != MCAST_Success) PMCAST_Quit ("PMCAST_HCLookUp: Handle miss failed\n"); /* load destination list into cache */ for (i=0; i<ReplyDListBuf.NDsts; i++) { rtncode = PMCAST_AddProcessor (LocalID, ReplyDListBuf.DList[i], ReplyDListBuf.SList[i]); if (rtncode != MCAST_Success) PMCAST_Quit ("PMCAST_HCLookUp: ran out of destination PE records\n"); } /* allocate a new cache entry */ if (MCAST_HCache[hcindex].InUse == MCAST_UNUSED) { i = hcindex; MCAST_HCache[i].Next = NULLINT; } else { if (HCacheFree == NULLINT) PMCAST_Quit ("PMCAST_HCLookUp: ran out of handle cache space\n"); i = HCacheFree; HCacheFree = MCAST_HCache[i].Next; MCAST_HCache[i].Next = MCAST_HCache[hcindex].Next; MCAST_HCache[hcindex].Next = i; } /* fill it in */ MCAST_HCache[i].InUse = 1; MCAST_HCache[i].Handle.Owner = Handle->Owner; MCAST_HCache[i].Handle.LocalID = Handle->LocalID; MCAST_HCache[i].LocalID = LocalID; return (LocalID);}/************************************************************************ * Handler for processing request for destination list for a handle */int PMCAST_RequestDList(FM_stream *strm, unsigned senderID){ ULONG LocalID, CLocalID; FM_stream *rplystrm; struct ReplyDList RepBuf; struct ReplyDList netRepBuf; int i; struct MCAST_PERecord *p; UTIL_EnterHandler(fmh_RequestDList); MCAST_Stats.NRequestDList++; /* get local ID of requested group */ FM_receive (&(LocalID), strm, sizeof (ULONG)); LocalID = ntohl (LocalID); FM_receive (&(CLocalID), strm, sizeof (ULONG)); CLocalID = ntohl (CLocalID); /* load reply message into a buffer */ if (MCAST_Tbl[LocalID].Status != MCAST_OWNER) { RepBuf.NDsts = 0; RepBuf.ReturnCode = MCAST_NoGroup; } else { RepBuf.NDsts = MCAST_Tbl[LocalID].NDsts; RepBuf.ReturnCode = MCAST_Success; for (i=0, p=MCAST_Tbl[LocalID].DList; i<RepBuf.NDsts; i++, p=p->Next) { RepBuf.DList[i] = p->PE; RepBuf.SList[i] = p->SubIndex; } } /* send reply message */ if (MCAST_OVFirst != NULL) { PMCAST_AddOVQ (fmh_ReplyDList, senderID, &RepBuf, NULL); UTIL_ExitHandler(); return(FM_CONTINUE); } rplystrm = FM_begin_message (senderID, 2*sizeof(ULONG)+(2*(RepBuf.NDsts*sizeof(ULONG))), fmh_ReplyDList); if (rplystrm == NULL) { PMCAST_AddOVQ (fmh_ReplyDList, senderID, &RepBuf, NULL); } else { netRepBuf.NDsts = htonl(RepBuf.NDsts); FM_send_piece (rplystrm, &(netRepBuf.NDsts), sizeof(ULONG)); netRepBuf.ReturnCode = htonl(RepBuf.ReturnCode); FM_send_piece (rplystrm, &(netRepBuf.ReturnCode), sizeof(ULONG)); for (i=0; i<RepBuf.NDsts; i++) { netRepBuf.DList[i] = htonl(RepBuf.DList[i]); FM_send_piece (rplystrm, &(netRepBuf.DList[i]), sizeof(ULONG)); netRepBuf.SList[i] = htonl(RepBuf.SList[i]); FM_send_piece (rplystrm, &(netRepBuf.SList[i]), sizeof(ULONG)); } FM_end_message (rplystrm); } /* record cached copy */ if (PMCAST_AddCachedProcessor (LocalID, senderID, CLocalID) != MCAST_Success) { PMCAST_Quit ("PMCAST_RequestDList: ran out of cache record space\n"); } UTIL_ExitHandler(); return (FM_CONTINUE);}/************************************************************************ * Handler for receiving reply from handle cache misses */int PMCAST_ReplyDList(FM_stream *strm, unsigned senderID){ int i; UTIL_EnterHandler(fmh_ReplyDList); MCAST_Stats.NReplyDList++; if (!WaitForDListReply) PMCAST_Quit ("Handler: Received unexpected ReplyDList message\n"); /* get number of destinations in DList and return code */ FM_receive (&(ReplyDListBuf.NDsts), strm, sizeof (ULONG)); ReplyDListBuf.NDsts =ntohl(ReplyDListBuf.NDsts); FM_receive (&(ReplyDListBuf.ReturnCode), strm, sizeof (ULONG)); ReplyDListBuf.ReturnCode =ntohl(ReplyDListBuf.ReturnCode); /* get destination list */ for (i=0; i<ReplyDListBuf.NDsts; i++) { FM_receive (&(ReplyDListBuf.DList[i]), strm, sizeof (ULONG)); ReplyDListBuf.DList[i] =ntohl(ReplyDListBuf.DList[i]); FM_receive (&(ReplyDListBuf.SList[i]), strm, sizeof (ULONG)); ReplyDListBuf.SList[i] =ntohl(ReplyDListBuf.SList[i]); } WaitForDListReply = FALSE; UTIL_ExitHandler(); return (FM_CONTINUE);} /************************************************************************ ************************************************************************ ************************************************************************ * * SUBSCRIPTION LIST TABLE * This table indicates the destination entities within this processor * subscribed to each group to which this processor is subscribed * ************************************************************************ ************************************************************************ ***********************************************************************//************************************************************************ * LOCAL SUBSCRIPTION TABLE * one entry in table corresponds to a multicast group this processor * has subscribed to. That entry includes a list of subscriptions to * that group by entities on this processor. * This table is used on the receiving side of the group */struct MCAST_SubListEntry { int InUse; /* flag, set if record is in use */ struct MCAST_HandleS Handle; /* handle for group */ long NDsts; /* number of subscribers on this processor */ int NextID; /* next subscription ID to use */ int NextFree; /* next free subscription record */ struct MCAST_SubRecord *SList; /* list of subscriptions */ MCAST_WhereProc WhereProc; /* memory allocator for incoming messages */ void *WContext; /* context for WhereProc() */ MCAST_EndProc EndProc; /* procedure called after handlers */ void *EContext; /* context for EndProc() */};struct MCAST_SubListEntry MCAST_SubList[MCAST_MAX_SUBS_PER_PE];int SubFree; /* next free entry in SubList *//* information for an individual subscription */struct MCAST_SubRecord { int ID; /* ID for subscription */ MCAST_HandleProc Handler; /* message handler procedure */ char *Context; /* context information for handler */ struct MCAST_SubRecord *Next; /* next one in list */};MB_BufferPool FreeSubRecords; /* pool of memory for subscription records *//************************************************************************ * Procedure to initialize table, allocate memory * NumSubRec is the number of subscription records to be created */long PMCAST_InitSubTbl (int NumSubRec){ int i; /* Allocate memory for subscription records */ FreeSubRecords = MB_MakePool (NumSubRec, sizeof (struct MCAST_SubRecord)); if (FreeSubRecords == NULL) return (MCAST_MemoryError); /* initialize multcast table; string array elements into a free list */ SubFree = NULLINT; for (i=MCAST_MAX_SUBS_PER_PE-1; i>=0; i--) { MCAST_SubList[i].InUse = 0; MCAST_SubList[i].Handle.Owner = NO_NODE; MCAST_SubList[i].Handle.LocalID = -1; MCAST_SubList[i].NDsts = 0; MCAST_SubList[i].NextID = 0; MCAST_SubList[i].SList = NULL; MCAST_SubList[i].WhereProc = NULL; MCAST_SubList[i].WContext = NULL; MCAST_SubList[i].EndProc = NULL; MCAST_SubList[i].EContext = NULL; MCAST_SubList[i].NextFree = SubFree; SubFree = i; } return (MCAST_Success);}/************************************************************************ * Hash a group handle to an index into the processor subscription table * return TRUE if found, FALSE if not found. * If found, index returns index into subscription table of entry * If not found, index returns the next available element in table, or -1 * if the table is full */BOOLEAN PMCAST_Handle2SubHash (MCAST_Handle handle, long *index){ int i, j; *index = -1; /* default: not found, no free entries left in table */ i = (((handle->Owner)*MCAST_MAX_GROUPS_PER_PE) + handle->LocalID) %MCAST_MAX_SUBS_PER_PE; for (j=0; j<MCAST_MAX_SUBS_PER_PE; j++, i=(i+1)%MCAST_MAX_SUBS_PER_PE)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -