📄 mcast.c
字号:
************************************************************************ ************************************************************************ ***********************************************************************/struct MCAST_OVEntry{ unsigned int HType; /* type of handler */ unsigned int Dst; /* destination processor */ union /* message headers */ { struct MCastMessage HUMessage; /* user message */ struct ReplyDList HRDList; /* reply for destination list req */ struct ReplyHandle HRHandle; /* reply for a handle request */ struct ReplySubscription HRSub; /* reply subscription request */ struct RequestLeave HRLeave; /* request to leave group */ struct ReplyRegister HRReg; /* reply register request */ struct ReplyNewGroup HRNewGroup; /* reply creating new group request */ struct UpdateDList HUDList; /* update DList request */ } H; char *Data; /* data portion of the message, if any */ struct MCAST_OVEntry *Next; /* next element in queue */};struct MCAST_OVEntry *MCAST_OVFirst=NULL; /* first message in overflow queue */struct MCAST_OVEntry *MCAST_OVLast=NULL; /* last message in overflow queue */struct MCAST_OVEntry *MCAST_DefFirst=NULL; /* first message in deferred queue */struct MCAST_OVEntry *MCAST_DefLast=NULL; /* last message in deferred queue *//************************************************************************ * * PMCAST_AddOVQ * Add message to the overflow queue * */void PMCAST_AddOVQ (unsigned int HType, ULONG dst, void *Header, char *Msg){ struct MCAST_OVEntry *q; struct MCastMessage *MCM; struct ReplyDList *RDL; struct UpdateDList *RUDL; struct ReplyHandle *RH; struct ReplySubscription *RS; struct RequestLeave *RL; struct ReplyRegister *RR; struct ReplyNewGroup *RNG; int i; /* allocate space for queue entry and fill it in */ q = (struct MCAST_OVEntry *) malloc (sizeof (struct MCAST_OVEntry)); if (q == NULL) { PMCAST_Quit ("PMCAST_AddOVQ: flow control ran out of memory\n"); } q->HType = HType; q->Dst = dst; q->Next = NULL; /* copy contents of message into a buffer */ if (HType == fmh_UserMessage) { MCM = (struct MCastMessage *) Header; q->H.HUMessage.SubIndex = MCM->SubIndex; q->H.HUMessage.Length = MCM->Length; q->H.HUMessage.MsgType = MCM->MsgType; q->Data = (char *) malloc (MCM->Length); /* copy contents of message */ if (q->Data == NULL) { PMCAST_Quit ("PMCAST_AddOVQ: flow control ran out of memory\n"); } for (i=0; i<MCM->Length; i++) q->Data[i] = Msg[i]; } else if (HType == fmh_ReplyDList) { RDL = (struct ReplyDList *) Header; q->H.HRDList.NDsts = RDL->NDsts; q->H.HRDList.ReturnCode = RDL->ReturnCode; for (i=0; i<RDL->NDsts; i++) { q->H.HRDList.DList[i] = RDL->DList[i]; q->H.HRDList.SList[i] = RDL->SList[i]; } } else if (HType == fmh_AddDList) { RUDL = (struct UpdateDList *) Header; q->H.HUDList.LocalID = RUDL->LocalID; q->H.HUDList.PE = RUDL->PE; q->H.HUDList.SubIndex = RUDL->SubIndex; } else if (HType == fmh_ReplyHandle) { RH = (struct ReplyHandle *) Header; q->H.HRHandle.Owner = RH->Owner; q->H.HRHandle.LocalID = RH->LocalID; q->H.HRHandle.ReturnCode = RH->ReturnCode; } else if (HType == fmh_ReplyNewGroup) { RNG = (struct ReplyNewGroup *) Header; q->H.HRNewGroup.LocalID = RNG->LocalID; q->H.HRNewGroup.ReturnCode = RNG->ReturnCode; } else if (HType == fmh_ReplyRegister) { RR = (struct ReplyRegister *) Header; q->H.HRReg.ReturnCode = RR->ReturnCode; } else if (HType == fmh_ReplySubscription) { RS = (struct ReplySubscription *) Header; q->H.HRSub.ReturnCode = RS->ReturnCode; } else if (HType == fmh_RequestLeave) { RL = (struct RequestLeave *) Header; q->H.HRLeave.LocalID = RL->LocalID; q->H.HRLeave.NodeID = RL->NodeID; } else { PMCAST_Quit ("PMCAST_AddOVQ: invalid handler type found\n"); } /* add to queue */ if (MCAST_OVFirst == NULL) /* if queue empty */ { MCAST_OVFirst = q; MCAST_OVLast = q; } else { MCAST_OVLast->Next = q; MCAST_OVLast = q; }}/************************************************************************ * * PMCAST_FlushOVQ * Send messages in overflow queue; busy wait until they're all gone * Caution: This procedure cannot be called from a message handler * */void PMCAST_FlushOVQ (void){ struct MCAST_OVEntry *p; struct MCAST_OVEntry SBuf; FM_stream *strm; int i; MCAST_Stats.NFlushes++; for (p=MCAST_OVFirst; p!=NULL; p=p->Next) { if (p->HType == fmh_UserMessage) { /* user defined message */ while (!(strm = FM_begin_message (p->Dst, 3*sizeof(ULONG)+p->H.HUMessage.Length, p->HType))) ; SBuf.H.HUMessage.SubIndex = htonl(p->H.HUMessage.SubIndex); FM_send_piece(strm, &SBuf.H.HUMessage.SubIndex, sizeof (ULONG)); SBuf.H.HUMessage.Length = htonl (p->H.HUMessage.Length); FM_send_piece(strm, &SBuf.H.HUMessage.Length, sizeof (ULONG)); SBuf.H.HUMessage.MsgType = htonl (p->H.HUMessage.MsgType); FM_send_piece(strm, &SBuf.H.HUMessage.MsgType, sizeof (ULONG)); if (p->H.HUMessage.Length) { FM_send_piece (strm, p->Data, p->H.HUMessage.Length); } FM_end_message (strm); free (p->Data); } else if (p->HType == fmh_ReplyDList) { /* reply for DList request */ while (!(strm = FM_begin_message (p->Dst, 2*sizeof(ULONG)+(2*(p->H.HRDList.NDsts*sizeof(ULONG))), p->HType))); SBuf.H.HRDList.NDsts = htonl(p->H.HRDList.NDsts); FM_send_piece(strm, &SBuf.H.HRDList.NDsts, sizeof (ULONG)); SBuf.H.HRDList.ReturnCode = htonl(p->H.HRDList.ReturnCode); FM_send_piece(strm, &SBuf.H.HRDList.ReturnCode, sizeof (ULONG)); for (i=0; i<SBuf.H.HRDList.NDsts; i++) { SBuf.H.HRDList.DList[i] = htonl(p->H.HRDList.DList[i]); FM_send_piece(strm, &(SBuf.H.HRDList.DList[i]), sizeof (ULONG)); SBuf.H.HRDList.SList[i] = htonl(p->H.HRDList.SList[i]); FM_send_piece(strm, &(SBuf.H.HRDList.SList[i]), sizeof (ULONG)); } FM_end_message (strm); } else if (p->HType == fmh_AddDList) { while (!(strm = FM_begin_message (p->Dst, 2*sizeof(ULONG), p->HType))) ; SBuf.H.HUDList.LocalID = htonl(p->H.HUDList.LocalID); FM_send_piece (strm, &(SBuf.H.HUDList.LocalID), sizeof(ULONG)); SBuf.H.HUDList.PE = htonl(p->H.HUDList.PE); FM_send_piece (strm, &(SBuf.H.HUDList.PE), sizeof(ULONG)); FM_end_message (strm); } else if (p->HType == fmh_ReplyHandle) { /* send reply to handle request */ while (!(strm = FM_begin_message (p->Dst, 3*sizeof(ULONG), p->HType))) ; SBuf.H.HRHandle.Owner = htonl(p->H.HRHandle.Owner); FM_send_piece (strm, &(SBuf.H.HRHandle.Owner), sizeof(ULONG)); SBuf.H.HRHandle.LocalID = htonl(p->H.HRHandle.LocalID); FM_send_piece (strm, &(SBuf.H.HRHandle.LocalID), sizeof(ULONG)); SBuf.H.HRHandle.ReturnCode = htonl(p->H.HRHandle.ReturnCode); FM_send_piece (strm, &(SBuf.H.HRHandle.ReturnCode), sizeof(ULONG)); FM_end_message (strm); } else if (p->HType == fmh_ReplyNewGroup) { /* send reply to handle creation of a new group */ while (!(strm = FM_begin_message (p->Dst, 2*sizeof(ULONG), p->HType))) ; SBuf.H.HRNewGroup.LocalID = htonl(p->H.HRNewGroup.LocalID); FM_send_piece (strm, &(SBuf.H.HRNewGroup.LocalID), sizeof(ULONG)); SBuf.H.HRNewGroup.ReturnCode = htonl(p->H.HRNewGroup.ReturnCode); FM_send_piece (strm, &(SBuf.H.HRNewGroup.ReturnCode), sizeof(ULONG)); FM_end_message (strm); } else if (p->HType == fmh_ReplyRegister) { /* send reply to request registration of a new group */ while (!(strm = FM_begin_message (p->Dst, sizeof(ULONG), p->HType))) ; SBuf.H.HRReg.ReturnCode = htonl(p->H.HRReg.ReturnCode); FM_send_piece (strm, &(SBuf.H.HRReg.ReturnCode), sizeof(ULONG)); FM_end_message (strm); } else if (p->HType == fmh_ReplySubscription) { /* send reply to request subscription to a group */ while (!(strm = FM_begin_message (p->Dst, sizeof(ULONG), p->HType))) ; SBuf.H.HRSub.ReturnCode = htonl(p->H.HRSub.ReturnCode); FM_send_piece (strm, &(SBuf.H.HRSub.ReturnCode), sizeof(ULONG)); FM_end_message (strm); } else if (p->HType == fmh_RequestLeave) { /* send reply to request subscription to a group */ while (!(strm = FM_begin_message (p->Dst, 2*sizeof(ULONG), p->HType))) ; SBuf.H.HRLeave.LocalID = htonl(p->H.HRLeave.LocalID); FM_send_piece (strm, &(SBuf.H.HRLeave.LocalID), sizeof(ULONG)); SBuf.H.HRLeave.NodeID = htonl(p->H.HRLeave.NodeID); FM_send_piece (strm, &(SBuf.H.HRLeave.NodeID), sizeof(ULONG)); FM_end_message (strm); } else { PMCAST_Quit ("PMCAST_FlushOVQ: invalid handler type found\n"); } free (p); } MCAST_OVFirst = NULL; MCAST_OVLast = NULL; MCAST_Stats.NFSends++;}void PMCAST_FlushDefQ (void);/************************************************************************ * * MCAST_Tick * flush overflow queue before returning * */void MCAST_Tick (void){ /* flush the overflow queue if it is not empty */ if ((MCAST_OVFirst != NULL) && (! UTIL_InHandler())) { PMCAST_FlushOVQ(); } /* if deferred messages, process them */ if ((UTIL_InHandler() == 0) && (MCAST_DefFirst != NULL)) {/*printf ("flushing incoming message queue\n");fflush(stdout);*/ PMCAST_FlushDefQ (); }}/************************************************************************ ************************************************************************ ************************************************************************ * * DESTINATION LIST TABLE * This table indicates the PROCESSORS subscribed to each group * Each processor has one such table, with an entry for each group it owns * ************************************************************************ ************************************************************************ ***********************************************************************//* equivalent of null pointer, but for integer type */#define NULLINT -12345/************************************************************************ * LOCAL MULTICAST TABLE * one entry in table for each multicast group owned by this processor * index by local ID for group (MCAST_Owner[group #].LocalID) * each entry contains a list of destinations for a single group * This table is used on the sending side of the group *//* Each entry corresponds to a single multicast group */struct MCAST_TblEntry { int Status; /* MCAST_UNUSED, MCAST_OWNER, or MCAST_CACHED */ int NDsts; /* number of destination processors */ struct MCAST_PERecord *DList; /* list of processors subscribed to group */ struct MCAST_PERecord *CList; /* processors with cached copy of DList */ int NextFree; /* index to next free entry in table */};struct MCAST_PERecord { long PE; /* destination processor */ long SubIndex; /* index into destinations subscription tbl */ struct MCAST_PERecord *Next; /* next record in destination list */};MB_BufferPool FreePERecords; /* pool of memory for PE records */struct MCAST_TblEntry MCAST_Tbl[MCAST_MAX_GROUPS_PER_PE];int TblFree; /* next free entry in Tbl *//************************************************************************ * HANDLE CACHE DATA STRUCTURES *//* * Each processor maintains a cache of handles it can send messages to * Access to the cache occurs on message sends * If hit, simply send messages to destination list * If miss, load handle and destination list into cache, then do send * * Changes to the destination list are sent to the owner, which sends * update messages to all processors with cache copies of the handle */struct MCAST_HCacheEntry{ int InUse; /* 1 if entry is in use */ struct MCAST_HandleS Handle; int LocalID; /* index into local destination list */ int Next; /* next element in overflow list */};struct MCAST_HCacheEntry MCAST_HCache[MCAST_HCACHE_SIZE+MCAST_HCACHE_OV_SIZE];int HCacheFree; /* next free entry in Cache */struct ReplyDList ReplyDListBuf;BOOLEAN WaitForDListReply;/************************************************************************ * Procedure to initialize table, allocate memory * NumDstRec is the number of destination records to be created */long PMCAST_InitMCastTbl (int NumDstRec){ int i; /* Allocate memory for PE records */ FreePERecords = MB_MakePool (NumDstRec, sizeof (struct MCAST_PERecord)); if (FreePERecords == NULL) return (MCAST_MemoryError); /* initialize multcast table; string array elements into a free list */ TblFree = NULLINT; for (i=MCAST_MAX_GROUPS_PER_PE-1; i>=0; i--) { MCAST_Tbl[i].Status = MCAST_UNUSED; /* not in use */ MCAST_Tbl[i].NDsts = 0; MCAST_Tbl[i].DList = NULL; MCAST_Tbl[i].NextFree = TblFree; TblFree = i; } /* initialize handle cache; first hash array */ for (i=0; i<MCAST_HCACHE_SIZE; i++) { MCAST_HCache[i].InUse = 0; MCAST_HCache[i].Next = NULLINT; } HCacheFree = NULLINT; for (i=MCAST_HCACHE_SIZE+MCAST_HCACHE_OV_SIZE-1; i>=MCAST_HCACHE_SIZE; i--) { MCAST_HCache[i].InUse = 0; MCAST_HCache[i].Next = HCacheFree; HCacheFree = i; } /* initialize statistics */ PMCAST_InitStats(); return (MCAST_Success);}/************************************************************************ * Procedure to allocate a table entry for a new group (group creation) * Status = MCAST_OWNER or MCAST_CACHED */long PMCAST_NewGroup (int Status){
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -