⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pls_xgrid_client.m

📁 MPI stands for the Message Passing Interface. Written by the MPI Forum (a large committee comprising
💻 M
字号:
/* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana *                         University Research and Technology *                         Corporation.  All rights reserved. * Copyright (c) 2004-2005 The University of Tennessee and The University *                         of Tennessee Research Foundation.  All rights *                         reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,  *                         University of Stuttgart.  All rights reserved. * Copyright (c) 2004-2005 The Regents of the University of California. *                         All rights reserved. * $COPYRIGHT$ *  * Additional copyrights may follow *  * $HEADER$ */#import "orte_config.h"#import <stdio.h>#import "opal/util/path.h"#import "orte/orte_constants.h"#import "orte/mca/rml/rml.h"#import "orte/mca/ns/ns.h"#import "orte/mca/pls/base/base.h"#import "orte/mca/pls/base/pls_private.h"#import "orte/mca/pls/pls.h"#import "orte/mca/errmgr/errmgr.h"#import "orte/mca/ras/ras_types.h"#import "orte/mca/rmaps/rmaps.h"#import "orte/mca/smr/smr.h"#import "pls_xgrid_client.h"char **environ;@implementation PlsXGridClient/* init / finalize */-(id) init{    return [self initWithControllerHostname: NULL		 AndControllerPassword: NULL		 AndOrted: NULL		 AndCleanup: 1];}-(id) initWithControllerHostname: (char*) hostname	   AndControllerPassword: (char*) password			AndOrted: (char*) ortedname		      AndCleanup: (int) val{    if (self = [super init]) {	/* class-specific initialization goes here */	OBJ_CONSTRUCT(&state_cond, opal_condition_t);	OBJ_CONSTRUCT(&state_mutex, opal_mutex_t);	if (NULL != password) {	    controller_password = [NSString stringWithCString: password];	}	if (NULL != hostname) {	    controller_hostname = [NSString stringWithCString: hostname];	}	cleanup = val;	if (NULL != ortedname) {	    orted = [NSString stringWithCString: ortedname];	}	active_jobs = [NSMutableDictionary dictionary];    }    return self;}-(void) dealloc{    /* if supposed to clean up jobs, do so */    if (cleanup) {	NSArray *keys = [active_jobs allKeys];	NSEnumerator *enumerator = [keys objectEnumerator];	NSString *key;	XGJob *job;	XGActionMonitor *actionMonitor;        while (key = [enumerator nextObject]) {	    job = [grid jobForIdentifier: [active_jobs objectForKey: key]];	    actionMonitor = [job performDeleteAction];	    while (XGActionMonitorOutcomeNone == [actionMonitor outcome]) {		opal_progress();	    }	    /* we should have a result - find out if it worked */	    if (XGActionMonitorOutcomeSuccess != [actionMonitor outcome]) {		NSError *err = [actionMonitor error];		fprintf(stderr, "orte:pls:xgrid: cleanup failed: %s\n", 			[[err localizedDescription] cString]);	    }	}    }    /* need to shut down connection */    [connection finalize];    OBJ_DESTRUCT(&state_mutex);    OBJ_DESTRUCT(&state_cond);    [super dealloc];}/* accessors */-(NSString*) getOrted{    return orted;}-(void) setOrtedAsCString: (char*) name{    orted = [NSString stringWithCString: name];}-(void) setControllerPasswordAsCString: (char*) name{    controller_password = [NSString stringWithCString: name];}-(void) setControllerHostnameAsCString: (char*) password{    controller_hostname = [NSString stringWithCString: password];}-(void) setCleanUp: (int) val{    cleanup = val;}- (NSString *)servicePrincipal;{    NSString *myServicePrincipal = [connection servicePrincipal];    if (myServicePrincipal == nil) {	myServicePrincipal = [NSString stringWithFormat:@"xgrid/%@", [connection name]];    }    opal_output_verbose(1, orte_pls_base.pls_output,			"orte:pls:xgrid: Kerberos servicePrincipal: %s",			[myServicePrincipal cString]);    return myServicePrincipal;}/* interface for launch */-(int) connect{    connection = [[[XGConnection alloc] initWithHostname: controller_hostname					portnumber:0] autorelease];    if (nil == controller_password) {	opal_output_verbose(1, orte_pls_base.pls_output,			    "orte:pls:xgrid: Using Kerberos authentication");	XGGSSAuthenticator *authenticator = 	    [[[XGGSSAuthenticator alloc] init] autorelease];	opal_output_verbose(1, orte_pls_base.pls_output,			    "orte:pls:xgrid: Kerberos principal: %s",			    [[self servicePrincipal] cString]);			[authenticator setServicePrincipal:[self servicePrincipal]];	[connection setAuthenticator:authenticator];    } else {	opal_output_verbose(1, orte_pls_base.pls_output,			    "orte:pls:xgrid: Using password authentication");       XGTwoWayRandomAuthenticator *authenticator =	    [[[XGTwoWayRandomAuthenticator alloc] init] autorelease];	/* this seems to be hard coded */	[authenticator setUsername:@"one-xgrid-client"];	[authenticator setPassword:controller_password];    	[connection setAuthenticator:authenticator];    }    [connection setDelegate: self];    /* get us connected */    opal_mutex_lock(&state_mutex);    [connection open];    while ([connection state] == XGConnectionStateOpening) {	opal_condition_wait(&state_cond, &state_mutex);    }    opal_mutex_unlock(&state_mutex);    /* if we're not connected when the condition is triggered, we       dont' have a connection and can't start.  exit. */    if ([connection state] != XGConnectionStateOpen) {	return ORTE_ERR_NOT_AVAILABLE;    }    opal_output_verbose(1, orte_pls_base.pls_output,			"orte:pls:xgrid: connection name: %s",			[[connection name] cString]);        controller = [[XGController alloc] initWithConnection:connection];    /* need to call progress exactly once for some reason to get the       controller happy enough to allow us to assign the grid */    opal_progress();    grid = [controller defaultGrid];    opal_output_verbose(1, orte_pls_base.pls_output,			"pls: xgrid: grid name: %s",			[[grid identifier] cString]);    return ORTE_SUCCESS;}-(int) launchJob:(orte_jobid_t) jobid{    orte_job_map_t *map;    opal_list_item_t *item;    size_t num_nodes;    orte_vpid_t vpid;    int rc, i = 0;      opal_list_t daemons;    orte_pls_daemon_info_t *dmn;    char *orted_path;    char *nsuri = NULL, *gpruri = NULL;   /* Query the map for this job.     * We need the entire mapping for a couple of reasons:     *  - need the prefix to start with.     *  - need to know if we are launching on a subset of the allocated nodes     */    rc = orte_rmaps.get_job_map(&map, jobid);    if (ORTE_SUCCESS != rc) {        goto cleanup;    }    num_nodes = opal_list_get_size(&map->nodes);    /*     * Allocate a range of vpids for the daemons.     */    if (0 == num_nodes) {        return ORTE_ERR_BAD_PARAM;    }    rc = orte_ns.reserve_range(0, num_nodes, &vpid);    if (ORTE_SUCCESS != rc) {        goto cleanup;    }    /* setup the orted triggers for passing their launch info */    if (ORTE_SUCCESS != (rc = orte_smr.init_orted_stage_gates(jobid, num_nodes, NULL, NULL))) {        ORTE_ERROR_LOG(rc);        goto cleanup;    }        /* setup a list that will contain the info for all the daemons     * so we can store it on the registry when done     */    OBJ_CONSTRUCT(&daemons, opal_list_t);    /* find orted */    orted_path = opal_path_findv((char*) [orted cString], 0, environ, NULL);         /* setup ns contact info */    if (NULL != orte_process_info.ns_replica_uri) {        nsuri = strdup(orte_process_info.ns_replica_uri);    } else {        nsuri = orte_rml.get_uri();    }    /* setup gpr contact info */    if (NULL != orte_process_info.gpr_replica_uri) {        gpruri = strdup(orte_process_info.gpr_replica_uri);    } else {        gpruri = orte_rml.get_uri();    }    /* build up the array of task specifications */    NSMutableDictionary *taskSpecifications = [NSMutableDictionary dictionary];    /* Iterate through each of the nodes and spin     * up a daemon.     */    for (item =  opal_list_get_first(&map->nodes);         item != opal_list_get_end(&map->nodes);         item =  opal_list_get_next(item)) {        orte_mapped_node_t* rmaps_node = (orte_mapped_node_t*)item;        orte_process_name_t* name;        char* name_string;                /* new daemon - setup to record its info */        dmn = OBJ_NEW(orte_pls_daemon_info_t);        dmn->active_job = jobid;        opal_list_append(&daemons, &dmn->super);                /* record the node name in the daemon struct */        dmn->cell = rmaps_node->cell;        dmn->nodename = strdup(rmaps_node->nodename);                /* initialize daemons process name */        rc = orte_ns.create_process_name(&name, rmaps_node->cell, 0, vpid);        if (ORTE_SUCCESS != rc) {            ORTE_ERROR_LOG(rc);            goto cleanup;        }                /* save it in the daemon struct */        if (ORTE_SUCCESS != (rc = orte_dss.copy((void**)&(dmn->name), name, ORTE_NAME))) {            ORTE_ERROR_LOG(rc);            goto cleanup;        }                /* setup per-node options */        opal_output_verbose(1, orte_pls_base.pls_output,            "orte:pls:xgrid: launching on node %s",             rmaps_node->nodename);                /* setup process name */        rc = orte_ns.get_proc_name_string(&name_string, name);        if (ORTE_SUCCESS != rc) {            opal_output(orte_pls_base.pls_output,            "orte:pls:xgrid: unable to create process name");            return rc;        }        NSMutableDictionary *task = [NSMutableDictionary dictionary];        [task setObject: [NSString stringWithCString: orted_path]            forKey: XGJobSpecificationCommandKey];        NSArray *taskArguments =         [NSArray arrayWithObjects: @"--no-daemonize",                @"--bootproxy", [NSString stringWithFormat: @"%d", jobid],                @"--name", [NSString stringWithCString: name_string],                            @"--num_procs", [NSString stringWithFormat: @"%d", 1],                @"--nodename", [NSString stringWithCString: rmaps_node->nodename],                @"--nsreplica", [NSString stringWithCString: nsuri],                @"--gprreplica", [NSString stringWithCString: gpruri],                nil];        [task setObject: taskArguments forKey: XGJobSpecificationArgumentsKey];            [taskSpecifications setObject: task                 forKey: [NSString stringWithFormat: @"%d", i]];            vpid++; i++;    }    /* job specification */    NSMutableDictionary *jobSpecification = [NSMutableDictionary dictionary];    [jobSpecification setObject:XGJobSpecificationTypeTaskListValue 		      forKey:XGJobSpecificationTypeKey];    [jobSpecification setObject: [NSString stringWithFormat: 					       @"org.open-mpi.pls.xgrid"]		      forKey:XGJobSpecificationSubmissionIdentifierKey];    [jobSpecification setObject: [NSString stringWithFormat: @"Open MPI Job %d", jobid]		      forKey:XGJobSpecificationNameKey];    [jobSpecification setObject:taskSpecifications 		      forKey:XGJobSpecificationTaskSpecificationsKey];    /* Submit the request and get our monitor */    XGActionMonitor *actionMonitor = 	[controller performSubmitJobActionWithJobSpecification: jobSpecification		    gridIdentifier: [grid identifier]];    /* wait until we have some idea if job succeeded or not */    while (XGActionMonitorOutcomeNone == [actionMonitor outcome]) {	opal_progress();    }    /* we should have a result - find out if it worked */    if (XGActionMonitorOutcomeSuccess == [actionMonitor outcome]) {	rc = ORTE_SUCCESS;    } else {		NSError *err = [actionMonitor error];	fprintf(stderr, "orte:pls:xgrid: launch failed: (%d) %s\n", 		[actionMonitor outcome],		[[err localizedDescription] cString]);	rc = ORTE_ERROR;	goto cleanup;    }    /* save the XGJob identifier somewhere we can get to it */    [active_jobs setObject: [[actionMonitor results] objectForKey: @"jobIdentifier"]		 forKey: [NSString stringWithFormat: @"%d", jobid]];    /* all done, so store the daemon info on the registry */    if (ORTE_SUCCESS != (rc = orte_pls_base_store_active_daemons(&daemons))) {        ORTE_ERROR_LOG(rc);    }cleanup:    OBJ_RELEASE(map);        if (NULL != nsuri) free(nsuri);    if (NULL != gpruri) free(gpruri);    /* deconstruct the daemon list */    while (NULL != (item = opal_list_remove_first(&daemons))) {        OBJ_RELEASE(item);    }    OBJ_DESTRUCT(&daemons);    opal_output_verbose(1, orte_pls_base.pls_output,			"orte:pls:xgrid:launch: finished\n");    return rc;}-(int) terminateJob: (orte_jobid_t) jobid{    int ret;    /* get our grid */    XGJob *job = [grid jobForIdentifier: [active_jobs objectForKey:			  [NSString stringWithFormat: @"%d", jobid]]];    XGActionMonitor *actionMonitor = [job performStopAction];    while (XGActionMonitorOutcomeNone == [actionMonitor outcome]) {	opal_progress();    }    /* we should have a result - find out if it worked */    if (XGActionMonitorOutcomeSuccess == [actionMonitor outcome]) {	ret = ORTE_SUCCESS;    } else {		NSError *err = [actionMonitor error];	fprintf(stderr, "orte:pls:xgrid: terminate failed: %s\n", 		[[err localizedDescription] cString]);	ret = ORTE_ERROR;    }    return ret;}/* delegate for changes */-(void) connectionDidOpen:(XGConnection*) myConnection{    /* this isn't an error condition -- we finally opened the       connection, so trigger the condition variable we're waiting       on */    opal_condition_broadcast(&state_cond);}-(void) connectionDidNotOpen:(XGConnection*) myConnection withError: (NSError*) error{    opal_output(orte_pls_base.pls_output,		"orte:pls:xgrid: Controller connection did not open: (%d) %s",		[error code],		[[error localizedDescription] cString]);    opal_condition_broadcast(&state_cond);}-(void) connectionDidClose:(XGConnection*) myConnection;{    // check for success    if ([myConnection error] != nil) {	switch ([[myConnection error] code]) {	case 200:	    /* success */	    break;	case 530:	case 535:	    opal_output(orte_pls_base.pls_output,			"orte:pls:xgrid: Connection to XGrid controller failed due to authentication error (%d):",			[[myConnection error] code]);	    break;	default:	    opal_output(orte_pls_base.pls_output,			"orte:pls:xgrid: Connection to XGrid controller unexpectedly closed: (%d) %s",			[[myConnection error] code],			[[[myConnection error] localizedDescription] cString]);	    break;	}    } else {	opal_output(orte_pls_base.pls_output,		    "orte:pls:xgrid: Connection to XGrid controller unexpectedly closed");    }    opal_condition_broadcast(&state_cond);}@end

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -