⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mcast.c

📁 基于linux环境的ns2多机并行仿真补丁
💻 C
📖 第 1 页 / 共 5 页
字号:
 ************************************************************************ ************************************************************************ ***********************************************************************/struct MCAST_OVEntry{  unsigned int HType;		/* type of handler */  unsigned int Dst;		/* destination processor */  union				/* message headers */  {    struct MCastMessage HUMessage;	/* user message */    struct ReplyDList HRDList;		/* reply for destination list req */    struct ReplyHandle HRHandle;	/* reply for a handle request */    struct ReplySubscription HRSub;	/* reply subscription request */    struct RequestLeave HRLeave;	/* request to leave group */    struct ReplyRegister HRReg;		/* reply register request */    struct ReplyNewGroup HRNewGroup;	/* reply creating new group request */    struct UpdateDList HUDList;		/* update DList request */  } H;  char *Data;			/* data portion of the message, if any */  struct MCAST_OVEntry *Next;	/* next element in queue */};struct MCAST_OVEntry *MCAST_OVFirst=NULL; /* first message in overflow queue */struct MCAST_OVEntry *MCAST_OVLast=NULL;  /* last message in overflow queue */struct MCAST_OVEntry *MCAST_DefFirst=NULL; /* first message in deferred queue */struct MCAST_OVEntry *MCAST_DefLast=NULL;  /* last message in deferred queue *//************************************************************************ * * PMCAST_AddOVQ * Add message to the overflow queue * */void PMCAST_AddOVQ (unsigned int HType, ULONG dst, void *Header, char *Msg){  struct MCAST_OVEntry *q;  struct MCastMessage *MCM;  struct ReplyDList *RDL;  struct UpdateDList *RUDL;  struct ReplyHandle *RH;  struct ReplySubscription *RS;  struct RequestLeave *RL;  struct ReplyRegister *RR;  struct ReplyNewGroup *RNG;  int i;  /* allocate space for queue entry and fill it in */  q = (struct MCAST_OVEntry *) malloc (sizeof (struct MCAST_OVEntry));  if (q == NULL)  {    PMCAST_Quit ("PMCAST_AddOVQ: flow control ran out of memory\n");  }  q->HType = HType;  q->Dst = dst;  q->Next = NULL;  /* copy contents of message into a buffer */  if (HType == fmh_UserMessage)  {    MCM = (struct MCastMessage *) Header;    q->H.HUMessage.SubIndex = MCM->SubIndex;    q->H.HUMessage.Length = MCM->Length;    q->H.HUMessage.MsgType = MCM->MsgType;    q->Data = (char *) malloc (MCM->Length);    /* copy contents of message */    if (q->Data == NULL)    {      PMCAST_Quit ("PMCAST_AddOVQ: flow control ran out of memory\n");    }    for (i=0; i<MCM->Length; i++) q->Data[i] = Msg[i];  }  else if (HType == fmh_ReplyDList)  {    RDL = (struct ReplyDList *) Header;    q->H.HRDList.NDsts = RDL->NDsts;    q->H.HRDList.ReturnCode = RDL->ReturnCode;    for (i=0; i<RDL->NDsts; i++)    {      q->H.HRDList.DList[i] = RDL->DList[i];      q->H.HRDList.SList[i] = RDL->SList[i];    }  }  else if (HType == fmh_AddDList)  {    RUDL = (struct UpdateDList *) Header;    q->H.HUDList.LocalID = RUDL->LocalID;    q->H.HUDList.PE = RUDL->PE;    q->H.HUDList.SubIndex = RUDL->SubIndex;  }  else if (HType == fmh_ReplyHandle)  {    RH = (struct ReplyHandle *) Header;    q->H.HRHandle.Owner = RH->Owner;    q->H.HRHandle.LocalID = RH->LocalID;    q->H.HRHandle.ReturnCode = RH->ReturnCode;  }  else if (HType == fmh_ReplyNewGroup)  {    RNG = (struct ReplyNewGroup *) Header;    q->H.HRNewGroup.LocalID = RNG->LocalID;    q->H.HRNewGroup.ReturnCode = RNG->ReturnCode;  }  else if (HType == fmh_ReplyRegister)  {    RR = (struct ReplyRegister *) Header;    q->H.HRReg.ReturnCode = RR->ReturnCode;  }  else if (HType == fmh_ReplySubscription)  {    RS = (struct ReplySubscription *) Header;    q->H.HRSub.ReturnCode = RS->ReturnCode;  }  else if (HType == fmh_RequestLeave)  {    RL = (struct RequestLeave *) Header;    q->H.HRLeave.LocalID = RL->LocalID;    q->H.HRLeave.NodeID = RL->NodeID;  }  else  {    PMCAST_Quit ("PMCAST_AddOVQ: invalid handler type found\n");  }  /* add to queue */  if (MCAST_OVFirst == NULL)	/* if queue empty */  {    MCAST_OVFirst = q;    MCAST_OVLast = q;  }  else  {    MCAST_OVLast->Next = q;    MCAST_OVLast = q;  }}/************************************************************************ * * PMCAST_FlushOVQ * Send messages in overflow queue; busy wait until they're all gone * Caution: This procedure cannot be called from a message handler * */void PMCAST_FlushOVQ (void){  struct MCAST_OVEntry *p;  struct MCAST_OVEntry SBuf;  FM_stream *strm;  int i;  MCAST_Stats.NFlushes++;  for (p=MCAST_OVFirst; p!=NULL; p=p->Next)  {    if (p->HType == fmh_UserMessage)    {      /* user defined message */      while (!(strm = FM_begin_message (p->Dst,		3*sizeof(ULONG)+p->H.HUMessage.Length, p->HType))) ;      SBuf.H.HUMessage.SubIndex = htonl(p->H.HUMessage.SubIndex);      FM_send_piece(strm, &SBuf.H.HUMessage.SubIndex, sizeof (ULONG));      SBuf.H.HUMessage.Length = htonl (p->H.HUMessage.Length);      FM_send_piece(strm, &SBuf.H.HUMessage.Length, sizeof (ULONG));      SBuf.H.HUMessage.MsgType = htonl (p->H.HUMessage.MsgType);      FM_send_piece(strm, &SBuf.H.HUMessage.MsgType, sizeof (ULONG));      if (p->H.HUMessage.Length)      {        FM_send_piece (strm, p->Data, p->H.HUMessage.Length);      }      FM_end_message (strm);      free (p->Data);    }    else if (p->HType == fmh_ReplyDList)    {      /* reply for DList request */      while (!(strm = FM_begin_message (p->Dst,        2*sizeof(ULONG)+(2*(p->H.HRDList.NDsts*sizeof(ULONG))), p->HType)));      SBuf.H.HRDList.NDsts = htonl(p->H.HRDList.NDsts);      FM_send_piece(strm, &SBuf.H.HRDList.NDsts, sizeof (ULONG));      SBuf.H.HRDList.ReturnCode = htonl(p->H.HRDList.ReturnCode);      FM_send_piece(strm, &SBuf.H.HRDList.ReturnCode, sizeof (ULONG));      for (i=0; i<SBuf.H.HRDList.NDsts; i++)      {        SBuf.H.HRDList.DList[i] = htonl(p->H.HRDList.DList[i]);        FM_send_piece(strm, &(SBuf.H.HRDList.DList[i]), sizeof (ULONG));        SBuf.H.HRDList.SList[i] = htonl(p->H.HRDList.SList[i]);        FM_send_piece(strm, &(SBuf.H.HRDList.SList[i]), sizeof (ULONG));      }      FM_end_message (strm);    }    else if (p->HType == fmh_AddDList)    {      while (!(strm = FM_begin_message (p->Dst, 2*sizeof(ULONG), p->HType))) ;      SBuf.H.HUDList.LocalID = htonl(p->H.HUDList.LocalID);      FM_send_piece (strm, &(SBuf.H.HUDList.LocalID), sizeof(ULONG));      SBuf.H.HUDList.PE = htonl(p->H.HUDList.PE);      FM_send_piece (strm, &(SBuf.H.HUDList.PE), sizeof(ULONG));      FM_end_message (strm);    }    else if (p->HType == fmh_ReplyHandle)    {      /* send reply to handle request */      while (!(strm = FM_begin_message (p->Dst, 3*sizeof(ULONG), p->HType))) ;      SBuf.H.HRHandle.Owner = htonl(p->H.HRHandle.Owner);      FM_send_piece (strm, &(SBuf.H.HRHandle.Owner), sizeof(ULONG));      SBuf.H.HRHandle.LocalID = htonl(p->H.HRHandle.LocalID);      FM_send_piece (strm, &(SBuf.H.HRHandle.LocalID), sizeof(ULONG));      SBuf.H.HRHandle.ReturnCode = htonl(p->H.HRHandle.ReturnCode);      FM_send_piece (strm, &(SBuf.H.HRHandle.ReturnCode), sizeof(ULONG));      FM_end_message (strm);    }    else if (p->HType == fmh_ReplyNewGroup)    {      /* send reply to handle creation of a new group */      while (!(strm = FM_begin_message (p->Dst, 2*sizeof(ULONG), p->HType))) ;      SBuf.H.HRNewGroup.LocalID = htonl(p->H.HRNewGroup.LocalID);      FM_send_piece (strm, &(SBuf.H.HRNewGroup.LocalID), sizeof(ULONG));      SBuf.H.HRNewGroup.ReturnCode = htonl(p->H.HRNewGroup.ReturnCode);      FM_send_piece (strm, &(SBuf.H.HRNewGroup.ReturnCode), sizeof(ULONG));      FM_end_message (strm);    }    else if (p->HType == fmh_ReplyRegister)    {      /* send reply to request registration of a new group */      while (!(strm = FM_begin_message (p->Dst, sizeof(ULONG), p->HType))) ;      SBuf.H.HRReg.ReturnCode = htonl(p->H.HRReg.ReturnCode);      FM_send_piece (strm, &(SBuf.H.HRReg.ReturnCode), sizeof(ULONG));      FM_end_message (strm);    }    else if (p->HType == fmh_ReplySubscription)    {      /* send reply to request subscription to a group */      while (!(strm = FM_begin_message (p->Dst, sizeof(ULONG), p->HType))) ;      SBuf.H.HRSub.ReturnCode = htonl(p->H.HRSub.ReturnCode);      FM_send_piece (strm, &(SBuf.H.HRSub.ReturnCode), sizeof(ULONG));      FM_end_message (strm);    }    else if (p->HType == fmh_RequestLeave)    {      /* send reply to request subscription to a group */      while (!(strm = FM_begin_message (p->Dst, 2*sizeof(ULONG), p->HType))) ;      SBuf.H.HRLeave.LocalID = htonl(p->H.HRLeave.LocalID);      FM_send_piece (strm, &(SBuf.H.HRLeave.LocalID), sizeof(ULONG));      SBuf.H.HRLeave.NodeID = htonl(p->H.HRLeave.NodeID);      FM_send_piece (strm, &(SBuf.H.HRLeave.NodeID), sizeof(ULONG));      FM_end_message (strm);    }    else    {      PMCAST_Quit ("PMCAST_FlushOVQ: invalid handler type found\n");    }    free (p);  }  MCAST_OVFirst = NULL;  MCAST_OVLast = NULL;  MCAST_Stats.NFSends++;}void PMCAST_FlushDefQ (void);/************************************************************************ * * MCAST_Tick * flush overflow queue before returning * */void MCAST_Tick (void){  /* flush the overflow queue if it is not empty */  if ((MCAST_OVFirst != NULL) && (! UTIL_InHandler()))  {    PMCAST_FlushOVQ();  }  /* if deferred messages, process them */  if ((UTIL_InHandler() == 0) && (MCAST_DefFirst != NULL))  {/*printf ("flushing incoming message queue\n");fflush(stdout);*/    PMCAST_FlushDefQ ();  }}/************************************************************************ ************************************************************************ ************************************************************************ * * DESTINATION LIST TABLE * This table indicates the PROCESSORS subscribed to each group * Each processor has one such table, with an entry for each group it owns * ************************************************************************ ************************************************************************ ***********************************************************************//* equivalent of null pointer, but for integer type */#define NULLINT   -12345/************************************************************************ * LOCAL MULTICAST TABLE * one entry in table for each multicast group owned by this processor * index by local ID for group (MCAST_Owner[group #].LocalID) * each entry contains a list of destinations for a single group * This table is used on the sending side of the group *//* Each entry corresponds to a single multicast group */struct MCAST_TblEntry {  int Status;			/* MCAST_UNUSED, MCAST_OWNER, or MCAST_CACHED */  int NDsts;			/* number of destination processors */  struct MCAST_PERecord *DList;	/* list of processors subscribed to group */  struct MCAST_PERecord *CList;	/* processors with cached copy of DList */  int NextFree;			/* index to next free entry in table */};struct MCAST_PERecord {  long PE;			/* destination processor */  long SubIndex;		/* index into destinations subscription tbl */  struct MCAST_PERecord *Next;	/* next record in destination list */};MB_BufferPool FreePERecords;	/* pool of memory for PE records */struct MCAST_TblEntry MCAST_Tbl[MCAST_MAX_GROUPS_PER_PE];int TblFree;			/* next free entry in Tbl *//************************************************************************ * HANDLE CACHE DATA STRUCTURES *//* * Each processor maintains a cache of handles it can send messages to * Access to the cache occurs on message sends * If hit, simply send messages to destination list * If miss, load handle and destination list into cache, then do send * * Changes to the destination list are sent to the owner, which sends * update messages to all processors with cache copies of the handle */struct MCAST_HCacheEntry{  int InUse;			/* 1 if entry is in use */  struct MCAST_HandleS Handle;  int LocalID;			/* index into local destination list */  int Next;			/* next element in overflow list */};struct MCAST_HCacheEntry MCAST_HCache[MCAST_HCACHE_SIZE+MCAST_HCACHE_OV_SIZE];int HCacheFree;                    /* next free entry in Cache */struct ReplyDList ReplyDListBuf;BOOLEAN WaitForDListReply;/************************************************************************ * Procedure to initialize table, allocate memory * NumDstRec is the number of destination records to be created */long PMCAST_InitMCastTbl (int NumDstRec){  int i;  /* Allocate memory for PE records */  FreePERecords = MB_MakePool (NumDstRec, sizeof (struct MCAST_PERecord));  if (FreePERecords == NULL) return (MCAST_MemoryError);  /* initialize multcast table; string array elements into a free list */  TblFree = NULLINT;  for (i=MCAST_MAX_GROUPS_PER_PE-1; i>=0; i--)  {    MCAST_Tbl[i].Status = MCAST_UNUSED;	/* not in use */    MCAST_Tbl[i].NDsts = 0;    MCAST_Tbl[i].DList = NULL;    MCAST_Tbl[i].NextFree = TblFree;    TblFree = i;  }  /* initialize handle cache; first hash array */  for (i=0; i<MCAST_HCACHE_SIZE; i++)  {    MCAST_HCache[i].InUse = 0;    MCAST_HCache[i].Next = NULLINT;  }  HCacheFree = NULLINT;  for (i=MCAST_HCACHE_SIZE+MCAST_HCACHE_OV_SIZE-1; i>=MCAST_HCACHE_SIZE; i--)  {    MCAST_HCache[i].InUse = 0;    MCAST_HCache[i].Next = HCacheFree;    HCacheFree = i;  }  /* initialize statistics */  PMCAST_InitStats();  return (MCAST_Success);}/************************************************************************ * Procedure to allocate a table entry for a new group (group creation) * Status = MCAST_OWNER or MCAST_CACHED */long PMCAST_NewGroup (int Status){

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -