📄 test.java
字号:
return all_received; } boolean receivedFinalResults() { return final_results_received; } void sendMessages(long interval, int nanos, boolean busy_sleep) throws Exception { long total_msgs=0; int msgSize=Integer.parseInt(config.getProperty("msg_size")); int num_msgs=Integer.parseInt(config.getProperty("num_msgs")); // int logInterval=Integer.parseInt(config.getProperty("log_interval")); byte[] buf=new byte[msgSize]; for(int k=0; k < msgSize; k++) buf[k]='.'; Data d=new Data(Data.DATA); byte[] payload=generatePayload(d, buf); System.out.println("-- sending " + num_msgs + " " + Util.printBytes(msgSize) + " messages"); for(int i=0; i < num_msgs; i++) { transport.send(null, payload); total_msgs++; if(total_msgs % log_interval == 0) { System.out.println("++ sent " + total_msgs); } if(interval > 0 || nanos > 0) { if(busy_sleep) Util.sleep(interval, busy_sleep); else Util.sleep(interval, nanos); } } } byte[] generatePayload(Data d, byte[] buf) throws Exception { byte[] tmp=buf != null? buf : Util.streamableToByteBuffer(d); byte[] payload=new byte[tmp.length +1]; payload[0]=intToByte(d.getType()); System.arraycopy(tmp, 0, payload, 1, tmp.length); return payload; } private byte intToByte(int type) { switch(type) { case Data.DATA: return 1; case Data.DISCOVERY_REQ: return 2; case Data.DISCOVERY_RSP: return 3; case Data.RESULTS: return 4; case Data.FINAL_RESULTS: return 5; default: return 0; } } private void dumpResults(Map final_results) { Object member; Map.Entry entry; MemberInfo val; double combined_msgs_sec, tmp=0; long combined_tp; StringBuffer sb=new StringBuffer(); sb.append("\n-- results:\n"); for(Iterator it=final_results.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); member=entry.getKey(); val=(MemberInfo)entry.getValue(); tmp+=val.getMessageSec(); sb.append("\n").append(member); if(member.equals(local_addr)) sb.append(" (myself)"); sb.append(":\n"); sb.append(val); sb.append('\n'); } combined_msgs_sec=tmp / final_results.size(); combined_tp=(long)combined_msgs_sec * msg_size; sb.append("\ncombined: ").append(f.format(combined_msgs_sec)). append(" msgs/sec averaged over all receivers (throughput=" + Util.printBytes(combined_tp) + "/sec)\n"); System.out.println(sb.toString()); output(sb.toString()); } private void dumpSenders() { StringBuffer sb=new StringBuffer(); dump(this.senders, sb); System.out.println(sb.toString()); } private void dump(Map map, StringBuffer sb) { Map.Entry entry; Object mySender; MemberInfo mi; MemberInfo combined=new MemberInfo(0); combined.start = Long.MAX_VALUE; combined.stop = Long.MIN_VALUE; sb.append("\n-- local results:\n"); for(Iterator it2=map.entrySet().iterator(); it2.hasNext();) { entry=(Map.Entry)it2.next(); mySender=entry.getKey(); mi=(MemberInfo)entry.getValue(); combined.start=Math.min(combined.start, mi.start); combined.stop=Math.max(combined.stop, mi.stop); combined.num_msgs_expected+=mi.num_msgs_expected; combined.num_msgs_received+=mi.num_msgs_received; combined.total_bytes_received+=mi.total_bytes_received; sb.append("sender: ").append(mySender).append(": ").append(mi).append('\n'); } } private String dumpStats(long received_msgs) { double msgs_sec, throughput_sec; long current; StringBuffer sb=new StringBuffer(); sb.append(received_msgs).append(' '); current=System.currentTimeMillis(); sb.append(current).append(' '); msgs_sec=received_msgs / ((current - start) / 1000.0); throughput_sec=msgs_sec * msg_size; sb.append(f.format(msgs_sec)).append(' ').append(f.format(throughput_sec)).append(' '); sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(' '); sb.append(Runtime.getRuntime().totalMemory() / 1000.0); if(dump_transport_stats) { Map stats=transport.dumpStats(); if(stats != null) { print(stats, sb); } } return sb.toString(); } public String dumpTransportStats() { Map stats=transport.dumpStats(); StringBuffer sb=new StringBuffer(128); if(stats != null) { Map.Entry entry; String key; Map value; for(Iterator it=stats.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); key=(String)entry.getKey(); value=(Map)entry.getValue(); sb.append("\n").append(key).append(":\n"); for(Iterator it2=value.entrySet().iterator(); it2.hasNext();) { sb.append(it2.next()).append("\n"); } } } return sb.toString(); } private void print(Map stats, StringBuffer sb) { sb.append("\nTransport stats:\n\n"); Map.Entry entry; Object key, val; for(Iterator it=stats.entrySet().iterator(); it.hasNext();) { entry=(Map.Entry)it.next(); key=entry.getKey(); val=entry.getValue(); sb.append(key).append(": ").append(val).append("\n"); } } void runDiscoveryPhase() throws Exception { sendDiscoveryRequest(); sendDiscoveryResponse(); synchronized(this.members) { System.out.println("-- waiting for " + num_members + " members to join"); while(this.members.size() < num_members) { this.members.wait(2000); sendDiscoveryRequest(); sendDiscoveryResponse(); } heard_from.addAll(members); System.out.println("-- members: " + this.members.size()); } } void sendDiscoveryRequest() throws Exception { Data d=new Data(Data.DISCOVERY_REQ); // System.out.println("-- sending discovery request"); transport.send(null, generatePayload(d, null)); } void sendDiscoveryResponse() throws Exception { final Data d2=new Data(Data.DISCOVERY_RSP); if(sender) { d2.sender=true; d2.num_msgs=Long.parseLong(config.getProperty("num_msgs")); } response_sender.execute(new Runnable() { public void run() { try { transport.send(null, generatePayload(d2, null)); } catch(Exception e) { log.error("failed sending discovery response", e); } } }); } public static void main(String[] args) { Properties config=new Properties(); boolean sender=false, verbose=false, jmx=false, dump_stats=false; // dumps at end of run Test t=null; String output=null; long interval=0; int interval_nanos=0; boolean busy_sleep=false; for(int i=0; i < args.length; i++) { if("-sender".equals(args[i])) { config.put("sender", "true"); sender=true; continue; } if("-receiver".equals(args[i])) { config.put("sender", "false"); sender=false; continue; } if("-config".equals(args[i])) { String config_file=args[++i]; config.put("config", config_file); continue; } if("-props".equals(args[i])) { String props=args[++i]; config.put("props", props); continue; } if("-verbose".equals(args[i])) { verbose=true; continue; } if("-jmx".equals(args[i])) { jmx=true; continue; } if("-dump_stats".equals(args[i])) { dump_stats=true; continue; } if("-interval".equals(args[i])) { interval=Long.parseLong(args[++i]); continue; } if("-nanos".equals(args[i])) { interval_nanos=Integer.parseInt(args[++i]); continue; } if("-busy_sleep".equals(args[i])) { busy_sleep=true; continue; } if("-f".equals(args[i])) { output=args[++i]; continue; } help(); return; } try { /*int prio=Thread.currentThread().getPriority(); System.out.println("current thread: " + Thread.currentThread() + ", prio: " + prio); Thread.currentThread().setPriority(Thread.MAX_PRIORITY); prio=Thread.currentThread().getPriority(); System.out.println("current thread: " + Thread.currentThread() + ", prio: " + prio);*/ t=new Test(); t.start(config, verbose, jmx, output); t.runDiscoveryPhase(); if(sender) { t.sendMessages(interval, interval_nanos, busy_sleep); } synchronized(t) { while(t.receivedFinalResults() == false) { t.wait(2000); } } if(dump_stats) { String stats=t.dumpTransportStats(); System.out.println("\nTransport statistics:\n" + stats); } if(t.jmx) { System.out.println("jmx=true: not terminating"); if(t != null) { t.stop(); t=null; } while(true) { Util.sleep(60000); } } } catch(Exception e) { e.printStackTrace(); } finally { if(t != null) { t.stop(); } } } static void help() { System.out.println("Test [-help] ([-sender] | [-receiver]) " + "[-config <config file>] " + "[-props <stack config>] [-verbose] [-jmx] " + "[-dump_stats] [-f <filename>] [-interval <ms between sends>] " + "[-nanos <additional nanos to sleep in interval>] [-busy_sleep (cancels out -nanos)]"); } private class ResultsPublisher implements Runnable { final long interval=1000; boolean running=true; Thread t; void start() { if(t == null) { t=new Thread(this, "ResultsPublisher"); t.setDaemon(true); t.start(); } } void stop() { if(t != null && t.isAlive()) { Thread tmp=t; t=null; tmp.interrupt(); } } public void run() { try { while(t != null) { sendResults(); Util.sleep(interval); } } catch(Exception e) { e.printStackTrace(); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -