rpcdispatcherstresstest.java

来自「JGRoups源码」· Java 代码 · 共 192 行

JAVA
192
字号
// $Id: RpcDispatcherStressTest.java,v 1.8 2005/05/30 16:15:12 belaban Exp $package org.jgroups.tests;import org.jgroups.*;import org.jgroups.blocks.GroupRequest;import org.jgroups.blocks.RpcDispatcher;import org.jgroups.util.RspList;import org.jgroups.util.Util;/** * Example for RpcDispatcher (see also MessageDispatcher). Multiple threads will invoke print() on * all members and wait indefinitely for all responses (excluding of course crashed members). Run this * on 2 nodes for an extended period of time to see whether GroupRequest.doExecute() hangs. * @author Bela Ban */public class RpcDispatcherStressTest implements MembershipListener {    Channel            channel;    RpcDispatcher      disp;    RspList            rsp_list;    Publisher[]        threads=null;    int[]              results;    public int print(int number) throws Exception {        return number * 2;    }    public void start(String props, int num_threads, long interval, boolean discard_local) throws Exception {        channel=new JChannel(props);        if(discard_local)            channel.setOpt(Channel.LOCAL, Boolean.FALSE);        disp=new RpcDispatcher(channel, null, this, this);        channel.connect("RpcDispatcherStressTestGroup");        threads=new Publisher[num_threads];        results=new int[num_threads];        for(int i=0; i < threads.length; i++) {            threads[i]=new Publisher(i, interval);            results[i]=0;        }        System.out.println("-- Created " + threads.length + " threads. Press enter to start them " +                           "('-' for sent message, '+' for received message)");        System.out.println("-- Press enter to stop the threads");                System.out.flush();        System.in.read();        System.in.skip(System.in.available());        for(int i=0; i < threads.length; i++)            threads[i].start();        System.out.flush();        System.in.read();        System.in.skip(System.in.available());        for(int i=0; i < threads.length; i++) {            threads[i].stopThread();            threads[i].join(2000);        }        System.out.println("\n");        for(int i=0; i < threads.length; i++) {            System.out.println("-- thread #" + i + ": called remote method " + results[i] + " times");                    }        System.out.println("Closing channel");        channel.close();        System.out.println("Closing channel: -- done");        System.out.println("Stopping dispatcher");        disp.stop();        System.out.println("Stopping dispatcher: -- done");    }    /* --------------------------------- MembershipListener interface ---------------------------------- */    public void viewAccepted(View new_view) {        System.out.println("-- new view: " + new_view);    }    public void suspect(Address suspected_mbr) {        System.out.println("-- suspected " + suspected_mbr);    }    public void block() {        ;    }    /* ------------------------------ End of MembershipListener interface -------------------------------- */    class Publisher extends Thread {        int     rank=0;        boolean running=true;        int     num_calls=0;        long    interval=1000;        Publisher(int rank, long interval) {            super();            setDaemon(true);            this.rank=rank;            this.interval=interval;        }        public void stopThread() {            running=false;        }        public void run() {            while(running) {                System.out.print(rank + "- ");                disp.callRemoteMethods(null, "print", new Object[]{new Integer(num_calls)},                        new Class[]{int.class}, GroupRequest.GET_ALL, 0);                num_calls++;                System.out.print(rank + "+ ");                Util.sleep(interval);            }            results[rank]=num_calls;        }    }    public static void main(String[] args) {        String  props;        int     num_threads=1;        long    interval=1000;        boolean discard_local=false;        props="UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;" +                "ucast_recv_buf_size=16000;ucast_send_buf_size=16000;" +                "mcast_send_buf_size=32000;mcast_recv_buf_size=64000;loopback=true):"+                "PING(timeout=2000;num_initial_members=3):"+                "MERGE2(min_interval=5000;max_interval=10000):"+                "FD_SOCK:"+                "VERIFY_SUSPECT(timeout=1500):"+                "pbcast.NAKACK(gc_lag=50;retransmit_timeout=1000,1500,2000,3000;max_xmit_size=8192):"+                "UNICAST(timeout=1000,1500,2000,3000):"+                "pbcast.STABLE(desired_avg_gossip=10000):"+                "FRAG(frag_size=8192;down_thread=false;up_thread=false):"+                "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true):"+                "pbcast.STATE_TRANSFER";                try {            for(int i=0; i < args.length; i++) {                if("-num_threads".equals(args[i])) {                    num_threads=Integer.parseInt(args[++i]);                    continue;                }                if("-interval".equals(args[i])) {                    interval=Long.parseLong(args[++i]);                    continue;                }                if("-props".equals(args[i])) {                    props=args[++i];                    continue;                }                if("-discard_local".equals(args[i])) {                    discard_local=true;                    continue;                }                help();                return;            }            new RpcDispatcherStressTest().start(props, num_threads, interval, discard_local);        }        catch(Exception e) {            System.err.println(e);        }    }    static void help() {        System.out.println("RpcDispatcherStressTest [-help] [-interval <msecs>] " +                           "[-num_threads <number>] [-props <stack properties>] [-discard_local]");    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?