⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mcast.c

📁 基于linux环境的ns2多机并行仿真补丁
💻 C
📖 第 1 页 / 共 5 页
字号:
  long rtnval;  if (TblFree == NULLINT) return (NULLINT);  rtnval = TblFree;  MCAST_Tbl[rtnval].Status = Status;  MCAST_Tbl[rtnval].NDsts = 0;  MCAST_Tbl[rtnval].DList = NULL;  MCAST_Tbl[rtnval].CList = NULL;  TblFree = MCAST_Tbl[rtnval].NextFree;  MCAST_Tbl[rtnval].NextFree = NULLINT;  return (rtnval);}/************************************************************************ * Procedure to add a processor to a group * LocalID = index into m-cast table identifying group * PE = processor added to subscription list * SubIndex = index into subscription table at destination for group */long PMCAST_AddProcessor (int LocalID, int PE, int SubIndex){  struct MCAST_PERecord *p;  /* allocate memory for a PE record and fill it in */  p = (struct MCAST_PERecord *)MB_GetBuffer(FreePERecords);  if (p == NULL) return (MCAST_MemoryError);  p->PE = PE;  p->SubIndex = SubIndex;  p->Next = MCAST_Tbl[LocalID].DList;  MCAST_Tbl[LocalID].DList = p;  (MCAST_Tbl[LocalID].NDsts)++;  return (MCAST_Success);}/************************************************************************ * Procedure to add a processor to list of PEs with cached copies * LocalID = index into m-cast table identifying group to add to * PE = processor now with a cached copy * SubIndex = LocalID in cached processor * This procedure is similar to PMCAST_AddProcessor except add to CList */long PMCAST_AddCachedProcessor (int LocalID, int PE, int SubIndex){  struct MCAST_PERecord *p;  /* allocate memory for a PE record and fill it in */  p = (struct MCAST_PERecord *)MB_GetBuffer(FreePERecords);  if (p == NULL) return (MCAST_MemoryError);  p->PE = PE;  p->SubIndex = SubIndex;  p->Next = MCAST_Tbl[LocalID].CList;  MCAST_Tbl[LocalID].CList = p;  return (MCAST_Success);}/************************************************************************ * Procedure to remove a processor from a group * LocalID = index into m-cast table identifying group * PE = processor being removed from distribution list */long PMCAST_RmProcessor (int LocalID, int PE){  struct MCAST_PERecord *p, *q;  p = MCAST_Tbl[LocalID].DList;  if (p == NULL) return (MCAST_SubError);  if (p->PE == PE)  {    /* remove first element in distribution list */    MCAST_Tbl[LocalID].DList = p->Next;    MB_FreeBuffer (FreePERecords, (void *) p);    (MCAST_Tbl[LocalID].NDsts)--;    return (MCAST_Success);  }  q = p;	/* q points to previous element */  for (p=p->Next; p !=NULL; q=p, p=p->Next)  {    /* if this is the record, remove from list */    if (p->PE == PE)    {      q->Next = p->Next;      MB_FreeBuffer (FreePERecords, p);      (MCAST_Tbl[LocalID].NDsts)--;      return (MCAST_Success);    }  }  return (MCAST_SubError);}/************************************************************************ ************************************************************************ ************************************************************************ * *  CACHE FOR MULTICAST GROUP HANDLES * ************************************************************************ ************************************************************************ ***********************************************************************//************************************************************************ * Look up in handle cache. * Search for handle in cache.  Return index into destination list table. * If handle is currently not in cache, communicate with Owner to load * it into the cache */int PMCAST_HCLookUp (MCAST_Handle Handle){  int i, hcindex;  ULONG LocalID, netLocalID, netCLocalID, rtncode;  FM_stream *strm;  /* hash into cache array */  hcindex =   ((Handle->Owner*MCAST_MAX_GROUPS_PER_PE)+Handle->LocalID)%MCAST_HCACHE_SIZE;  /* search through cache to find handle */  if (MCAST_HCache[hcindex].InUse != MCAST_UNUSED)  {    for (i=hcindex; i!=NULLINT; i=MCAST_HCache[i].Next)    {      if ((MCAST_HCache[i].Handle.Owner == Handle->Owner) &&		(MCAST_HCache[i].Handle.LocalID == Handle->LocalID))      {        MCAST_Stats.NHCHits++;        return (MCAST_HCache[i].LocalID);      }    }  }  MCAST_Stats.NHCMisses++;  /* miss in cache; request destination list from owner; wait for reply */  {    int rc;    for (rc=0;      !(strm=FM_begin_message(Handle->Owner,2*sizeof(ULONG),fmh_RequestDList));		rc++)    {      if (rc >= MCAST_MAXRETRY)        PMCAST_Quit("PMCAST_HCLookUp: Couldn't get stream for sending\n");    }  }  /* allocate spot in cache */  LocalID = PMCAST_NewGroup (MCAST_CACHED);  if (LocalID == NULLINT)    PMCAST_Quit ("PMCAST_HCLookUp: ran out of destination table space\n");   netLocalID = htonl(Handle->LocalID);  FM_send_piece(strm, (ULONG *) &netLocalID, sizeof(ULONG));  netCLocalID = htonl(LocalID);  FM_send_piece(strm, (ULONG *) &netCLocalID, sizeof(ULONG));  FM_end_message (strm);  /* wait for reply, response loaded into buffer ReplyDListBuf */  WaitForDListReply = TRUE;  while (WaitForDListReply)    FM_extract (~0);  if (ReplyDListBuf.ReturnCode != MCAST_Success)    PMCAST_Quit ("PMCAST_HCLookUp: Handle miss failed\n");  /* load destination list into cache */  for (i=0; i<ReplyDListBuf.NDsts; i++)  {    rtncode = PMCAST_AddProcessor (LocalID, ReplyDListBuf.DList[i],						ReplyDListBuf.SList[i]);    if (rtncode != MCAST_Success)      PMCAST_Quit ("PMCAST_HCLookUp: ran out of destination PE records\n");  }  /* allocate a new cache entry */  if (MCAST_HCache[hcindex].InUse == MCAST_UNUSED)  {    i = hcindex;    MCAST_HCache[i].Next = NULLINT;  }  else  {    if (HCacheFree == NULLINT)      PMCAST_Quit ("PMCAST_HCLookUp: ran out of handle cache space\n");    i = HCacheFree;    HCacheFree = MCAST_HCache[i].Next;    MCAST_HCache[i].Next = MCAST_HCache[hcindex].Next;    MCAST_HCache[hcindex].Next = i;  }  /* fill it in */  MCAST_HCache[i].InUse = 1;  MCAST_HCache[i].Handle.Owner = Handle->Owner;  MCAST_HCache[i].Handle.LocalID = Handle->LocalID;  MCAST_HCache[i].LocalID = LocalID;  return (LocalID);}/************************************************************************ * Handler for processing request for destination list for a handle */int PMCAST_RequestDList(FM_stream *strm, unsigned senderID){  ULONG LocalID, CLocalID;  FM_stream *rplystrm;  struct ReplyDList RepBuf;  struct ReplyDList netRepBuf;  int i;  struct MCAST_PERecord *p;   UTIL_EnterHandler(fmh_RequestDList);  MCAST_Stats.NRequestDList++;  /* get local ID of requested group */  FM_receive (&(LocalID), strm, sizeof (ULONG));  LocalID = ntohl (LocalID);  FM_receive (&(CLocalID), strm, sizeof (ULONG));  CLocalID = ntohl (CLocalID);  /* load reply message into a buffer */  if (MCAST_Tbl[LocalID].Status != MCAST_OWNER)  {    RepBuf.NDsts = 0;    RepBuf.ReturnCode = MCAST_NoGroup;  }  else  {    RepBuf.NDsts = MCAST_Tbl[LocalID].NDsts;    RepBuf.ReturnCode = MCAST_Success;    for (i=0, p=MCAST_Tbl[LocalID].DList; i<RepBuf.NDsts; i++, p=p->Next)    {      RepBuf.DList[i] = p->PE;      RepBuf.SList[i] = p->SubIndex;    }  }  /* send reply message */  if (MCAST_OVFirst != NULL)  {    PMCAST_AddOVQ (fmh_ReplyDList, senderID, &RepBuf, NULL);    UTIL_ExitHandler();    return(FM_CONTINUE);  }  rplystrm = FM_begin_message (senderID,	2*sizeof(ULONG)+(2*(RepBuf.NDsts*sizeof(ULONG))), fmh_ReplyDList);  if (rplystrm == NULL)  {    PMCAST_AddOVQ (fmh_ReplyDList, senderID, &RepBuf, NULL);  }  else  {    netRepBuf.NDsts = htonl(RepBuf.NDsts);    FM_send_piece (rplystrm, &(netRepBuf.NDsts), sizeof(ULONG));    netRepBuf.ReturnCode = htonl(RepBuf.ReturnCode);    FM_send_piece (rplystrm, &(netRepBuf.ReturnCode), sizeof(ULONG));    for (i=0; i<RepBuf.NDsts; i++)    {      netRepBuf.DList[i] = htonl(RepBuf.DList[i]);      FM_send_piece (rplystrm, &(netRepBuf.DList[i]), sizeof(ULONG));      netRepBuf.SList[i] = htonl(RepBuf.SList[i]);      FM_send_piece (rplystrm, &(netRepBuf.SList[i]), sizeof(ULONG));    }    FM_end_message (rplystrm);  }  /* record cached copy */  if (PMCAST_AddCachedProcessor (LocalID, senderID, CLocalID) != MCAST_Success)  {    PMCAST_Quit ("PMCAST_RequestDList: ran out of cache record space\n");  }  UTIL_ExitHandler();  return (FM_CONTINUE);}/************************************************************************ * Handler for receiving reply from handle cache misses */int PMCAST_ReplyDList(FM_stream *strm, unsigned senderID){  int i;   UTIL_EnterHandler(fmh_ReplyDList);  MCAST_Stats.NReplyDList++;  if (!WaitForDListReply)    PMCAST_Quit ("Handler: Received unexpected ReplyDList message\n");  /* get number of destinations in DList and return code */  FM_receive (&(ReplyDListBuf.NDsts), strm, sizeof (ULONG));  ReplyDListBuf.NDsts =ntohl(ReplyDListBuf.NDsts);  FM_receive (&(ReplyDListBuf.ReturnCode), strm, sizeof (ULONG));  ReplyDListBuf.ReturnCode =ntohl(ReplyDListBuf.ReturnCode);  /* get destination list */  for (i=0; i<ReplyDListBuf.NDsts; i++)  {    FM_receive (&(ReplyDListBuf.DList[i]), strm, sizeof (ULONG));    ReplyDListBuf.DList[i] =ntohl(ReplyDListBuf.DList[i]);    FM_receive (&(ReplyDListBuf.SList[i]), strm, sizeof (ULONG));    ReplyDListBuf.SList[i] =ntohl(ReplyDListBuf.SList[i]);  }  WaitForDListReply = FALSE;  UTIL_ExitHandler();  return (FM_CONTINUE);} /************************************************************************ ************************************************************************ ************************************************************************ * * SUBSCRIPTION LIST TABLE * This table indicates the destination entities within this processor * subscribed to each group to which this processor is subscribed * ************************************************************************ ************************************************************************ ***********************************************************************//************************************************************************ * LOCAL SUBSCRIPTION TABLE * one entry in table corresponds to a multicast group this processor * has subscribed to.  That entry includes a list of subscriptions to * that group by entities on this processor. * This table is used on the receiving side of the group */struct MCAST_SubListEntry {  int InUse;			/* flag, set if record is in use */  struct MCAST_HandleS Handle;	/* handle for group */  long NDsts;			/* number of subscribers on this processor */  int NextID;			/* next subscription ID to use */  int NextFree;			/* next free subscription record */  struct MCAST_SubRecord *SList; /* list of subscriptions */  MCAST_WhereProc WhereProc;	/* memory allocator for incoming messages */  void *WContext;		/* context for WhereProc() */  MCAST_EndProc EndProc;	/* procedure called after handlers */  void *EContext;		/* context for EndProc() */};struct MCAST_SubListEntry MCAST_SubList[MCAST_MAX_SUBS_PER_PE];int SubFree;			/* next free entry in SubList *//* information for an individual subscription */struct MCAST_SubRecord {  int ID;			/* ID for subscription */  MCAST_HandleProc Handler;	/* message handler procedure */  char *Context;		/* context information for handler */  struct MCAST_SubRecord *Next;	/* next one in list */};MB_BufferPool FreeSubRecords;	/* pool of memory for subscription records *//************************************************************************ * Procedure to initialize table, allocate memory * NumSubRec is the number of subscription records to be created */long PMCAST_InitSubTbl (int NumSubRec){  int i;  /* Allocate memory for subscription records */  FreeSubRecords = MB_MakePool (NumSubRec, sizeof (struct MCAST_SubRecord));  if (FreeSubRecords == NULL) return (MCAST_MemoryError);  /* initialize multcast table; string array elements into a free list */  SubFree = NULLINT;  for (i=MCAST_MAX_SUBS_PER_PE-1; i>=0; i--)  {    MCAST_SubList[i].InUse = 0;    MCAST_SubList[i].Handle.Owner = NO_NODE;    MCAST_SubList[i].Handle.LocalID = -1;    MCAST_SubList[i].NDsts = 0;    MCAST_SubList[i].NextID = 0;    MCAST_SubList[i].SList = NULL;    MCAST_SubList[i].WhereProc = NULL;    MCAST_SubList[i].WContext = NULL;    MCAST_SubList[i].EndProc = NULL;    MCAST_SubList[i].EContext = NULL;    MCAST_SubList[i].NextFree = SubFree;    SubFree = i;  }  return (MCAST_Success);}/************************************************************************ * Hash a group handle to an index into the processor subscription table * return TRUE if found, FALSE if not found. * If found, index returns index into subscription table of entry * If not found, index returns the next available element in table, or -1 * if the table is full */BOOLEAN PMCAST_Handle2SubHash (MCAST_Handle handle, long *index){  int i, j;  *index = -1;  /* default: not found, no free entries left in table */  i = (((handle->Owner)*MCAST_MAX_GROUPS_PER_PE) + handle->LocalID)						%MCAST_MAX_SUBS_PER_PE;  for (j=0; j<MCAST_MAX_SUBS_PER_PE; j++, i=(i+1)%MCAST_MAX_SUBS_PER_PE)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -