📄 queueprocessor.java
字号:
/* * ScheduleProcessor.java * * Created on April 21, 2005, 10:11 AM */package jwsgrid.scheduler.priv;import jwsgrid.scheduler.Scheduler;import jwsgrid.ws.resourcemanager.ResourceManagerWSSEI;import jwsgrid.ws.resourcemanager.ConstraintMatchException;import jwsgrid.ws.resourcemanager.ConstraintDistanceException;import jwsgrid.ws.jobhost.JobHostWSSEI;import jwsgrid.util.*;import jwsgrid.xsd.*;import jwsgrid.xsd.jobdescription.*;import jwsgrid.xsd.jobconstraints.*;import jwsgrid.xsd.jobhostsearchresult.*;import jwsgrid.wsdl.Proxy;import java.io.File;import java.io.FileNotFoundException;import java.io.IOException;import java.sql.Connection;import java.sql.SQLException;import java.sql.Timestamp;import java.util.GregorianCalendar;import java.util.List;import java.util.Map;import java.util.LinkedHashMap;import java.util.Set;import java.util.Iterator;import java.util.Vector;import javax.activation.DataHandler;import javax.activation.FileDataSource;/** * * @author sean */public class QueueProcessor implements Runnable { //////////////////////////////////////////////////////////////////////////// // public classes // public class JobDeployment { private String jobHostWsAddr; private String jobHostJobId; private String jobOwner; public JobDeployment( String jobHostWsAddr, String jobHostJobId, String jobOwner ) { this.jobHostWsAddr = jobHostWsAddr; this.jobHostJobId = jobHostJobId; this.jobOwner = jobOwner; } public String getJobHostWsAddr() { return jobHostWsAddr; } public String getJobHostJobId() { return jobHostJobId; } public String getJobOwner() { return jobOwner; } } //////////////////////////////////////////////////////////////////////////// // private attributes // private static final String FILE_SEP = System.getProperty( "file.separator" ); private static final int MS_PER_SEC = 1000; private static final int MS_PER_MIN = MS_PER_SEC * 60; private static final int MS_PER_HOUR = MS_PER_MIN * 60; private Scheduler sched = null; private boolean shutdownRequest = false; //////////////////////////////////////////////////////////////////////////// // public methods // /** Creates a new instance of ScheduleProcessor */ public QueueProcessor( Scheduler sched ) { this.sched = sched; } public Scheduler getSched() { return sched; } public void stop() { shutdownRequest = true; } public void run() { Connection conn = null; boolean success = false; Sql.QueueResult queueResult = null; while ( !shutdownRequest ) { try { queueResult = null; // // handle queued jobs // try { success = false; if ( conn == null ) { conn = Sql.getConnection(); } List<Sql.QueueResult> list = Sql.getQueued( conn, null ); for ( int i = 0; i < list.size() && !shutdownRequest; i++ ) { queueResult = list.get( i ); getSched().logMessage( Log.MSG_PRI_MIN, "'" + queueResult.getOwnerId() + "' submitted job " + "'" + queueResult.getSchedId() + "'" ); GregorianCalendar cal = new GregorianCalendar(); Timestamp tsSubmit = new Timestamp( cal.getTimeInMillis() ); handleJobBundle( conn, list.get( i ) ); cal = new GregorianCalendar(); Timestamp tsDeploy = new Timestamp( cal.getTimeInMillis() ); long queueSecs = (tsDeploy.getTime() - tsSubmit.getTime()) / 1000; System.out.println( "--> submit time: " + tsSubmit.toString() ); System.out.println( "--> deploy time: " + tsDeploy.toString() ); System.out.println( "--> queue time: " + queueSecs ); // take a small break Thread.sleep( 50 ); } success = true; } catch ( ResourceManagerProxyException resMgrEx ) { getSched().logMessage( Log.MSG_PRI_MAX, "error: " + resMgrEx.getMessage() ); } catch ( SQLException sqle ) { getSched().logMessage( Log.MSG_PRI_MAX, "SQL error: " + sqle.getMessage() ); } catch ( Exception ex ) { getSched().logMessage( Log.MSG_PRI_MAX, "error: " + ex.getMessage() ); if ( queueResult != null ) { cleanup( queueResult.getOwnerId(), queueResult.getSchedId(), true, true ); try { Sql.deleteQueued( conn, queueResult.getOwnerId(), queueResult.getSchedId() ); Sql.insertDeployed( conn, queueResult.getOwnerId(), queueResult.getSchedId(), "", 0, "", "", Scheduler.SCHED_STATUS_ERROR, ex.getMessage() ); } catch ( Exception e ) {} } } finally { try { if ( conn != null && !success ) { conn.close(); conn = null; } } catch ( Exception ex ) { getSched().logMessage( Log.MSG_PRI_MAX, "SQL error while closing connection: " + ex.getMessage() ); } } // // take a break // Thread.sleep( MS_PER_SEC * 2 ); } catch ( InterruptedException intEx ) {} } } //////////////////////////////////////////////////////////////////////////// // private methods // private void handleJobBundle( Connection conn, Sql.QueueResult job ) throws SQLException, Exception { ResourceManagerWSSEI resMgrProxy = null; String xmlSearchResult = null; Sql.ConfigResult config = null; JobDescriptionBundle jobDescriptions = null; JobConstraintObject jobConstraints = null; LinkedHashMap<String,List<JobDeployment>> deploymentTable = new LinkedHashMap(); // make sure job job zipfile exists File jobBundleFile = new File( getSched().getJobBundleDir() + FILE_SEP + job.getOwnerId() + FILE_SEP + job.getSchedId() + FILE_SEP + job.getSchedId() + ".zip" ); if ( !jobBundleFile.exists() || !jobBundleFile.isFile() ) { throw new Exception( "job job file '" + job.getSchedId() + ".zip'" + " does not exist" ); } // retrieve configuration & create job job description object try { config = Sql.getConfig( conn ); jobDescriptions = JobDescr.createObj( job.getJobDescriptions() ); jobConstraints = JobConstraints.createtObj( job.getJobConstraints() ); } catch ( SQLException ex ) { throw ex; } catch ( Exception ex ) { throw ex; } // extract zipfile; should have the structure // job // \group1_id.zip // \group2_id.zip // ... // \groupN_id.zip try { File destDirFile = new File( getSched().getJobBundleDir() + FILE_SEP + job.getOwnerId() + FILE_SEP + job.getSchedId() ); if ( !destDirFile.exists() ) { destDirFile.mkdirs(); } Zip.extractZipFile( jobBundleFile, destDirFile, false ); jobBundleFile.delete(); } catch ( Exception ex ) { throw new Exception( "an error occured while trying to extract the " + "submitted job job file" ); } // verify file structure try { List<JobDescriptionType> list = jobDescriptions.getJobDescrList(); for ( int i = 0; i < list.size(); i++ ) { File file = new File( getSched().getJobBundleDir() + FILE_SEP + job.getOwnerId() + FILE_SEP + job.getSchedId() + FILE_SEP + list.get( i ).getGroupId() + ".zip" ); if ( !file.exists() ) { throw new Exception( "group '" + list.get( i ).getGroupId() + "' is missing its data file '" + list.get( i ).getGroupId() + ".zip'" ); } } } catch ( Exception ex ) { throw ex; } // create and send a job host search request try { // get a handle on the resource manager try { resMgrProxy = Proxy.getResourceManagerProxy( config.getResMgrWsAddr() ); resMgrProxy.ping(); } catch ( Exception ex ) { throw new ResourceManagerProxyException(); } // send request to resource manager -- if contraints cannot be // satisfied, use constraint fallback mechanism try { xmlSearchResult = resMgrProxy.searchJobHosts( job.getJobConstraints() ); } catch ( ConstraintMatchException ex ) { throw ex; } catch ( ConstraintDistanceException ex ) { throw ex; } catch ( Exception ex ) { throw ex; } } catch ( Exception ex ) { throw ex; } // deploy job(s) on returned job host(s) try { JobHostSearchResult searchResult = JobHost.createSearchResultObj( xmlSearchResult ); List<JobHostGroupType> resultList = searchResult.getJobHostGroupList(); for ( int i = 0; i < resultList.size(); i++ ) { List<String> wsAddrList = resultList.get( i ).getWsAddrList(); int failureCount = 0; for ( int j = 0; j < wsAddrList.size(); j++ ) { JobHostWSSEI jobHostProxy = null; boolean addedToTable = false; String jobId = null; try { getSched().logMessage( Log.MSG_PRI_MIN, "deploying group '" + resultList.get( i ).getGroupId() + "', " + "job (" + (j+1) + ") from job '" + job.getSchedId() + " to job host: " + wsAddrList.get( j ) ); // get a handle on the current job's zipfile File file = new File(
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -