state_transfer_test.java
字号:
// $Id: STATE_TRANSFER_Test.java,v 1.8 2006/03/27 08:34:24 belaban Exp $package org.jgroups.protocols;import junit.framework.Test;import junit.framework.TestCase;import junit.framework.TestSuite;import org.jgroups.Address;import org.jgroups.Channel;import org.jgroups.ChannelClosedException;import org.jgroups.ChannelException;import org.jgroups.ChannelListener;import org.jgroups.ChannelNotConnectedException;import org.jgroups.ExitEvent;import org.jgroups.GetStateEvent;import org.jgroups.JChannel;import org.jgroups.SetStateEvent;import org.jgroups.Message;import org.jgroups.util.Util;/** * It's an attemp to setup Junit test case template for Protocol regression. <p> * Two "processes" are started, and the coord. keeps sending msg of a counter. The 2nd * process joins the grp and get the state from the coordinator. The subsequent msgs * after the setState will be validated to ensure the total ordering of msg delivery. <p> * This should cover the fix introduced by rev. 1.12 * * @author Wenbo Zhu * @version 1.0 */public class STATE_TRANSFER_Test extends TestCase { public final static String CHANNEL_PROPS = "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;" + "mcast_send_buf_size=64000;mcast_recv_buf_size=64000):" + "PING(timeout=2000;num_initial_members=3):" + "MERGE2(min_interval=5000;max_interval=10000):" + "FD_SOCK:" + "VERIFY_SUSPECT(timeout=1500):" + "UNICAST(timeout=600,1200,2400,4800):" + "STABLE():" + "NAKACK(retransmit_timeout=600,1200,2400,4800):" + "FRAG(frag_size=8096;down_thread=false;up_thread=false):" + "FLUSH():" + "GMS(join_timeout=5000;join_retry_timeout=2000;" + "print_local_addr=true):" + "VIEW_ENFORCER:" + "TOTAL:" + "STATE_TRANSFER:" + "QUEUE"; public static final String GROUP_NAME = "jgroups.TEST_GROUP"; private Coordinator coord; public STATE_TRANSFER_Test(String testName) { super(testName); } protected void setUp() throws Exception { super.setUp(); System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.SimpleLog"); System.setProperty("org.apache.commons.logging.simplelog.defaultlog", "error"); coord = new Coordinator(); coord.recvLoop(); coord.sendLoop(); } protected void tearDown() throws Exception { super.tearDown(); coord.stop(); coord = null; } static class Coordinator implements ChannelListener { private JChannel channel = null; private int cnt = 0; // the state private volatile boolean closed = false; protected Coordinator() throws ChannelException { channel = new JChannel(CHANNEL_PROPS); channel.setOpt(Channel.LOCAL, Boolean.FALSE); channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE); channel.addChannelListener(this); channel.connect(GROUP_NAME); } public void channelConnected(Channel channel) { } public void channelDisconnected(Channel channel) { } public void channelClosed(Channel channel) { } public void channelShunned() { } public void channelReconnected(Address addr) { // n/a. now } public void recvLoop() throws Exception { Thread task = new Thread(new Runnable() { public void run() { Object tmp; while (! closed) { try { tmp = channel.receive(0); if (tmp instanceof ExitEvent) { System.err.println("-- received EXIT, waiting for ChannelReconnected callback"); break; } if (tmp instanceof GetStateEvent) { synchronized (Coordinator.this) { System.err.println("-- GetStateEvent, cnt=" + cnt); channel.returnState(Util.objectToByteBuffer(new Integer(cnt))); } } } catch (ChannelNotConnectedException not) { break; } catch (ChannelClosedException closed) { break; } catch (Exception e) { System.err.println(e); } } } }); task.start(); } public void sendLoop() throws Exception { Thread task = new Thread(new Runnable() { public void run() { while (! closed) { try { synchronized (Coordinator.this) { channel.send(null, null, new Integer(++cnt)); System.err.println("send cnt=" + cnt); } Thread.sleep(1000); } catch (ChannelNotConnectedException not) { break; } catch (ChannelClosedException closed) { break; } catch (Exception e) { System.err.println(e); } } } }); task.start(); } public void stop() { closed = true; channel.close(); } } public void testBasicStateSync() throws Exception { Channel channel = new JChannel(CHANNEL_PROPS); channel.setOpt(Channel.LOCAL, Boolean.FALSE); channel.connect(GROUP_NAME); Thread.sleep(1000); boolean join = false; join = channel.getState(null, 100000l); assertTrue(join); Object tmp; int cnt = -1; while (true) { try { tmp = channel.receive(0); if (tmp instanceof ExitEvent) { break; } if (tmp instanceof SetStateEvent) { cnt = ((Integer) Util.objectFromByteBuffer(((SetStateEvent) tmp).getArg())).intValue(); System.err.println("-- SetStateEvent, cnt=" + cnt); continue; } if ( tmp instanceof Message ) { if (cnt != -1) { int msg = ((Integer) ((Message) tmp).getObject()).intValue(); assertEquals(cnt, msg - 1); break; // done } } } catch (ChannelNotConnectedException not) { break; } catch (ChannelClosedException closed) { break; } catch (Exception e) { System.err.println(e); } } channel.close(); } public static Test suite() { return new TestSuite(STATE_TRANSFER_Test.class); } public static void main(String[] args) { junit.textui.TestRunner.run(suite()); }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -