📄 jobhandlerthread.java
字号:
{ // job was still running when service was shutdown // try to reset job's state to ready and have the // ready handler re-start the job Sql.updateCurrJobStatus( conn, jobId, JobHost.JOB_READY ); return; } if ( jobEntry.getJob().getStatus() == Job.STATUS_ABORTED ) { Sql.updateCurrJobStatus( conn, jobId, JobHost.JOB_ABORTED ); } else if ( jobEntry.getJob().getStatus() == Job.STATUS_STOPPED ) { Sql.updateCurrJobStatus( conn, jobId, JobHost.JOB_STOPPED ); } else if ( jobEntry.getJob().getStatus() == Job.STATUS_COMPLETE ) { Sql.updateCurrJobStatus( conn, jobId, JobHost.JOB_PRE_COMPLETE ); } } catch ( SQLException sqle ) { throw sqle; } catch ( Exception e ) { throw e; } } private boolean handleReadyJob( Connection conn, Sql.JobResult jobResult ) throws SQLException, Exception { JobDescriptionType jobDescr = null; boolean running = false; // check if job has received start signal if ( !jobResult.startJob() ) { // check if timeout has occured Timestamp currTs = new Timestamp( System.currentTimeMillis() ); Timestamp submitTs = Timestamp.valueOf( jobResult.getSubmitTs() ); Timestamp expireTs = new Timestamp( submitTs.getTime() + JOB_START_TIMEOUT ); if ( currTs.compareTo( expireTs ) > 0 ) { throw new Exception( "job timed out while waiting for start signal" ); } // else, leave job in current state return false; } try { jobDescr = getJobDescr( jobResult.getJobDescr(), jobResult.getJobIndex() ); if ( jobDescr == null ) { throw new Exception( "could not extract job description at given index '" + jobResult.getJobIndex() + "' from job description bundle" ); } File deployDir = new File( jobHost.getJobDeployDir() + FILE_SEP + jobResult.getOwnerId() + FILE_SEP + jobResult.getJobId() ); // create a job object Job job = new Job( jobResult.getJobId(), deployDir, jobDescr.getType(), jobDescr.getExecFile(), jobDescr.getInputArgs(), jobDescr.getInputFiles(), jobDescr.getEnvVars() ); // store in memory & start the job if ( addJobEntry( jobResult.getOwnerId(), job ) ) { if ( startJob( jobResult.getOwnerId(), jobResult.getJobId() ) ) { running = true; Sql.updateCurrJobStatus( conn, jobResult.getJobId(), JobHost.JOB_RUNNING ); } else { throw new Exception ( "failed to start job" ); } } else { throw new Exception ( "failed to create a job entry in memory" ); } Sql.setJobStartTs( conn, jobResult.getJobId() ); } catch ( SQLException sqlEx ) { throw sqlEx; } catch ( Exception ex ) { if ( running ) { stopJob( jobResult.getOwnerId(), jobResult.getJobId() ); } throw ex; } return true; } private void handleNewJob( Connection conn, Sql.JobResult jobResult ) throws SQLException, Exception { File jobFile = new File( JobHost.getJobDeployDir() + FILE_SEP + jobResult.getOwnerId() + FILE_SEP + jobResult.getJobId() + FILE_SEP + jobResult.getJobId() + ".zip" ); File destDir = new File( JobHost.getJobDeployDir() + FILE_SEP + jobResult.getOwnerId() + FILE_SEP + jobResult.getJobId() ); try { // extract job files & delete zipfile once complete Zip.extractZipFile( jobFile, destDir, false ); jobFile.delete(); // update job status to READY Sql.updateCurrJobStatus( conn, jobResult.getJobId(), JobHost.JOB_READY ); } catch ( SQLException sqlEx ) { throw sqlEx; } catch ( Exception ex ) { throw ex; } } private JobDescriptionType getJobDescr( String xmlJobDescrBundle, int index ) throws Exception { JobDescriptionType jobDescr = null; try { JobDescriptionBundle bundle = JobDescr.createObj( xmlJobDescrBundle ); if ( index >= bundle.getJobDescrList().size() ) { throw new Exception( "job index exceeds number of job descriptions " + "specified in bundle" ); } jobDescr = (JobDescriptionType) bundle.getJobDescrList().get( index ); } catch ( Exception jaxbEx ) { throw new Exception( "job description bundle is not a valid xml document " + "or is not constructed appropriately" ); } return jobDescr; } private synchronized boolean removeJobEntry( String ownerId, String jobId ) { if ( ownerId == null || jobId == null ) { return false; } Vector jobList = (Vector) jobEntryTable.get( ownerId ); if ( jobList == null ) { return false; } JobEntry entry = null; for ( int i = 0; i < jobList.size(); i++ ) { entry = (JobEntry) jobList.elementAt( i ); if ( entry.getJob().getId().compareTo( jobId ) == 0 ) { entry = (JobEntry) jobList.remove( i ); return true; } } return false; } private synchronized JobEntry getJobEntry( String ownerId, String jobId ) { if ( ownerId == null || jobId == null ) { return null; } Vector jobList = (Vector) jobEntryTable.get( ownerId ); if ( jobList == null ) { return null; } JobEntry entry = null; for ( int i = 0; i < jobList.size(); i++ ) { entry = (JobEntry) jobList.elementAt( i ); if ( entry.getJob().getId().compareTo( jobId ) == 0 ) { return entry; } } return null; } private synchronized boolean addJobEntry( String ownerId, Job job ) { JobEntry entry = null; if ( ownerId == null || job == null ) { return false; } Vector jobList = (Vector) jobEntryTable.get( ownerId ); if ( jobList == null ) { jobList = new Vector(); } if ( getJobEntry( ownerId, job.getId() ) == null ) { entry = new JobEntry( job, new Thread( job ) ); jobList.add( entry ); jobEntryTable.put( ownerId, jobList ); return true; } return false; } private synchronized boolean startJob( String ownerId, String jobId ) { JobEntry entry = getJobEntry( ownerId, jobId ); if ( entry != null ) { entry.getThread().start(); return true; } return false; } private synchronized boolean stopJob( String ownerId, String jobId ) { JobEntry entry = getJobEntry( ownerId, jobId ); if ( entry != null ) { entry.getJob().stop(); return true; } return false; } private void cleanupCurrJob( String ownerId, String jobId, boolean cleanDatabase, boolean cleanupMem, boolean cleanDeployDir, boolean cleanOutputDir ) { if ( jobId == null || jobId.length() == 0 ) { return; } if ( cleanupMem || cleanDeployDir ) { // if job is running, stop it -- we won't be able // to delete its files if they're in use JobEntry entry = getJobEntry( ownerId, jobId ); int count = 0; if ( entry != null ) { while ( entry.getThread().isAlive() && count++ < 10 ) { entry.getJob().stop(); try { Thread.sleep( 1000 ); } catch ( InterruptedException ie ) {} } if ( count >= 10 ) { entry.getThread().destroy(); } } } if ( cleanDatabase ) { Connection conn = null; try { conn = Sql.getConnection(); Sql.removeCurrJob( conn, jobId ); } catch ( SQLException sqle ) { getJobHost().logMessage( Log.MSG_PRI_MIN, "SQL error during cleanup: " + sqle.getMessage() ); } catch ( Exception e ) { getJobHost().logMessage( Log.MSG_PRI_MIN, "error during job cleanup: " + e.getMessage() ); } finally { try { if ( conn != null ) { conn.close(); conn = null; } } catch ( Exception ex ) {} } } if ( cleanupMem ) { removeJobEntry( ownerId, jobId ); } if ( cleanDeployDir ) { try { File file = new File( JobHost.getJobDeployDir() + FILE_SEP + ownerId + FILE_SEP + jobId ); if ( file.exists() && file.isDirectory() ) { FileUtil.removeDir( file ); } // see if parent directory is empty, if so, remove it file = new File( JobHost.getJobDeployDir() + FILE_SEP + ownerId ); File[] fileContents = file.listFiles(); if ( fileContents == null || fileContents.length == 0 ) { file.delete(); } } catch ( Exception ex ) { getJobHost().logMessage( Log.MSG_PRI_MIN, "error during job cleanup: " + ex.getMessage() ); } } if ( cleanOutputDir ) { // delete output file directory try { File file = new File( JobHost.getJobOutputDir() + FILE_SEP + ownerId + FILE_SEP + jobId ); if ( file.exists() && file.isDirectory() ) { FileUtil.removeDir( file ); // see if parent directory is empty, if so, remove it file = new File( JobHost.getJobOutputDir() + FILE_SEP + ownerId ); File[] fileContents = file.listFiles(); if ( fileContents == null || fileContents.length == 0 ) { file.delete(); } } } catch ( Exception ex ) { getJobHost().logMessage( Log.MSG_PRI_MIN, "error during job cleanup: " + ex.getMessage() ); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -