📄 test.java
字号:
package org.jgroups.tests.perf;import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Version;import org.jgroups.util.Util;import java.io.BufferedReader;import java.io.FileReader;import java.io.FileWriter;import java.io.IOException;import java.text.NumberFormat;import java.util.*;/** You start the test by running this class. * @author Bela Ban (belaban@yahoo.com) */public class Test implements Receiver { String props=null; Properties config; boolean sender=false; Transport transport=null; Object local_addr=null; /** Map<Object,MemberInfo> members. Keys=member addresses, value=MemberInfo */ Map senders=new ConcurrentReaderHashMap(10); /** Keeps track of members. ArrayList<SocketAddress> */ final ArrayList members=new ArrayList(); /** Set when first message is received */ long start=0; /** Set when last message is received */ long stop=0; int num_members=0; int num_senders=0; long num_msgs_expected=0; long num_msgs_received=0; // from everyone long num_bytes_received=0; // from everyone Log log=LogFactory.getLog(getClass()); boolean all_received=false; boolean final_results_received=false; /** Map<Object, MemberInfo>. A hashmap of senders, each value is the 'senders' hashmap */ Map results=new HashMap(); private ResultsPublisher publisher=new ResultsPublisher(); List heard_from=new ArrayList(); boolean dump_transport_stats=false; /** Log every n msgs received */ long log_interval=1000; long counter=1; long msg_size=1000; boolean jmx=false; /** Number of ms to wait at the receiver to simulate processing of the received message (0 == don't wait) */ long processing_delay=0; FileWriter output=null; QueuedExecutor response_sender=new QueuedExecutor(); static NumberFormat f; static { f=NumberFormat.getNumberInstance(); f.setGroupingUsed(false); f.setMaximumFractionDigits(2); } public void start(Properties c, boolean verbose, boolean jmx, String output) throws Exception { String config_file="config.txt"; BufferedReader fileReader; String line; String key, val; StringTokenizer st; Properties tmp=new Properties(); if(output != null) this.output=new FileWriter(output, false); response_sender.setThreadFactory(new ThreadFactory() { public Thread newThread(Runnable runnable) { return new Thread(runnable, "Test.ResponseSender"); } }); config_file=c.getProperty("config"); fileReader=new BufferedReader(new FileReader(config_file)); while((line=fileReader.readLine()) != null) { if(line.startsWith("#")) continue; line=line.trim(); if(line.length() == 0) continue; st=new StringTokenizer(line, "=", false); key=st.nextToken().toLowerCase(); val=st.nextToken(); tmp.put(key, val); } fileReader.close(); // 'tmp' now contains all properties from the file, now we need to override the ones // passed to us by 'c' tmp.putAll(c); this.config=tmp; StringBuffer sb=new StringBuffer(); sb.append("\n\n----------------------- TEST -----------------------\n"); sb.append("Date: ").append(new Date()).append('\n'); sb.append("Run by: ").append(System.getProperty("user.name")).append("\n\n"); if(verbose) sb.append("Properties: ").append(printProperties()).append("\n-------------------------\n\n"); for(Iterator it=this.config.entrySet().iterator(); it.hasNext();) { Map.Entry entry=(Map.Entry)it.next(); sb.append(entry.getKey()).append(":\t").append(entry.getValue()).append('\n'); } sb.append("JGroups version: ").append(Version.description).append('\n'); System.out.println("Configuration is: " + sb); output(sb.toString()); props=this.config.getProperty("props"); num_members=Integer.parseInt(this.config.getProperty("num_members")); num_senders=Integer.parseInt(this.config.getProperty("num_senders")); long num_msgs=Long.parseLong(this.config.getProperty("num_msgs")); this.num_msgs_expected=num_senders * num_msgs; sender=Boolean.valueOf(this.config.getProperty("sender")).booleanValue(); msg_size=Long.parseLong(this.config.getProperty("msg_size")); String tmp2=this.config.getProperty("dump_transport_stats", "false"); if(Boolean.valueOf(tmp2).booleanValue()) this.dump_transport_stats=true; tmp2=this.config.getProperty("log_interval"); if(tmp2 != null) log_interval=Long.parseLong(tmp2); sb=new StringBuffer(); sb.append("\n##### msgs_received"); sb.append(", current time (in ms)"); sb.append(", msgs/sec"); sb.append(", throughput/sec [KB]"); sb.append(", free_mem [KB] "); sb.append(", total_mem [KB] "); output(sb.toString()); if(jmx) { this.config.setProperty("jmx", "true"); } this.jmx=new Boolean(this.config.getProperty("jmx")).booleanValue(); String tmp3=this.config.getProperty("processing_delay"); if(tmp3 != null) this.processing_delay=Long.parseLong(tmp3); String transport_name=this.config.getProperty("transport"); transport=(Transport)Util.loadClass(transport_name, this.getClass()).newInstance(); transport.create(this.config); transport.setReceiver(this); transport.start(); local_addr=transport.getLocalAddress(); } private void output(String msg) { // if(log.isInfoEnabled()) // log.info(msg); if(this.output != null) { try { this.output.write(msg + "\n"); this.output.flush(); } catch(IOException e) { } } } private String printProperties() { StringBuffer sb=new StringBuffer(); Properties p=System.getProperties(); for(Iterator it=p.entrySet().iterator(); it.hasNext();) { Map.Entry entry=(Map.Entry)it.next(); sb.append(entry.getKey()).append(": ").append(entry.getValue()).append('\n'); } return sb.toString(); } public void stop() { if(transport != null) { transport.stop(); transport.destroy(); } if(response_sender != null) { response_sender.shutdownNow(); } if(this.output != null) { try { this.output.close(); } catch(IOException e) { } } } public void receive(Object sender, byte[] payload) { if(payload == null || payload.length == 0) { System.err.println("payload is incorrect (sender=" + sender + "): " + payload); return; } try { int type=payload[0]; if(type == 1) { // DATA int len=payload.length -1; handleData(sender, len); return; } byte[] tmp=new byte[payload.length-1]; System.arraycopy(payload, 1, tmp, 0, tmp.length); Data d=(Data)Util.streamableFromByteBuffer(Data.class, tmp); switch(d.getType()) { case Data.DISCOVERY_REQ: // System.out.println("-- received discovery request"); sendDiscoveryResponse(); break; case Data.DISCOVERY_RSP: // System.out.println("-- received discovery response from " + sender); synchronized(this.members) { if(!this.members.contains(sender)) { this.members.add(sender); System.out.println("-- " + sender + " joined"); if(d.sender) { synchronized(this.members) { if(!this.senders.containsKey(sender)) { this.senders.put(sender, new MemberInfo(d.num_msgs)); } } } this.members.notifyAll(); } } break; case Data.FINAL_RESULTS: publisher.stop(); if(!final_results_received) { dumpResults(d.results); final_results_received=true; } synchronized(this) { this.notifyAll(); } break; case Data.RESULTS: results.put(sender, d.result); heard_from.remove(sender); if(heard_from.size() == 0) { for(int i=0; i < 3; i++) { sendFinalResults(); Util.sleep(100); } } break; default: log.error("received invalid data type: " + payload[0]); break; } } catch(Exception e) { e.printStackTrace(); } } private void handleData(Object sender, int num_bytes) { if(all_received) return; if(start == 0) { start=System.currentTimeMillis(); } num_msgs_received++; num_bytes_received+=num_bytes; if(num_msgs_received >= num_msgs_expected) { if(stop == 0) stop=System.currentTimeMillis(); all_received=true; } if(num_msgs_received % log_interval == 0) System.out.println(new StringBuffer("-- received ").append(num_msgs_received).append(" messages")); if(counter % log_interval == 0) { output(dumpStats(counter)); } MemberInfo info=(MemberInfo)this.senders.get(sender); if(info != null) { if(info.start == 0) info.start=System.currentTimeMillis(); info.num_msgs_received++; counter++; info.total_bytes_received+=num_bytes; if(info.num_msgs_received >= info.num_msgs_expected) { info.done=true; if(info.stop == 0) info.stop=System.currentTimeMillis(); } else { if(processing_delay > 0) Util.sleep(processing_delay); } } else { log.error("-- sender " + sender + " not found in senders hashmap"); } if(all_received) { if(!this.sender) dumpSenders(); publisher.start(); } } private void sendResults() throws Exception { Data d=new Data(Data.RESULTS); byte[] buf; MemberInfo info=new MemberInfo(num_msgs_expected); info.done=true; info.num_msgs_received=num_msgs_received; info.start=start; info.stop=stop; info.total_bytes_received=this.num_bytes_received; d.result=info; buf=generatePayload(d, null); transport.send(null, buf); } private void sendFinalResults() throws Exception { Data d=new Data(Data.FINAL_RESULTS); d.results=new ConcurrentReaderHashMap(this.results); final byte[] buf=generatePayload(d, null); // transport.send(null, buf); response_sender.execute(new Runnable() { public void run() { try { transport.send(null, buf); } catch(Exception e) { log.error("failed sending discovery response", e); } } }); } boolean allReceived() {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -