📄 mcast.c
字号:
/*---------------------------------------------------------------------------*//* Mcast implementation. *//* Author(s): Richard Fujimoto. *//* $Revision: 1.6 $ $Name: v26apr05 $ $Date: 2004/02/10 02:58:05 $ *//*---------------------------------------------------------------------------*/#include <stdlib.h>#include <string.h>#include <stdio.h>#include <stdarg.h>#include "mcast.h"#include "bufpool.h"/*---------------------------------------------------------------------------*/int UTIL_EnterHandler(int NewHandler);int UTIL_ExitHandler(void);int UTIL_InHandler(void);int UTIL_GetInHandlerCount(void);void UTIL_Quit (const char *s, ...);/*---------------------------------------------------------------------------*/int PMCAST_Name2PEHash (const char *);int PMCAST_Name2IndexHash (const char *);void PMCAST_Quit(char *);struct MCAST_NameEntry * PMCAST_LookUpName (const char *);long PMCAST_AddName (const char *, struct MCAST_NameEntry **);/*---------------------------------------------------------------------------*//**************************************************************************** * * CREATION OF A NEW MULTICAST GROUP * *//* * Message sent to request creation of new group * Source: Processor requesting creation of new group * Destination: Processor that is owner for new group */struct RequestNewGroup{ int dummy;};/* * Reply to request to create new group * Source: processor that created group (owner) * Destination: processor requesting creation of new group */struct ReplyNewGroup{ ULONG LocalID; /* local ID if successful */ ULONG ReturnCode; /* 0 if successful, 1 if failed */};/**************************************************************************** * Message to register group with appropriate server * Source: processor requesting creation of group * Destination: processor owning name list that includes group */struct RequestRegister{ ULONG Owner; /* processor owning group */ ULONG LocalID; /* ID within that processor */ ULONG Length; /* length of char string, including terminating \0 */ char Name[MCAST_MAX_NAME_LEN]; /* name of group */};struct ReplyRegister{ ULONG ReturnCode; /* 0 if successful, 1 if failed */};/**************************************************************************** * * SUBSCRIPTION/UNSUBSCRIPTION TO A MULTICAST GROUP * *//* * Message sent to request subscription of a processor to a group * Source: processor making request * Destination: owner of group */struct RequestSubscription{ ULONG LocalID; /* identifies group being subscribed to */ ULONG NodeID; /* processor being added to group */ ULONG SubIndex; /* index into subscriber's subnscription table */};/* * Reply to request to subscribe to a group * Source: processor owning group * Destination: processor that made request */struct ReplySubscription{ ULONG ReturnCode; /* 0 if successful, 1 if failed */};/* * Message sent to request unsubscribing of a processor to a group * Source: processor making request * Destination: owner of group */struct RequestLeave{ ULONG LocalID; /* identifies group unsubscribed */ ULONG NodeID; /* processor leaving group */};/**************************************************************************** * * GET HANDLE TO A MULTICAST GROUP * *//* message to request subscription to an existing group *//* sent by requestor to owner of the group */struct RequestHandle{ ULONG Length; /* length of char string, including terminating \0 */ char Name[MCAST_MAX_NAME_LEN];}; struct ReplyHandle{ ULONG Owner; /* Processor owning handle, if found */ ULONG LocalID; /* local ID in that processor, if found */ ULONG ReturnCode; /* 0 if successful, 1 if failed */};/**************************************************************************** * * GET DESTINATION LIST FOR A MULTICAST GROUP (HANDLE_CACHE MISS) * *//* message to request list of destinations; sent on miss in handle cache */struct RequestDList{ ULONG LocalID; /* local ID for requested group */ ULONG CachedLocalID; /* local ID in processor w/ cache */}; struct ReplyDList{ ULONG NDsts; /* number of destinations */ ULONG ReturnCode; /* 0 if successful, 1 if failed */ ULONG DList[MAX_PE]; /* list of destination PEs for group */ ULONG SList[MAX_PE]; /* list of subscription indexes for group */};struct UpdateDList{ ULONG LocalID; /* local ID in cache to be updated */ ULONG PE; /* processor to add/delete from cache */ ULONG SubIndex; /* subscription index at destination */};/**************************************************************************** * * APPLICATION MESSAGES SENT TO MULTICAST GROUP * *//* message to be forward message to an existing group *//* sent by sender of message to owner of group */struct ForwardMessage{ ULONG LocalID; /* local ID of group */ ULONG Length; /* Length of message in bytes */ ULONG MsgType; /* application defined message type */};/* message sent by owner to subscribers of group */struct MCastMessage{ ULONG SubIndex; /* index into subscription table at receiver */ ULONG Length; /* Length of message in bytes */ ULONG MsgType; /* application defined message type */};/*---------------------------------------------------------------------------*//************************************************************************ * Local definitions *//* * Status of a table entry in Destination Table (MCAST_Tbl) * OWNER means this processor owns the group (may also be in this PE's cache) * CACHED means this processor doesn't own it, but has cached copy */#define MCAST_UNUSED 0#define MCAST_OWNER 1#define MCAST_CACHED 2/* structure for a handle */struct MCAST_HandleS{ int Owner; /* Processor that owns group */ int LocalID; /* ID within that owner */};/************************************************************************ * indices for FM message handlers, one per message type */unsigned int fmh_RequestNewGroup, /* request controller to create a new group */ fmh_ReplyNewGroup, /* reply, new group created */ fmh_RequestSubscription, /* request to owner to subscribe to a group */ fmh_ReplySubscription, /* indicates subscription created or not */ fmh_RequestLeave, /* request to owner to leave a group */ fmh_RequestHandle, /* request name list processor to send handle */ fmh_ReplyHandle, /* reply to request for handle */ fmh_RequestRegister, /* request to register group with name server */ fmh_ReplyRegister, /* reply to registration request */ fmh_RequestDList, /* request destination list of group from owner */ fmh_ReplyDList, /* reply to request for destination list */ fmh_UserMessage, /* message sent to group by owner */ fmh_UserForward, /* message sent to forwarded to distr list */ fmh_AddDList, /* handler to update cached DList by adding a PE */ fmh_SBarrier, /* handler for simple barrier primitive */ fmh_Abort; /* error occurred, abort program */int PMCAST_RequestNewGroup(FM_stream *, unsigned);int PMCAST_ReplyNewGroup(FM_stream *, unsigned);int PMCAST_RequestSubscription(FM_stream *, unsigned);int PMCAST_ReplySubscription(FM_stream *, unsigned);int PMCAST_RequestLeave(FM_stream *, unsigned);int PMCAST_RequestHandle(FM_stream *, unsigned);int PMCAST_ReplyHandle(FM_stream *, unsigned);int PMCAST_RequestRegister(FM_stream *, unsigned);int PMCAST_ReplyRegister(FM_stream *, unsigned);int PMCAST_RequestDList(FM_stream *, unsigned);int PMCAST_ReplyDList(FM_stream *, unsigned);int PMCAST_UserMessage(FM_stream *, unsigned);int PMCAST_UserForward(FM_stream *, unsigned);int PMCAST_AddDList(FM_stream *, unsigned);int PMCAST_SBarrier(FM_stream *, unsigned);int PMCAST_Abort(FM_stream *, unsigned);void PMCAST_Quit (char *);long PMCAST_MCast(int, void *, ULONG, ULONG);/************************************************************************ * * ***********************************************************************//* flag to indicate blank entry in destination list */#define NO_NODE -1/************************************************************************ ************************************************************************ ************************************************************************ * * MCAST STATISTICS * ************************************************************************ ************************************************************************ ***********************************************************************/struct{ /* counts of number of time primitives invoked */ long NSends; /* number of multicast sends */ long NRemoteHandlerCalls; /* handler calls from incoming messages */ long NLocalHandlerCalls; /* handler calls from local messages */ /* counts indicating number of time diferent handlers called */ long NRequestNewGroup; long NReplyNewGroup; long NRequestSubscription; long NReplySubscription; long NRequestLeave; long NRequestHandle; long NReplyHandle; long NRequestRegister; long NReplyRegister; long NRequestDList; long NReplyDList; long NUserMessage; long NUserForward; long NSBarrier; long NHCMisses; /* number of hits in handle cache */ long NHCHits; /* number of hits in handle cache */ long NOHits; /* number of hits because sender is owner */ long NAddDList; /* number of HC updates to add to the DList */ long NFlushes; /* number of times overflow queue was flushed */ long NFSends; /* number of messages sent using flush mechanism */} MCAST_Stats;/************************************************************************ * * PMCAST_InitStats * Initialize MCAST statistics * */void PMCAST_InitStats(){ MCAST_Stats.NSends = 0; MCAST_Stats.NLocalHandlerCalls = 0; MCAST_Stats.NRemoteHandlerCalls = 0; MCAST_Stats.NReplyNewGroup = 0; MCAST_Stats.NRequestSubscription = 0; MCAST_Stats.NReplySubscription = 0; MCAST_Stats.NRequestLeave = 0; MCAST_Stats.NRequestHandle = 0; MCAST_Stats.NReplyHandle = 0; MCAST_Stats.NRequestRegister = 0; MCAST_Stats.NReplyRegister = 0; MCAST_Stats.NRequestDList = 0; MCAST_Stats.NReplyDList = 0; MCAST_Stats.NUserMessage = 0; MCAST_Stats.NUserForward = 0; MCAST_Stats.NSBarrier = 0; MCAST_Stats.NHCMisses = 0; MCAST_Stats.NHCHits = 0; MCAST_Stats.NOHits = 0; MCAST_Stats.NAddDList = 0; MCAST_Stats.NFlushes = 0; MCAST_Stats.NFSends = 0;}/************************************************************************ * * MCAST_PrintStats * Print MCAST statistics on local processor * */void MCAST_PrintLocalStats(void){ printf ("PE %d: Calls_To_MCAST_Send = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NSends); printf ("PE %d: Remote_Handler_Calls = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NRemoteHandlerCalls); printf ("PE %d: Local_Handler_Calls = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NLocalHandlerCalls); printf ("PE %d: Received_Multicast Messages = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NUserMessage); printf ("PE %d: Forwarded_Multicast_Messages = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NUserForward); printf ("PE %d: Barrier Messages = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NSBarrier); printf ("PE %d: Request_New_Group = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NRequestNewGroup); printf ("PE %d: Reply_New_Group = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NReplyNewGroup); printf ("PE %d: Request_Subscription = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NRequestSubscription); printf ("PE %d: Reply_Subscription = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NReplySubscription); printf ("PE %d: Request_Leave = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NRequestLeave); printf ("PE %d: Request_Handle = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NRequestHandle); printf ("PE %d: Reply_Handle = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NReplyHandle); printf ("PE %d: Request_Register = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NRequestRegister); printf ("PE %d: Reply_Register = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NReplyRegister); printf ("PE %d: Request_DList = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NRequestDList); printf ("PE %d: Reply_DList = %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NReplyDList); printf ("PE %d: Handle Cache Hits = %d (%.2f%%)\n", (int)MCAST_nodeid, (int) MCAST_Stats.NHCHits, 100.0*(double)MCAST_Stats.NHCHits / (double) (MCAST_Stats.NHCHits+MCAST_Stats.NHCMisses)); printf ("PE %d: Handle Cache Misses = %d (%.2f%%)\n", (int)MCAST_nodeid, (int) MCAST_Stats.NHCMisses, 100.0*(double)MCAST_Stats.NHCMisses / (double) (MCAST_Stats.NHCHits+MCAST_Stats.NHCMisses)); printf ("PE %d: Owner Hits: %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NOHits); printf ("PE %d: Handle Cache Updates (add): %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NAddDList); printf ("PE %d: Queue Flushes: %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NFlushes); printf ("PE %d: Flushed Messages: %d\n", (int)MCAST_nodeid, (int) MCAST_Stats.NFSends); fflush (stdout);}/************************************************************************ ************************************************************************ ************************************************************************ * * FLOW CONTROL: * If an application level procedure tries to send a message and FM cannot * accept it (probably because of lack of resources), retry sending it * MCAST_MAXRETRY times, and if still not successful, give up and abort * the program. * If a handler tries to send and it fails, place the message into the * overflow message queue, then try to resend later (when not in a handler) * * OVERFLOW MESSAGE QUEUE * FCFS queue holding messages that could not be accepted by FMi when * a message handler tried to send it * MCAST_Tick() will busy wait, until it has sent all messages in the * queue, before returning control back to the application. *
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -