⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 pingpongbench.java

📁 sea是一个基于seda模式的实现。这个设计模式将系统分为很多stage。每个stage分布不同的任务(基于线程池)。通过任务流的方式提高系统的效率。
💻 JAVA
字号:
/* * Copyright (c) 2003, The Regents of the University of California, through * Lawrence Berkeley National Laboratory (subject to receipt of any required * approvals from the U.S. Dept. of Energy). All rights reserved. */package gov.lbl.dsd.sea.nio.demo;import gov.lbl.dsd.sea.ExecutorFactory;import gov.lbl.dsd.sea.Stage;import gov.lbl.dsd.sea.StageManager;import gov.lbl.dsd.sea.nio.AgentEventHandler;import gov.lbl.dsd.sea.nio.NetAgent;import gov.lbl.dsd.sea.nio.event.AdminRequest;import gov.lbl.dsd.sea.nio.event.ChannelRequest;import gov.lbl.dsd.sea.nio.event.ChannelResponse;import gov.lbl.dsd.sea.nio.util.ByteBufferPool;import gov.lbl.dsd.sea.nio.util.SocketOpts;import java.io.IOException;import java.net.InetSocketAddress;import java.net.SocketException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;/** * Asynchronous non-blocking ping pong benchmark; Sends messages back and forth * between client and server and measures throughput. * <p> * Example server usage (large messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench * server 9000 2000 2000 * <p> * Example client usage (large messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench * client localhost 9000 2000 1000000 2000 2000  * <p> * Example server usage (micro messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench * server 9000 1 1 * <p> * Example client usage (micro messages): fire-java gov.lbl.dsd.sea.nio.demo.PingPongBench * client localhost 9000 0.003 600 1 1  * <p> * Using localhost with large messages should report on the order of 190 MB/s throughput  * (appropriately large packet sizes and TCP buffer sizes are critical). * <p> * Using localhost with micro messages should report on the order of 30000 messages/s throughput. * <p> * Set log level to ERROR to avoid logging becoming the bottleneck! *  * @author whoschek@lbl.gov * @author $Author: gegles $ * @version $Revision: 1.5 $, $Date: 2004/09/16 16:57:15 $ */public class PingPongBench extends AgentEventHandler {	private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(PingPongBench.class);	protected int chunkSize = 8192;	protected long todo = 10;	protected long done = 0;	protected boolean isClient = true;	protected long startTime;	protected long endTime;		protected long reads = 0;		static public void main(String args[]) throws IOException {		new PingPongBench(args);	}		public PingPongBench(String args[]) throws IOException {		NetAgent agent = new NetAgent();				String hostName = "localhost";		int port = 9000;				int k=-1;		if (args.length > ++k) {			isClient = args[k].equals("client");		}		if (isClient && args.length > ++k) {					hostName = args[k]; 		}		if (args.length > ++k) {			port = Integer.parseInt(args[k]);					}		if (isClient && args.length > ++k) {			chunkSize = Math.round(1024 * Float.parseFloat(args[k]));					}		if (isClient && args.length > ++k) {			todo = Math.round(1024 * Float.parseFloat(args[k]));				}		if (args.length > ++k) {			// setting a buffer pool is a purely optional performance optimization			int n = Math.round(1024 * Float.parseFloat(args[k]));			agent.setReadBufferPool(new ByteBufferPool(5 * n, n, true, null));					}				SocketOpts options = new SocketOpts();		if (args.length > ++k) {			int n = Math.round(1024 * Float.parseFloat(args[k]));			options.setOption(SocketOpts.SO_RCVBUF, new Integer(n));						options.setOption(SocketOpts.SO_SNDBUF, new Integer(n));					}				if (args.length > ++k) {			options.setOption(SocketOpts.TCP_NODELAY, new Boolean(args[k]));					}		agent.setSocketOptions(options);		ExecutorFactory execFactory = StageManager.QUEUED;		if (args.length > ++k) {			if (args[k].equals("single")) execFactory = StageManager.DIRECT;		}		Stage myStage = new StageManager(execFactory).createStage(this).start();			if (isClient) {			agent.addConnectAddress(myStage, new InetSocketAddress(hostName, port));		}		else {			agent.addListenPort(myStage, port);		}		agent.start();	}	protected void onAccepted(ChannelResponse.Accepted rsp) {		// server has accepted new connection from remote client		// register read interest with new socket channel		// this triggers future ChannelResponse.Read events to be delivered to us		log.error("*************got accepted=" + rsp);		try {			log.error("socketOpts=" + new SocketOpts(((SocketChannel) rsp.getKey().channel()).socket()));		} catch (SocketException e) {			throw new RuntimeException(e);		}		this.startTime = System.currentTimeMillis();		rsp.getAgent().enqueue(new ChannelRequest.Register(this.getStage(), rsp.getKey().channel(), SelectionKey.OP_READ));		log.error("*************enqueued register");	}	protected void onClosed(ChannelResponse.Closed rsp) {		log.error("*************got channel closed=" + rsp);		if (rsp.getException() != null) { 			; // nothing special to be done		}		if (isClient) {			this.shutDown(rsp);			if (this.endTime > 0) {				this.printStats(rsp, endTime - startTime);				try {					Thread.sleep(100);				} catch (InterruptedException e) {					// TODO Auto-generated catch block					e.printStackTrace();				}				this.printStats(rsp, endTime - startTime);			}		}		else {			this.endTime = System.currentTimeMillis();			this.printStats(rsp, endTime - startTime); // no problem; continueing			try {				Thread.sleep(100);			} catch (InterruptedException e) {				// TODO Auto-generated catch block				e.printStackTrace();			}			this.printStats(rsp, endTime - startTime); // no problem; continueing			this.endTime = 0;		}			}	protected void onConnected(ChannelResponse.Connected rsp) {		log.error("*************got connected=" + rsp);				ByteBuffer buf = ByteBuffer.allocateDirect(chunkSize);		//ByteBuffer buf = NioUtil.toAsciiByteBuffer("hello");				try {			log.error("socketOpts=" + new SocketOpts(((SocketChannel) rsp.getKey().channel()).socket()));		} catch (SocketException e) {			throw new RuntimeException(e);		}		this.startTime = System.currentTimeMillis();				rsp.getAgent().enqueue(new ChannelRequest.Register(this.getStage(), rsp.getKey().channel(), SelectionKey.OP_READ));		rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), buf));		log.info("*************writeData1 enqueued");					}	protected void onRegistered(ChannelResponse.Registered rsp) {		log.error("*************got registered=" + rsp);	}	protected void onRead(ChannelResponse.Read rsp) {		// we have read some data		this.reads++;		//byte[] bytes = NioUtil.toByteArray(rsp.getBuffer());		//String str = new String(bytes);		//log.error("******************** read ***********\n" + str);		done += rsp.getBuffer().remaining();		log.info("########### done reads so far = " + done);		if (isClient && done > todo) {			if (this.endTime > 0) {				done -= rsp.getBuffer().remaining();			}			else {				this.endTime = System.currentTimeMillis();				rsp.getAgent().enqueue(new ChannelRequest.Close(rsp.getKey().channel()));							}		}		else {			// echo the same data back to client								ByteBuffer writeBuf = rsp.getBuffer();			rsp.getAgent().enqueue(new ChannelRequest.WriteData(rsp.getKey().channel(), writeBuf));			log.info("*************writeData2 enqueued");			}			}	protected void onWrite(ChannelResponse.Write rsp) {		// we have written some data		log.info("******************** wrote all " + rsp.getBuffer().limit() + " bytes");		// recycling to buffer pool is a purely optional performance optimization		rsp.getAgent().getReadBufferPool().put(rsp.getBuffer());	}	private void shutDown(ChannelResponse rsp) {		log.error("now shutting down demo");		this.getStage().stop();				rsp.getAgent().enqueue(new AdminRequest.Stop());	}	private void printStats(ChannelResponse rsp, long time) {		long realdone = this.done * 2; // we have written and read those bytes		long realreads = this.reads * 2; // we have written and read those bytes		System.out.println("Summary statistics:");		System.out.println("*******************");		System.out.println(realdone / (1024.0f * 1024) + " MB echoed back and forth");		System.out.println(realdone + " bytes echoed back and forth");		System.out.println(time / 1000.0f + " seconds");		System.out.println(1.0f * realdone / ( 1024 * time) + " MB/s throughput");		System.out.println();		System.out.println(realreads + " messages");		System.out.println(realreads / (time / 1000.0f) + " messages/s");		System.out.println("\nagent=" + rsp.getAgent().toDebugString());	}	}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -