⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dispatchergroup.java

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 JAVA
字号:
package cn.edu.hust.cgcl.biogrid.monitor;

import java.io.BufferedReader;
import java.io.PrintWriter;
import java.text.ParseException;
import java.util.LinkedList;
import java.util.Vector;

import cn.edu.hust.cgcl.biogrid.dispatcher.DispatcherInfo;

/**
 * <p>Title: </p>
 * <p>Description: dispatcher组类</p>
 * <p>Copyright: Copyright (c) 2004</p>
 * <p>Company: </p>
 * @author not attributable
 * @version 1.0
 */

public class DispatcherGroup
{
    private Vector dispatcherArray;
    private String firstMonitorId, secondMonitorId;
    private String Id="G0101";//注:初始化时应初始化id。
    private CountDispatcher cd;

    public DispatcherGroup()
    {
        this("", "");
    }

    public DispatcherGroup(String fmid, String smid)
    {
        dispatcherArray = new Vector();
        this.firstMonitorId = fmid;
        this.secondMonitorId = smid;
        this.cd=new CountDispatcher(1,dispatcherArray);
        cd.start();
    } // DispatcherGroup

    public String getId()
    {
        return this.Id;
    }

    public int getNodeCount()
    {
        return this.dispatcherArray.size();
    }

    public DispatcherNode getNode(int index)
    {
        if (index>=dispatcherArray.size())
        {
            return null;
        }
        return (DispatcherNode)this.dispatcherArray.get(index);
    }

    // 获取dispatcher组的统计信息
    public int getInfo()
    {
        return dispatcherArray.capacity(); //这里只是简单地输出dispatcher组中的dispatcher个数
    } // GetInfo

    // 获取主monitor
    public String getFirstMonitor()
    {
        return this.firstMonitorId;
    } // GetFirstMonitor

    // 获取从monitor
    public String getSecondaryMonitor()
    {
        return this.secondMonitorId;
    } // GetFirstMonitor

    // 获取dispatcher节点
    public DispatcherInfo getDispatcherNode(int index)
    {
        return ( (DispatcherNode) (dispatcherArray.get(index))).getNodeInfo();
    } // getDispatcherNode

    // 接收新的计算任务
    // is: 与用户主程序通信socket的接收BufferedReader
    // os: 与用户主程序通信socket的接收PrintWriter
    public boolean getJob(BufferedReader is,
                          PrintWriter os,
                          MonitorJob job,
                          int jobcount)
    throws Exception
    {
        LinkedList sortList=SortIdleWorker();
        DispatcherNode tmpNode=null;
        DispatcherInfo tmpInfo=null;
        int countArr[]=new int[sortList.size()];
        int tmpCount=jobcount;
        int tmpIdleCount;
        int i;
        for (i=0; i<sortList.size(); i++)
        {
            countArr[i]=0;
        } // for

        // 调度dispatcher执行子任务
        for (i=0; i<sortList.size() && tmpCount>0; i++)
        {
            // 把子任务分派给有空闲worker的dispatcher
            tmpNode=(DispatcherNode)sortList.get(i);
            tmpInfo=tmpNode.getNodeInfo();
            if ((tmpIdleCount=tmpInfo.getIdleWorkerCount())<=0)
            {
                break;
            } // if
            if (tmpCount<tmpIdleCount)
            {
                countArr[i]=tmpCount;
            } // if
            else
            {
                countArr[i] = tmpIdleCount;
            } // else
            tmpCount-=tmpIdleCount;
            //tmpNode.dispatchJob(job, tmpIdleCount);
        } // for
        if (tmpCount > 0)
        {
            // 寻找所有空闲worker数不小于0的节点
            while (i < sortList.size())
            {
                tmpNode = (DispatcherNode) sortList.get(i);
                tmpInfo = tmpNode.getNodeInfo();
                if (tmpInfo.getIdleWorkerCount() < 0)
                {
                    break;
                } // if
                i++;
            } // while

            // 如果job还没分配完
            if (tmpCount<i)
            {
                countArr[0]+=tmpCount;
                tmpCount=0;
            } // if
            else
            {
                tmpIdleCount = tmpCount / i;
                if (tmpIdleCount*i<tmpCount)
                {
                    tmpIdleCount++;
                } // if
                for (int j = 0; j < i && tmpCount > 0; j++)
                {
                    countArr[j] += tmpIdleCount;
                    tmpCount -= tmpIdleCount;
                } // for
            } // esle
        } // if
        // 分配
        StringBuffer tmpStr = new StringBuffer();
        for (i = 0; i < sortList.size(); i++)
        {
            // 主dispatcher
            tmpNode = (DispatcherNode) sortList.get(i);
            tmpNode.dispatchJob(job, countArr[i]);
            tmpStr.append(countArr[i]);
            tmpStr.append('#');
            tmpStr.append(tmpNode.GetIp());
            tmpStr.append('#');
            tmpStr.append(tmpNode.getTaskPort());
            tmpStr.append('#');

            // 后备dispatcher
            tmpNode=(DispatcherNode) sortList.get(sortList.size()-i-1);
            tmpStr.append(tmpNode.GetIp());
            tmpStr.append('#');
            tmpStr.append(tmpNode.GetPort());

            //tmpStr.append();
            os.println(tmpStr);

            tmpStr.delete(0, tmpStr.length());
        } // for
        os.println("<dispatcher finish>");

        sortList=null;
        return true;
    } // getProject

    // 对dispatcher队列,根据空闲worker数进行排队,从大到小
    private LinkedList SortIdleWorker()
    {
        LinkedList linkList=new LinkedList();
        int count=this.dispatcherArray.size();
        for (int i=0; i<count; i++)
        {
            DispatcherNode tmpNode=(DispatcherNode)dispatcherArray.elementAt(i);
            int position=0;
            while (position<linkList.size())
            {
                DispatcherInfo tmpInfo=tmpNode.getNodeInfo();
                int tmpCount=tmpInfo.getJobCount();
                if (tmpCount>((DispatcherInfo)linkList.get(position)).getJobCount())
                {
                    linkList.add(position, tmpNode);
                } // if
            } // while
            if (position>=linkList.size())
            {
                linkList.add(position, tmpNode);
            } // if
        } // for
        return linkList;
    } // SortIdleWorker

    // 结束计算项目任务

    public boolean cancelJob(String jobId)
    {
        //要查找jobid属于哪个dispatcher节点
        int k = -1;
        int n = this.getInfo();
        for (int i = 0; i < n; i++)
        {
            //取得一个dispatcher的job队列
            Vector m = ( (DispatcherNode) (this.dispatcherArray.get(i))).
                getNodeInfo().getDispacherJob();
            //再比较jobid
            for (int j = 0; j < m.capacity(); j++)
            {
                if (jobId.compareToIgnoreCase( (String) (m.get(j))) == 0)
                {
                    k = i;
                    break;
                } // if
            } // for
        }  // for
        if (k < 0)
        {
            return false;
        }
        else
        {
            return ( (DispatcherNode) (this.dispatcherArray.get(k))).cancelJob(
                jobId);
        }
    } // cancilProject

    public void workerJoin(BufferedReader is, PrintWriter os)
    throws Exception
    {
        System.out.println("Dispatcher Group Worker Join In");
        DispatcherNode tmpNode=getMaxTaskQueueNode();
        DispatcherInfo tmpinfo=tmpNode.getNodeInfo();
        os.println(tmpNode.GetIp());
        os.println(tmpNode.getWorkerPort());
        os.flush();
        return;
    } // workerJoin

    public DispatcherNode getMaxTaskQueueNode()
    throws Exception
    {
        if (this.dispatcherArray.size()<=0)
        {
            throw new ParseException("protocol parse error!", 0);
        } // if
        DispatcherNode tmpNode=(DispatcherNode)dispatcherArray.get(0);
        DispatcherInfo tmpinfo=tmpNode.getNodeInfo();
        int tmpcount=tmpinfo.getIdleWorkerCount();
        for (int i=0; i<this.dispatcherArray.size(); i++)
        {
            tmpinfo=
                ((DispatcherNode)dispatcherArray.get(i)).getNodeInfo();
            if (tmpinfo.getIdleWorkerCount()<tmpcount)
            {
                tmpcount=tmpinfo.getIdleWorkerCount();
                tmpNode=(DispatcherNode)(dispatcherArray.get(i));
            } // if
        } // for
        return tmpNode;
    } // getMaxTaskQueueNode

    public void addNode(DispatcherNode node)
    {
        synchronized(dispatcherArray)
        {
            this.dispatcherArray.add(node);
        }
    } // addNode
    
    public void quit()
    {
    	System.out.println("DispatcherGroup start to quitting...");
    	for(int i=0;i<dispatcherArray.size();i++)
    	{
    		DispatcherNode dn=(DispatcherNode)dispatcherArray.elementAt(i);
    		dn.interrupt();
    		dn.quit();
    	}
    	cd.quit();
//    	synchronized(dispatcherArray)
//		{
//    		Iterator it=dispatcherArray.iterator();
//    		System.out.println(dispatcherArray.size());
//    		while(it.hasNext())
//    		{
//    			it.next();
//    			DispatcherNode dn=(DispatcherNode)it;
//    			System.out.println(dn.isActive);
//    			dn.isActive=false;
//    			System.out.println(dn.isActive);
////    			dn.interrupt();
////    			dn.quit();
////    			dn.interrupt();
//    		}
//		}
    	System.out.println("DispatcherGroup's quitting finished!");
    }

    public boolean removeNode(DispatcherNode node)
    {
        synchronized(dispatcherArray)
        {
            for (int i = 0; i < dispatcherArray.size(); i++)
            {
                DispatcherNode tmpNode=(DispatcherNode)dispatcherArray.get(i);
                if (tmpNode==node)
                {
                    dispatcherArray.remove(i);
                    return true;
                } // if
            } // for
        } // synchronized
        return false;
    }

} // DispatcherGroup

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -