continousthroughputtest.java
来自「JGRoups源码」· Java 代码 · 共 462 行 · 第 1/2 页
JAVA
462 行
package org.jgroups.tests;import org.jgroups.*;import org.jgroups.stack.IpAddress;import org.jgroups.stack.Protocol;import org.jgroups.stack.ProtocolObserver;import org.jgroups.util.Util;import java.io.BufferedReader;import java.io.File;import java.io.FileWriter;import java.io.InputStreamReader;import java.util.Date;/** * <h1>ContinousThroughputTest.java</h1> * <p/> * This is a program to make Throughput tests. * <p/> * The program assumes to run on a reliable network where no partitioning or failures happen (Apart for cping test). * Once you run the program it connects the channel and gives you a prompt. * Every time a new view is received you will see it printed. * Once you have launched the program on all the machine you use for the test just digit * on one machine the command for the test you desire to make, you will be asked for the necessary parameters, * then the test starts. * Depending on the chosen test you will see its results on the monitor and them ar logged * on a file on the working dir called <code>"ContinousThroughputTest<hostname><systemTimeInSeconds>.log"</code> . * * @author Gianluca Collot * @version 1.0 */public class ContinousThroughputTest { String props="UDP:" + "PING(up_thread=false;down_thread=false):" + "FD(timeout=1000;shun=false):" + "STABLE(up_thread=false;down_thread=false):" + "MERGE(up_thread=false;down_thread=false):" + "NAKACK:" + "FLUSH:" + "GMS:" + "VIEW_ENFORCER(up_thread=false;down_thread=false):" +// "TSTAU:" + "QUEUE(up_thread=false;down_thread=false)";// String props= "TCP:TCPPING(initial_hosts=manolete2[8880]):FD(timeout=10000):" +// "STABLE:MERGE:NAKACK:FRAG:FLUSH:GMS:VIEW_ENFORCER:QUEUE"; JChannel channel=null; Thread sendThread, receiveThread; boolean coordinator=false; IpAddress my_addr=null; View view; BufferedReader reader; float troughputSum=0, meanTroughput=0, minTroughput=10000, maxTroughput=0; int numTests=0; FileWriter logWriter; Protocol prot=null; /** * Creates threads, creates and connects channel opens log file */ public ContinousThroughputTest() { sendThread=new Thread("sendThread") { public void run() { parser(); } }; receiveThread=new Thread("receiveThread") { public void run() { checkChannel(); } }; reader=new BufferedReader(new InputStreamReader(System.in)); try { channel=new JChannel(props);// prot = (Protocol) channel.getProtocolStack().getProtocols().lastElement();// prot.setObserver(new ContinousThroughputTest.MessageLenghtObserver()); channel.setOpt(Channel.BLOCK, Boolean.FALSE); channel.connect("Janus"); } catch(Exception ex) { System.out.println("Connection Failed!" + ex); System.exit(1); } my_addr=(IpAddress)channel.getLocalAddress(); try { File log=new File("ContinousThroughputTest" + my_addr.getIpAddress().getHostName() + (System.currentTimeMillis() / 10000) + ".log"); if(!log.exists()) { log.createNewFile(); } logWriter=new FileWriter(log); logWriter.write("ContinousThroughputTest.java log\r\n"); logWriter.write("Date:" + new Date(System.currentTimeMillis()) + "\r\n"); log("Protocol Stack is " + props); System.out.println("Protocol Stack is " + props); } catch(Exception ex) { System.out.println("File problems " + ex); System.exit(5); } } public static void main(String[] args) { ContinousThroughputTest perfTest=new ContinousThroughputTest(); perfTest.go(); } void go() {// Starts Receiving receiveThread.start();// Starts input Parser sendThread.start(); } /** * This function should be called in its own thread. * It recives messages and calculates the troughput */ public void checkChannel() { String payload=null; Object received=null; Message msg=null; boolean done=false; long n; int i=1; System.out.println("Started receiving"); try { while(!done) { received=channel.receive(0); if(received instanceof Message) { msg=(Message)received; payload=(String)msg.getObject(); System.out.println(payload); if("stop".equalsIgnoreCase(payload)) { done=true; } if("pingpong".equalsIgnoreCase(payload)) { n=((Long)((Message)channel.receive(0)).getObject()).longValue(); i=((Integer)((Message)channel.receive(0)).getObject()).intValue(); log("Starting pingpong test. Rounds: " + n + " Bursts: " + i); pingpongTest(n, i, false); } if("cping".equalsIgnoreCase(payload)) {// i = ((Integer) ((Message) channel.receive(0)).getObject()).intValue(); log("Starting cping test. Bursts: " + 1); cpingTest(1, true); } if("sweep".equalsIgnoreCase(payload)) { n=((Long)((Message)channel.receive(0)).getObject()).longValue(); i=((Integer)((Message)channel.receive(0)).getObject()).intValue(); log("Starting sweep test. Rounds: " + n + " initial burst: " + i); sweep(n, i); } } if(received instanceof View) { view=(View)received; System.out.println(view); if(view.getMembers().elementAt(0).equals(my_addr)) { System.out.println("I'm the new Coordinator"); coordinator=true; } resetData(); } } } catch(Exception ex) { System.out.println("checkChannel() :" + ex); try { logWriter.write("Stopped cause " + ex + "\r\n"); } catch(Exception e) { } System.exit(2); } System.out.println("Stopped Receiving"); channel.disconnect(); System.out.println("Disconnected from \"Janus\""); channel.close(); System.out.println("Channel Closed"); System.exit(0); } /** * This function should be run in its own thread and sends messages on an already connected channel */ public void parser() { boolean done=false; String input; int number=0; int burstlength=1; System.out.println("Ready."); try { while(!done) { input=reader.readLine(); if("stop".equalsIgnoreCase(input)) { done=true; } if("pingpong".equalsIgnoreCase(input)) { number=askNumber(reader, "How many rounds?"); burstlength=askNumber(reader, "Length of bursts?"); channel.send(new Message(null, null, input)); channel.send(new Message(null, null, new Long(number))); channel.send(new Message(null, null, new Integer(burstlength))); continue; } if("cping".equalsIgnoreCase(input)) {// burstlength = askNumber( reader,"Length of bursts?"); channel.send(new Message(null, null, input));// channel.send(new Message(null,null,new Integer(burstlength))); continue; } if("sweep".equalsIgnoreCase(input)) { number=askNumber(reader, "Number of tests"); burstlength=askNumber(reader, "Initial length of bursts?"); channel.send(new Message(null, null, input)); channel.send(new Message(null, null, new Long(number))); channel.send(new Message(null, null, new Integer(burstlength))); continue; } channel.send(new Message(null, null, input)); } } catch(Exception ex) {
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?