📄 loadtest.java
字号:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.catalina.tribes.demos;
import java.io.Serializable;
import java.util.Random;
import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.Channel;
import java.io.Externalizable;
/**
* <p>Title: </p>
*
* <p>Description: </p>
*
* <p>Copyright: Copyright (c) 2005</p>
*
* <p>Company: </p>
*
* @author not attributable
* @version 1.0
*/
public class LoadTest implements MembershipListener,ChannelListener, Runnable {
protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(LoadTest.class);
public static int size = 24000;
public static Object mutex = new Object();
public boolean doRun = true;
public long bytesReceived = 0;
public float mBytesReceived = 0;
public int messagesReceived = 0;
public boolean send = true;
public boolean debug = false;
public int msgCount = 100;
ManagedChannel channel=null;
public int statsInterval = 10000;
public long pause = 0;
public boolean breakonChannelException = false;
public boolean async = false;
public long receiveStart = 0;
public int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
static int messageSize = 0;
public static long messagesSent = 0;
public static long messageStartSendTime = 0;
public static long messageEndSendTime = 0;
public static int threadCount = 0;
public static synchronized void startTest() {
threadCount++;
if ( messageStartSendTime == 0 ) messageStartSendTime = System.currentTimeMillis();
}
public static synchronized void endTest() {
threadCount--;
if ( messageEndSendTime == 0 && threadCount==0 ) messageEndSendTime = System.currentTimeMillis();
}
public static synchronized long addSendStats(long count) {
messagesSent+=count;
return 0l;
}
private static void printSendStats(long counter, int messageSize) {
float cnt = (float)counter;
float size = (float)messageSize;
float time = (float)(System.currentTimeMillis()-messageStartSendTime) / 1000f;
log.info("****SEND STATS-"+Thread.currentThread().getName()+"*****"+
"\n\tMessage count:"+counter+
"\n\tTotal bytes :"+(long)(size*cnt)+
"\n\tTotal seconds:"+(time)+
"\n\tBytes/second :"+(size*cnt/time)+
"\n\tMBytes/second:"+(size*cnt/time/1024f/1024f));
}
public LoadTest(ManagedChannel channel,
boolean send,
int msgCount,
boolean debug,
long pause,
int stats,
boolean breakOnEx) {
this.channel = channel;
this.send = send;
this.msgCount = msgCount;
this.debug = debug;
this.pause = pause;
this.statsInterval = stats;
this.breakonChannelException = breakOnEx;
}
public void run() {
long counter = 0;
long total = 0;
LoadMessage msg = new LoadMessage();
int messageSize = LoadTest.messageSize;
try {
startTest();
while (total < msgCount) {
if (channel.getMembers().length == 0 || (!send)) {
synchronized (mutex) {
try {
mutex.wait();
} catch (InterruptedException x) {
log.info("Thread interrupted from wait");
}
}
} else {
try {
//msg.setMsgNr((int)++total);
counter++;
if (debug) {
printArray(msg.getMessage());
}
channel.send(channel.getMembers(), msg, channelOptions);
if ( pause > 0 ) {
if ( debug) System.out.println("Pausing sender for "+pause+" ms.");
Thread.sleep(pause);
}
} catch (ChannelException x) {
if ( debug ) log.error("Unable to send message:"+x.getMessage(),x);
log.error("Unable to send message:"+x.getMessage());
ChannelException.FaultyMember[] faulty = x.getFaultyMembers();
for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]);
--counter;
if ( this.breakonChannelException ) throw x;
}
}
if ( (counter % statsInterval) == 0 && (counter > 0)) {
//add to the global counter
counter = addSendStats(counter);
//print from the global counter
//printSendStats(LoadTest.messagesSent, LoadTest.messageSize, LoadTest.messageSendTime);
printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
}
}
}catch ( Exception x ) {
log.error("Captured error while sending:"+x.getMessage());
if ( debug ) log.error("",x);
printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
}
endTest();
}
/**
* memberAdded
*
* @param member Member
* @todo Implement this org.apache.catalina.tribes.MembershipListener
* method
*/
public void memberAdded(Member member) {
log.info("Member added:"+member);
synchronized (mutex) {
mutex.notifyAll();
}
}
/**
* memberDisappeared
*
* @param member Member
* @todo Implement this org.apache.catalina.tribes.MembershipListener
* method
*/
public void memberDisappeared(Member member) {
log.info("Member disappeared:"+member);
}
public boolean accept(Serializable msg, Member mbr){
return (msg instanceof LoadMessage) || (msg instanceof ByteMessage);
}
public void messageReceived(Serializable msg, Member mbr){
if ( receiveStart == 0 ) receiveStart = System.currentTimeMillis();
if ( debug ) {
if ( msg instanceof LoadMessage ) {
printArray(((LoadMessage)msg).getMessage());
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -