📄 gramclient.java
字号:
import java.io.File;import java.io.IOException;import java.math.BigDecimal;import java.net.URL;import java.util.Date; import org.apache.axis.components.uuid.UUIDGen;import org.apache.axis.components.uuid.UUIDGenFactory;import org.apache.axis.message.addressing.EndpointReferenceType;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.globus.exec.client.GramJob;import org.globus.exec.client.GramJobListener;import org.globus.exec.generated.StateEnumeration;import org.globus.exec.utils.FaultUtils;import org.globus.exec.utils.ManagedJobFactoryConstants;import org.globus.exec.utils.client.ManagedJobFactoryClientHelper;import org.globus.exec.utils.rsl.RSLHelper;import org.globus.wsrf.impl.security.authentication.Constants;import org.globus.wsrf.impl.security.authorization.Authorization;import org.globus.wsrf.impl.security.authorization.HostAuthorization;import org.gridforum.jgss.ExtendedGSSCredential;import org.gridforum.jgss.ExtendedGSSManager;import org.ietf.jgss.GSSCredential;import org.oasis.wsrf.faults.BaseFaultType; public class GRAMClient implements GramJobListener // Listen for job status messages{ private static Log logger = LogFactory.getLog(GRAMClient.class.getName()); // Amount of time to wait for job status changes private static final long STATE_CHANGE_BASE_TIMEOUT_MILLIS = 60000; // Job submission member variables. private GramJob job; // completed if Done or Failed private boolean jobCompleted = false; // Batch runs will not wait for the job to complete private boolean batch; // Delegation private boolean limitedDelegation = true; private boolean delegationEnabled = true; // Don't print messages by default private boolean quiet = false; // proxy credential private String proxyPath = null; // Application error state. private boolean noInterruptHandling = false; private boolean isInterrupted = true; private boolean normalApplicationEnd = false; // Callback as a GramJobListener. Will not be called in batch mode. public void stateChanged(GramJob job) { StateEnumeration jobState = job.getState(); boolean holding = job.isHolding(); printMessage("========== State Notification =========="); printJobState(jobState, holding); printMessage("========================================"); synchronized (this) { if ( jobState.equals(StateEnumeration.Done) || jobState.equals(StateEnumeration.Failed)) { printMessage("Exit Code: " + Integer.toString(job.getExitCode())); this.jobCompleted = true; } notifyAll(); // if we a running an interractive job, prevent a hold from hanging the client if ( holding && !batch) { logger.debug( "Automatically releasing hold for interactive job"); try { job.release(); } catch (Exception e) { String errorMessage = "Unable to release job from hold"; logger.debug(errorMessage, e); printError(errorMessage + " - " + e.getMessage()); } } } } static private EndpointReferenceType getFactoryEPR (String contact, String factoryType) throws Exception { URL factoryUrl = ManagedJobFactoryClientHelper.getServiceURL(contact).getURL(); logger.debug("Factory Url: " + factoryUrl); return ManagedJobFactoryClientHelper.getFactoryEndpoint(factoryUrl, factoryType); } /** * Submit a WS-GRAM Job (GT4) * @param factoryEndpoint Factory endpoint reference * @param simpleJobCommandLine Executable (null to use a job file) * @param rslFileJob XML file (null to use a command line) * @param authorization Authorizarion: Host, Self, Identity * @param xmlSecurity XML Sec: Encryption or signature * @param batchMode Submission mode: batch will not wait for completion * @param dryRunMode Used to parse RSL * @param quiet Messages/NO messages * @param duration Duartion date * @param terminationDate Termination date * @param timeout Job timeout (ms) */ private void submitRSL(EndpointReferenceType factoryEndpoint, String simpleJobCommandLine, File rslFile, Authorization authorization, Integer xmlSecurity, boolean batchMode, boolean dryRunMode, boolean quiet, Date duration, Date terminationDate, int timeout) throws Exception { this.quiet = quiet; this.batch = batchMode || dryRunMode; // in single job only. // In multi-job, -batch is not allowed. Dryrun is. if (batchMode) { printMessage("Warning: Will not wait for job completion, " + "and will not destroy job service."); } if (rslFile != null) { try { this.job = new GramJob(rslFile); } catch (Exception e) { String errorMessage = "Unable to parse RSL from file " + rslFile; logger.debug(errorMessage, e); throw new IOException(errorMessage + " - " + e.getMessage()); } } else { this.job = new GramJob(RSLHelper .makeSimpleJob(simpleJobCommandLine)); } job.setTimeOut(timeout); job.setAuthorization(authorization); job.setMessageProtectionType(xmlSecurity); job.setDelegationEnabled(this.delegationEnabled); job.setDuration(duration); job.setTerminationTime(terminationDate); this.processJob(job, factoryEndpoint, batch); } /** * Submit the GRAM Job * @param job * @param factoryEndpoint * @param batch * @throws Exception */ private void processJob(GramJob job, EndpointReferenceType factoryEndpoint, boolean batch) throws Exception { // load custom proxy (if any) if (proxyPath != null) { try { ExtendedGSSManager manager = (ExtendedGSSManager) ExtendedGSSManager .getInstance(); String handle = "X509_USER_PROXY=" + proxyPath.toString(); GSSCredential proxy = manager.createCredential(handle .getBytes(), ExtendedGSSCredential.IMPEXP_MECH_SPECIFIC, GSSCredential.DEFAULT_LIFETIME, null, GSSCredential.INITIATE_AND_ACCEPT); job.setCredentials(proxy); } catch (Exception e) { logger.debug("Exception while obtaining user proxy: ", e); printError("error obtaining user proxy: " + e.getMessage()); // don't exit, but resume using default proxy instead } } // Generate a Job ID UUIDGen uuidgen = UUIDGenFactory.getUUIDGen(); String submissionID = "uuid:" + uuidgen.nextUUID(); printMessage("Submission ID: " + submissionID); if (!batch) { job.addListener(this); } boolean submitted = false; int tries = 0; while (!submitted) { tries++; try { job.submit(factoryEndpoint, batch, this.limitedDelegation, submissionID); submitted = true; } catch (Exception e) { logger.debug("Exception while submitting the job request: ", e); throw new IOException("Job request error: " + e); } } if (batch) { printMessage("CREATED MANAGED JOB SERVICE WITH HANDLE:"); printMessage(job.getHandle()); } if (logger.isDebugEnabled()) { long millis = System.currentTimeMillis(); BigDecimal seconds = new BigDecimal(((double) millis) / 1000); seconds = seconds.setScale(3, BigDecimal.ROUND_HALF_DOWN); logger.debug("Submission time (secs) after: " + seconds.toString()); logger.debug("Submission time in milliseconds: " + millis); } if (!batch) { printMessage("WAITING FOR JOB TO FINISH"); waitForJobCompletion(STATE_CHANGE_BASE_TIMEOUT_MILLIS); try { this.destroyJob(this.job); } catch (Exception e) { printError("coudl not destroy"); } if (this.job.getState().equals(StateEnumeration.Failed)) { printJobFault(this.job); } } } /** * Since messaging is assumed to be unreliable (i.e. a notification could * very well be lost), we implement policy of pulling the remote state when * a given waited-for notification has not has been received after a * timeout. Note: this could however have the side-effect of hiding bugs in * the service-side notification implementation. * * The base delay in parameter is doubled each time the wait times out * (binary exponential backoff). When a state change notification is * received, the time out delay is reset to the base value. * * @param maxWaitPerStateNotificationMillis * long base timeout for each state transition before pulling the * state from the service */ private synchronized void waitForJobCompletion( long maxWaitPerStateNotificationMillis) throws Exception { long durationToWait = maxWaitPerStateNotificationMillis; long startTime; StateEnumeration oldState = job.getState(); // prints one more state initially (Unsubmitted) // but cost extra remote call for sure. Null test below instead while (!this.jobCompleted) { if (logger.isDebugEnabled()) { logger.debug("Job not completed - waiting for state change " + "(timeout before pulling: " + durationToWait + " ms)."); } startTime = System.currentTimeMillis(); // (re)set start time try { wait(durationToWait); // wait for a state change notif } catch (InterruptedException ie) { String errorMessage = "interrupted thread waiting for job to finish"; logger.debug(errorMessage, ie); printError(errorMessage); // no exiting... } // now let's determine what stopped the wait(): StateEnumeration currentState = job.getState(); // A) New job state change notification (good!) if (currentState != null && !currentState.equals(oldState)) { oldState = currentState; // wait for next state notif durationToWait = maxWaitPerStateNotificationMillis; // reset } else { long now = System.currentTimeMillis(); long durationWaited = now - startTime; // B) Timeout when waiting for a notification (bad) if (durationWaited >= durationToWait) { if (logger.isWarnEnabled()) { logger.warn("Did not receive any new notification of " + "job state change after a delay of " + durationToWait + " ms.\nPulling job state."); } // pull state from remote job and print the // state only if it is a new state //refreshJobStatus(); job.refreshStatus(); // binary exponential backoff durationToWait = 2 * durationToWait; } // C) Some other reason else { // wait but only for remainder of timeout duration durationToWait = durationToWait - durationWaited; } } } } public void Gram(String ServerIP,String ExeFile) { // remote host String contact = ServerIP; // Factory type: Fork, Condor, PBS, LSF String factoryType = ManagedJobFactoryConstants.FACTORY_TYPE.FORK; // Job XML File rslFile = new File(ExeFile); // Deafult Security: Host authorization + XML encryption Authorization authz = HostAuthorization.getInstance(); Integer xmlSecurity = Constants.ENCRYPTION; // Submission mode: batch = will not wait boolean batchMode = false; // a Simple command executable (if no job file) String simpleJobCommandLine = null; // Job timeout values: duration, termination times Date serviceDuration = null; Date serviceTermination = null; int timeout = GramJob.DEFAULT_TIMEOUT; try { this.submitRSL(getFactoryEPR(contact,factoryType) , simpleJobCommandLine, rslFile , authz, xmlSecurity , batchMode, false, false , serviceDuration, serviceTermination, timeout ); } catch (Exception e) { e.printStackTrace(); } } private void printMessage(String message) { if (!this.quiet) { System.out.println(message); } } private void printError(String message) { System.err.println(message); } private void printJobState(StateEnumeration jobState, boolean holding) { String holdString = ""; if (holding) holdString = "HOLD "; printMessage("Job State: " + holdString + jobState.getValue()); } private void printJobFault(GramJob job) { BaseFaultType fault = job.getFault(); if (fault != null) { printMessage("Fault:\n" + FaultUtils.faultToString(fault)); } } // Precondition: job ! =null && job.isRequested() && !job.isLocallyDestroyed() private void destroyJob(GramJob job) throws Exception { printMessage("DESTROYING JOB RESOURCE"); job.destroy(); printMessage("JOB RESOURCE DESTROYED"); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -