📄 loadtest.java
字号:
/* * Copyright 1999,2004-2006 The Apache Software Foundation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.catalina.tribes.demos;import java.io.Serializable;import java.util.Random;import org.apache.catalina.tribes.ByteMessage;import org.apache.catalina.tribes.ChannelException;import org.apache.catalina.tribes.ChannelListener;import org.apache.catalina.tribes.ManagedChannel;import org.apache.catalina.tribes.Member;import org.apache.catalina.tribes.MembershipListener;import org.apache.catalina.tribes.io.XByteBuffer;import org.apache.catalina.tribes.Channel;import java.io.Externalizable;/** * <p>Title: </p> * * <p>Description: </p> * * <p>Copyright: Copyright (c) 2005</p> * * <p>Company: </p> * * @author not attributable * @version 1.0 */public class LoadTest implements MembershipListener,ChannelListener, Runnable { protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(LoadTest.class); public static int size = 1020; public static Object mutex = new Object(); public boolean doRun = true; public long bytesReceived = 0; public int messagesReceived = 0; public boolean send = true; public boolean debug = false; public int msgCount = 100; ManagedChannel channel=null; public int statsInterval = 10000; public long pause = 0; public boolean breakonChannelException = false; public boolean async = false; public long receiveStart = 0; public int channelOptions = Channel.SEND_OPTIONS_DEFAULT; static int messageSize = 0; public static long messagesSent = 0; public static long messageStartSendTime = 0; public static long messageEndSendTime = 0; public static int threadCount = 0; public static synchronized void startTest() { threadCount++; if ( messageStartSendTime == 0 ) messageStartSendTime = System.currentTimeMillis(); } public static synchronized void endTest() { threadCount--; if ( messageEndSendTime == 0 && threadCount==0 ) messageEndSendTime = System.currentTimeMillis(); } public static synchronized long addSendStats(long count) { messagesSent+=count; return 0l; } private static void printSendStats(long counter, int messageSize) { float cnt = (float)counter; float size = (float)messageSize; float time = (float)(System.currentTimeMillis()-messageStartSendTime) / 1000f; log.info("****SEND STATS-"+Thread.currentThread().getName()+"*****"+ "\n\tMessage count:"+counter+ "\n\tTotal bytes :"+(long)(size*cnt)+ "\n\tTotal seconds:"+(time)+ "\n\tBytes/second :"+(size*cnt/time)+ "\n\tMBytes/second:"+(size*cnt/time/1024f/1024f)); } public LoadTest(ManagedChannel channel, boolean send, int msgCount, boolean debug, long pause, int stats, boolean breakOnEx) { this.channel = channel; this.send = send; this.msgCount = msgCount; this.debug = debug; this.pause = pause; this.statsInterval = stats; this.breakonChannelException = breakOnEx; } public void run() { long counter = 0; long total = 0; LoadMessage msg = new LoadMessage(); int messageSize = LoadTest.messageSize; try { startTest(); while (total < msgCount) { if (channel.getMembers().length == 0 || (!send)) { synchronized (mutex) { try { mutex.wait(); } catch (InterruptedException x) { log.info("Thread interrupted from wait"); } } } else { try { msg.setMsgNr((int)++total); counter++; if (debug) { printArray(msg.getMessage()); } channel.send(channel.getMembers(), msg, channelOptions); if ( pause > 0 ) { if ( debug) System.out.println("Pausing sender for "+pause+" ms."); Thread.sleep(pause); } } catch (ChannelException x) { log.error("Unable to send message:"+x.getMessage(),x); Member[] faulty = x.getFaultyMembers(); for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]); --counter; if ( this.breakonChannelException ) throw x; } } if ( (counter % statsInterval) == 0 && (counter > 0)) { //add to the global counter counter = addSendStats(counter); //print from the global counter //printSendStats(LoadTest.messagesSent, LoadTest.messageSize, LoadTest.messageSendTime); printSendStats(LoadTest.messagesSent, LoadTest.messageSize); } } }catch ( Exception x ) { log.error("Captured error while sending:"+x.getMessage()); if ( debug ) log.error("",x); printSendStats(LoadTest.messagesSent, LoadTest.messageSize); } endTest(); } /** * memberAdded * * @param member Member * @todo Implement this org.apache.catalina.tribes.MembershipListener * method */ public void memberAdded(Member member) { log.info("Member added:"+member); synchronized (mutex) { mutex.notifyAll(); } } /** * memberDisappeared * * @param member Member * @todo Implement this org.apache.catalina.tribes.MembershipListener * method */ public void memberDisappeared(Member member) { log.info("Member disappeared:"+member); } public boolean accept(Serializable msg, Member mbr){ return (msg instanceof LoadMessage) || (msg instanceof ByteMessage); } public void messageReceived(Serializable msg, Member mbr){ if ( receiveStart == 0 ) receiveStart = System.currentTimeMillis(); if ( debug ) { if ( msg instanceof LoadMessage ) { printArray(((LoadMessage)msg).getMessage()); } } if ( msg instanceof ByteMessage && !(msg instanceof LoadMessage)) { LoadMessage tmp = new LoadMessage();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -