causaldemo.java
来自「JGRoups源码」· Java 代码 · 共 213 行
JAVA
213 行
// $Id: CausalDemo.java,v 1.6 2005/05/30 16:14:40 belaban Exp $package org.jgroups.demos;import org.jgroups.*;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import java.io.Serializable;import java.util.Random;import java.util.Vector;/** * Simple causal demo where each member bcast a consecutive letter from the * alphabet and picks the next member to transmit the next letter. Start a * few instances of CausalDemo and pass a paramter "-start" to a CausalDemo * that initiates transmission of a letter A. All participanting members should * have correct alphabet. DISCARD layer has been added to simulate lost messages, * thus forcing delaying of delivery of a certain alphabet letter until the causally * prior one has been received. Remove CAUSAL from the stack and witness how FIFO * alone doesn't provide this guarantee. * * @author Vladimir Blagojevic */public class CausalDemo implements Runnable{ private Channel channel; private final Vector alphabet = new Vector(); private boolean starter = false; private int doneCount=0; private Log log=LogFactory.getLog(getClass()); private final String props = "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;" + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + "PING(timeout=2000;num_initial_members=5):" + "DISCARD(up=0.05;excludeitself=true):" + "FD_SOCK:" + "VERIFY_SUSPECT(timeout=1500):" + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800,9600):" + "UNICAST(timeout=5000):" + "pbcast.STABLE(desired_avg_gossip=2000):" + "FRAG(frag_size=4096;down_thread=false;up_thread=false):" + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + "shun=false;print_local_addr=true):CAUSAL"; public CausalDemo(boolean start) { starter = start; } public String getNext(String c) { char letter = c.charAt(0); return new String(new char[]{++letter}); } public void listAlphabet() { System.out.println(alphabet); } public void run() { Object obj; Message msg; Random r = new Random(); try { channel = new JChannel(props); channel.connect("CausalGroup"); System.out.println("View:" + channel.getView()); if (starter) channel.send(new Message(null, null, new CausalMessage("A", (Address) channel.getView().getMembers().get(0)))); } catch (Exception e) { System.out.println("Could not conec to channel"); } try { Runtime.getRuntime().addShutdownHook( new Thread("Shutdown cleanup thread") { public void run() { listAlphabet(); channel.disconnect(); channel.close(); } } ); } catch (Exception e) { System.out.println("Exception while shutting down" + e); } while (true) { try { CausalMessage cm = null; obj = channel.receive(0); // no timeout if (obj instanceof Message) { msg = (Message) obj; cm = (CausalMessage) msg.getObject(); Vector members = channel.getView().getMembers(); String receivedLetter = cm.message; if("Z".equals(receivedLetter)) { channel.send(new Message(null, null, new CausalMessage("done", null))); } if("done".equals(receivedLetter)) { if(++doneCount >= members.size()) { System.exit(0); } continue; } alphabet.add(receivedLetter); listAlphabet(); //am I chosen to transmit next letter? if (cm.member.equals(channel.getLocalAddress())) { int nextTarget = r.nextInt(members.size()); //chose someone other than yourself while (nextTarget == members.indexOf(channel.getLocalAddress())) { nextTarget = r.nextInt(members.size()); } Address next = (Address) members.get(nextTarget); String nextChar = getNext(receivedLetter); if (nextChar.compareTo("Z") < 1) { System.out.println("Sending " + nextChar); channel.send(new Message(null, null, new CausalMessage(nextChar, next))); } } } } catch (ChannelNotConnectedException conn) { break; } catch (Exception e) { log.error(e); } } } public static void main(String args[]) { CausalDemo test = null; boolean start=false; for(int i=0; i < args.length; i++) { if("-help".equals(args[i])) { System.out.println("CausalDemo [-help] [-start]"); return; } if("-start".equals(args[i])) { start=true; continue; } } //if parameter start is passed , start the demo test = new CausalDemo(start); try { new Thread(test).start(); } catch (Exception e) { System.err.println(e); } }}class CausalMessage implements Serializable{ public final String message; public final Address member; public CausalMessage(String message, Address member) { this.message = message; this.member = member; } public String toString() { return "CausalMessage[" + message + '=' + message + "member=" + member + ']'; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?