simulator.java
来自「JGRoups源码」· Java 代码 · 共 228 行
JAVA
228 行
package org.jgroups.debug;import org.jgroups.Address;import org.jgroups.Event;import org.jgroups.Message;import org.jgroups.View;import org.jgroups.stack.Protocol;import org.jgroups.stack.ProtocolStack;import org.jgroups.util.Queue;import org.jgroups.util.QueueClosedException;import java.util.HashMap;import java.util.Iterator;/** * Tests one or more protocols independently. Look at org.jgroups.tests.FCTest for an example of how to use it. * @author Bela Ban * @version $Id: Simulator.java,v 1.6 2005/08/22 14:12:53 belaban Exp $ */public class Simulator { private Protocol[] protStack=null; private ProtocolAdapter ad=new ProtocolAdapter(); ProtocolStack prot_stack=null; private Receiver r=null; private Protocol top=null, bottom=null; private Queue send_queue=new Queue(); private Thread send_thread; private Queue recv_queue=new Queue(); private Thread recv_thread; /** HashMap from Address to Simulator. */ private final HashMap addrTable=new HashMap(); private Address local_addr=null; private View view; public interface Receiver { void receive(Event evt); } public void setProtocolStack(Protocol[] stack) { this.protStack=stack; this.protStack[0].setUpProtocol(ad); this.protStack[this.protStack.length-1].setDownProtocol(ad); top=protStack[0]; bottom=this.protStack[this.protStack.length-1]; prot_stack=new ProtocolStack(); if(protStack.length > 1) { for(int i=0; i < protStack.length; i++) { Protocol p1=protStack[i]; p1.setProtocolStack(prot_stack); Protocol p2=i+1 >= protStack.length? null : protStack[i+1]; if(p2 != null) { p1.setDownProtocol(p2); p2.setUpProtocol(p1); } } } } public String dumpStats() { StringBuffer sb=new StringBuffer(); for(int i=0; i < protStack.length; i++) { Protocol p1=protStack[i]; sb.append(p1.getName()).append(":\n").append(p1.dumpStats()).append("\n"); } return sb.toString(); } public void addMember(Address addr) { addMember(addr, this); } public void addMember(Address addr, Simulator s) { addrTable.put(addr, s); } public void setLocalAddress(Address addr) { this.local_addr=addr; } public void setView(View v) { this.view=v; } public void setReceiver(Receiver r) { this.r=r; } public void send(Event evt) { top.down(evt); } public void receive(Event evt) { try { Event copy; if(evt.getType() == Event.MSG && evt.getArg() != null) { copy=new Event(Event.MSG, ((Message)evt.getArg()).copy()); } else copy=evt; recv_queue.add(copy); } catch(QueueClosedException e) { } } public void start() throws Exception { if(local_addr == null) throw new Exception("local_addr has to be non-null"); if(protStack == null) throw new Exception("protocol stack is null"); bottom.up(new Event(Event.SET_LOCAL_ADDRESS, local_addr)); if(view != null) { Event view_evt=new Event(Event.VIEW_CHANGE, view); bottom.up(view_evt); top.down(view_evt); } for(int i=0; i < protStack.length; i++) { Protocol p=protStack[i]; p.setProtocolStack(prot_stack); } for(int i=0; i < protStack.length; i++) { Protocol p=protStack[i]; p.init(); } for(int i=0; i < protStack.length; i++) { Protocol p=protStack[i]; p.start(); } send_thread=new Thread() { public void run() { Event evt; while(send_thread != null) { try { evt=(Event)send_queue.remove(); if(evt.getType() == Event.MSG) { Message msg=(Message)evt.getArg(); Address dst=msg.getDest(); if(msg.getSrc() == null) ((Message)evt.getArg()).setSrc(local_addr); Simulator s; if(dst == null) { for(Iterator it=addrTable.values().iterator(); it.hasNext();) { s=(Simulator)it.next(); s.receive(evt); } } else { s=(Simulator)addrTable.get(dst); if(s != null) s.receive(evt); } } } catch(QueueClosedException e) { send_thread=null; break; } } } }; send_thread.start(); recv_thread=new Thread() { public void run() { Event evt; while(recv_thread != null) { try { evt=(Event)recv_queue.remove(); bottom.up(evt); } catch(QueueClosedException e) { recv_thread=null; break; } } } }; recv_thread.start(); } public void stop() { recv_thread=null; recv_queue.close(false); send_thread=null; send_queue.close(false); } class ProtocolAdapter extends Protocol { public String getName() { return "ProtocolAdapter"; } public void up(Event evt) { if(r != null) r.receive(evt); } /** send to unicast or multicast destination */ public void down(Event evt) { try { send_queue.add(evt); } catch(QueueClosedException e) { } } }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?