⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mpid_nem_ckpt.c

📁 fortran并行计算包
💻 C
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2006 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "mpid_nem_impl.h"#define printf_dd(x...) /*printf (x) */int MPID_nem_ckpt_logging_messages = 0; /* are we in logging-message-mode? */int MPID_nem_ckpt_sending_markers = 0; /* are we in the process of sending markers? */struct cli_message_log_total *MPID_nem_ckpt_message_log = 0; /* are we replaying messages? */#ifdef ENABLED_CHECKPOINTING#include "cli.h"static int* log_msg;static int* sent_marker;static unsigned short current_wave;static int next_marker_dest;static void checkpoint_shutdown();static void restore_env (int rank);extern unsigned short *MPID_nem_recv_seqno;static int _rank;#undef FUNCNAME#define FUNCNAME MPID_nem_ckpt_init#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)intMPID_nem_ckpt_init (int ckpt_restart){    int mpi_errno = MPI_SUCCESS;    int num_procs;    int rank;    int i;    MPIU_CHKPMEM_DECL(3);    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_CKPT_INIT);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_CKPT_INIT);    num_procs = MPID_nem_mem_region.num_procs;    rank = _rank = MPID_nem_mem_region.rank;    if (!ckpt_restart)    {	process_id_t *procids;        MPIU_CHKPMEM_MALLOC (procids, process_id_t *, sizeof (*procids) * num_procs, mpi_errno, "procids");	for (i = 0; i < num_procs; ++i)	    procids[i] = i;		cli_init (num_procs, procids, rank, checkpoint_shutdown);    }    MPIU_CHKPMEM_MALLOC (log_msg, int *, sizeof (*log_msg) * num_procs, mpi_errno, "log_msg");    MPIU_CHKPMEM_MALLOC (sent_marker, int *, sizeof (*sent_marker) * num_procs, mpi_errno, "sent_marker");    for (i = 0; i < num_procs; ++i)    {	log_msg[i] = 0;	sent_marker[i] = 0;    }    MPID_nem_ckpt_logging_messages = 0;    MPID_nem_ckpt_sending_markers = 0;    current_wave = 0;    MPIU_CHKPMEM_COMMIT(); fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_CKPT_INIT);    return mpi_errno; fn_fail:    MPIU_CHKPMEM_REAP();    goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPID_nem_ckpt_finalize#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)intMPID_nem_ckpt_finalize(){    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_CKPT_FINALIZE);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_CKPT_FINALIZE);    MPIU_Free (log_msg);    MPIU_Free (sent_marker);    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_CKPT_FINALIZE);    return MPI_SUCCESS;}#undef FUNCNAME#define FUNCNAME MPID_nem_ckpt_maybe_take_checkpoint#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPID_nem_ckpt_maybe_take_checkpoint(){    int mpi_errno = MPI_SUCCESS;    int ret;    struct cli_marker marker;    int num_procs;    int rank;    int i;    MPIU_CHKLMEM_DECL(1);    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_CKPT_MAYBE_TAKE_CHECKPOINT);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_CKPT_MAYBE_TAKE_CHECKPOINT);    num_procs = MPID_nem_mem_region.num_procs;    rank = MPID_nem_mem_region.rank;    ret = cli_check_for_checkpoint_start (&marker);    switch (ret)    {    case CLI_CP_BEGIN:		MPIU_Assert (MPID_nem_ckpt_logging_messages == 0);	/* initialize the state */	MPID_nem_ckpt_logging_messages = num_procs;	MPID_nem_ckpt_sending_markers = num_procs;	for (i = 0; i < num_procs; ++i)	{	    log_msg[i] = 1;	    sent_marker[i] = 0;	}		/* we don't send messages to ourselves, so we pretend we sent and received a marker */	--MPID_nem_ckpt_sending_markers;	ret = cli_on_marker_receive (&marker, rank);        MPIU_ERR_CHKANDJUMP (ret != CLI_CP_MARKED, mpi_errno, MPI_ERR_OTHER, "**intern");	--MPID_nem_ckpt_logging_messages;	log_msg[rank] = 0;	sent_marker[rank] = 1;	next_marker_dest = 0;	current_wave = marker.checkpoint_wave_number;	mpi_errno = MPID_nem_ckpt_send_markers();        if (mpi_errno) MPIU_ERR_POP (mpi_errno);	break;    case CLI_RESTART:	{	    int newrank;	    int newsize;	    struct cli_emitter_based_message_log *per_rank_log;	    mpi_errno = restore_env (rank);            if (mpi_errno) MPIU_ERR_POP (mpi_errno);	    _MPID_nem_init (0, NULL, &newrank, &newsize, 1);	    MPIU_Assert (newrank == rank);	    MPIU_Assert (newsize == num_procs);            /* we don't use the per_rank_log, so we discard it */            MPIU_CHKLMEM_MALLOC (per_rank_log, struct cli_emitter_based_message_log *, sizeof (struct cli_emitter_based_message_log) * num_procs, mpi_errno, "per rank log");	    MPID_nem_ckpt_message_log = cli_get_network_state (per_rank_log);	    break;	}    case CLI_NOTHING:	break;    default:        MPIU_ERR_CHKANDJUMP (ret != CLI_CP_MARKED, mpi_errno, MPI_ERR_OTHER, "**intern");    } fn_exit:    MPIU_CHKLMEM_FREEALL();    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_CKPT_MAYBE_TAKE_CHECKPOINT);    return mpi_errno; fn_fail:    goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPID_nem_ckpt_got_marker#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)voidMPID_nem_ckpt_got_marker (MPID_nem_cell_ptr_t *cell, int *in_fbox){    int mpi_errno = MPI_SUCCESS;    int ret;    int source;    struct cli_marker marker;    int num_procs;    int rank;    int i;    MPIDI_VC_t *vc;    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_CKPT_GOT_MARKER);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_CKPT_GOT_MARKER);    num_procs = MPID_nem_mem_region.num_procs;    rank = MPID_nem_mem_region.rank;    marker.checkpoint_wave_number = (*cell)->pkt.ckpt.wave;    source = (*cell)->pkt.ckpt.source;    if (!*in_fbox)    {	MPIDI_PG_Get_vc (MPIDI_Process.my_pg, source, &vc);	MPID_nem_mpich2_release_cell (*cell, vc);    }    else    {	MPID_nem_mpich2_release_fbox (*cell);    }    *cell = NULL;    *in_fbox = 0;    ret = cli_on_marker_receive (&marker, source);    switch (ret)    {    case CLI_CP_BEGIN:	MPIU_Assert (MPID_nem_ckpt_logging_messages == 0);	/* initialize the state */	MPID_nem_ckpt_logging_messages = num_procs;	MPID_nem_ckpt_sending_markers = num_procs;	for (i = 0; i < num_procs; ++i)	{	    log_msg[i] = 1;	    sent_marker[i] = 0;	}		/* we received one from the source of this message */	--MPID_nem_ckpt_logging_messages;	log_msg[source] = 0;		/* we don't send messages to ourselves, so we pretend we sent and received a marker */	--MPID_nem_ckpt_sending_markers;	ret = cli_on_marker_receive (&marker, rank);        MPIU_ERR_CHKANDJUMP (ret != CLI_CP_MARKED, mpi_errno, MPI_ERR_OTHER, "**intern");	--MPID_nem_ckpt_logging_messages;	log_msg[rank] = 0;	sent_marker[rank] = 1;	next_marker_dest = 0;	current_wave = marker.checkpoint_wave_number;	mpi_errno = MPID_nem_ckpt_send_markers();        if (mpi_errno) MPIU_ERR_POP (mpi_errno);	break;    case CLI_CP_MARKED:	MPIU_Assert (MPID_nem_ckpt_logging_messages);	MPIU_Assert (log_msg[source]);	--MPID_nem_ckpt_logging_messages;	log_msg[source] = 0;	break;    case CLI_RESTART:	{	    int newrank;	    int newsize;	    struct cli_emitter_based_message_log *per_rank_log;	    --MPID_nem_recv_seqno[source]; /* Will thie really work????? */	    printf_dd ("%d: cli_on_marker_recv: CLI_CP_RESTART wave = %d\n", rank, marker.checkpoint_wave_number);	    restore_env (rank);	    printf_dd ("%d: before _MPID_nem_init\n", rank);	    _MPID_nem_init (0, NULL, &newrank, &newsize, 1);	    printf_dd ("%d: after _MPID_nem_init\n", rank);	    MPIU_Assert (newrank == rank);	    MPIU_Assert (newsize == num_procs);	    printf_dd ("%d: _MPID_nem_init done\n", rank);            /* we don't use the per_rank_log, so we discard it */            MPIU_CHKLMEM_MALLOC (per_rank_log, struct cli_emitter_based_message_log *, sizeof (struct cli_emitter_based_message_log) * num_procs, mpi_errno, "per rank log");	    MPID_nem_ckpt_message_log = cli_get_network_state (per_rank_log);	    break;	}    default:        MPIU_ERR_CHKANDJUMP (ret != CLI_CP_MARKED, mpi_errno, MPI_ERR_OTHER, "**intern");	return;    } fn_exit:    MPIU_CHKLMEM_FREEALL();    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_CKPT_GOT_MARKER);    return mpi_errno; fn_fail:    goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPID_nem_ckpt_log_message#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)intMPID_nem_ckpt_log_message (MPID_nem_cell_ptr_t cell){    int mpi_errno = MPI_SUCCESS;    int source;    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_CKPT_LOG_MESSAGE);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_CKPT_LOG_MESSAGE);    source = cell->pkt.mpich2.source;    if (log_msg[source])    {	cli_log_message (cell, cell->pkt.header.datalen + sizeof(MPIDI_CH3_Pkt_t) + MPID_NEM_OFFSETOF (MPID_nem_cell_t, pkt.mpich2.payload), source);    }    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_CKPT_LOG_MESSAGE);    return mpi_errno;}/* MPID_nem_ckpt_send_markers() -- keep sending markers until we're done or out of cells */#undef FUNCNAME#define FUNCNAME MPID_nem_ckpt_send_markers#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)intMPID_nem_ckpt_send_markers(){    int mpi_errno = MPI_SUCCESS;    int try_again;    int num_procs = MPID_nem_mem_region.num_procs;    MPIDI_VC_t vc;    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_CKPT_SEND_MARKERS);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_CKPT_SEND_MARKERS);    MPIU_Assert (MPID_nem_ckpt_sending_markers);    MPIU_Assert (next_marker_dest < num_procs);    while (next_marker_dest < num_procs)    {	if (sent_marker[next_marker_dest])	{	    ++next_marker_dest;	    continue;	}		MPIDI_PG_Get_vc (MPIDI_Process.my_pg, next_marker_dest, &vc);	mpi_errno = MPID_nem_mpich2_send_ckpt_marker (current_wave, vc, &try_again);        if (mpi_errno) MPIU_ERR_POP (mpi_errno);        if (try_again)            break;	sent_marker[next_marker_dest] = 1;	++next_marker_dest;	--MPID_nem_ckpt_sending_markers;    } fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_CKPT_SEND_MARKERS);    return mpi_errno; fn_fail:    goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPID_nem_ckpt_replay_message#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)intMPID_nem_ckpt_replay_message (MPID_nem_cell_ptr_t *cell){    int mpi_errno = MPI_SUCCESS;    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_CKPT_REPLAY_MESSAGE);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_CKPT_REPLAY_MESSAGE);    MPIU_Assert (MPID_nem_ckpt_message_log);    *cell = (MPID_nem_cell_ptr_t)MPID_nem_ckpt_message_log->ptr;    (*cell)->pkt.header.type = MPID_NEM_PKT_CKPT_REPLAY;    MPID_nem_ckpt_message_log = MPID_nem_ckpt_message_log->next;    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_CKPT_REPLAY_MESSAGE);    return mpi_errno;}#undef FUNCNAME#define FUNCNAME MPID_nem_ckpt_free_msg_log#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)voidMPID_nem_ckpt_free_msg_log(){    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_CKPT_FREE_MSG_LOG);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_CKPT_FREE_MSG_LOG);    MPIU_Assert (!MPID_nem_ckpt_message_log);    cli_message_log_free();    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_CKPT_FREE_MSG_LOG);}#undef FUNCNAME#define FUNCNAME checkpoint_shutdown#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static intcheckpoint_shutdown(){    int mpi_errno = MPI_SUCCESS;    mpi_errno = MPID_nem_ckpt_shutdown ();    if (mpi_errno) MPIU_ERR_POP (mpi_errno); fn_exit:    return mpi_errno; fn_fail:    goto fn_exit;}#define MAX_STR_LEN 256#undef FUNCNAME#define FUNCNAME restore_env#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)static intrestore_env (int rank){    int mpi_errno = MPI_SUCCESS;    FILE *fd;    char env_filename[MAX_STR_LEN];    char var[MAX_STR_LEN], val[MAX_STR_LEN];    int ret;    MPIU_Snprintf (env_filename, MAX_STR_LEN, "/tmp/cli-restart-env:%d", rank);    fd = fopen (env_filename, "r");    MPIU_ERR_CHKANDJUMP1 (!fd, mpi_errno, MPI_ERR_OTHER, "**open", "**open %s", strerror (errno));    /* { *//* 	int i; *//* 	for (i = 0; i < sizeof(FILE); ++i) *//* 	{ *//* 	    if (!(i%8)) *//* 		printf (" "); *//* 	    if (!(i%64)) *//* 		printf ("\n"); *//* 	    printf ("%02x", ((unsigned char *)fd)[i]); *//* 	} *//* 	printf ("\n--\n"); *//* 	fgets (var, MAX_STR_LEN, fd); *//* 	printf_dd ("var = %s\n", var); *//*     } */    ret = fscanf (fd, "%[^=]=%[^\n]\n", var, val);    while (ret != EOF)    {	if (ret == 2)	{	    ret = setenv (var, val, 1);            MPIU_ERR_CHKANDJUMP (ret != 0, mpi_errno, MPI_ERR_OTHER, "**setenv");	}	ret = fscanf (fd, "%[^=]=%[^\n]\n", var, val);    } fn_exit:    return mpi_errno; fn_fail:    goto fn_exit;}#endif

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -