connectstresstest.java

来自「JGRoups源码」· Java 代码 · 共 241 行

JAVA
241
字号
// $Id: ConnectStressTest.java,v 1.16 2006/05/19 11:23:27 belaban Exp $package org.jgroups.tests;import EDU.oswego.cs.dl.util.concurrent.BrokenBarrierException;import EDU.oswego.cs.dl.util.concurrent.CyclicBarrier;import junit.framework.TestCase;import junit.framework.Test;import junit.framework.TestSuite;import org.jgroups.*;import org.jgroups.util.Util;import java.util.Vector;/** * Creates 1 channel, then creates NUM channels, all try to join the same channel concurrently. * @author Bela Ban Nov 20 2003 * @version $Id: ConnectStressTest.java,v 1.16 2006/05/19 11:23:27 belaban Exp $ */public class ConnectStressTest extends TestCase {    static CyclicBarrier  start_connecting=null;    static CyclicBarrier  connected=null;    static CyclicBarrier  received_all_views=null;    static CyclicBarrier  start_disconnecting=null;    static CyclicBarrier  disconnected=null;    static final int      NUM=30;    static final MyThread[] threads=new MyThread[NUM];    static JChannel       channel=null;    static String         groupname="ConcurrentTestDemo";    static String props="UDP(mcast_addr=228.8.8.9;mcast_port=7788;ip_ttl=1;" +            "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +            "PING(timeout=3000;num_initial_members=10):" +            "MERGE2(min_interval=3000;max_interval=5000):" +            "FD_SOCK:" +            "VERIFY_SUSPECT(timeout=1500):" +            "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):" +            "UNICAST(timeout=300,600,1200,2400):" +            "pbcast.STABLE(desired_avg_gossip=5000):" +            "FRAG(frag_size=4096;down_thread=false;up_thread=false):" +            "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +            "shun=false;print_local_addr=false;view_ack_collection_timeout=5000;" +            "digest_timeout=0;merge_timeout=30000)";    public ConnectStressTest(String name) {        super(name);    }    static void log(String msg) {        System.out.println("-- [" + Thread.currentThread().getName() + "] " + msg);    }    public void testConcurrentJoins() throws Exception {        start_connecting=new CyclicBarrier(NUM +1);        connected=new CyclicBarrier(NUM +1);        received_all_views=new CyclicBarrier(NUM +1);        start_disconnecting=new CyclicBarrier(NUM +1);        disconnected=new CyclicBarrier(NUM +1);        long start, stop;        //  create main channel - will be coordinator for JOIN requests        channel=new JChannel(props);        channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);        start=System.currentTimeMillis();        channel.connect(groupname);        stop=System.currentTimeMillis();        log(channel.getLocalAddress() + " connected in " + (stop-start) + " msecs (" +                    channel.getView().getMembers().size() + " members). VID=" + channel.getView().getVid());        assertEquals(1, channel.getView().getMembers().size());        for(int i=0; i < threads.length; i++) {            threads[i]=new MyThread(i);            threads[i].start();        }        // signal the threads to start connecting to their channels        start_connecting.barrier();        start=System.currentTimeMillis();        try {            connected.barrier();            stop=System.currentTimeMillis();            System.out.println("-- took " + (stop-start) + " msecs for all " + NUM + " threads to connect");            received_all_views.barrier();            stop=System.currentTimeMillis();            System.out.println("-- took " + (stop-start) + " msecs for all " + NUM + " threads to see all views");            int num_members=-1;            for(int i=0; i < 10; i++) {                View v=channel.getView();                num_members=v.getMembers().size();                System.out.println("*--* number of members connected: " + num_members + ", (expected: " +(NUM+1) +                        "), v=" + v);                if(num_members == NUM+1)                    break;                Util.sleep(500);            }            assertEquals((NUM+1), num_members);        }        catch(Exception ex) {            fail(ex.toString());        }    }    public void testConcurrentLeaves() throws Exception {        start_disconnecting.barrier();        long start, stop;        start=System.currentTimeMillis();        disconnected.barrier();        stop=System.currentTimeMillis();        System.out.println("-- took " + (stop-start) + " msecs for " + NUM + " threads to disconnect");        int num_members=0;        for(int i=0; i < 10; i++) {            View v=channel.getView();            Vector mbrs=v != null? v.getMembers() : null;            if(mbrs != null) {                num_members=mbrs.size();                System.out.println("*--* number of members connected: " + num_members + ", (expected: 1), view=" + v);                if(num_members <= 1)                    break;            }            Util.sleep(3000);        }        assertEquals(1, num_members);        log("closing all channels");        for(int i=0; i < threads.length; i++) {            MyThread t=threads[i];            t.closeChannel();        }        channel.close();    }    public static class MyThread extends Thread {        int                index=-1;        long                total_connect_time=0, total_disconnect_time=0;        private JChannel    ch=null;        private Address     my_addr=null;        public MyThread(int i) {            super("thread #" + i);            index=i;        }        public void closeChannel() {            if(ch != null) {                ch.close();            }        }        public void run() {            View view;            try {                ch=new JChannel(props);                start_connecting.barrier();                long start=System.currentTimeMillis(), stop;                ch.connect(groupname);                stop=System.currentTimeMillis();                total_connect_time=stop-start;                view=ch.getView();                my_addr=ch.getLocalAddress();                log(my_addr + " connected in " + total_connect_time + " msecs (" +                    view.getMembers().size() + " members). VID=" + view.getVid());                connected.barrier();                int num_members=0;                while(true) {                    View v=ch.getView();                    Vector mbrs=v != null? v.getMembers() : null;                    if(mbrs == null) {                        System.err.println("mbrs is null, v=" + v);                    }                    else {                        num_members=mbrs.size();                        log("num_members=" + num_members);                        if(num_members == NUM+1) // all threads (NUM) plus the first channel (1)                            break;                    }                    Util.sleep(2000);                }                log("reached " + num_members + " members");                received_all_views.barrier();                start_disconnecting.barrier();                start=System.currentTimeMillis();                ch.disconnect();                stop=System.currentTimeMillis();                log(my_addr + " disconnected in " + (stop-start) + " msecs");                disconnected.barrier();            }            catch(BrokenBarrierException e) {                e.printStackTrace();            }            catch(ChannelException e) {                e.printStackTrace();            }            catch(InterruptedException e) {                e.printStackTrace();            }        }    }    public static Test suite() {        TestSuite s=new TestSuite();        // we're adding the tests manually, because they need to be run in *this exact order*        s.addTest(new ConnectStressTest("testConcurrentJoins"));        s.addTest(new ConnectStressTest("testConcurrentLeaves"));        return s;    }    public static void main(String[] args) {        String[] testCaseName={ConnectStressTest.class.getName()};        junit.textui.TestRunner.main(testCaseName);    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?