📄 mcast.c
字号:
{ if (MCAST_SubList[i].InUse == 0) /* if entry not in use */ { if (*index == -1) *index = i; /* next free entry in case not found */ } else { if ((MCAST_SubList[i].Handle.Owner == handle->Owner) && (MCAST_SubList[i].Handle.LocalID == handle->LocalID)) /* if match */ { *index = i; return (TRUE); } } } /* fall out of loop if not found */ return (FALSE);}/************************************************************************ * Procedure to allocate an entry in the subscription list * SubIndex = index into subscription table to be filled in */void PMCAST_AllocSubscription (int SubIndex, MCAST_Handle Handle, MCAST_WhereProc WhereProc, void *WContext){ MCAST_SubList[SubIndex].InUse = 1; MCAST_SubList[SubIndex].Handle.Owner = Handle->Owner; MCAST_SubList[SubIndex].Handle.LocalID = Handle->LocalID; MCAST_SubList[SubIndex].NDsts = 0; MCAST_SubList[SubIndex].NextID = 0; MCAST_SubList[SubIndex].SList = NULL; MCAST_SubList[SubIndex].WhereProc = WhereProc; MCAST_SubList[SubIndex].WContext = WContext; MCAST_SubList[SubIndex].EndProc = NULL; MCAST_SubList[SubIndex].EContext = NULL; SubFree = MCAST_SubList[SubIndex].NextFree; MCAST_SubList[SubIndex].NextFree = NULLINT;}/************************************************************************ * * MCAST_SetWhereProc * Set the Where procedure for a group * */long MCAST_SetWhereProc(MCAST_Handle Handle, MCAST_WhereProc WhereProc, void *WContext){ long subindex; if (Handle == NULL) PMCAST_Quit ("MCAST_SetWhereProc passed illegal handle\n"); if (PMCAST_Handle2SubHash (Handle, &subindex)) { /* found subscription record */ MCAST_SubList[subindex].WhereProc = WhereProc; MCAST_SubList[subindex].WContext = WContext; return (MCAST_Success); } else { /* subscription record not found, create a new table entry */ if (subindex >= 0) { /* blank entry was found */ PMCAST_AllocSubscription (subindex, Handle, WhereProc, WContext); return (MCAST_Success); } else { /* subscription table full */ fprintf (stderr, "SetWhereProc: Table Overflow"); fprintf (stderr, "increase MCAST_MAX_SUBS_PER_PE\n"); return (MCAST_TableFull); } }}/************************************************************************ * * MCAST_SetEndProc (struct MCAST_HandleS * Handle) * Set the End procedure for a group * */long MCAST_SetEndProc(MCAST_Handle Handle, MCAST_EndProc EndProc, void *EContext){ long subindex; if (Handle == NULL) PMCAST_Quit ("MCAST_SetEndProc passed illegal handle\n"); if (PMCAST_Handle2SubHash (Handle, &subindex)) { /* found subscription record */ MCAST_SubList[subindex].EndProc = EndProc; MCAST_SubList[subindex].EContext = EContext; return (MCAST_Success); } else { /* subscription record not found, create a new table entry */ if (subindex >= 0) { /* blank entry was found */ PMCAST_AllocSubscription (subindex, Handle, NULL, NULL); MCAST_SubList[subindex].EndProc = EndProc; MCAST_SubList[subindex].EContext = EContext; return (MCAST_Success); } else { /* subscription table full */ fprintf (stderr, "SetEndProc: Table Overflow"); fprintf (stderr, "increase MCAST_MAX_SUBS_PER_PE\n"); return (MCAST_TableFull); } }}/************************************************************************ * Procedure to add a subscriber to an existing subscription list * GroupID = index into subscription table for the group */long PMCAST_AddSub (int GroupID, MCAST_HandleProc MsgHandler, char *Context, struct MCAST_SubRecord **p){ *p = MB_GetBuffer (FreeSubRecords); if ((*p) == NULL) return (MCAST_MemoryError); (*p)->Handler = MsgHandler; (*p)->Context = Context; (*p)->ID = MCAST_SubList[GroupID].NextID; (MCAST_SubList[GroupID].NextID)++; (MCAST_SubList[GroupID].NDsts)++; (*p)->Next = MCAST_SubList[GroupID].SList; MCAST_SubList[GroupID].SList = *p; return (MCAST_Success);}/************************************************************************ * Procedure to remove a subscriber from a group * GroupID = index into subscription table for group * SubscrID = ID for a particular subscription to this group */long PMCAST_RmSub (int GroupID, int SubscrID){ struct MCAST_SubRecord *p, *q; p = MCAST_SubList[GroupID].SList; if (p == NULL) return (MCAST_SubNotFound); if (p->ID == SubscrID) { /* remove first element in list */ MCAST_SubList[GroupID].SList = p->Next; MB_FreeBuffer (FreeSubRecords, p); (MCAST_SubList[GroupID].NDsts)--; return (MCAST_Success); } q = p; for (p=p->Next; p != NULL; q=p, p=p->Next) { /* if this is the record, remove from list */ if (p->ID == SubscrID) { q->Next = p->Next; MB_FreeBuffer (FreeSubRecords, p); (MCAST_SubList[GroupID].NDsts)--; return (MCAST_Success); } } return (MCAST_SubNotFound);}/************************************************************************ ************************************************************************ ************************************************************************ * Deferred Message Queue * Holds incoming messages that couldn't be immediately processed * because another handler is active * * Similar to overflow queue, except for incoming messages ************************************************************************ ************************************************************************ ***********************************************************************//************************************************************************ * * PMCAST_AddDefQ * Add an incoming message to the deferred queue * */void PMCAST_AddDefQ (unsigned int HType, void *Header, char *Msg){ struct MCAST_OVEntry *q; struct MCastMessage *MCM; struct ReplyDList *RDL; struct UpdateDList *RUDL; struct ReplyHandle *RH; struct ReplySubscription *RS; struct RequestLeave *RL; struct ReplyRegister *RR; struct ReplyNewGroup *RNG; int i; /* allocate space for queue entry and fill it in */ q = (struct MCAST_OVEntry *) malloc (sizeof (struct MCAST_OVEntry)); if (q == NULL) { PMCAST_Quit ("PMCAST_DefQ: ran out of memory\n"); } q->HType = HType; q->Next = NULL; /* copy contents of message into a buffer */ if (HType == fmh_UserMessage) { MCM = (struct MCastMessage *) Header; q->H.HUMessage.SubIndex = MCM->SubIndex; q->H.HUMessage.Length = MCM->Length; q->H.HUMessage.MsgType = MCM->MsgType; q->Data = Msg; } else if (HType == fmh_ReplyDList) { RDL = (struct ReplyDList *) Header; q->H.HRDList.NDsts = RDL->NDsts; q->H.HRDList.ReturnCode = RDL->ReturnCode; for (i=0; i<RDL->NDsts; i++) { q->H.HRDList.DList[i] = RDL->DList[i]; q->H.HRDList.SList[i] = RDL->SList[i]; } } else if (HType == fmh_AddDList) { RUDL = (struct UpdateDList *) Header; q->H.HUDList.LocalID = RUDL->LocalID; q->H.HUDList.PE = RUDL->PE; q->H.HUDList.SubIndex = RUDL->SubIndex; } else if (HType == fmh_ReplyHandle) { RH = (struct ReplyHandle *) Header; q->H.HRHandle.Owner = RH->Owner; q->H.HRHandle.LocalID = RH->LocalID; q->H.HRHandle.ReturnCode = RH->ReturnCode; } else if (HType == fmh_ReplyNewGroup) { RNG = (struct ReplyNewGroup *) Header; q->H.HRNewGroup.LocalID = RNG->LocalID; q->H.HRNewGroup.ReturnCode = RNG->ReturnCode; } else if (HType == fmh_ReplyRegister) { RR = (struct ReplyRegister *) Header; q->H.HRReg.ReturnCode = RR->ReturnCode; } else if (HType == fmh_ReplySubscription) { RS = (struct ReplySubscription *) Header; q->H.HRSub.ReturnCode = RS->ReturnCode; } else if (HType == fmh_RequestLeave) { RL = (struct RequestLeave *) Header; q->H.HRLeave.LocalID = RL->LocalID; q->H.HRLeave.NodeID = RL->NodeID; } else { PMCAST_Quit ("PMCAST_AddDefQ: invalid handler type found\n"); } /* add to queue */ if (MCAST_DefFirst == NULL) /* if queue empty */ { MCAST_DefFirst = q; MCAST_DefLast = q; } else { MCAST_DefLast->Next = q; MCAST_DefLast = q; }}/************************************************************************ * * PMCAST_FlushDefQ * Deliver messages in deferred queue. * */void PMCAST_FlushDefQ (void){ struct MCAST_OVEntry *p; struct MCAST_SubRecord *s; for (p=MCAST_DefFirst; p!=NULL; p=p->Next) { if (p->HType == fmh_UserMessage) { /* call handler for each subscriber */ for (s=MCAST_SubList[p->H.HUMessage.SubIndex].SList; s!=NULL; s=s->Next) { MCAST_Stats.NRemoteHandlerCalls++; s->Handler(p->Data, p->H.HUMessage.Length, s->Context, p->H.HUMessage.MsgType); if (MCAST_SubList[p->H.HUMessage.SubIndex].EndProc != NULL) { MCAST_SubList[p->H.HUMessage.SubIndex].EndProc(p->Data, p->H.HUMessage.Length, MCAST_SubList[p->H.HUMessage.SubIndex].EContext, p->H.HUMessage.MsgType); } } /* for each subscriber */ } else if (p->HType == fmh_ReplyDList) { /* reply for DList request */ PMCAST_Quit ("Deferred ReplyDList messages not implemented yet\n"); } else if (p->HType == fmh_AddDList) { /* add to DList request */ PMCAST_Quit ("Deferred AddDList messages not implemented yet\n"); } else if (p->HType == fmh_ReplyHandle) { /* add reply to handle request */ PMCAST_Quit ("Deferred ReplyHandle messages not implemented yet\n"); } else if (p->HType == fmh_ReplyNewGroup) { /* reply to handle creation of a new group */ PMCAST_Quit ("Deferred ReplyNewGroup messages not implemented yet\n"); } else if (p->HType == fmh_ReplyRegister) { /* reply to request registration of a new group */ PMCAST_Quit ("Deferred ReplyRegister messages not implemented yet\n"); } else if (p->HType == fmh_ReplySubscription) { /* send reply to request subscription to a group */ PMCAST_Quit ("Deferred ReplySubscription messages not implemented yet\n"); } else if (p->HType == fmh_RequestLeave) { /* send reply to request subscription to a group */ PMCAST_Quit ("Deferred RequestLEave messages not implemented yet\n"); } else { PMCAST_Quit ("PMCAST_FlushDefQ: invalid handler type found\n"); } free (p); } MCAST_DefFirst = NULL; MCAST_DefLast = NULL;}/************************************************************************ ************************************************************************ ************************************************************************ * NAME LIST * hash table containing mapping of group names to handles * Each processor has one such table for names that hash to that processor ************************************************************************ ************************************************************************ ***********************************************************************/struct MCAST_NameEntry{ char *Name; struct MCAST_HandleS Handle; struct MCAST_NameEntry *Next;}; struct MCAST_NameEntry * MCAST_NameList[MCAST_NAME_TBL_SIZE];/************************************************************************ ************************************************************************ ************************************************************************ ***********************************************************************//************************************************************************ * * MCAST_Init () * ***********************************************************************/void MCAST_Init (void)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -