multiplexertest.java

来自「JGRoups源码」· Java 代码 · 共 1,085 行 · 第 1/3 页

JAVA
1,085
字号
package org.jgroups.tests;import junit.framework.Test;import junit.framework.TestCase;import junit.framework.TestSuite;import org.jgroups.*;import org.jgroups.mux.MuxChannel;import org.jgroups.stack.IpAddress;import org.jgroups.stack.ProtocolStack;import org.jgroups.stack.Protocol;import org.jgroups.util.Util;import java.util.*;import java.io.*;import java.lang.management.ThreadMXBean;import java.lang.management.ManagementFactory;import java.lang.management.ThreadInfo;/** * Test the multiplexer functionality provided by JChannelFactory * @author Bela Ban * @version $Id: MultiplexerTest.java,v 1.31 2006/10/27 08:16:05 belaban Exp $ */public class MultiplexerTest extends TestCase {    private Cache c1, c2, c1_repl, c2_repl;    private Channel ch1, ch2, ch1_repl, ch2_repl;    static String CFG="stacks.xml";    static String STACK_NAME="udp";    JChannelFactory factory, factory2;    int active_threads=0;    String thread_dump=null;    public MultiplexerTest(String name) {        super(name);    }    public void setUp() throws Exception {        super.setUp();        active_threads=Thread.activeCount();        thread_dump="active threads before (" + active_threads + "):\n" + Util.activeThreads();        CFG = System.getProperty("cfg",CFG);        STACK_NAME = System.getProperty("stack",STACK_NAME);        log("Using stack configuration file " + CFG + " and stack name " + STACK_NAME);        factory=new JChannelFactory();        factory.setMultiplexerConfig(CFG);        factory2=new JChannelFactory();        factory2.setMultiplexerConfig(CFG);    }    protected void tearDown() throws Exception {        super.tearDown();        if(ch1_repl != null)            ch1_repl.close();        if(ch2_repl != null)            ch2_repl.close();        if(ch1 != null)            ch1.close();        if(ch2 != null)            ch2.close();        if(ch1 != null) {            assertFalse(((MuxChannel)ch1).getChannel().isOpen());            assertFalse(((MuxChannel)ch1).getChannel().isConnected());        }        if(ch2 != null) {            assertFalse(((MuxChannel)ch2).getChannel().isOpen());            assertFalse(((MuxChannel)ch2).getChannel().isConnected());        }        if(ch1_repl != null) {            assertFalse(((MuxChannel)ch1_repl).getChannel().isOpen());            assertFalse(((MuxChannel)ch1_repl).getChannel().isConnected());        }        if(ch2_repl != null) {            assertFalse(((MuxChannel)ch2_repl).getChannel().isOpen());            assertFalse(((MuxChannel)ch2_repl).getChannel().isConnected());        }        if(c1 != null) c1.clear();        if(c2 != null) c2.clear();        if(c1_repl != null) c1_repl.clear();        if(c2_repl != null) c2_repl.clear();        ch1_repl=ch2_repl=ch1=ch2=null;        c1=c2=c1_repl=c2_repl=null;        Util.sleep(500); // remove this in 2.5 !        int current_active_threads=Thread.activeCount();        String msg="";        if(active_threads != current_active_threads) {            System.out.println(thread_dump);            System.out.println("active threads after (" + current_active_threads + "):\n" + Util.activeThreads());            msg="active threads:\n" + dumpThreads();        }        assertEquals(msg, active_threads, current_active_threads);    }    public void testReplicationWithOneChannel() throws Exception {        ch1=factory.createMultiplexerChannel(STACK_NAME, "c1");        ch1.connect("bla");        c1=new Cache(ch1, "cache-1");        assertEquals("cache has to be empty initially", 0, c1.size());        c1.put("name", "Bela");        Util.sleep(300); // we need to wait because replication is asynchronous here        assertEquals(1, c1.size());        assertEquals("Bela", c1.get("name"));    }    public void testLifecycle() throws Exception {        ch1=factory.createMultiplexerChannel(STACK_NAME, "c1");        assertTrue(ch1.isOpen());        assertFalse(ch1.isConnected());        ch1.connect("bla");        assertTrue(ch1.isOpen());        assertTrue(ch1.isConnected());        ch2=factory.createMultiplexerChannel(STACK_NAME, "c2");        assertTrue(ch2.isOpen());        assertFalse(ch2.isConnected());        ch2.connect("bla");        assertTrue(ch2.isOpen());        assertTrue(ch2.isConnected());        ch2.disconnect();        assertTrue(ch2.isOpen());        assertFalse(ch2.isConnected());        ch2.connect("bla");        assertTrue(ch2.isOpen());        assertTrue(ch2.isConnected());        ch2.disconnect();        assertTrue(ch2.isOpen());        assertFalse(ch2.isConnected());        ch2.close();        assertFalse(ch2.isOpen());        assertFalse(ch2.isConnected());        ch2=factory.createMultiplexerChannel(STACK_NAME, "c2");        ch2.connect("bla");        assertTrue(ch2.isOpen());        assertTrue(ch2.isConnected());        ch2.close();        assertFalse(ch2.isOpen());        assertFalse(ch2.isConnected());    }    public void testDisconnect() throws Exception {        ch1=factory.createMultiplexerChannel(STACK_NAME, "c1");        assertTrue(ch1.isOpen());        assertFalse(ch1.isConnected());        assertTrue(((MuxChannel)ch1).getChannel().isOpen());        assertFalse(((MuxChannel)ch1).getChannel().isConnected());        ch1.connect("bla");        assertTrue(ch1.isOpen());        assertTrue(ch1.isConnected());        assertTrue(((MuxChannel)ch1).getChannel().isOpen());        assertTrue(((MuxChannel)ch1).getChannel().isConnected());        ch2=factory.createMultiplexerChannel(STACK_NAME, "c2");        assertTrue(ch2.isOpen());        assertFalse(ch2.isConnected());        ch1.disconnect();        assertTrue(ch1.isOpen());        assertFalse(ch1.isConnected());        ch1.connect("bla");        assertTrue(ch1.isOpen());        assertTrue(ch1.isConnected());        ch1.close();        assertFalse(ch1.isOpen());        assertFalse(ch1.isConnected());        assertTrue(((MuxChannel)ch1).getChannel().isOpen());        assertTrue(((MuxChannel)ch1).getChannel().isConnected());        ch2.close();        assertFalse(ch2.isOpen());        assertFalse(ch2.isConnected());    }    public void testDisconnect2() throws Exception {        ch1=factory.createMultiplexerChannel(STACK_NAME, "c1");        assertTrue(ch1.isOpen());        assertFalse(ch1.isConnected());        ch1.connect("bla");        assertTrue(ch1.isOpen());        assertTrue(ch1.isConnected());        ch2=factory.createMultiplexerChannel(STACK_NAME, "c2");        assertTrue(ch2.isOpen());        assertFalse(ch2.isConnected());        ch1.disconnect();        assertTrue(ch1.isOpen());        assertFalse(ch1.isConnected());        assertTrue(ch2.isOpen());        assertFalse(ch2.isConnected());        ch1.connect("bla");        assertTrue(ch1.isOpen());        assertTrue(ch1.isConnected());        assertTrue(ch2.isOpen());        assertFalse(ch2.isConnected());    }    public void testClose() throws Exception {        ch1=factory.createMultiplexerChannel(STACK_NAME, "c1");        ch1.connect("bla");        ch2=factory.createMultiplexerChannel(STACK_NAME, "c2");        ch2.connect("bla");        ch1.close();        ch2.close();    }    public void testReplicationWithTwoChannels() throws Exception {        ch1=factory.createMultiplexerChannel(STACK_NAME, "c1");        c1=new Cache(ch1, "cache-1");        assertEquals("cache has to be empty initially", 0, c1.size());        ch1.connect("bla");        ch1_repl=factory2.createMultiplexerChannel(STACK_NAME, "c1");        c1_repl=new Cache(ch1_repl, "cache-1-repl");        assertEquals("cache has to be empty initially", 0, c1_repl.size());        ch1_repl.connect("bla");        View v=ch1_repl.getView();        assertNotNull(v);        assertEquals(2, v.size());        // System.out.println("****** [c1] PUT(name, Bela) *******");        c1.put("name", "Bela");        if(ch1.flushSupported())            ch1.startFlush(5000, true);        else            Util.sleep(10000);        System.out.println("c1: " + c1 + ", c1_repl: " + c1_repl);        assertEquals(1, c1.size());        assertEquals("Bela", c1.get("name"));        assertEquals(1, c1_repl.size());        assertEquals("Bela", c1_repl.get("name"));        c1.put("id", new Long(322649));        c1_repl.put("hobbies", "biking");        c1_repl.put("bike", "Centurion");         if(ch1.flushSupported())            ch1.startFlush(5000, true);        else            Util.sleep(10000);        System.out.println("c1: " + c1 + ", c1_repl: " + c1_repl);        assertEquals(4, c1.size());        assertEquals(4, c1_repl.size());        assertEquals(new Long(322649), c1.get("id"));        assertEquals(new Long(322649), c1_repl.get("id"));        assertEquals("biking", c1.get("hobbies"));        assertEquals("biking", c1_repl.get("hobbies"));        assertEquals("Centurion", c1.get("bike"));        assertEquals("Centurion", c1_repl.get("bike"));    }    public void testReplicationWithReconnect() throws Exception {        ch1=factory.createMultiplexerChannel(STACK_NAME, "c1");        ch1.connect("bla");        c1=new Cache(ch1, "cache-1");        assertEquals("cache has to be empty initially", 0, c1.size());        c1.put("name", "Bela");        Util.sleep(300); // we need to wait because replication is asynchronous here        assertEquals(1, c1.size());        assertEquals("Bela", c1.get("name"));        ch1.disconnect();        ch1.connect("bla");        c2=new Cache(ch1, "cache-1");        assertEquals("cache has to be empty initially", 0, c2.size());        c2.put("name", "Bela");        Util.sleep(300); // we need to wait because replication is asynchronous here        assertEquals(1, c2.size());        assertEquals("Bela", c2.get("name"));    }    public void testStateTransfer() throws Exception {        ch1=factory.createMultiplexerChannel(STACK_NAME, "c1");        ch1.connect("bla");        c1=new Cache(ch1, "cache-1");        assertEquals("cache has to be empty initially", 0, c1.size());        ch1_repl=factory2.createMultiplexerChannel(STACK_NAME, "c1");        c1.put("name", "Bela");        c1.put("id", new Long(322649));        c1.put("hobbies", "biking");        c1.put("bike", "Centurion");        ch1_repl.connect("bla");        c1_repl=new Cache(ch1_repl, "cache-1-repl");        boolean rc=ch1_repl.getState(null, 5000);        System.out.println("state transfer: " + rc);        Util.sleep(500);        System.out.println("c1_repl: " + c1_repl);        assertEquals("initial state should have been transferred", 4, c1_repl.size());        assertEquals(new Long(322649), c1.get("id"));        assertEquals(new Long(322649), c1_repl.get("id"));        assertEquals("biking", c1.get("hobbies"));        assertEquals("biking", c1_repl.get("hobbies"));        assertEquals("Centurion", c1.get("bike"));        assertEquals("Centurion", c1_repl.get("bike"));    }    public void testStateTransferWithTwoApplications() throws Exception {        ch1=factory.createMultiplexerChannel(STACK_NAME, "c1");        ch1.connect("bla");        c1=new Cache(ch1, "cache-1");        assertEquals("cache has to be empty initially", 0, c1.size());        ch2=factory.createMultiplexerChannel(STACK_NAME, "c2");        ch2.connect("bla");        c2=new Cache(ch2, "cache-2");        assertEquals("cache has to be empty initially", 0, c2.size());        ch1_repl=factory2.createMultiplexerChannel(STACK_NAME, "c1");        ch2_repl=factory2.createMultiplexerChannel(STACK_NAME, "c2");        c1.put("name", "cache-1");        c2.put("name", "cache-2");        ch1_repl.connect("bla");

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?