⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 loadtest.java

📁 业界著名的tomcat服务器的最新6.0的源代码。
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.catalina.tribes.demos;

import java.io.Serializable;
import java.util.Random;

import org.apache.catalina.tribes.ByteMessage;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ChannelListener;
import org.apache.catalina.tribes.ManagedChannel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.MembershipListener;
import org.apache.catalina.tribes.io.XByteBuffer;
import org.apache.catalina.tribes.Channel;
import java.io.Externalizable;


/**
 * <p>Title: </p>
 *
 * <p>Description: </p>
 *
 * <p>Copyright: Copyright (c) 2005</p>
 *
 * <p>Company: </p>
 *
 * @author not attributable
 * @version 1.0
 */
public class LoadTest implements MembershipListener,ChannelListener, Runnable {
    protected static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(LoadTest.class);
    public static int size = 24000;
    public static Object mutex = new Object();
    public boolean doRun = true;
    
    public long bytesReceived = 0;
    public float mBytesReceived = 0;
    public int  messagesReceived = 0;
    public boolean send = true;
    public boolean debug = false;
    public int msgCount = 100;
    ManagedChannel channel=null;
    public int statsInterval = 10000;
    public long pause = 0;
    public boolean breakonChannelException = false;
    public boolean async = false;
    public long receiveStart = 0;
    public int channelOptions = Channel.SEND_OPTIONS_DEFAULT;
    
    static int messageSize = 0;
    
    public static long messagesSent = 0;
    public static long messageStartSendTime = 0;
    public static long messageEndSendTime = 0;
    public static int  threadCount = 0;
    
    public static synchronized void startTest() {
        threadCount++;
        if ( messageStartSendTime == 0 ) messageStartSendTime = System.currentTimeMillis();
    }
    
    public static synchronized void endTest() {
        threadCount--;
        if ( messageEndSendTime == 0 && threadCount==0 ) messageEndSendTime = System.currentTimeMillis();
    }

    
    public static synchronized long addSendStats(long count) {
        messagesSent+=count;
        return 0l;
    }    
    
    private static void printSendStats(long counter, int messageSize) {
        float cnt = (float)counter;
        float size = (float)messageSize;
        float time = (float)(System.currentTimeMillis()-messageStartSendTime) / 1000f;
        log.info("****SEND STATS-"+Thread.currentThread().getName()+"*****"+
                 "\n\tMessage count:"+counter+
                 "\n\tTotal bytes  :"+(long)(size*cnt)+
                 "\n\tTotal seconds:"+(time)+
                 "\n\tBytes/second :"+(size*cnt/time)+
                 "\n\tMBytes/second:"+(size*cnt/time/1024f/1024f));
    }

    
    
    public LoadTest(ManagedChannel channel, 
                    boolean send,
                    int msgCount,
                    boolean debug,
                    long pause,
                    int stats,
                    boolean breakOnEx) {
        this.channel = channel;
        this.send = send;
        this.msgCount = msgCount;
        this.debug = debug;
        this.pause = pause;
        this.statsInterval = stats;
        this.breakonChannelException = breakOnEx;
    }
    
    
    
    public void run() {
        
        long counter = 0;
        long total = 0;
        LoadMessage msg = new LoadMessage();
        int messageSize = LoadTest.messageSize;
        
        try {
            startTest();
            while (total < msgCount) {
                if (channel.getMembers().length == 0 || (!send)) {
                    synchronized (mutex) {
                        try {
                            mutex.wait();
                        } catch (InterruptedException x) {
                            log.info("Thread interrupted from wait");
                        }
                    }
                } else {
                    try {
                        //msg.setMsgNr((int)++total);
                        counter++;
                        if (debug) {
                            printArray(msg.getMessage());
                        }
                        channel.send(channel.getMembers(), msg, channelOptions);
                        if ( pause > 0 ) {
                            if ( debug) System.out.println("Pausing sender for "+pause+" ms.");
                            Thread.sleep(pause);
                        }
                    } catch (ChannelException x) {
                        if ( debug ) log.error("Unable to send message:"+x.getMessage(),x);
                        log.error("Unable to send message:"+x.getMessage());
                        ChannelException.FaultyMember[] faulty = x.getFaultyMembers();
                        for (int i=0; i<faulty.length; i++ ) log.error("Faulty: "+faulty[i]);
                        --counter;
                        if ( this.breakonChannelException ) throw x;
                    }
                }
                if ( (counter % statsInterval) == 0 && (counter > 0)) {
                    //add to the global counter
                    counter = addSendStats(counter);
                    //print from the global counter
                    //printSendStats(LoadTest.messagesSent, LoadTest.messageSize, LoadTest.messageSendTime);
                    printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
                    
                }

            }
        }catch ( Exception x ) {
            log.error("Captured error while sending:"+x.getMessage());
            if ( debug ) log.error("",x);
            printSendStats(LoadTest.messagesSent, LoadTest.messageSize);
        }
        endTest();
    }

    

    /**
     * memberAdded
     *
     * @param member Member
     * @todo Implement this org.apache.catalina.tribes.MembershipListener
     *   method
     */
    public void memberAdded(Member member) {
        log.info("Member added:"+member);
        synchronized (mutex) {
            mutex.notifyAll();
        }
    }

    /**
     * memberDisappeared
     *
     * @param member Member
     * @todo Implement this org.apache.catalina.tribes.MembershipListener
     *   method
     */
    public void memberDisappeared(Member member) {
        log.info("Member disappeared:"+member);
    }
    
    public boolean accept(Serializable msg, Member mbr){ 
       return (msg instanceof LoadMessage) || (msg instanceof ByteMessage);
    }
    
    public void messageReceived(Serializable msg, Member mbr){ 
        if ( receiveStart == 0 ) receiveStart = System.currentTimeMillis();
        if ( debug ) {
            if ( msg instanceof LoadMessage ) {
                printArray(((LoadMessage)msg).getMessage());
            }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -