📄 queueprocessor.java
字号:
getSched().getJobBundleDir() + FILE_SEP + job.getOwnerId() + FILE_SEP + job.getSchedId() + FILE_SEP + resultList.get( i ).getGroupId() + ".zip" ); FileDataSource fds = new FileDataSource( file ); DataHandler dataHandler = new DataHandler( fds ); // get a handle on the job host jobHostProxy = Proxy.getJobHostProxy( wsAddrList.get( j ) ); String addr = wsAddrList.get( j ); // submit the job jobId = jobHostProxy.submitJob( job.getOwnerId(), job.getJobDescriptions(), i, dataHandler ); List<JobDeployment> list = deploymentTable.get( resultList.get( i ).getGroupId() ); if ( list == null ) { Vector<JobDeployment> newList = new Vector(); deploymentTable.put( resultList.get( i ).getGroupId(), newList ); list = newList; } list.add( new JobDeployment( wsAddrList.get( j ), jobId, job.getOwnerId() ) ); addedToTable = true; } catch ( Exception ex ) { getSched().logMessage( Log.MSG_PRI_MAX, "job deploment for group '" + resultList.get( i ).getGroupId() + "', " + "job (" + (j+1) + ") from job '" + job.getSchedId() + " failed at job host: " + wsAddrList.get( j ) ); if ( addedToTable ) { removeJobDeployment( deploymentTable, resultList.get( i ).getGroupId(), jobId ); } failureCount++; } } JobHostInfoType jobHostConstraint = getJobHostConstraint( resultList.get( i ).getGroupId(), jobConstraints ); if ( (wsAddrList.size() - failureCount) < jobHostConstraint.getMinDeployments() ) { cancelAllJobs( deploymentTable ); throw new Exception( "failed to meet the minimum number of job host " + "deployments due to error(s) occuring during " + "job submission" ); } } } catch ( Exception ex ) { throw ex; } // start job(s) on deployed job host(s) try { Set set = deploymentTable.entrySet(); Iterator iter = set.iterator(); int failureCount = 0; while ( iter.hasNext() ) { Map.Entry entry = (Map.Entry) iter.next(); String groupId = entry.getKey().toString(); List<JobDeployment> deployList = (List<JobDeployment>) entry.getValue(); for ( int i = 0; i < deployList.size(); i++ ) { try { JobHostWSSEI jobHostProxy = Proxy.getJobHostProxy( deployList.get( i ).getJobHostWsAddr() ); jobHostProxy.startJob( deployList.get( i ).getJobOwner(), deployList.get( i ).getJobHostJobId() ); } catch ( Exception ex ) { getSched().logMessage( Log.MSG_PRI_MIN, "failed to start group '" + groupId + "' " + "job (" + (i+1) + ") with id '" + deployList.get( i ).getJobHostJobId() + " deployed at job host :" + deployList.get( i ).getJobHostWsAddr() ); removeJobDeployment( deploymentTable, groupId, deployList.get( i ).getJobHostJobId() ); failureCount++; } } JobHostInfoType jobHostConstraint = getJobHostConstraint( groupId, jobConstraints ); if ( (deployList.size() - failureCount) < jobHostConstraint.getMinDeployments() ) { cancelAllJobs( deploymentTable ); throw new Exception( "failed to meet the minimum number of job host " + "deployments due to error(s) occuring during " + "job startup" ); } } } catch ( Exception ex ) { throw ex; } // update database try { Set set = deploymentTable.entrySet(); Iterator iter = set.iterator(); if ( conn == null ) { conn = Sql.getConnection(); } Sql.deleteQueued( conn, job.getOwnerId(), job.getSchedId() ); while ( iter.hasNext() ) { Map.Entry entry = (Map.Entry) iter.next(); String groupId = entry.getKey().toString(); List<JobDeployment> deployList = (List<JobDeployment>) entry.getValue(); for ( int i = 0; i < deployList.size(); i++ ) { Sql.insertDeployed( conn, job.getOwnerId(), job.getSchedId(), groupId, (i+1), deployList.get( i ).getJobHostWsAddr(), deployList.get( i ).getJobHostJobId(), Scheduler.SCHED_STATUS_DEPLOYED, "" ); } } } catch ( Exception ex ) { throw ex; } finally { try { if ( conn == null ) { conn.close(); conn = null; } } catch ( Exception ex ) {} } } private void removeJobDeployment( Map<String,List<JobDeployment>> deployMap, String groupId, String jobId ) throws Exception { try { List<JobDeployment> list = deployMap.get( groupId ); if ( list != null ) { for ( int i = 0; i < list.size(); i++ ) { if ( list.get( i ).getJobHostJobId().compareTo( jobId ) == 0 ) { list.remove( i ); break; } } } } catch ( Exception ex ) { throw ex; } } private JobHostInfoType getJobHostConstraint( String groupId, JobConstraintObject jobConstraints ) throws Exception { List<JobHostInfoType> jobHostConstraintList = jobConstraints.getJobHostConstraintList(); for ( int i = 0; i < jobHostConstraintList.size(); i++ ) { if ( jobHostConstraintList.get( i ).getGroupId().compareToIgnoreCase( groupId ) == 0 ) { return jobHostConstraintList.get( i ); } } throw new Exception( "group '" + groupId + "' not found" ); } private void cancelAllJobs( Map<String,List<JobDeployment>> deployMap ) throws Exception { try { Set set = deployMap.entrySet(); Iterator iter = set.iterator(); while ( iter.hasNext() ) { Map.Entry entry = (Map.Entry) iter.next(); String groupId = entry.getKey().toString(); List<JobDeployment> deployList = (List<JobDeployment>) entry.getValue(); for ( int i = 0; i < deployList.size(); i++ ) { try { JobHostWSSEI jobHostProxy = Proxy.getJobHostProxy( deployList.get( i ).getJobHostWsAddr() ); jobHostProxy.destroyJob( deployList.get( i ).getJobOwner(), deployList.get( i ).getJobHostJobId() ); } catch ( Exception ex ) { getSched().logMessage( Log.MSG_PRI_MIN, "failed to destroy group '" + groupId + "' " + "job (" + (i+1) + ") with id '" + deployList.get( i ).getJobHostJobId() + " deployed at job host :" + deployList.get( i ).getJobHostWsAddr() ); } } } } catch ( Exception ex ) { throw ex; } } private void cleanup( String ownerId, String jobBundleId, boolean cleanQueueDb, boolean cleanSchedDb ) { Connection conn = null; if ( ownerId == null || jobBundleId == null ) { return; } // // cleanup database queue // if ( cleanQueueDb ) { try { conn = Sql.getConnection(); Sql.deleteQueued( conn, ownerId, jobBundleId ); } catch ( Exception ex ) { } finally { try { if ( conn != null ) { conn.close(); conn = null; } } catch ( Exception ex ) {} } } // // cleanup database deploy // if ( cleanSchedDb ) { try { conn = Sql.getConnection(); Sql.deleteDeployed( conn, ownerId, jobBundleId ); } catch ( Exception ex ) { } finally { try { if ( conn != null ) { conn.close(); conn = null; } } catch ( Exception ex ) {} } } // // cleanup files // try { // delete job file File file = new File( getSched().getJobBundleDir() + FILE_SEP + ownerId + FILE_SEP + jobBundleId + ".zip" ); if ( file.exists() ) { file.delete(); } // delete individual job files file = new File( getSched().getJobBundleDir() + FILE_SEP + ownerId + FILE_SEP + jobBundleId ); if ( file.exists() && file.isDirectory() ) { FileUtil.removeDir( file ); } // see if parent directory is empty, if so, remove it file = new File( getSched().getJobBundleDir() + FILE_SEP + ownerId ); File[] fileContents = file.listFiles(); if ( fileContents == null || fileContents.length == 0 ) { file.delete(); } } catch ( Exception ex ) {} } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -