📄 proc.c
字号:
/* * Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. * Copyright (c) 2004-2006 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2006 High Performance Computing Center Stuttgart, * University of Stuttgart. All rights reserved. * Copyright (c) 2004-2006 The Regents of the University of California. * All rights reserved. * Copyright (c) 2006 Cisco Systems, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow * * $HEADER$ */#include "ompi_config.h"#include <string.h>#include "opal/threads/mutex.h"#include "opal/util/output.h"#include "opal/util/show_help.h"#include "orte/util/sys_info.h"#include "orte/dss/dss.h"#include "orte/mca/oob/oob.h"#include "orte/mca/ns/ns.h"#include "orte/mca/gpr/gpr.h"#include "orte/mca/errmgr/errmgr.h"#include "orte/util/proc_info.h"#include "ompi/proc/proc.h"#include "ompi/mca/pml/pml.h"#include "ompi/datatype/dt_arch.h"#include "ompi/datatype/convertor.h"#include "ompi/runtime/params.h"#include "ompi/runtime/mpiruntime.h"static opal_list_t ompi_proc_list;static opal_mutex_t ompi_proc_lock;ompi_proc_t* ompi_proc_local_proc = NULL;static void ompi_proc_construct(ompi_proc_t* proc);static void ompi_proc_destruct(ompi_proc_t* proc);static int setup_registry_callback(void);static void callback(orte_gpr_notify_data_t *data, void *cbdata);OBJ_CLASS_INSTANCE( ompi_proc_t, opal_list_item_t, ompi_proc_construct, ompi_proc_destruct);void ompi_proc_construct(ompi_proc_t* proc){ proc->proc_bml = NULL; proc->proc_pml = NULL; proc->proc_modex = NULL; OBJ_CONSTRUCT(&proc->proc_lock, opal_mutex_t); /* By default all processors are supposelly having the same architecture as me. Thus, * by default we run in a homogeneous environment. Later when the registry callback * get fired we will have to set the convertors to the correct architecture. */ proc->proc_convertor = ompi_mpi_local_convertor; OBJ_RETAIN( ompi_mpi_local_convertor ); proc->proc_arch = ompi_mpi_local_arch; proc->proc_flags = 0; /* By default, put NULL in the hostname. It may or may not get filled in later -- consumer of this field beware! */ proc->proc_hostname = NULL; OPAL_THREAD_LOCK(&ompi_proc_lock); opal_list_append(&ompi_proc_list, (opal_list_item_t*)proc); OPAL_THREAD_UNLOCK(&ompi_proc_lock);}void ompi_proc_destruct(ompi_proc_t* proc){ if (proc->proc_modex != NULL) { OBJ_RELEASE(proc->proc_modex); } /* As all the convertors are created with OBJ_NEW we can just call OBJ_RELEASE. All, except * the local convertor, will get destroyed at some point here. If the reference count is correct * the local convertor (who has the reference count increased in the datatype) will not get * destroyed here. It will be destroyed later when the ompi_ddt_finalize is called. */ OBJ_RELEASE( proc->proc_convertor ); if (NULL != proc->proc_hostname) { free(proc->proc_hostname); } OPAL_THREAD_LOCK(&ompi_proc_lock); opal_list_remove_item(&ompi_proc_list, (opal_list_item_t*)proc); OPAL_THREAD_UNLOCK(&ompi_proc_lock); OBJ_DESTRUCT(&proc->proc_lock);}int ompi_proc_init(void){ orte_process_name_t *peers; orte_std_cntr_t i, npeers, num_tokens; orte_jobid_t jobid; char *segment, **tokens; orte_data_value_t value = { {OBJ_CLASS(orte_data_value_t),0}, ORTE_NULL, NULL}; uint32_t ui32; int rc; OBJ_CONSTRUCT(&ompi_proc_list, opal_list_t); OBJ_CONSTRUCT(&ompi_proc_lock, opal_mutex_t); /* get all peers in this job */ if(ORTE_SUCCESS != (rc = orte_ns.get_peers(&peers, &npeers, NULL))) { opal_output(0, "ompi_proc_init: get_peers failed with errno=%d", rc); return rc; } /* find self */ for( i = 0; i < npeers; i++ ) { ompi_proc_t *proc = OBJ_NEW(ompi_proc_t); proc->proc_name = peers[i]; if(peers[i].vpid == ORTE_PROC_MY_NAME->vpid ) { ompi_proc_local_proc = proc; proc->proc_flags |= OMPI_PROC_FLAG_LOCAL; } } free(peers); /* setup registry callback to find everyone on my local node. Can't do a GPR get because we're in the middle of MPI_INIT, and we're setup for the GPR compound command -- so create a subscription which will be serviced later, at the end of the compound command. */ if (ORTE_SUCCESS != (rc = setup_registry_callback())) { return rc; } /* Here we have to add to the GPR the information about the current architecture. */ if (OMPI_SUCCESS != (rc = ompi_arch_compute_local_id(&ui32))) { return rc; } if (ORTE_SUCCESS != (rc = orte_dss.set(&value, &ui32, ORTE_UINT32))) { ORTE_ERROR_LOG(rc); return rc; } jobid = ORTE_PROC_MY_NAME->jobid; /* find the job segment on the registry */ if (ORTE_SUCCESS != (rc = orte_schema.get_job_segment_name(&segment, jobid))) { return rc; } /* get the registry tokens for this node */ if (ORTE_SUCCESS != (rc = orte_schema.get_proc_tokens(&tokens, &num_tokens, orte_process_info.my_name))) { ORTE_ERROR_LOG(rc); free(segment); return rc; } /* put the arch info on the registry */ if (ORTE_SUCCESS != (rc = orte_gpr.put_1(ORTE_GPR_TOKENS_OR | ORTE_GPR_KEYS_OR, segment, tokens, OMPI_PROC_ARCH, &value))) { ORTE_ERROR_LOG(rc); } free(segment); for (i=0; i < num_tokens; i++) { free(tokens[i]); tokens[i] = NULL; } if (NULL != tokens) free(tokens); return OMPI_SUCCESS;}int ompi_proc_finalize (void){ ompi_proc_t *proc, *nextproc, *endproc; proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list); nextproc = (ompi_proc_t*)opal_list_get_next(proc); endproc = (ompi_proc_t*)opal_list_get_end(&ompi_proc_list); OBJ_RELEASE(proc); while ( nextproc != endproc ) { proc = nextproc; nextproc = (ompi_proc_t *)opal_list_get_next(proc); OBJ_RELEASE(proc); } OBJ_DESTRUCT(&ompi_proc_list); return OMPI_SUCCESS;}ompi_proc_t** ompi_proc_world(size_t *size){ ompi_proc_t **procs; ompi_proc_t *proc; size_t count = 0; orte_ns_cmp_bitmask_t mask; orte_process_name_t my_name; /* check bozo case */ if (NULL == ompi_proc_local_proc) { return NULL; } mask = ORTE_NS_CMP_JOBID; my_name = ompi_proc_local_proc->proc_name; /* First count how many match this jobid */ OPAL_THREAD_LOCK(&ompi_proc_lock); for (proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list); proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list); proc = (ompi_proc_t*)opal_list_get_next(proc)) { if (ORTE_EQUAL == orte_ns.compare_fields(mask, &proc->proc_name, &my_name)) { ++count; } } /* allocate an array */ procs = (ompi_proc_t**) malloc(count * sizeof(ompi_proc_t*)); if (NULL == procs) { return NULL; } /* now save only the procs that match this jobid */ count = 0; for (proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list); proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list); proc = (ompi_proc_t*)opal_list_get_next(proc)) { if (ORTE_EQUAL == orte_ns.compare_fields(mask, &proc->proc_name, &my_name)) { procs[count++] = proc; } } OPAL_THREAD_UNLOCK(&ompi_proc_lock); *size = count; return procs;}ompi_proc_t** ompi_proc_all(size_t* size){ ompi_proc_t **procs = (ompi_proc_t**) malloc(opal_list_get_size(&ompi_proc_list) * sizeof(ompi_proc_t*)); ompi_proc_t *proc; size_t count = 0; if (NULL == procs) { return NULL; } OPAL_THREAD_LOCK(&ompi_proc_lock); for(proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list); proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list); proc = (ompi_proc_t*)opal_list_get_next(proc)) { OBJ_RETAIN(proc); procs[count++] = proc; } OPAL_THREAD_UNLOCK(&ompi_proc_lock); *size = count; return procs;}ompi_proc_t** ompi_proc_self(size_t* size){ ompi_proc_t **procs = (ompi_proc_t**) malloc(sizeof(ompi_proc_t*)); if (NULL == procs) { return NULL; } OBJ_RETAIN(ompi_proc_local_proc); *procs = ompi_proc_local_proc; *size = 1; return procs;}ompi_proc_t * ompi_proc_find ( const orte_process_name_t * name ){ ompi_proc_t *proc, *rproc=NULL; orte_ns_cmp_bitmask_t mask; /* return the proc-struct which matches this jobid+process id */ mask = ORTE_NS_CMP_CELLID | ORTE_NS_CMP_JOBID | ORTE_NS_CMP_VPID; OPAL_THREAD_LOCK(&ompi_proc_lock); for(proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list); proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list); proc = (ompi_proc_t*)opal_list_get_next(proc)) { if (ORTE_EQUAL == orte_ns.compare_fields(mask, &proc->proc_name, name)) { rproc = proc; break; } } OPAL_THREAD_UNLOCK(&ompi_proc_lock); return rproc;}static ompi_proc_t *ompi_proc_find_and_add(const orte_process_name_t * name, bool* isnew){ ompi_proc_t *proc, *rproc = NULL; orte_ns_cmp_bitmask_t mask; /* return the proc-struct which matches this jobid+process id */ mask = ORTE_NS_CMP_CELLID | ORTE_NS_CMP_JOBID | ORTE_NS_CMP_VPID; OPAL_THREAD_LOCK(&ompi_proc_lock); for(proc = (ompi_proc_t*)opal_list_get_first(&ompi_proc_list); proc != (ompi_proc_t*)opal_list_get_end(&ompi_proc_list);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -