📄 jobhandlerthread.java
字号:
{ jobId = list.get( i ).getJobId(); jobOwner = list.get( i ).getOwnerId(); handleRunningJob( conn, list.get( i ) ); } } catch ( SQLException sqlEx ) { if ( jobId != null && jobOwner != null ) { getJobHost().logMessage( Log.MSG_PRI_MIN, "job '" + jobId + "' from user '" + jobOwner + "' processing error: " + sqlEx.getMessage() ); } else { getJobHost().logMessage( Log.MSG_PRI_MAX, "SQL error: " + sqlEx.getMessage() ); } } catch ( Exception ex ) { if ( jobId != null && jobOwner != null ) { getJobHost().logMessage( Log.MSG_PRI_MIN, "job '" + jobId + "' from user '" + jobOwner + "' processing error: " + ex.getMessage() ); cleanupCurrJob( jobOwner, jobId, false, true, true, true ); try { // move job to history table Sql.moveJobToHistoryTable( conn, jobId ); } catch ( Exception e ) { cleanupCurrJob( jobOwner, jobId, true, false, false, false ); } } else { getJobHost().logMessage( Log.MSG_PRI_MAX, "error: " + ex.getMessage() ); } } finally { try { if ( conn != null ) { conn.close(); conn = null; } } catch ( SQLException sqlEx ) { getJobHost().logMessage( Log.MSG_PRI_MAX, "SQL error while closing connection : " + sqlEx.getMessage() ); } } getJobHost().logMessage( Log.MSG_PRI_MIN, "job handler thread stopped" ); } //////////////////////////////////////////////////////////////////////////// // private methods // private void stopRunningJobs( Connection conn ) throws SQLException, Exception { String jobId = null; String ownerId = null; JobEntry jobEntry = null; List<Sql.JobResult> list = null; try { conn = Sql.getConnection(); list = Sql.getCurrJobs( conn, JobHost.JOB_RUNNING, true ); for ( int i = 0; i < list.size(); i++ ) { jobId = list.get( i ).getJobId(); ownerId = list.get( i ).getOwnerId(); if ( (jobEntry = getJobEntry( ownerId, jobId )) == null ) { continue; } jobEntry.getJob().stop(); Sql.updateCurrJobStatus( conn, jobId, JobHost.JOB_SHUTDOWN ); getJobHost().logMessage( Log.MSG_PRI_MIN, "job '" + jobId + "' from user '" + ownerId + "' shut down" ); } } catch ( SQLException sqle ) { throw sqle; } catch ( Exception e ) { throw e; } } private void handleStoppedJob( Connection conn, Sql.JobResult jobResult ) throws SQLException, Exception { try { // set completion time and refresh job result Sql.setJobCompleteTs( conn, jobResult.getJobId() ); jobResult = Sql.getJob( conn, jobResult.getJobId(), false ); // send email update if requested String email = jobResult.getOwnerEmail(); if ( email != null && email.length() > 0 ) { String msgBody = "Job \"" + jobResult.getJobId() + "\" stopped by request\n\n" + "\tsubmit time : " + jobResult.getSubmitTs().toString() + "\n" + "\tstart time : " + jobResult.getStartTs().toString() + "\n" + "\tcomplete time : " + jobResult.getCompleteTs().toString(); try { Mail.sendSmtpMessage( email, jobHost.getEmail(), "JWSGrid JobHost", msgBody ); getJobHost().logMessage( Log.MSG_PRI_1, "sent email update to '" + email + "' about job '" + jobResult.getJobId() + "' from user '" + jobResult.getOwnerId() + "'" ); } catch ( Exception ex ) { getJobHost().logMessage( Log.MSG_PRI_1, "failed to send email update to '" + email + "' about job '" + jobResult.getJobId() + "' from user '" + jobResult.getOwnerId() + "' ; " + ex.getMessage() ); } } // cleanup Sql.moveJobToHistoryTable( conn, jobResult.getJobId() ); removeJobEntry( jobResult.getOwnerId(), jobResult.getJobId() ); cleanupCurrJob( jobResult.getOwnerId(), jobResult.getJobId(), false, true, true, true ); } catch ( Exception e ) { throw e; } } private void handleAbortedJob( Connection conn, Sql.JobResult jobResult ) throws SQLException, Exception { JobEntry jobEntry = null; String exitMessage = null; try { jobEntry = getJobEntry( jobResult.getOwnerId(), jobResult.getJobId() ); if ( jobEntry != null ) { Sql.setJobExitValue( conn, jobResult.getJobId(), jobEntry.getJob().getExitValue() ); if ( jobEntry.getJob().getException() != null ) { exitMessage = jobEntry.getJob().getException().getMessage(); Sql.setJobExitMessage( conn, jobResult.getJobId(), exitMessage ); } } // set completion time and refresh job result Sql.setJobCompleteTs( conn, jobResult.getJobId() ); jobResult = Sql.getJob( conn, jobResult.getJobId(), false ); // send email update if requested String email = jobResult.getOwnerEmail(); if ( email != null && email.length() > 0 ) { String msgBody = "Job \"" + jobResult.getJobId() + "\" aborted on error\n\n" + "\terror message : " + jobResult.getExitMessage() + "\n" + "\texit value : " + jobResult.getExitValue() + "\n" + "\tsubmit time : " + jobResult.getSubmitTs().toString() + "\n" + "\tstart time : " + jobResult.getStartTs().toString() + "\n" + "\tcomplete time : " + jobResult.getCompleteTs().toString(); try { Mail.sendSmtpMessage( email, jobHost.getEmail(), "JWSGrid JobHost", msgBody ); getJobHost().logMessage( Log.MSG_PRI_1, "sent email update to '" + email + "' about job '" + jobResult.getJobId() + "' from user '" + jobResult.getOwnerId() + "'" ); } catch ( Exception ex ) { getJobHost().logMessage( Log.MSG_PRI_1, "failed to send email update to '" + email + "' about job '" + jobResult.getJobId() + "' from user '" + jobResult.getOwnerId() + "' ; " + ex.getMessage() ); } } // cleanup Sql.moveJobToHistoryTable( conn, jobResult.getJobId() ); removeJobEntry( jobResult.getOwnerId(), jobResult.getJobId() ); cleanupCurrJob( jobResult.getOwnerId(), jobResult.getJobId(), false, true, true, true ); } catch ( Exception e ) { throw e; } } private void handleCompleteJob( Connection conn, Sql.JobResult jobResult ) throws SQLException, Exception { JobDescriptionType jobDescr = null; String jobId = null; try { JobEntry entry = getJobEntry( jobResult.getOwnerId(), jobResult.getJobId() ); if ( entry == null ) { // for some reason the job is no longer in memory // (service starting up?) -- add a temporary entry jobDescr = getJobDescr( jobResult.getJobDescr(), jobResult.getJobIndex() ); if ( jobDescr == null ) { throw new Exception( "could not extract job description at given index '" + jobResult.getJobIndex() + "' from job description bundle" ); } // set deployment dir File deployDir = new File( jobHost.getJobDeployDir() + FILE_SEP + jobResult.getOwnerId() + FILE_SEP + jobResult.getJobId() ); // create a job object Job job = new Job( jobResult.getJobId(), deployDir, jobDescr.getType(), jobDescr.getExecFile(), jobDescr.getInputArgs(), jobDescr.getInputFiles(), jobDescr.getEnvVars() ); if ( addJobEntry( jobResult.getOwnerId(), job ) ) { entry = getJobEntry( jobResult.getOwnerId(), jobResult.getJobId() ); } else { throw new Exception( "failed to add job to memory" ); } } jobId = jobResult.getJobId(); // get a list of job's output files and copy // them to the output directory List<String> outputFileList = entry.getJob().generatedOutputFileList(); String outputFiles = ""; String destPathName = null; String sourcePathName = null; String deployPathName = JobHost.getJobDeployDir() + FILE_SEP + jobResult.getOwnerId() + FILE_SEP + jobResult.getJobId() + FILE_SEP; for ( int i = 0; i < outputFileList.size(); i++ ) { sourcePathName = outputFileList.get( i ); destPathName = sourcePathName.substring( deployPathName.length(), sourcePathName.length() ); outputFiles += "\t" + destPathName + "\n"; // add output file to database table Sql.addJobOuputFile( conn, jobResult.getJobId(), destPathName ); destPathName = JobHost.getJobOutputDir() + FILE_SEP + jobResult.getOwnerId() + FILE_SEP + jobResult.getJobId() + FILE_SEP + destPathName; FileUtil.copyFile( sourcePathName, destPathName, true ); } // set job completion time and refresh job result Sql.setJobCompleteTs( conn, jobResult.getJobId() ); jobResult = Sql.getJob( conn, jobResult.getJobId(), false ); // send email update if requested String email = jobResult.getOwnerEmail(); if ( email != null && email.length() > 0 ) { String msgBody = "Job \"" + jobResult.getJobId() + "\" is complete\n\n" + "\texit value : " + jobResult.getExitValue() + "\n" + "\tsubmit time : " + jobResult.getSubmitTs().toString() + "\n" + "\tstart time : " + jobResult.getStartTs().toString() + "\n" + "\tcomplete time : " + jobResult.getCompleteTs().toString() + "\n" + "\n" + "The following output files were created during execution:\n\n"; msgBody += outputFiles; try { Mail.sendSmtpMessage( email, jobHost.getEmail(), "JWSGrid JobHost", msgBody ); getJobHost().logMessage( Log.MSG_PRI_1, "sent email update to '" + email + "' about job '" + jobResult.getJobId() + "' from user '" + jobResult.getOwnerId() + "'" ); } catch ( Exception ex ) { getJobHost().logMessage( Log.MSG_PRI_1, "failed to send email update to '" + email + "' about job '" + jobResult.getJobId() + "' from user '" + jobResult.getOwnerId() + "' ; " + ex.getMessage() ); } } // set status to complete now that list of // output files has been generated Sql.updateCurrJobStatus( conn, jobId, JobHost.JOB_COMPLETE ); // cleanup Sql.moveJobToHistoryTable( conn, jobResult.getJobId() ); removeJobEntry( jobResult.getOwnerId(), jobResult.getJobId() ); cleanupCurrJob( jobResult.getOwnerId(), jobResult.getJobId(), false, true, true, false ); } catch ( SQLException sqle ) { try { if ( jobId != null ) { Sql.removeJobOuputFiles( conn, jobResult.getJobId() ); } } catch ( Exception ex ) {} throw sqle; } catch ( Exception e ) { try { if ( jobId != null ) { Sql.removeJobOuputFiles( conn, jobResult.getJobId() ); } } catch ( Exception ex ) {} throw e; } } private void handleRunningJob( Connection conn, Sql.JobResult jobResult ) throws SQLException, Exception { try { String ownerId = jobResult.getOwnerId(); String jobId = jobResult.getJobId(); JobEntry jobEntry = getJobEntry( ownerId, jobId ); if ( jobEntry == null )
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -