statetransfertest.java
来自「JGRoups源码」· Java 代码 · 共 266 行
JAVA
266 行
package org.jgroups.tests;import junit.framework.Test;import junit.framework.TestCase;import junit.framework.TestSuite;import org.jgroups.*;import org.jgroups.util.Util;import org.jgroups.util.Promise;import java.io.InputStream;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.io.OutputStream;import java.util.Collections;import java.util.HashMap;import java.util.Map;import java.util.Set;/** * Tests correct state transfer while other members continue sending messages to the group * @author Bela Ban * @version $Id: StateTransferTest.java,v 1.10 2006/09/22 12:33:12 belaban Exp $ */public class StateTransferTest extends TestCase { final int NUM=10000; final int NUM_THREADS=2; String props="udp.xml"; public StateTransferTest(String name) { super(name); } protected void setUp() throws Exception { props = System.getProperty("props",props); log("Using configuration file " + props); super.setUp(); } public void testStateTransferWhileSending() throws Exception { Worker[] workers=new Worker[NUM_THREADS]; int from=0, to=NUM; for(int i=0; i < workers.length; i++) { workers[i]=new Worker(from, to); from+=NUM; to+=NUM; } for(int i=0; i < workers.length; i++) { Worker worker=workers[i]; worker.start(); Util.sleep(50); // to have threads join the group a bit later and get the state } for(int i=0; i < workers.length; i++) { Worker worker=workers[i]; worker.waitUntilDone(); } for(int i=0; i < workers.length; i++) { Worker worker=workers[i]; worker.stop(); } log("\n\nhashmaps\n"); for(int i=0; i < workers.length; i++) { Worker w=workers[i]; Map m=w.getMap(); log("map has " + m.size() + " elements"); assertEquals(NUM * NUM_THREADS, m.size()); } Set keys=workers[0].getMap().keySet(); for(int i=0; i < workers.length; i++) { Worker w=workers[i]; Map m=w.getMap(); Set s=m.keySet(); assertEquals(keys, s); } } class Worker implements Runnable { JChannel ch; int to; int from; final Promise promise=new Promise(); Thread t; Receiver receiver; public Worker(int from, int to) { this.to=to; this.from=from; } public Map getMap() { return receiver.getMap(); } void start() throws Exception { ch=new JChannel(props); ch.connect("StateTransferTest-Group"); receiver=new Receiver(ch, promise); boolean rc=ch.getState(null, 10000); if(rc) log("state transfer: OK"); else { if(ch.getView().size() == 1) log("state transfer: OK"); else log("state transfer: FAIL"); } receiver.setName("Receiver [" + from + " - " + to + "]"); receiver.start(); if(rc) promise.getResult(); t=new Thread(this); t.setName("Worker [" + from + " - " + to + "]"); t.start(); } public void stop() { ch.close(); } void waitUntilDone() throws InterruptedException { t.join(); receiver.join(); } public void run() { Object[] data=new Object[2]; log("Worker thread started (sending msgs from " + from + " to " + to + " (excluding " + to + ")"); for(int i=from; i < to; i++) { data[0]=new Integer(i); data[1]="Value #" + i; try { ch.send(null, null, data); if(i % 1000 == 0) log("sent " + i); // log("sent " + data[0]); } catch(Exception e) { e.printStackTrace(); break; } } } } class Receiver extends Thread { JChannel ch; Promise promise; Map map; public Receiver(JChannel ch, Promise promise) { this.ch=ch; this.promise=promise; map=Collections.synchronizedMap(new HashMap(NUM * NUM_THREADS)); } public Map getMap() { return map; } public void run() { Object obj, prev_val; Object[] data; int num_received=0, to_be_received=NUM * NUM_THREADS; log("Receiver thread started"); while(ch.isConnected()) { try { obj=ch.receive(0); if(obj instanceof Message) { data=(Object[])((Message)obj).getObject(); prev_val=map.put(data[0], data[1]); if(prev_val != null) // we have a duplicate value continue; num_received=map.size(); if(num_received % 1000 == 0) log("received " + num_received); // log("received " + data[0] + " total: " + num_received + ")"); if(num_received >= to_be_received) { log("DONE: received " + num_received + " messages"); break; } } else if(obj instanceof View) { log("VIEW: " + obj); } else if(obj instanceof GetStateEvent) { byte[] state=Util.objectToByteBuffer(map); log("returning state, map has " + map.size() + " elements"); ch.returnState(state); } else if(obj instanceof SetStateEvent) { byte state[]=((SetStateEvent)obj).getArg(); if(state == null) { log("received null state"); } else { Map tmp=(Map)Util.objectFromByteBuffer(state); log("received state, map has " + tmp.size() + " elements"); map=Collections.synchronizedMap(tmp); } promise.setResult(Boolean.TRUE); } else if(obj instanceof StreamingGetStateEvent) { StreamingGetStateEvent evt=(StreamingGetStateEvent)obj; OutputStream stream = evt.getArg(); ObjectOutputStream out = new ObjectOutputStream(stream); synchronized(map){ out.writeObject(map); } out.close(); } else if(obj instanceof StreamingSetStateEvent) { StreamingSetStateEvent evt=(StreamingSetStateEvent)obj; InputStream stream = evt.getArg(); ObjectInputStream in = new ObjectInputStream(stream); map=Collections.synchronizedMap((Map) in.readObject()); in.close(); promise.setResult(Boolean.TRUE); } } catch(Exception e) { log("receiver thread terminated due to exception: " + e); break; } } log("Receiver thread terminated"); } } static void log(String msg) { System.out.println(Thread.currentThread() + " -- "+ msg); } public static Test suite() { return new TestSuite(StateTransferTest.class); } public static void main(String[] args) { junit.textui.TestRunner.run(suite()); }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?