📄 dispatchergroup.java
字号:
package cn.edu.hust.cgcl.biogrid.monitor;
import java.io.BufferedReader;
import java.io.PrintWriter;
import java.text.ParseException;
import java.util.LinkedList;
import java.util.Vector;
import cn.edu.hust.cgcl.biogrid.dispatcher.DispatcherInfo;
/**
* <p>Title: </p>
* <p>Description: dispatcher组类</p>
* <p>Copyright: Copyright (c) 2004</p>
* <p>Company: </p>
* @author not attributable
* @version 1.0
*/
public class DispatcherGroup
{
private Vector dispatcherArray;
private String firstMonitorId, secondMonitorId;
private String Id="G0101";//注:初始化时应初始化id。
private CountDispatcher cd;
public DispatcherGroup()
{
this("", "");
}
public DispatcherGroup(String fmid, String smid)
{
dispatcherArray = new Vector();
this.firstMonitorId = fmid;
this.secondMonitorId = smid;
this.cd=new CountDispatcher(1,dispatcherArray);
cd.start();
} // DispatcherGroup
public String getId()
{
return this.Id;
}
public int getNodeCount()
{
return this.dispatcherArray.size();
}
public DispatcherNode getNode(int index)
{
if (index>=dispatcherArray.size())
{
return null;
}
return (DispatcherNode)this.dispatcherArray.get(index);
}
// 获取dispatcher组的统计信息
public int getInfo()
{
return dispatcherArray.capacity(); //这里只是简单地输出dispatcher组中的dispatcher个数
} // GetInfo
// 获取主monitor
public String getFirstMonitor()
{
return this.firstMonitorId;
} // GetFirstMonitor
// 获取从monitor
public String getSecondaryMonitor()
{
return this.secondMonitorId;
} // GetFirstMonitor
// 获取dispatcher节点
public DispatcherInfo getDispatcherNode(int index)
{
return ( (DispatcherNode) (dispatcherArray.get(index))).getNodeInfo();
} // getDispatcherNode
// 接收新的计算任务
// is: 与用户主程序通信socket的接收BufferedReader
// os: 与用户主程序通信socket的接收PrintWriter
public boolean getJob(BufferedReader is,
PrintWriter os,
MonitorJob job,
int jobcount)
throws Exception
{
LinkedList sortList=SortIdleWorker();
DispatcherNode tmpNode=null;
DispatcherInfo tmpInfo=null;
int countArr[]=new int[sortList.size()];
int tmpCount=jobcount;
int tmpIdleCount;
int i;
for (i=0; i<sortList.size(); i++)
{
countArr[i]=0;
} // for
// 调度dispatcher执行子任务
for (i=0; i<sortList.size() && tmpCount>0; i++)
{
// 把子任务分派给有空闲worker的dispatcher
tmpNode=(DispatcherNode)sortList.get(i);
tmpInfo=tmpNode.getNodeInfo();
if ((tmpIdleCount=tmpInfo.getIdleWorkerCount())<=0)
{
break;
} // if
if (tmpCount<tmpIdleCount)
{
countArr[i]=tmpCount;
} // if
else
{
countArr[i] = tmpIdleCount;
} // else
tmpCount-=tmpIdleCount;
//tmpNode.dispatchJob(job, tmpIdleCount);
} // for
if (tmpCount > 0)
{
// 寻找所有空闲worker数不小于0的节点
while (i < sortList.size())
{
tmpNode = (DispatcherNode) sortList.get(i);
tmpInfo = tmpNode.getNodeInfo();
if (tmpInfo.getIdleWorkerCount() < 0)
{
break;
} // if
i++;
} // while
// 如果job还没分配完
if (tmpCount<i)
{
countArr[0]+=tmpCount;
tmpCount=0;
} // if
else
{
tmpIdleCount = tmpCount / i;
if (tmpIdleCount*i<tmpCount)
{
tmpIdleCount++;
} // if
for (int j = 0; j < i && tmpCount > 0; j++)
{
countArr[j] += tmpIdleCount;
tmpCount -= tmpIdleCount;
} // for
} // esle
} // if
// 分配
StringBuffer tmpStr = new StringBuffer();
for (i = 0; i < sortList.size(); i++)
{
// 主dispatcher
tmpNode = (DispatcherNode) sortList.get(i);
tmpNode.dispatchJob(job, countArr[i]);
tmpStr.append(countArr[i]);
tmpStr.append('#');
tmpStr.append(tmpNode.GetIp());
tmpStr.append('#');
tmpStr.append(tmpNode.getTaskPort());
tmpStr.append('#');
// 后备dispatcher
tmpNode=(DispatcherNode) sortList.get(sortList.size()-i-1);
tmpStr.append(tmpNode.GetIp());
tmpStr.append('#');
tmpStr.append(tmpNode.GetPort());
//tmpStr.append();
os.println(tmpStr);
tmpStr.delete(0, tmpStr.length());
} // for
os.println("<dispatcher finish>");
sortList=null;
return true;
} // getProject
// 对dispatcher队列,根据空闲worker数进行排队,从大到小
private LinkedList SortIdleWorker()
{
LinkedList linkList=new LinkedList();
int count=this.dispatcherArray.size();
for (int i=0; i<count; i++)
{
DispatcherNode tmpNode=(DispatcherNode)dispatcherArray.elementAt(i);
int position=0;
while (position<linkList.size())
{
DispatcherInfo tmpInfo=tmpNode.getNodeInfo();
int tmpCount=tmpInfo.getJobCount();
if (tmpCount>((DispatcherInfo)linkList.get(position)).getJobCount())
{
linkList.add(position, tmpNode);
} // if
} // while
if (position>=linkList.size())
{
linkList.add(position, tmpNode);
} // if
} // for
return linkList;
} // SortIdleWorker
// 结束计算项目任务
public boolean cancelJob(String jobId)
{
//要查找jobid属于哪个dispatcher节点
int k = -1;
int n = this.getInfo();
for (int i = 0; i < n; i++)
{
//取得一个dispatcher的job队列
Vector m = ( (DispatcherNode) (this.dispatcherArray.get(i))).
getNodeInfo().getDispacherJob();
//再比较jobid
for (int j = 0; j < m.capacity(); j++)
{
if (jobId.compareToIgnoreCase( (String) (m.get(j))) == 0)
{
k = i;
break;
} // if
} // for
} // for
if (k < 0)
{
return false;
}
else
{
return ( (DispatcherNode) (this.dispatcherArray.get(k))).cancelJob(
jobId);
}
} // cancilProject
public void workerJoin(BufferedReader is, PrintWriter os)
throws Exception
{
System.out.println("Dispatcher Group Worker Join In");
DispatcherNode tmpNode=getMaxTaskQueueNode();
DispatcherInfo tmpinfo=tmpNode.getNodeInfo();
os.println(tmpNode.GetIp());
os.println(tmpNode.getWorkerPort());
os.flush();
return;
} // workerJoin
public DispatcherNode getMaxTaskQueueNode()
throws Exception
{
if (this.dispatcherArray.size()<=0)
{
throw new ParseException("protocol parse error!", 0);
} // if
DispatcherNode tmpNode=(DispatcherNode)dispatcherArray.get(0);
DispatcherInfo tmpinfo=tmpNode.getNodeInfo();
int tmpcount=tmpinfo.getIdleWorkerCount();
for (int i=0; i<this.dispatcherArray.size(); i++)
{
tmpinfo=
((DispatcherNode)dispatcherArray.get(i)).getNodeInfo();
if (tmpinfo.getIdleWorkerCount()<tmpcount)
{
tmpcount=tmpinfo.getIdleWorkerCount();
tmpNode=(DispatcherNode)(dispatcherArray.get(i));
} // if
} // for
return tmpNode;
} // getMaxTaskQueueNode
public void addNode(DispatcherNode node)
{
synchronized(dispatcherArray)
{
this.dispatcherArray.add(node);
}
} // addNode
public void quit()
{
System.out.println("DispatcherGroup start to quitting...");
for(int i=0;i<dispatcherArray.size();i++)
{
DispatcherNode dn=(DispatcherNode)dispatcherArray.elementAt(i);
dn.interrupt();
dn.quit();
}
cd.quit();
// synchronized(dispatcherArray)
// {
// Iterator it=dispatcherArray.iterator();
// System.out.println(dispatcherArray.size());
// while(it.hasNext())
// {
// it.next();
// DispatcherNode dn=(DispatcherNode)it;
// System.out.println(dn.isActive);
// dn.isActive=false;
// System.out.println(dn.isActive);
//// dn.interrupt();
//// dn.quit();
//// dn.interrupt();
// }
// }
System.out.println("DispatcherGroup's quitting finished!");
}
public boolean removeNode(DispatcherNode node)
{
synchronized(dispatcherArray)
{
for (int i = 0; i < dispatcherArray.size(); i++)
{
DispatcherNode tmpNode=(DispatcherNode)dispatcherArray.get(i);
if (tmpNode==node)
{
dispatcherArray.remove(i);
return true;
} // if
} // for
} // synchronized
return false;
}
} // DispatcherGroup
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -