concurrentstartuptest.java
来自「JGRoups源码」· Java 代码 · 共 370 行
JAVA
370 行
package org.jgroups.tests;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.jgroups.*;
import org.jgroups.util.Util;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.*;
/**
* Tests concurrent startup and message sending directly after joining
* See doc/design/ConcurrentStartupTest.txt for details. This will only work 100% correctly once we have
* FLUSH support (JGroups 2.4)
*
* NOTE: This test is not guaranteed to pass at 100% rate until combined join
* and state transfer using one FLUSH phase is introduced (Jgroups 2.5)[1].
*
* [1] http://jira.jboss.com/jira/browse/JGRP-236
*
* @author bela
* @version $Id: ConcurrentStartupTest.java,v 1.16 2006/10/27 15:12:28 vlada Exp $
*/
public class ConcurrentStartupTest extends TestCase implements ExtendedReceiver {
final List list=Collections.synchronizedList(new LinkedList());
JChannel channel;
final static String GROUP="demo";
static String PROPS="flush-udp.xml"; // use flush properties
final int NUM=5;
int mod=1;
final Map modifications=new TreeMap();
protected void setUp() throws Exception {
super.setUp();
PROPS = System.getProperty("props",PROPS);
}
protected void tearDown() throws Exception {
super.tearDown();
}
int getMod() {
synchronized(this) {
int retval=mod;
mod++;
return retval;
}
}
public void testMessageSendingAfterConnect() throws Exception {
channel=new JChannel(PROPS);
channel.setOpt(Channel.BLOCK, Boolean.TRUE);
channel.setReceiver(this);
channel.connect(GROUP);
channel.getState(null, 10000);
// channel.send(null, null, channel.getLocalAddress());
Util.sleep(2000);
MyThread[] threads=new MyThread[NUM];
for(int i=0; i < threads.length; i++) {
threads[i]=new MyThread(String.valueOf(i));
Util.sleepRandom(1000);
threads[i].start();
}
for(int i=0; i < threads.length; i++) {
MyThread thread=threads[i];
thread.join(15000);
if(thread.isAlive())
System.err.println("thread " + i + " is still alive");
}
List[] lists=new List[NUM];
for(int i=0; i < threads.length; i++) {
MyThread thread=threads[i];
lists[i]=new LinkedList(thread.getList());
}
printLists(list, lists);
Map[] mods=new Map[NUM];
for(int i=0; i < threads.length; i++) {
MyThread thread=threads[i];
mods[i]=thread.getModifications();
}
printModifications(modifications, mods);
int len=list.size();
for(int i=0; i < lists.length; i++) {
List l=lists[i];
assertEquals("list #" + i + " should have " + len + " elements", len, l.size());
}
}
private void printModifications(Map mod, Map[] modifications) {
System.out.println("\nmodifications: " + mod);
for(int i=0; i < modifications.length; i++) {
Map modification=modifications[i];
System.out.println("modifications for #" + i + ": " + modification);
}
}
private void printLists(List list, List[] lists) {
System.out.println("\nlist=" + list);
for(int i=0; i < lists.length; i++) {
List l=lists[i];
System.out.println(i + ": " + l);
}
}
public void receive(Message msg) {
if(msg.getBuffer() == null)
return;
Object obj=msg.getObject();
synchronized(this) {
list.add(obj);
Integer key=new Integer(getMod());
modifications.put(key, obj);
}
}
public byte[] getState() {
synchronized(this) {
List tmp=new LinkedList(list);
try {
return Util.objectToByteBuffer(tmp);
}
catch(Exception e) {
e.printStackTrace();
return null;
}
}
}
public void setState(byte[] state) {
try {
List tmp=(List)Util.objectFromByteBuffer(state);
synchronized(this) {
list.clear();
list.addAll(tmp);
}
}
catch(Exception e) {
e.printStackTrace();
}
}
public byte[] getState(String state_id) {
//not needed
return null;
}
public void getState(OutputStream ostream) {
ObjectOutputStream oos = null;
try{
oos = new ObjectOutputStream(ostream);
List tmp = null;
synchronized (this){
tmp = new LinkedList(list);
}
oos.writeObject(tmp);
oos.flush();
}
catch (IOException e){
e.printStackTrace();
}
finally{
Util.close(oos);
}
}
public void getState(String state_id, OutputStream ostream) {
//not used
}
public void setState(String state_id, byte[] state) {
// not used
}
public void setState(InputStream istream) {
ObjectInputStream ois = null;
try{
ois = new ObjectInputStream(istream);
List tmp = (List) ois.readObject();
synchronized (this){
list.clear();
list.addAll(tmp);
}
}
catch (Exception e){
e.printStackTrace();
}
finally{
Util.close(ois);
}
}
public void setState(String state_id, InputStream istream) {
// not used
}
public void viewAccepted(View new_view) {
System.out.println("-- view: " + new_view);
synchronized(this) {
Integer key=new Integer(getMod());
modifications.put(key, new_view.getVid());
}
}
public void suspect(Address suspected_mbr) {
}
public void block() {
}
public void unblock() {
}
private static class MyThread extends Thread {
final List list=new LinkedList();
Channel ch;
int mod=1;
final Map modifications=new TreeMap();
int getMod() {
int retval = mod;
mod++;
return retval;
}
MyThread(String name) {
super(name);
}
public void run() {
try {
ch=new JChannel(PROPS);
ch.setOpt(Channel.BLOCK, Boolean.TRUE);
ch.setReceiver(new ExtendedReceiverAdapter() {
public void receive(Message msg) {
if(msg.getBuffer() == null)
return;
Object obj=msg.getObject();
synchronized (this) {
list.add(obj);
Integer key=new Integer(getMod());
modifications.put(key, obj);
}
}
public void viewAccepted(View new_view) {
synchronized(this) {
Integer key=new Integer(getMod());
modifications.put(key, new_view.getVid());
}
}
public void setState(byte[] state) {
try {
List tmp=(List)Util.objectFromByteBuffer(state);
synchronized(this) {
list.clear();
list.addAll(tmp);
System.out.println("-- [#" + getName() + " (" +ch.getLocalAddress()+")]: state is " + list);
Integer key=new Integer(getMod());
modifications.put(key, tmp);
}
}
catch(Exception e) {
e.printStackTrace();
}
}
public byte[] getState() {
List tmp = null;
synchronized (this) {
tmp = new LinkedList(list);
try {
return Util.objectToByteBuffer(tmp);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
public void getState(OutputStream ostream){
ObjectOutputStream oos = null;
try{
oos = new ObjectOutputStream(ostream);
List tmp = null;
synchronized (this){
tmp = new LinkedList(list);
}
oos.writeObject(tmp);
oos.flush();
}
catch (IOException e){
e.printStackTrace();
}
finally{
Util.close(oos);
}
}
public void setState(InputStream istream) {
ObjectInputStream ois = null;
try{
ois = new ObjectInputStream(istream);
List tmp = (List) ois.readObject();
synchronized (this){
list.clear();
list.addAll(tmp);
System.out.println("-- [#" + getName() + " (" +ch.getLocalAddress()+")]: state is " + list);
Integer key=new Integer(getMod());
modifications.put(key, tmp);
}
}
catch (Exception e){
e.printStackTrace();
}
finally{
Util.close(ois);
}
}
});
ch.connect(GROUP);
ch.getState(null, 10000);
// Util.sleep(1000);
ch.send(null, null, ch.getLocalAddress());
Util.sleep(10000); // potential retransmissions
}
catch(ChannelException e) {
e.printStackTrace();
}
}
List getList() {return list;}
Map getModifications() {return modifications;}
}
public static Test suite() {
return new TestSuite(ConcurrentStartupTest.class);
}
public static void main(String[] args) {
String[] testCaseName={ConcurrentStartupTest.class.getName()};
junit.textui.TestRunner.main(testCaseName);
}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?