⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 .#dispatchernodemanagement.java.1.9

📁 分布式计算平台P2HP-1的源代码;P2HP-1是基于P2P的高性能计算平台
💻 9
📖 第 1 页 / 共 3 页
字号:
                                {
                                    wn = (WorkerNode) (deadWorkerNodeList.
                                        elementAt(j));
                                    workerNodeList.addElement(wn);
                                    synchronized (idleWorkerNodeList)
                                    {
                                        idleWorkerNodeList.addElement(wn.
                                            getWorkerId()); //应该以workerid入队列
                                    }
                                    deadWorkerNodeList.removeElementAt(j);
                                    dispatcherInfo.setWorkerCount(
                                        workerNodeList.size());
                                    dispatcherInfo.setIdleWorkerCount(
                                        idleWorkerNodeList.size());
                                    wn.revive();
                                }
                            }
                            synchronized(dummy)
							{
                                try{
                                dummy.notifyAll();
                                }catch(IllegalMonitorStateException e)
								{
                                	
								}
							}
                        }
                        else
                            wn = (WorkerNode) (workerNodeList.
                                               elementAt(i));
                    }
                    //找到这个worker,接着:
                    if (lineStr.equals("<worker alive>"))
                    {
                        SubJob sj = wn.
                            alive(); //寿命恢复到3
                        if (sj != null)
                        {
                            os.println("<new job>");
                            os.flush();
                            if (is.readLine().equals("<new job accept>"))
                            {
                            	System.out.println("<new job accept>");
                            	//oob = new ObjectOutputStream(s.getOutputStream());
                                oob.writeObject(sj); //传送datapoll信息给worker
                                //oob.close();
                                oob.flush();
                                wn.
                                    setWorkerState(1); //将worker状态置为busy
                                sj.setWorkerState(1);
                                //这里有一个延时问题。
                                dispatcherInfo.setIdleWorkerCount(
                                    idleWorkerNodeList.size());
                            }
                        }
                        os.println("<dispatcher alive>");
                        os.flush();
                        //System.out.println("<dispatcher alive>");
                    }
                    else {
                    if (lineStr.equals("<subjob hava finished>"))
                    {
                    	is.readLine();//jobid, unuseless
                    	is.readLine();//subjobid, unuseless
                        SubJob sj = wn.
                            alive(); //寿命恢复到3
                        wn.
                            workFinish();
                        idleWorkerNodeList.addElement(wn.getWorkerId());
                        dispatcherInfo.setIdleWorkerCount(idleWorkerNodeList.
                            size());
                        synchronized(dummy)
						{
                        try{
                        dummy.notifyAll();
                        }
                        catch(IllegalMonitorStateException e)
						{
                        	
						}
						}
                        os.println("<dispatcher hava known>");
                        os.flush();
                        sj.finishedTime=System.currentTimeMillis();
                        sj.computingTime=sj.finishedTime-sj.distributeTime;
                        calAveTime(sj,sj.computingTime);
                        if(sj.redun_num!=0)
                        {//有冗余计算,从urgentJobList中删除
                        	LinkedList tmplist=DispatcherJobManagement.urgentJobList;
                        	synchronized(tmplist)
                			{
                        		for(int j=0;i<tmplist.size();i++)
                        		{
                        			SubJob tmpsj=(SubJob)tmplist.get(j);
                        			if(sj.getSubJobId().equals(tmpsj.getSubJobId()))
                        				{
                        				  tmplist.remove(j);
                        				  break;
                        				}
                        		}
                			}
                        }
                        //从runningJobList中删除
                        LinkedList tmplist=DispatcherJobManagement.runningJobList;
                    	synchronized(tmplist)
            			{
                    		for(int j=0;i<tmplist.size();i++)
                    		{
                    			SubJob tmpsj=(SubJob)tmplist.get(j);
                    			if(sj.getSubJobId().equals(tmpsj.getSubJobId()))
                    				{
                    				  tmplist.remove(j);
                    				  break;
                    				}
                    		}
            			}                   
                        
                    }
                    else if (lineStr.equals("<worker quit>"))
                    {
                        //交给workernode的一个函数处理
                        wn.workerQuit();
                        os.println("<worker quit agree>");
                        os.flush();
                    }
                    
                    this.s.close();
                    return;
                   }//else
                 }//else
                }//try
                catch (IOException e)
                {
                	try
                    {
                        is.close();
                        s.close();
                        os.close();
                        return;
                    }
                    catch (IOException e1)
                    {
                        log(e);
                        System.out.println(e.toString());
                        flag=false;
                        //error("Error closing socket", e);
                    }
                    //log(e);
                    System.out.println(e.toString());
                    //e.printStackTrace();
                    flag=false;
                    return;
                    //error("I/O error", e);
                }
            }//while
            	try
				{
            	if(!this.s.isClosed())
            		this.s.close();
            	flag=false;
				}catch(Exception e)
				{
					
				}
				return;
            }//run
//.....
        }

        public void terminate()
        {
        	 System.out.println("listening handler quitting...");
        	subflag=false;
            isActive = false;
            
        }
        
        public void calAveTime(SubJob sj,long t)
        {
        	String jobId=sj.getJobId();
        	Job tmpjob=null;
        	synchronized(jobList){
        		int j=0;
                for (j=0; j < jobList.size(); j++)
                {
                	tmpjob=(Job) (jobList.elementAt(j));
                	//System.out.println(jobId+"  "+tmpjob.getJobId());
                    if (jobId.equals( tmpjob.getJobId()))
                        break;
                }
                if(j>=jobList.size())
                {
                System.out.println("Don't find this job in jobList!");
                return;
                }
                tmpjob.calAveTime(t);
                //else tmpjob=(Job) (jobList.elementAt(j));
            }
        	jfth.start();
        }

    }

    /**
     *
     * <p>Title: </p>
     * <p>Description: 定期检查有没有死亡的worker,将这些workernode从workernodelist队列移入deadworkerlist队列</p>
     * <p>Copyright: Copyright (c) 2004</p>
     * <p>Company: </p>
     * @author not attributable
     * @version 1.0
     */
    public class TestDeadWorker
        extends Thread
    {
        private boolean flag = true;
        private boolean subflag=true;
        private static final int POLL_INTERVAL = 10 * 1000;
        public void run()
        {
            while (flag)
            {
                this.poll();
                try
                {
                	if(!interrupted())
                     Thread.sleep(POLL_INTERVAL);
                	else  flag=false;                		
                }
                catch (InterruptedException e)
                {
                	flag=false;
                //get back to work
                }

            }
            System.out.println("Testdeadworker handler quit!");
        }

        public void poll()
        {
            while (subflag&&deadFlag.size() != 0)
            {
            	System.out.println("deadwork: "+deadFlag.size());
                String s = null;
                WorkerNode tmpwn=null;
                synchronized (deadFlag)
                {
                	s=(String)deadFlag.removeFirst();
                }
                synchronized (workerNodeList)
                {
                    int i = 0;
                    boolean b=true;
                    for (; i < workerNodeList.size(); i++)
                    {
                    	tmpwn=(WorkerNode) (workerNodeList.elementAt(i));
                        if (s.equals( tmpwn.getWorkerId()))
                            {
                        	String deadW_ip=tmpwn.getWorkerIp();
                            workerNodeList.removeElementAt(i);
                            dispatcherInfo.setWorkerCount(workerNodeList.size());
                            b=false;
                            if(Parameter.dispatcherLogIsActive)
                            {
                            	LogFile lf=new LogFile(Parameter.logFileName);
                            	try{
                            		lf.logWorker_quit(deadW_ip,s,dispatcherInfo.getWorkerCount());
                            	}
                            	catch(Exception e)
                     		{
                            		e.printStackTrace();
                     		}
                            }
                        	break;
                            }
                    }
                    if (b)
                    {
                        System.out.println(
                            "Don't find this dead workernode in workerNodeList!");
                        continue;
                    }
                }
                synchronized (deadWorkerNodeList)
                   {
                        deadWorkerNodeList.addElement(tmpwn);
                   }
                    if(tmpwn.atIdleListFlag)
                        synchronized(idleWorkerNodeList)
						{
                        	Iterator it=idleWorkerNodeList.iterator();
                        	String tmps="";
                        	boolean b=true;
                        	while(it.hasNext())
                        	{
                        	   tmps=(String)it.next();
                        	   if(s.equals(tmps))
                        	   {
                        	   	idleWorkerNodeList.remove(tmps);
                        	   	dispatcherInfo.setIdleWorkerCount(idleWorkerNodeList.size());
                        	   	b=false;
                        	   	break;
                        	   }	
                             }
                        	if (b) System.out.println("Don't find this worker at idleWorkerNodelist(but the falg 'atIdleListFlag' is true)!");
                          }
            }//while
        } //pool

        public void terminate()
        {
        	System.out.println("Testdeadworker handler quitting...");
        	subflag=false;
            flag = false;
        }
    } //TestDead

    public void log(Exception e)
    {
        //e.printStackTrace();
    	System.out.println(e.toString());
    }

 

} //DispatcherNodeManagement

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -