messagedispatchertestasync.java
来自「JGRoups源码」· Java 代码 · 共 184 行
JAVA
184 行
// $Id: MessageDispatcherTestAsync.java,v 1.11 2006/08/28 06:51:54 belaban Exp $package org.jgroups.tests;import org.jgroups.*;import org.jgroups.blocks.MessageDispatcher;import org.jgroups.blocks.RequestHandler;import org.jgroups.blocks.RspCollector;import org.jgroups.debug.Debugger;import org.jgroups.util.RspList;import org.jgroups.util.Util;import java.io.IOException;/** * Asynchronous example for MessageDispatcher; message is mcast to all members, responses are received * asynchronously by calling RspCollector.receiveResponse(). Message is periodically broadcast to all * members; handle() method is invoked whenever a message is received. * * @author Bela Ban */public class MessageDispatcherTestAsync implements RequestHandler { Channel channel; MessageDispatcher disp; RspList rsp_list; MyCollector coll=new MyCollector(); Debugger debugger=null; boolean debug=false; boolean cummulative=false; boolean done_submitted=true; static final int NUM=10; String props="UDP(loopback=true;mcast_addr=224.0.0.35;mcast_port=45566;ip_ttl=32;" + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" + "PING(timeout=2000;num_initial_members=3):" + "MERGE2(min_interval=10000;max_interval=20000):" + "FD_SOCK:" + "VERIFY_SUSPECT(timeout=1500):" + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,1200,2400,4800):" + "UNICAST(timeout=5000):" + "pbcast.STABLE(desired_avg_gossip=20000):" + "FRAG(frag_size=8096;down_thread=false;up_thread=false):" + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" + "shun=false;print_local_addr=true)"; static class MyCollector implements RspCollector { public void receiveResponse(Object retval, Address sender) { System.out.println("** received response " + retval + " [sender=" + sender + ']'); } public void suspect(Address mbr) { System.out.println("** suspected member " + mbr); } public void viewChange(View new_view) { System.out.println("** received new view " + new_view); } } public MessageDispatcherTestAsync(boolean debug, boolean cummulative) { this.debug=debug; this.cummulative=cummulative; } public void start() throws Exception { channel=new JChannel(props); if(debug) { debugger=new Debugger((JChannel)channel, cummulative); debugger.start(); } //channel.setOpt(Channel.LOCAL, Boolean.FALSE); disp=new MessageDispatcher(channel, null, null, this); channel.connect("MessageDispatcherTestAsyncGroup"); } public void mcast(int num) throws IOException { if(!done_submitted) { System.err.println("Must submit 'done' (press 'd') before mcasting new message"); return; } for(int i=0; i < num; i++) { Util.sleep(100); System.out.println("Casting message #" + i); disp.castMessage(null, i, new Message(null, null, "Number #" + i), coll); } done_submitted=false; } public void disconnect() { System.out.println("** Disconnecting channel"); channel.disconnect(); System.out.println("** Disconnecting channel -- done"); System.out.println("** Closing channel"); channel.close(); System.out.println("** Closing channel -- done"); System.out.println("** disp.stop()"); disp.stop(); System.out.println("** disp.stop() -- done"); } public void done() { for(int i=0; i < NUM; i++) disp.done(i); done_submitted=true; } public Object handle(Message msg) { Object tmp=msg.getObject(); System.out.println("** handle(" + tmp + ')'); return tmp + ": success"; } public static void main(String[] args) { int c; MessageDispatcherTestAsync test=null; boolean debug=false, cummulative=false; for(int i=0; i < args.length; i++) { if("-help".equals(args[i])) { help(); return; } if("-debug".equals(args[i])) { debug=true; continue; } if("-cummulative".equals(args[i])) { cummulative=true; } } try { test=new MessageDispatcherTestAsync(debug, cummulative); test.start(); while(true) { System.out.println("[m=mcast " + NUM + " msgs x=exit]"); c=System.in.read(); switch(c) { case 'x': test.disconnect(); System.exit(0); return; case 'm': test.mcast(NUM); break; case 'd': test.done(); break; default: break; } } } catch(Exception e) { System.err.println(e); } } static void help() { System.out.println("MessageDispatcherTestAsync [-debug] [-cummulative]"); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?