📄 jobhandlerthread.java
字号:
/* * JobHandler.java * * Created on April 25, 2005, 11:49 AM */package jwsgrid.jobhost.priv;import jwsgrid.jobhost.JobHost;import jwsgrid.util.*;import jwsgrid.xsd.*;import jwsgrid.xsd.jobdescription.*;import java.lang.String;import java.lang.Thread;import java.util.Hashtable;import java.util.List;import java.util.Vector;import java.sql.Connection;import java.sql.SQLException;import java.sql.Timestamp;import java.io.File;import java.io.FileNotFoundException;import java.io.IOException;/** * * @author sean */public class JobHandlerThread implements Runnable { //////////////////////////////////////////////////////////////////////////// // public attributes // public static final String FILE_SEP = System.getProperty( "file.separator" ); public static final long JOB_START_TIMEOUT = ((1000) * 60) * 5; // 5 minutes public static final int SLEEP_INTERVAL = 2000; //////////////////////////////////////////////////////////////////////////// // private attributes / classes // private Hashtable<String,List<JobEntry>> jobEntryTable = new Hashtable(); private boolean shutdownRequest = false; private Hashtable jobTable = new Hashtable(); private JobHost jobHost = null; private class JobEntry { private Job job = null; private Thread thread = null; boolean loggedStartMsg = false; public JobEntry( Job job, Thread thread ) { this.job = job; this.thread = thread; } public void setStartMessageLogged( boolean value ) { loggedStartMsg = value; } public Job getJob() { return job; } public Thread getThread() { return thread; } public boolean isStartMessageLogged() { return loggedStartMsg; } } //////////////////////////////////////////////////////////////////////////// // public methods // /** Creates a new instance of JobHandler */ public JobHandlerThread( JobHost jobHost ) { this.jobHost = jobHost; } public static String getJobStatusString( int status ) { if ( status == JobHost.JOB_QUEUED ) { return "QUEUED"; } else if ( status == JobHost.JOB_COMPLETE ) { return "COMPLETE"; } else if ( status == JobHost.JOB_ABORTED ) { return "ABORTED"; } else if ( status == JobHost.JOB_READY ) { return "READY"; } else if ( status == JobHost.JOB_RUNNING ) { return "RUNNING"; } else if ( status == JobHost.JOB_SHUTDOWN ) { return "SHUTDOWN"; } else if ( status == JobHost.JOB_STOPPED ) { return "STOPPED"; } else if ( status == JobHost.JOB_PRE_COMPLETE ) { return "RUNNING"; } else { return "UNKNOWN"; } } public JobHost getJobHost() { return jobHost; } public void stop() { shutdownRequest = true; } public void run() { Connection conn = null; boolean success = false; List<Sql.JobResult> jobList = null; String jobId = null; String jobOwner = null; getJobHost().logMessage( Log.MSG_PRI_MIN, "job handler thread started" ); // // keep processing jobs until a shutdown is requested // while ( !shutdownRequest ) { try { success = false; jobId = null; jobOwner = null; if ( conn == null ) { conn = Sql.getConnection(); } // get a list of all jobs in database jobList = Sql.getCurrJobs( conn ); // handle each job according to its status if ( jobList.size() > 0 ) { for ( int i = 0; i < jobList.size(); i++ ) { // check for shutdown request if ( shutdownRequest ) { break; } jobId = jobList.get( i ).getJobId(); jobOwner = jobList.get( i ).getOwnerId(); // handle new job if ( jobList.get( i ).getJobStatus() == JobHost.JOB_QUEUED ) { handleNewJob( conn, jobList.get( i ) ); getJobHost().logMessage( Log.MSG_PRI_MIN, "job '" + jobId + "' from user '" + jobOwner + "' ready to run" ); } // handle ready job else if ( jobList.get( i ).getJobStatus() == JobHost.JOB_READY ) { if ( handleReadyJob( conn, jobList.get( i ) ) ) { getJobHost().logMessage( Log.MSG_PRI_MIN, "job '" + jobId + "' from user '" + jobOwner + "' running" ); } else { JobEntry entry = getJobEntry( jobOwner, jobId ); // only log message if it already hasn't been // logged; don't want to fill log with clutter if ( entry != null && !entry.isStartMessageLogged() ) { getJobHost().logMessage( Log.MSG_PRI_MIN, "job '" + jobId + "' from user '" + jobOwner + "' waiting for start signal" ); entry.setStartMessageLogged( true ); } } } // handle running job else if ( jobList.get( i ).getJobStatus() == JobHost.JOB_RUNNING ) { handleRunningJob( conn, jobList.get( i ) ); } // handle shutdown job else if ( jobList.get( i ).getJobStatus() == JobHost.JOB_SHUTDOWN ) { Sql.updateCurrJobStatus( conn, jobId, JobHost.JOB_READY ); } // handle destroyed job else if ( jobList.get( i ).getJobStatus() == JobHost.JOB_DESTROY ) { cleanupCurrJob( jobOwner, jobId, true, true, true, true ); getJobHost().logMessage( Log.MSG_PRI_MIN, "job '" + jobId + "' from user '" + jobOwner + "' destroyed; no records will remain" ); } // handle stopped job else if ( jobList.get( i ).getJobStatus() == JobHost.JOB_STOPPED ) { handleStoppedJob( conn, jobList.get( i ) ); getJobHost().logMessage( Log.MSG_PRI_MIN, "job '" + jobId + "' from user '" + jobOwner + "' stopped" ); } // handle aborted job else if ( jobList.get( i ).getJobStatus() == JobHost.JOB_ABORTED ) { handleAbortedJob( conn, jobList.get( i ) ); getJobHost().logMessage( Log.MSG_PRI_MIN, "job '" + jobId + "' from user '" + jobOwner + "' aborted" ); } // handle pre-complete job else if ( jobList.get( i ).getJobStatus() == JobHost.JOB_PRE_COMPLETE ) { handleCompleteJob( conn, jobList.get( i ) ); getJobHost().logMessage( Log.MSG_PRI_MIN, "job '" + jobId + "' from user '" + jobOwner + "' complete" ); Timestamp tsSubmit = Timestamp.valueOf( jobList.get( i ).getSubmitTs() ); Timestamp tsStart = Timestamp.valueOf( jobList.get( i ).getStartTs() ); Timestamp tsDone = Timestamp.valueOf( jobList.get( i ).getCompleteTs() ); long numSecs = (tsStart.getTime() - tsSubmit.getTime()) / 1000; System.out.println( "--> submit time: " + tsSubmit.toString() ); System.out.println( "--> start time: " + tsStart.toString() ); System.out.println( "--> queue time: " + numSecs + " (s)" ); } // handle complete job -- shouldn't appear here due to // pre-complete state, but can be possible because of an // improper service shutdown else if ( jobList.get( i ).getJobStatus() == JobHost.JOB_COMPLETE ) { Sql.moveJobToHistoryTable( conn, jobList.get( i ).getJobId() ); removeJobEntry( jobList.get( i ).getOwnerId(), jobList.get( i ).getJobId() ); cleanupCurrJob( jobList.get( i ).getOwnerId(), jobList.get( i ).getJobId(), false, true, true, false ); } // unknown job status...cleanup else { throw new Exception( "unknown job status (" + jobList.get( i ).getJobStatus() + ")" ); } // take a small break try { Thread.sleep( 100 ); } catch ( InterruptedException intEx ) {} } } success = true; } catch ( InterruptedException intEx ) { } catch ( SQLException sqlEx ) { if ( jobId != null && jobOwner != null ) { getJobHost().logMessage( Log.MSG_PRI_MIN, "job '" + jobId + "' from user '" + jobOwner + "' processing error: " + sqlEx.getMessage() ); } else { getJobHost().logMessage( Log.MSG_PRI_MAX, "SQL error: " + sqlEx.getMessage() ); } } catch ( Exception ex ) { if ( jobId != null && jobOwner != null ) { getJobHost().logMessage( Log.MSG_PRI_MIN, "job '" + jobId + "' from user '" + jobOwner + "' processing error: " + ex.getMessage() ); cleanupCurrJob( jobOwner, jobId, false, true, true, true ); try { // move job to history table Sql.moveJobToHistoryTable( conn, jobId ); } catch ( Exception e ) { cleanupCurrJob( jobOwner, jobId, true, false, false, false ); } } else { getJobHost().logMessage( Log.MSG_PRI_MAX, "error: " + ex.getMessage() ); } } finally { try { if ( conn != null && !success ) { conn.close(); conn = null; } } catch ( SQLException sqlEx ) { getJobHost().logMessage( Log.MSG_PRI_MAX, "SQL error while closing connection : " + sqlEx.getMessage() ); } } try { Thread.sleep( SLEEP_INTERVAL ); } catch ( InterruptedException ie ) {} } // // perform pre-shutdown operations // try { List<Sql.JobResult> list = null; jobId = null; jobOwner = null; if ( conn == null ) { conn = Sql.getConnection(); } // stop currently running jobs stopRunningJobs( conn ); // update status changes before exiting list = Sql.getCurrJobs( conn, JobHost.JOB_RUNNING, true ); for ( int i = 0; i < list.size(); i++ )
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -