⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 queueprocessor.java

📁 这是一个基于计算网格的web service。它用java编写。一旦安装完成
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/* * ScheduleProcessor.java * * Created on April 21, 2005, 10:11 AM */package jwsgrid.scheduler.priv;import jwsgrid.scheduler.Scheduler;import jwsgrid.ws.resourcemanager.ResourceManagerWSSEI;import jwsgrid.ws.resourcemanager.ConstraintMatchException;import jwsgrid.ws.resourcemanager.ConstraintDistanceException;import jwsgrid.ws.jobhost.JobHostWSSEI;import jwsgrid.util.*;import jwsgrid.xsd.*;import jwsgrid.xsd.jobdescription.*;import jwsgrid.xsd.jobconstraints.*;import jwsgrid.xsd.jobhostsearchresult.*;import jwsgrid.wsdl.Proxy;import java.io.File;import java.io.FileNotFoundException;import java.io.IOException;import java.sql.Connection;import java.sql.SQLException;import java.sql.Timestamp;import java.util.GregorianCalendar;import java.util.List;import java.util.Map;import java.util.LinkedHashMap;import java.util.Set;import java.util.Iterator;import java.util.Vector;import javax.activation.DataHandler;import javax.activation.FileDataSource;/** * * @author sean */public class QueueProcessor implements Runnable {        ////////////////////////////////////////////////////////////////////////////    // public classes    //          public class JobDeployment    {        private String jobHostWsAddr;        private String jobHostJobId;        private String jobOwner;                public JobDeployment(                 String jobHostWsAddr,                 String jobHostJobId,                String jobOwner )        {            this.jobHostWsAddr = jobHostWsAddr;            this.jobHostJobId = jobHostJobId;            this.jobOwner = jobOwner;        }                public String getJobHostWsAddr() { return jobHostWsAddr; }        public String getJobHostJobId() { return jobHostJobId; }        public String getJobOwner() { return jobOwner; }    }        ////////////////////////////////////////////////////////////////////////////    // private attributes    //              private static final String FILE_SEP = System.getProperty( "file.separator" );     private static final int MS_PER_SEC = 1000;    private static final int MS_PER_MIN = MS_PER_SEC * 60;    private static final int MS_PER_HOUR = MS_PER_MIN * 60;           private Scheduler sched = null;    private boolean shutdownRequest = false;        ////////////////////////////////////////////////////////////////////////////    // public methods    //               /** Creates a new instance of ScheduleProcessor */    public QueueProcessor( Scheduler sched )     {        this.sched = sched;    }        public Scheduler getSched()     {        return sched;    }        public void stop()     {        shutdownRequest = true;    }        public void run()     {        Connection conn = null;        boolean success = false;        Sql.QueueResult queueResult = null;                while ( !shutdownRequest )        {            try            {                queueResult = null;                //                // handle queued jobs                //                            try                 {                    success = false;                    if ( conn == null )                    {                        conn = Sql.getConnection();                    }                    List<Sql.QueueResult> list = Sql.getQueued( conn, null );                    for ( int i = 0; i < list.size() && !shutdownRequest; i++ )                    {                        queueResult = list.get( i );                        getSched().logMessage(                                 Log.MSG_PRI_MIN,                                "'" + queueResult.getOwnerId() +                                 "' submitted job " +                                 "'" + queueResult.getSchedId() + "'" );                                                   GregorianCalendar cal = new GregorianCalendar();                        Timestamp tsSubmit = new Timestamp( cal.getTimeInMillis() );                        handleJobBundle( conn, list.get( i ) );                                                   cal = new GregorianCalendar();                        Timestamp tsDeploy = new Timestamp( cal.getTimeInMillis() );                        long queueSecs = (tsDeploy.getTime() - tsSubmit.getTime()) / 1000;                        System.out.println( "--> submit time: " + tsSubmit.toString() );                        System.out.println( "--> deploy time: " + tsDeploy.toString() );                         System.out.println( "--> queue time: " + queueSecs );                        // take a small break                        Thread.sleep( 50 );                                       }                            success = true;                } catch ( ResourceManagerProxyException resMgrEx ) {                    getSched().logMessage(                             Log.MSG_PRI_MAX,                            "error: " + resMgrEx.getMessage() );                                } catch ( SQLException sqle ) {                    getSched().logMessage(                             Log.MSG_PRI_MAX,                            "SQL error: " + sqle.getMessage() );                } catch ( Exception ex ) {                    getSched().logMessage(                             Log.MSG_PRI_MAX,                            "error: " + ex.getMessage() );                     if ( queueResult != null )                    {                        cleanup( queueResult.getOwnerId(),                                 queueResult.getSchedId(),                                 true,                                 true );                        try                         {                            Sql.deleteQueued(                                    conn,                                    queueResult.getOwnerId(),                                    queueResult.getSchedId() );                            Sql.insertDeployed(                                    conn,                                    queueResult.getOwnerId(),                                    queueResult.getSchedId(),                                    "",                                    0,                                    "",                                    "",                                    Scheduler.SCHED_STATUS_ERROR,                                    ex.getMessage() );                          } catch ( Exception e ) {}                    }                                              } finally {                    try                    {                        if ( conn != null && !success )                        {                            conn.close();                            conn = null;                        }                    } catch ( Exception ex ) {                        getSched().logMessage(                                 Log.MSG_PRI_MAX,                                 "SQL error while closing connection: "                                 + ex.getMessage() );                                     }                }                //                // take a break                //                Thread.sleep( MS_PER_SEC * 2 );                            } catch ( InterruptedException intEx ) {}                    }    }            ////////////////////////////////////////////////////////////////////////////    // private methods    //    private void handleJobBundle( Connection conn, Sql.QueueResult job )    throws SQLException, Exception    {        ResourceManagerWSSEI resMgrProxy = null;        String xmlSearchResult = null;        Sql.ConfigResult config = null;        JobDescriptionBundle jobDescriptions = null;        JobConstraintObject jobConstraints = null;        LinkedHashMap<String,List<JobDeployment>> deploymentTable =                 new LinkedHashMap();                // make sure job job zipfile exists        File jobBundleFile = new File(                getSched().getJobBundleDir() + FILE_SEP +                job.getOwnerId() + FILE_SEP +                job.getSchedId() + FILE_SEP +                job.getSchedId() + ".zip" );                if ( !jobBundleFile.exists() || !jobBundleFile.isFile() )        {            throw new Exception(                     "job job file '" + job.getSchedId() + ".zip'" +                    " does not exist" );        }               // retrieve configuration & create job job description object        try         {            config = Sql.getConfig( conn );                        jobDescriptions = JobDescr.createObj( job.getJobDescriptions() );             jobConstraints = JobConstraints.createtObj( job.getJobConstraints() );                    } catch ( SQLException ex ) {            throw ex;                    } catch ( Exception ex ) {            throw ex;        }                    // extract zipfile; should have the structure        //  job        //      \group1_id.zip        //      \group2_id.zip        //      ...        //      \groupN_id.zip        try        {            File destDirFile = new File(                    getSched().getJobBundleDir() + FILE_SEP +                    job.getOwnerId() + FILE_SEP +                    job.getSchedId() );                        if ( !destDirFile.exists() )            {                destDirFile.mkdirs();            }                        Zip.extractZipFile( jobBundleFile, destDirFile, false );            jobBundleFile.delete();                   } catch ( Exception ex ) {            throw new Exception(                     "an error occured while trying to extract the " +                    "submitted job job file" );        }                     // verify file structure        try        {            List<JobDescriptionType> list = jobDescriptions.getJobDescrList();            for ( int i = 0; i < list.size(); i++ )            {                      File file = new File(                        getSched().getJobBundleDir() + FILE_SEP +                        job.getOwnerId() + FILE_SEP +                        job.getSchedId() + FILE_SEP +                         list.get( i ).getGroupId() + ".zip" );                                if ( !file.exists() )                {                    throw new Exception(                            "group '" + list.get( i ).getGroupId() +                             "' is missing its data file '" +                             list.get( i ).getGroupId() + ".zip'" );                }            }                              } catch ( Exception ex ) {            throw ex;        }                    // create and send a job host search request        try        {            // get a handle on the resource manager            try            {                resMgrProxy = Proxy.getResourceManagerProxy(                         config.getResMgrWsAddr() );                                resMgrProxy.ping();                            } catch ( Exception ex ) {                throw new ResourceManagerProxyException();            }                        // send request to resource manager -- if contraints cannot be            // satisfied, use constraint fallback mechanism              try            {                xmlSearchResult = resMgrProxy.searchJobHosts(                         job.getJobConstraints() );                            } catch ( ConstraintMatchException ex ) {                throw ex;                            } catch ( ConstraintDistanceException ex ) {                throw ex;                            } catch ( Exception ex ) {                throw ex;            }                                    }  catch ( Exception ex ) {                          throw ex;                   }                // deploy job(s) on returned job host(s)        try        {            JobHostSearchResult searchResult = JobHost.createSearchResultObj(                    xmlSearchResult );                        List<JobHostGroupType> resultList =                     searchResult.getJobHostGroupList();                        for ( int i = 0; i < resultList.size(); i++ )            {                List<String> wsAddrList = resultList.get( i ).getWsAddrList();                int failureCount = 0;                                for ( int j = 0; j < wsAddrList.size(); j++ )                {                    JobHostWSSEI jobHostProxy = null;                    boolean addedToTable = false;                    String jobId = null;                    try                     {                                                  getSched().logMessage(                                 Log.MSG_PRI_MIN,                                 "deploying group '" +                                 resultList.get( i ).getGroupId() + "', " +                                "job (" + (j+1) + ") from job '" +                                job.getSchedId() + " to job host: " +                                wsAddrList.get( j ) );                                                // get a handle on the current job's zipfile                        File file = new File(

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -