📄 dispatcherjob.java
字号:
package primecruncher;import net.jxta.endpoint.*;import net.jxta.pipe.*;import net.jxta.peergroup.*;import net.jxta.protocol.*;import net.jxta.document.*;import net.jxta.id.*;import java.util.*;import java.io.*;class DispatcherJob implements Runnable,PipeMsgListener,OutputPipeListener,Comparable{ // 线程类 private int low=0; private int high=0; private String jobID; private PeerInfoBundle peerInfoBundle; private PendingJob pendingJob; private int count=0; private PeerGroup group; private static PipeService pService=null; private InputPipe inPipe=null; private PipeAdvertisement inPipeAdv=null;/*****************************************************************/ public DispatcherJob(String jobID,int count,int low,int high,PeerInfoBundle peerInfoBundle,PendingJob job,PeerGroup group){ this.high=high; this.low=low; this.jobID=jobID; this.count=count; this.group=group; this.peerInfoBundle=peerInfoBundle; this.pendingJob=job; pService=group.getPipeService(); //创建管道服务 }/*****************************************************************/ public void run(){ //启动线程,分发任务 System.out.println("Starting dispatcher job execution with low="+low+"and high="+high); inPipeAdv=(PipeAdvertisement)AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType());//创建输入管道公告信息 final PipeService pService=group.getPipeService(); //创建管道服务 PipeID pid=IDFactory.newPipeID(group.getPeerGroupID()); //创建一个新的PipeID inPipeAdv.setPipeID(pid); inPipeAdv.setName("name"); try { group.getDiscoveryService().publish(inPipeAdv); //发布输入管道公告信息 inPipe=pService.createInputPipe(inPipeAdv,this); //创建输入管道 ModuleSpecAdvertisement adv=peerInfoBundle.getModuleSpecAdvertisement(); //取出要分发任务的对等节点的模块类公告信息 PipeAdvertisement otherPipe=adv.getPipeAdvertisement(); // 取出要分发任务的对等节点的管道公告信息 pService.createOutputPipe(otherPipe,this); //用要分发任务的对等节点的管道公告信息创建输出管道并和对等节点连接 } catch(Exception e){ System.out.println("An IO Exception occured:"+e.getMessage()); e.printStackTrace(); } }/*****************************************************************/ public void create(){ System.out.println("Starting dispatcher job execution with low="+low+"and high="+high); inPipeAdv=(PipeAdvertisement)AdvertisementFactory.newAdvertisement(PipeAdvertisement.getAdvertisementType()); final PipeService pService=group.getPipeService(); PipeID pid=IDFactory.newPipeID(group.getPeerGroupID()); inPipeAdv.setPipeID(pid); inPipeAdv.setName("name"); try { group.getDiscoveryService().publish(inPipeAdv); inPipe=pService.createInputPipe(inPipeAdv,this); ModuleSpecAdvertisement adv=peerInfoBundle.getModuleSpecAdvertisement(); PipeAdvertisement otherPipe=adv.getPipeAdvertisement(); pService.createOutputPipe(otherPipe,this); } catch(Exception e){ System.out.println("An IO Exception occured:"+e.getMessage()); e.printStackTrace(); } } public void outputPipeEvent(OutputPipeEvent event){ System.out.println("Connected to other peer's pipe"); try { Message message=pService.createMessage(); StructuredTextDocument doc=(StructuredTextDocument)inPipeAdv.getDocument(new MimeMediaType("text/xml")); InputStream is=doc.getStream(); /*&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&1*/ MessageElement advElement=message.newMessageElement(ServiceConstants.PIPEADV,new MimeMediaType("text/xml"),is); /*&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&1*/ StructuredTextDocument hiNumDoc=(StructuredTextDocument)StructuredDocumentFactory.newStructuredDocument( new MimeMediaType("text/xml"),ServiceConstants.HIGH_INT, new Integer(high).toString()); MessageElement highNum=message.newMessageElement(ServiceConstants.HIGH_INT, new MimeMediaType("text/xml"),hiNumDoc.getStream()); /*&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&*/ StructuredTextDocument lowNumDoc=(StructuredTextDocument)StructuredDocumentFactory.newStructuredDocument( new MimeMediaType("text/xml"),ServiceConstants.LOW_INT, new Integer(low).toString()); MessageElement lowNum=message.newMessageElement(ServiceConstants.LOW_INT, new MimeMediaType("text/xml"),lowNumDoc.getStream()); /*&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&*/ StructuredTextDocument //利用结构化的文档创建消息元素 idDoc=(StructuredTextDocument)StructuredDocumentFactory.newStructuredDocument( new MimeMediaType("text/xml"),ServiceConstants.JOBID,jobID); MessageElement //转化为消息类型,形成消息元素 idEl=message.newMessageElement(ServiceConstants.JOBID, new MimeMediaType("text/xml"),idDoc.getStream()); message.addElement(advElement); message.addElement(highNum); message.addElement(lowNum); message.addElement(idEl); event.getOutputPipe().send(message); System.out.println("Sent the message"); } catch (IOException e) { e.printStackTrace(); } }/*****************************************************************/ public int compareTo(Object o) { if(o instanceof DispatcherJob){ DispatcherJob other=(DispatcherJob)o; if (count<other.count) return -1; else if(count>other.count) return 1; } return 0; }/*****************************************************************/ public void pipeMsgEvent(PipeMsgEvent event) { System.out.println("Received a message!"); Message mes=null; MessageElement resEl=null; //Message mes=event.getMessage(); //MessageElement resEl=mes.getElement(ServiceConstants.RESULT); try { mes=event.getMessage(); resEl=mes.getElement(ServiceConstants.RESULT); StructuredTextDocument resDoc=(StructuredTextDocument)StructuredDocumentFactory.newStructuredDocument( new MimeMediaType("text/xml"),resEl.getStream()); //将服务端返回的result结果以结构化文档的形式读取出来 HashMap resmp=new HashMap(); //创建HashMap列表 Enumeration en=resDoc.getChildren(); //读取结果集中的元素id,type while(en.hasMoreElements()){ TextElement el=(TextElement)en.nextElement();//id,type分别取出 resmp.put(el.getName(),el.getTextValue()); } pendingJob.getResult(this,resmp); }catch(IOException e){ e.printStackTrace(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -