📄 pingpongbench.java
字号:
/* * Copyright (c) 2003, The Regents of the University of California, through * Lawrence Berkeley National Laboratory (subject to receipt of any required * approvals from the U.S. Dept. of Energy). All rights reserved. */package gov.lbl.dsd.sea.nio.demo;import gov.lbl.dsd.sea.ExecutorFactory;import gov.lbl.dsd.sea.Stage;import gov.lbl.dsd.sea.StageManager;import gov.lbl.dsd.sea.nio.AgentEventHandler;import gov.lbl.dsd.sea.nio.NetAgent;import gov.lbl.dsd.sea.nio.event.AdminRequest;import gov.lbl.dsd.sea.nio.event.ChannelRequest;import gov.lbl.dsd.sea.nio.event.ChannelResponse;import gov.lbl.dsd.sea.nio.util.ByteBufferPool;import gov.lbl.dsd.sea.nio.util.SocketOpts;import java.io.IOException;import java.net.InetSocketAddress;import java.net.SocketException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;/** * Asynchronous non-blocking ping pong benchmark; Sends messages back and forth * between client and server and measures throughput. * <p> * Example server usage (large messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench * server 9000 2000 2000 * <p> * Example client usage (large messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench * client localhost 9000 2000 1000000 2000 2000 * <p> * Example server usage (micro messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench * server 9000 1 1 * <p> * Example client usage (micro messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench * client localhost 9000 0.003 600 1 1 * <p> * Using localhost with large messages should report on the order of 190 MB/s throughput * (appropriately large packet sizes and TCP buffer sizes are critical). * <p> * Using localhost with micro messages should report on the order of 30000 messages/s throughput. * <p> * Set log level to ERROR to avoid logging becoming the bottleneck! * * @author whoschek@lbl.gov * @author $Author: gegles $ * @version $Revision: 1.5 $, $Date: 2004/09/16 16:57:15 $ */public class PingPongBench extends AgentEventHandler { private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(PingPongBench.class); protected int chunkSize = 8192; protected long todo = 10; protected long done = 0; protected boolean isClient = true; protected long startTime; protected long endTime; protected long reads = 0; static public void main(String args[]) throws IOException { new PingPongBench(args); } public PingPongBench(String args[]) throws IOException { NetAgent agent = new NetAgent(); String hostName = "localhost"; int port = 9000; int k=-1; if (args.length > ++k) { isClient = args[k].equals("client"); } if (isClient && args.length > ++k) { hostName = args[k]; } if (args.length > ++k) { port = Integer.parseInt(args[k]); } if (isClient && args.length > ++k) { chunkSize = Math.round(1024 * Float.parseFloat(args[k])); } if (isClient && args.length > ++k) { todo = Math.round(1024 * Float.parseFloat(args[k])); } if (args.length > ++k) { // setting a buffer pool is a purely optional performance optimization int n = Math.round(1024 * Float.parseFloat(args[k])); agent.setReadBufferPool(new ByteBufferPool(5 * n, n, true, null)); } SocketOpts options = new SocketOpts(); if (args.length > ++k) { int n = Math.round(1024 * Float.parseFloat(args[k])); options.setOption(SocketOpts.SO_RCVBUF, new Integer(n)); options.setOption(SocketOpts.SO_SNDBUF, new Integer(n)); } if (args.length > ++k) { options.setOption(SocketOpts.TCP_NODELAY, new Boolean(args[k])); } agent.setSocketOptions(options); ExecutorFactory execFactory = StageManager.QUEUED; if (args.length > ++k) { if (args[k].equals("single")) execFactory = StageManager.DIRECT; } Stage myStage = new StageManager(execFactory).createStage(this).start(); if (isClient) { agent.addConnectAddress(myStage, new InetSocketAddress(hostName, port)); } else { agent.addListenPort(myStage, port); } agent.start(); } protected void onAccepted(ChannelResponse.Accepted rsp) { // server has accepted new connection from remote client // register read interest with new socket channel // this triggers future ChannelResponse.Read events to be delivered to us log.error("*************got accepted=" + rsp); try { log.error("socketOpts=" + new SocketOpts(((SocketChannel) rsp.getKey().channel()).socket())); } catch (SocketException e) { throw new RuntimeException(e); } this.startTime = System.currentTimeMillis(); rsp.getAgent().enqueue(new ChannelRequest.Register(this.getStage(), rsp.getKey().channel(), SelectionKey.OP_READ)); log.error("*************enqueued register"); } protected void onClosed(ChannelResponse.Closed rsp) { log.error("*************got channel closed=" + rsp); if (rsp.getException() != null) { ; // nothing special to be done } if (isClient) { this.shutDown(rsp); if (this.endTime > 0) { this.printStats(rsp, endTime - startTime); try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } this.printStats(rsp, endTime - startTime); } } else { this.endTime = System.currentTimeMillis(); this.printStats(rsp, endTime - startTime); // no problem; continueing try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } this.printStats(rsp, endTime - startTime); // no problem; continueing this.endTime = 0; } } protected void onConnected(ChannelResponse.Connected rsp) { log.error("*************got connected=" + rsp); ByteBuffer buf = ByteBuffer.allocateDirect(chunkSize); //ByteBuffer buf = NioUtil.toAsciiByteBuffer("hello"); try { log.error("socketOpts=" + new SocketOpts(((SocketChannel) rsp.getKey().channel()).socket())); } catch (SocketException e) { throw new RuntimeException(e); } this.startTime = System.currentTimeMillis(); rsp.getAgent().enqueue(new ChannelRequest.Register(this.getStage(), rsp.getKey().channel(), SelectionKey.OP_READ)); rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), buf)); log.info("*************writeData1 enqueued"); } protected void onRegistered(ChannelResponse.Registered rsp) { log.error("*************got registered=" + rsp); } protected void onRead(ChannelResponse.Read rsp) { // we have read some data this.reads++; //byte[] bytes = NioUtil.toByteArray(rsp.getBuffer()); //String str = new String(bytes); //log.error("******************** read ***********\n" + str); done += rsp.getBuffer().remaining(); log.info("########### done reads so far = " + done); if (isClient && done > todo) { if (this.endTime > 0) { done -= rsp.getBuffer().remaining(); } else { this.endTime = System.currentTimeMillis(); rsp.getAgent().enqueue(new ChannelRequest.Close(rsp.getKey().channel())); } } else { // echo the same data back to client ByteBuffer writeBuf = rsp.getBuffer(); rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), writeBuf)); log.info("*************writeData2 enqueued"); } } protected void onWrite(ChannelResponse.Write rsp) { // we have written some data log.info("******************** wrote all " + rsp.getBuffer().limit() + " bytes"); // recycling to buffer pool is a purely optional performance optimization rsp.getAgent().getReadBufferPool().put(rsp.getBuffer()); } private void shutDown(ChannelResponse rsp) { log.error("now shutting down demo"); this.getStage().stop(); rsp.getAgent().enqueue(new AdminRequest.Stop()); } private void printStats(ChannelResponse rsp, long time) { long realdone = this.done * 2; // we have written and read those bytes long realreads = this.reads * 2; // we have written and read those bytes System.out.println("Summary statistics:"); System.out.println("*******************"); System.out.println(realdone / (1024.0f * 1024) + " MB echoed back and forth"); System.out.println(realdone + " bytes echoed back and forth"); System.out.println(time / 1000.0f + " seconds"); System.out.println(1.0f * realdone / ( 1024 * time) + " MB/s throughput"); System.out.println(); System.out.println(realreads + " messages"); System.out.println(realreads / (time / 1000.0f) + " messages/s"); System.out.println("\nagent=" + rsp.getAgent().toDebugString()); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -