⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 test.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package org.jgroups.tests.perf;import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.jgroups.Version;import org.jgroups.util.Util;import java.io.BufferedReader;import java.io.FileReader;import java.io.FileWriter;import java.io.IOException;import java.text.NumberFormat;import java.util.*;/**  You start the test by running this class. * @author Bela Ban (belaban@yahoo.com) */public class Test implements Receiver {    String          props=null;    Properties      config;    boolean         sender=false;    Transport       transport=null;    Object          local_addr=null;    /** Map<Object,MemberInfo> members. Keys=member addresses, value=MemberInfo */    Map             senders=new ConcurrentReaderHashMap(10);    /** Keeps track of members. ArrayList<SocketAddress> */    final ArrayList       members=new ArrayList();    /** Set when first message is received */    long            start=0;    /** Set when last message is received */    long            stop=0;    int             num_members=0;    int             num_senders=0;    long            num_msgs_expected=0;    long            num_msgs_received=0;  // from everyone    long            num_bytes_received=0; // from everyone    Log             log=LogFactory.getLog(getClass());    boolean         all_received=false;    boolean         final_results_received=false;    /** Map<Object, MemberInfo>. A hashmap of senders, each value is the 'senders' hashmap */    Map             results=new HashMap();    private ResultsPublisher publisher=new ResultsPublisher();    List            heard_from=new ArrayList();    boolean         dump_transport_stats=false;    /** Log every n msgs received */    long            log_interval=1000;    long            counter=1;    long            msg_size=1000;    boolean         jmx=false;    /** Number of ms to wait at the receiver to simulate processing of the received message (0 == don't wait) */    long            processing_delay=0;    FileWriter      output=null;    QueuedExecutor  response_sender=new QueuedExecutor();    static  NumberFormat f;    static {        f=NumberFormat.getNumberInstance();        f.setGroupingUsed(false);        f.setMaximumFractionDigits(2);    }    public void start(Properties c, boolean verbose, boolean jmx, String output) throws Exception {        String          config_file="config.txt";        BufferedReader  fileReader;        String          line;        String          key, val;        StringTokenizer st;        Properties      tmp=new Properties();        if(output != null)            this.output=new FileWriter(output, false);        response_sender.setThreadFactory(new ThreadFactory() {            public Thread newThread(Runnable runnable) {                return new Thread(runnable, "Test.ResponseSender");            }        });        config_file=c.getProperty("config");        fileReader=new BufferedReader(new FileReader(config_file));        while((line=fileReader.readLine()) != null) {            if(line.startsWith("#"))                continue;            line=line.trim();            if(line.length() == 0)                continue;            st=new StringTokenizer(line, "=", false);            key=st.nextToken().toLowerCase();            val=st.nextToken();            tmp.put(key, val);        }        fileReader.close();        // 'tmp' now contains all properties from the file, now we need to override the ones        // passed to us by 'c'        tmp.putAll(c);        this.config=tmp;        StringBuffer sb=new StringBuffer();        sb.append("\n\n----------------------- TEST -----------------------\n");        sb.append("Date: ").append(new Date()).append('\n');        sb.append("Run by: ").append(System.getProperty("user.name")).append("\n\n");        if(verbose)            sb.append("Properties: ").append(printProperties()).append("\n-------------------------\n\n");        for(Iterator it=this.config.entrySet().iterator(); it.hasNext();) {            Map.Entry entry=(Map.Entry)it.next();            sb.append(entry.getKey()).append(":\t").append(entry.getValue()).append('\n');        }        sb.append("JGroups version: ").append(Version.description).append('\n');        System.out.println("Configuration is: " + sb);        output(sb.toString());        props=this.config.getProperty("props");        num_members=Integer.parseInt(this.config.getProperty("num_members"));        num_senders=Integer.parseInt(this.config.getProperty("num_senders"));        long num_msgs=Long.parseLong(this.config.getProperty("num_msgs"));        this.num_msgs_expected=num_senders * num_msgs;        sender=Boolean.valueOf(this.config.getProperty("sender")).booleanValue();        msg_size=Long.parseLong(this.config.getProperty("msg_size"));        String tmp2=this.config.getProperty("dump_transport_stats", "false");        if(Boolean.valueOf(tmp2).booleanValue())            this.dump_transport_stats=true;        tmp2=this.config.getProperty("log_interval");        if(tmp2 != null)            log_interval=Long.parseLong(tmp2);        sb=new StringBuffer();        sb.append("\n##### msgs_received");        sb.append(", current time (in ms)");        sb.append(", msgs/sec");        sb.append(", throughput/sec [KB]");        sb.append(", free_mem [KB] ");        sb.append(", total_mem [KB] ");        output(sb.toString());        if(jmx) {            this.config.setProperty("jmx", "true");        }        this.jmx=new Boolean(this.config.getProperty("jmx")).booleanValue();        String tmp3=this.config.getProperty("processing_delay");        if(tmp3 != null)            this.processing_delay=Long.parseLong(tmp3);        String transport_name=this.config.getProperty("transport");        transport=(Transport)Util.loadClass(transport_name, this.getClass()).newInstance();        transport.create(this.config);        transport.setReceiver(this);        transport.start();        local_addr=transport.getLocalAddress();    }    private void output(String msg) {        // if(log.isInfoEnabled())           // log.info(msg);        if(this.output != null) {            try {                this.output.write(msg + "\n");                this.output.flush();            }            catch(IOException e) {            }        }    }    private String printProperties() {        StringBuffer sb=new StringBuffer();        Properties p=System.getProperties();        for(Iterator it=p.entrySet().iterator(); it.hasNext();) {            Map.Entry entry=(Map.Entry)it.next();            sb.append(entry.getKey()).append(": ").append(entry.getValue()).append('\n');        }        return sb.toString();    }    public void stop() {        if(transport != null) {            transport.stop();            transport.destroy();        }        if(response_sender != null) {            response_sender.shutdownNow();        }        if(this.output != null) {            try {                this.output.close();            }            catch(IOException e) {            }        }    }    public void receive(Object sender, byte[] payload) {        if(payload == null || payload.length == 0) {            System.err.println("payload is incorrect (sender=" + sender + "): " + payload);            return;        }        try {            int type=payload[0];            if(type == 1) { // DATA                int len=payload.length -1;                handleData(sender, len);                return;            }            byte[] tmp=new byte[payload.length-1];            System.arraycopy(payload, 1, tmp, 0, tmp.length);            Data d=(Data)Util.streamableFromByteBuffer(Data.class, tmp);            switch(d.getType()) {            case Data.DISCOVERY_REQ:                // System.out.println("-- received discovery request");                sendDiscoveryResponse();                break;            case Data.DISCOVERY_RSP:                // System.out.println("-- received discovery response from " + sender);                synchronized(this.members) {                    if(!this.members.contains(sender)) {                        this.members.add(sender);                        System.out.println("-- " + sender + " joined");                        if(d.sender) {                            synchronized(this.members) {                                if(!this.senders.containsKey(sender)) {                                    this.senders.put(sender, new MemberInfo(d.num_msgs));                                }                            }                        }                        this.members.notifyAll();                    }                }                break;            case Data.FINAL_RESULTS:                publisher.stop();                if(!final_results_received) {                    dumpResults(d.results);                    final_results_received=true;                }                synchronized(this) {                    this.notifyAll();                }                break;            case Data.RESULTS:                results.put(sender, d.result);                heard_from.remove(sender);                if(heard_from.size() == 0) {                    for(int i=0; i < 3; i++) {                        sendFinalResults();                        Util.sleep(100);                    }                }                break;            default:                log.error("received invalid data type: " + payload[0]);                break;            }        }        catch(Exception e) {            e.printStackTrace();        }    }    private void handleData(Object sender, int num_bytes) {        if(all_received)            return;        if(start == 0) {            start=System.currentTimeMillis();        }        num_msgs_received++;        num_bytes_received+=num_bytes;        if(num_msgs_received >= num_msgs_expected) {            if(stop == 0)                stop=System.currentTimeMillis();            all_received=true;        }        if(num_msgs_received % log_interval == 0)            System.out.println(new StringBuffer("-- received ").append(num_msgs_received).append(" messages"));        if(counter % log_interval == 0) {            output(dumpStats(counter));        }        MemberInfo info=(MemberInfo)this.senders.get(sender);        if(info != null) {            if(info.start == 0)                info.start=System.currentTimeMillis();            info.num_msgs_received++;            counter++;            info.total_bytes_received+=num_bytes;            if(info.num_msgs_received >= info.num_msgs_expected) {                info.done=true;                if(info.stop == 0)                    info.stop=System.currentTimeMillis();            }            else {                if(processing_delay > 0)                    Util.sleep(processing_delay);            }        }        else {            log.error("-- sender " + sender + " not found in senders hashmap");        }        if(all_received) {            if(!this.sender)                dumpSenders();            publisher.start();        }    }    private void sendResults() throws Exception {        Data d=new Data(Data.RESULTS);        byte[] buf;        MemberInfo info=new MemberInfo(num_msgs_expected);        info.done=true;        info.num_msgs_received=num_msgs_received;        info.start=start;        info.stop=stop;        info.total_bytes_received=this.num_bytes_received;        d.result=info;        buf=generatePayload(d, null);        transport.send(null, buf);    }    private void sendFinalResults() throws Exception {        Data d=new Data(Data.FINAL_RESULTS);        d.results=new ConcurrentReaderHashMap(this.results);        final byte[] buf=generatePayload(d, null);        // transport.send(null, buf);        response_sender.execute(new Runnable() {            public void run() {                try {                    transport.send(null, buf);                }                catch(Exception e) {                    log.error("failed sending discovery response", e);                }            }        });    }    boolean allReceived() {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -