continousthroughputtest.java

来自「JGRoups源码」· Java 代码 · 共 462 行 · 第 1/2 页

JAVA
462
字号
package org.jgroups.tests;import org.jgroups.*;import org.jgroups.stack.IpAddress;import org.jgroups.stack.Protocol;import org.jgroups.stack.ProtocolObserver;import org.jgroups.util.Util;import java.io.BufferedReader;import java.io.File;import java.io.FileWriter;import java.io.InputStreamReader;import java.util.Date;/** * <h1>ContinousThroughputTest.java</h1> * <p/> * This is a program to make Throughput tests. * <p/> * The program assumes to run on a reliable network where no partitioning or failures happen (Apart for cping test). * Once you run the program it connects the channel and gives you a prompt. * Every time a new view is received you will see it printed. * Once you have launched the program on all the machine you use for the test just digit * on one machine the command for the test you desire to make, you will be asked for the necessary parameters, * then the test starts. * Depending on the chosen test you will see its results on the monitor and them ar logged * on a file on the working dir called <code>"ContinousThroughputTest<hostname><systemTimeInSeconds>.log"</code> . * * @author Gianluca Collot * @version 1.0 */public class ContinousThroughputTest {    String props="UDP:" +            "PING(up_thread=false;down_thread=false):" +            "FD(timeout=1000;shun=false):" +            "STABLE(up_thread=false;down_thread=false):" +            "MERGE(up_thread=false;down_thread=false):" +            "NAKACK:" +            "FLUSH:" +            "GMS:" +            "VIEW_ENFORCER(up_thread=false;down_thread=false):" +//		"TSTAU:" +            "QUEUE(up_thread=false;down_thread=false)";//  String props= "TCP:TCPPING(initial_hosts=manolete2[8880]):FD(timeout=10000):" +//		"STABLE:MERGE:NAKACK:FRAG:FLUSH:GMS:VIEW_ENFORCER:QUEUE";    JChannel channel=null;    Thread sendThread, receiveThread;    boolean coordinator=false;    IpAddress my_addr=null;    View view;    BufferedReader reader;    float troughputSum=0, meanTroughput=0, minTroughput=10000, maxTroughput=0;    int numTests=0;    FileWriter logWriter;    Protocol prot=null;    /**     * Creates threads, creates and connects channel opens log file     */    public ContinousThroughputTest() {        sendThread=new Thread("sendThread") {            public void run() {                parser();            }        };        receiveThread=new Thread("receiveThread") {            public void run() {                checkChannel();            }        };        reader=new BufferedReader(new InputStreamReader(System.in));        try {            channel=new JChannel(props);//      prot = (Protocol) channel.getProtocolStack().getProtocols().lastElement();//      prot.setObserver(new ContinousThroughputTest.MessageLenghtObserver());            channel.setOpt(Channel.BLOCK, Boolean.FALSE);            channel.connect("Janus");        }        catch(Exception ex) {            System.out.println("Connection Failed!" + ex);            System.exit(1);        }        my_addr=(IpAddress)channel.getLocalAddress();        try {            File log=new File("ContinousThroughputTest" + my_addr.getIpAddress().getHostName()                    + (System.currentTimeMillis() / 10000) + ".log");            if(!log.exists()) {                log.createNewFile();            }            logWriter=new FileWriter(log);            logWriter.write("ContinousThroughputTest.java log\r\n");            logWriter.write("Date:" + new Date(System.currentTimeMillis()) + "\r\n");            log("Protocol Stack is " + props);            System.out.println("Protocol Stack is " + props);        }        catch(Exception ex) {            System.out.println("File problems " + ex);            System.exit(5);        }    }    public static void main(String[] args) {        ContinousThroughputTest perfTest=new ContinousThroughputTest();        perfTest.go();    }    void go() {//    Starts Receiving        receiveThread.start();//    Starts input Parser        sendThread.start();    }    /**     * This function should be called in its own thread.     * It recives messages and calculates the troughput     */    public void checkChannel() {        String payload=null;        Object received=null;        Message msg=null;        boolean done=false;        long n;        int i=1;        System.out.println("Started receiving");        try {            while(!done) {                received=channel.receive(0);                if(received instanceof Message) {                    msg=(Message)received;                    payload=(String)msg.getObject();                    System.out.println(payload);                    if("stop".equalsIgnoreCase(payload)) {                        done=true;                    }                    if("pingpong".equalsIgnoreCase(payload)) {                        n=((Long)((Message)channel.receive(0)).getObject()).longValue();                        i=((Integer)((Message)channel.receive(0)).getObject()).intValue();                        log("Starting pingpong test. Rounds: " + n + " Bursts: " + i);                        pingpongTest(n, i, false);                    }                    if("cping".equalsIgnoreCase(payload)) {//	    i = ((Integer) ((Message) channel.receive(0)).getObject()).intValue();                        log("Starting cping test. Bursts: " + 1);                        cpingTest(1, true);                    }                    if("sweep".equalsIgnoreCase(payload)) {                        n=((Long)((Message)channel.receive(0)).getObject()).longValue();                        i=((Integer)((Message)channel.receive(0)).getObject()).intValue();                        log("Starting sweep test. Rounds: " + n + " initial burst: " + i);                        sweep(n, i);                    }                }                if(received instanceof View) {                    view=(View)received;                    System.out.println(view);                    if(view.getMembers().elementAt(0).equals(my_addr)) {                        System.out.println("I'm the new Coordinator");                        coordinator=true;                    }                    resetData();                }            }        }        catch(Exception ex) {            System.out.println("checkChannel() :" + ex);            try {                logWriter.write("Stopped cause " + ex + "\r\n");            }            catch(Exception e) {            }            System.exit(2);        }        System.out.println("Stopped Receiving");        channel.disconnect();        System.out.println("Disconnected from \"Janus\"");        channel.close();        System.out.println("Channel Closed");        System.exit(0);    }    /**     * This function should be run in its own thread and sends messages on an already connected channel     */    public void parser() {        boolean done=false;        String input;        int number=0;        int burstlength=1;        System.out.println("Ready.");        try {            while(!done) {                input=reader.readLine();                if("stop".equalsIgnoreCase(input)) {                    done=true;                }                if("pingpong".equalsIgnoreCase(input)) {                    number=askNumber(reader, "How many rounds?");                    burstlength=askNumber(reader, "Length of bursts?");                    channel.send(new Message(null, null, input));                    channel.send(new Message(null, null, new Long(number)));                    channel.send(new Message(null, null, new Integer(burstlength)));                    continue;                }                if("cping".equalsIgnoreCase(input)) {//	       burstlength = askNumber( reader,"Length of bursts?");                    channel.send(new Message(null, null, input));//	       channel.send(new Message(null,null,new Integer(burstlength)));                    continue;                }                if("sweep".equalsIgnoreCase(input)) {                    number=askNumber(reader, "Number of tests");                    burstlength=askNumber(reader, "Initial length of bursts?");                    channel.send(new Message(null, null, input));                    channel.send(new Message(null, null, new Long(number)));                    channel.send(new Message(null, null, new Integer(burstlength)));                    continue;                }                channel.send(new Message(null, null, input));            }        }        catch(Exception ex) {

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?