欢迎来到虫虫下载站 | 资源下载 资源专辑 关于我们
虫虫下载站

state_transfer_test.java

JGRoups源码
JAVA
字号:
// $Id: STATE_TRANSFER_Test.java,v 1.8 2006/03/27 08:34:24 belaban Exp $package org.jgroups.protocols;import junit.framework.Test;import junit.framework.TestCase;import junit.framework.TestSuite;import org.jgroups.Address;import org.jgroups.Channel;import org.jgroups.ChannelClosedException;import org.jgroups.ChannelException;import org.jgroups.ChannelListener;import org.jgroups.ChannelNotConnectedException;import org.jgroups.ExitEvent;import org.jgroups.GetStateEvent;import org.jgroups.JChannel;import org.jgroups.SetStateEvent;import org.jgroups.Message;import org.jgroups.util.Util;/** * It's an attemp to setup Junit test case template for Protocol regression. <p> * Two "processes" are started, and the coord. keeps sending msg of a counter. The 2nd * process joins the grp and get the state from the coordinator. The subsequent msgs * after the setState will be validated to ensure the total ordering of msg delivery. <p> * This should cover the fix introduced by rev. 1.12 * * @author Wenbo Zhu * @version 1.0 */public class STATE_TRANSFER_Test extends TestCase {   public final static String CHANNEL_PROPS =      "UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;" +      "mcast_send_buf_size=64000;mcast_recv_buf_size=64000):" +      "PING(timeout=2000;num_initial_members=3):" +      "MERGE2(min_interval=5000;max_interval=10000):" +      "FD_SOCK:" +      "VERIFY_SUSPECT(timeout=1500):" +      "UNICAST(timeout=600,1200,2400,4800):" +      "STABLE():" +      "NAKACK(retransmit_timeout=600,1200,2400,4800):" +      "FRAG(frag_size=8096;down_thread=false;up_thread=false):" +      "FLUSH():" +      "GMS(join_timeout=5000;join_retry_timeout=2000;" +      "print_local_addr=true):" +      "VIEW_ENFORCER:" +      "TOTAL:" +      "STATE_TRANSFER:" +      "QUEUE";   public static final String GROUP_NAME = "jgroups.TEST_GROUP";   private Coordinator coord;   public STATE_TRANSFER_Test(String testName) {      super(testName);   }   protected void setUp() throws Exception {      super.setUp();      System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.SimpleLog");      System.setProperty("org.apache.commons.logging.simplelog.defaultlog", "error");      coord = new Coordinator();      coord.recvLoop();      coord.sendLoop();   }   protected void tearDown() throws Exception {      super.tearDown();      coord.stop();      coord = null;   }   static class Coordinator implements ChannelListener {      private JChannel channel = null;      private int cnt = 0;  // the state      private volatile boolean closed = false;      protected Coordinator() throws ChannelException {         channel = new JChannel(CHANNEL_PROPS);         channel.setOpt(Channel.LOCAL, Boolean.FALSE);         channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);         channel.addChannelListener(this);         channel.connect(GROUP_NAME);      }      public void channelConnected(Channel channel) {      }      public void channelDisconnected(Channel channel) {      }      public void channelClosed(Channel channel) {      }      public void channelShunned() {      }      public void channelReconnected(Address addr) {     // n/a. now      }      public void recvLoop() throws Exception {         Thread task = new Thread(new Runnable() {            public void run() {               Object tmp;               while (! closed) {                  try {                     tmp = channel.receive(0);                     if (tmp instanceof ExitEvent) {                        System.err.println("-- received EXIT, waiting for ChannelReconnected callback");                        break;                     }                     if (tmp instanceof GetStateEvent) {                        synchronized (Coordinator.this) {                           System.err.println("--  GetStateEvent, cnt=" + cnt);                           channel.returnState(Util.objectToByteBuffer(new Integer(cnt)));                        }                     }                  } catch (ChannelNotConnectedException not) {                     break;                  } catch (ChannelClosedException closed) {                     break;                  } catch (Exception e) {                     System.err.println(e);                  }               }            }         });         task.start();      }      public void sendLoop() throws Exception {         Thread task = new Thread(new Runnable() {            public void run() {               while (! closed) {                  try {                     synchronized (Coordinator.this) {                        channel.send(null, null, new Integer(++cnt));                        System.err.println("send cnt=" + cnt);                     }                     Thread.sleep(1000);                  } catch (ChannelNotConnectedException not) {                     break;                  } catch (ChannelClosedException closed) {                     break;                  } catch (Exception e) {                     System.err.println(e);                  }               }            }         });         task.start();      }      public void stop() {         closed = true;         channel.close();      }   }   public void testBasicStateSync() throws Exception {      Channel channel = new JChannel(CHANNEL_PROPS);      channel.setOpt(Channel.LOCAL, Boolean.FALSE);      channel.connect(GROUP_NAME);      Thread.sleep(1000);      boolean join = false;      join = channel.getState(null, 100000l);      assertTrue(join);      Object tmp;      int cnt = -1;      while (true) {         try {            tmp = channel.receive(0);            if (tmp instanceof ExitEvent) {               break;            }            if (tmp instanceof SetStateEvent) {               cnt = ((Integer) Util.objectFromByteBuffer(((SetStateEvent) tmp).getArg())).intValue();               System.err.println("--  SetStateEvent, cnt=" + cnt);               continue;            }            if ( tmp instanceof Message ) {               if (cnt != -1) {                  int msg = ((Integer) ((Message) tmp).getObject()).intValue();                  assertEquals(cnt, msg - 1);                  break;  // done               }            }         } catch (ChannelNotConnectedException not) {            break;         } catch (ChannelClosedException closed) {            break;         } catch (Exception e) {            System.err.println(e);         }      }      channel.close();   }   public static Test suite() {      return new TestSuite(STATE_TRANSFER_Test.class);   }   public static void main(String[] args) {      junit.textui.TestRunner.run(suite());   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -