⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 test.java

📁 JGRoups源码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
        return all_received;    }    boolean receivedFinalResults() {        return final_results_received;    }    void sendMessages(long interval, int nanos, boolean busy_sleep) throws Exception {        long total_msgs=0;        int msgSize=Integer.parseInt(config.getProperty("msg_size"));        int num_msgs=Integer.parseInt(config.getProperty("num_msgs"));        // int logInterval=Integer.parseInt(config.getProperty("log_interval"));        byte[] buf=new byte[msgSize];        for(int k=0; k < msgSize; k++)            buf[k]='.';        Data d=new Data(Data.DATA);        byte[] payload=generatePayload(d, buf);        System.out.println("-- sending " + num_msgs + " " + Util.printBytes(msgSize) + " messages");        for(int i=0; i < num_msgs; i++) {            transport.send(null, payload);            total_msgs++;            if(total_msgs % log_interval == 0) {                System.out.println("++ sent " + total_msgs);            }            if(interval > 0 || nanos > 0) {                if(busy_sleep)                    Util.sleep(interval, busy_sleep);                else                    Util.sleep(interval, nanos);            }        }    }    byte[] generatePayload(Data d, byte[] buf) throws Exception {        byte[] tmp=buf != null? buf : Util.streamableToByteBuffer(d);        byte[] payload=new byte[tmp.length +1];        payload[0]=intToByte(d.getType());        System.arraycopy(tmp, 0, payload, 1, tmp.length);        return payload;    }    private byte intToByte(int type) {        switch(type) {            case Data.DATA: return 1;            case Data.DISCOVERY_REQ: return 2;            case Data.DISCOVERY_RSP: return 3;            case Data.RESULTS: return 4;            case Data.FINAL_RESULTS: return 5;            default: return 0;        }    }    private void dumpResults(Map final_results) {        Object      member;        Map.Entry   entry;        MemberInfo  val;        double      combined_msgs_sec, tmp=0;        long        combined_tp;        StringBuffer sb=new StringBuffer();        sb.append("\n-- results:\n");        for(Iterator it=final_results.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            member=entry.getKey();            val=(MemberInfo)entry.getValue();            tmp+=val.getMessageSec();            sb.append("\n").append(member);            if(member.equals(local_addr))                sb.append(" (myself)");            sb.append(":\n");            sb.append(val);            sb.append('\n');        }        combined_msgs_sec=tmp / final_results.size();        combined_tp=(long)combined_msgs_sec * msg_size;        sb.append("\ncombined: ").append(f.format(combined_msgs_sec)).                append(" msgs/sec averaged over all receivers (throughput=" + Util.printBytes(combined_tp) + "/sec)\n");        System.out.println(sb.toString());        output(sb.toString());    }    private void dumpSenders() {        StringBuffer sb=new StringBuffer();        dump(this.senders, sb);        System.out.println(sb.toString());    }    private void dump(Map map, StringBuffer sb) {        Map.Entry  entry;        Object     mySender;        MemberInfo mi;        MemberInfo combined=new MemberInfo(0);        combined.start = Long.MAX_VALUE;        combined.stop = Long.MIN_VALUE;        sb.append("\n-- local results:\n");        for(Iterator it2=map.entrySet().iterator(); it2.hasNext();) {            entry=(Map.Entry)it2.next();            mySender=entry.getKey();            mi=(MemberInfo)entry.getValue();            combined.start=Math.min(combined.start, mi.start);            combined.stop=Math.max(combined.stop, mi.stop);            combined.num_msgs_expected+=mi.num_msgs_expected;            combined.num_msgs_received+=mi.num_msgs_received;            combined.total_bytes_received+=mi.total_bytes_received;            sb.append("sender: ").append(mySender).append(": ").append(mi).append('\n');        }    }    private String dumpStats(long received_msgs) {        double msgs_sec, throughput_sec;        long   current;        StringBuffer sb=new StringBuffer();        sb.append(received_msgs).append(' ');        current=System.currentTimeMillis();        sb.append(current).append(' ');        msgs_sec=received_msgs / ((current - start) / 1000.0);        throughput_sec=msgs_sec * msg_size;        sb.append(f.format(msgs_sec)).append(' ').append(f.format(throughput_sec)).append(' ');        sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(' ');        sb.append(Runtime.getRuntime().totalMemory() / 1000.0);        if(dump_transport_stats) {            Map stats=transport.dumpStats();            if(stats != null) {                print(stats, sb);            }        }        return sb.toString();    }    public String dumpTransportStats() {        Map stats=transport.dumpStats();        StringBuffer sb=new StringBuffer(128);        if(stats != null) {            Map.Entry entry;            String key;            Map value;            for(Iterator it=stats.entrySet().iterator(); it.hasNext();) {                entry=(Map.Entry)it.next();                key=(String)entry.getKey();                value=(Map)entry.getValue();                sb.append("\n").append(key).append(":\n");                for(Iterator it2=value.entrySet().iterator(); it2.hasNext();) {                    sb.append(it2.next()).append("\n");                }            }        }        return sb.toString();    }    private void print(Map stats, StringBuffer sb) {        sb.append("\nTransport stats:\n\n");        Map.Entry entry;        Object key, val;        for(Iterator it=stats.entrySet().iterator(); it.hasNext();) {            entry=(Map.Entry)it.next();            key=entry.getKey();            val=entry.getValue();            sb.append(key).append(": ").append(val).append("\n");        }    }    void runDiscoveryPhase() throws Exception {        sendDiscoveryRequest();        sendDiscoveryResponse();        synchronized(this.members) {            System.out.println("-- waiting for " + num_members + " members to join");            while(this.members.size() < num_members) {                this.members.wait(2000);                sendDiscoveryRequest();                sendDiscoveryResponse();            }            heard_from.addAll(members);            System.out.println("-- members: " + this.members.size());        }    }    void sendDiscoveryRequest() throws Exception {        Data d=new Data(Data.DISCOVERY_REQ);        // System.out.println("-- sending discovery request");        transport.send(null, generatePayload(d, null));    }    void sendDiscoveryResponse() throws Exception {        final Data d2=new Data(Data.DISCOVERY_RSP);        if(sender) {            d2.sender=true;            d2.num_msgs=Long.parseLong(config.getProperty("num_msgs"));        }        response_sender.execute(new Runnable() {            public void run() {                try {                    transport.send(null, generatePayload(d2, null));                }                catch(Exception e) {                    log.error("failed sending discovery response", e);                }            }        });    }    public static void main(String[] args) {        Properties config=new Properties();        boolean sender=false, verbose=false, jmx=false, dump_stats=false; // dumps at end of run        Test t=null;        String output=null;        long interval=0;        int interval_nanos=0;        boolean busy_sleep=false;        for(int i=0; i < args.length; i++) {            if("-sender".equals(args[i])) {                config.put("sender", "true");                sender=true;                continue;            }            if("-receiver".equals(args[i])) {                config.put("sender", "false");                sender=false;                continue;            }            if("-config".equals(args[i])) {                String config_file=args[++i];                config.put("config", config_file);                continue;            }            if("-props".equals(args[i])) {                String props=args[++i];                config.put("props", props);                continue;            }            if("-verbose".equals(args[i])) {                verbose=true;                continue;            }            if("-jmx".equals(args[i])) {                jmx=true;                continue;            }            if("-dump_stats".equals(args[i])) {                dump_stats=true;                continue;            }            if("-interval".equals(args[i])) {                interval=Long.parseLong(args[++i]);                continue;            }            if("-nanos".equals(args[i])) {                interval_nanos=Integer.parseInt(args[++i]);                continue;            }            if("-busy_sleep".equals(args[i])) {                busy_sleep=true;                continue;            }            if("-f".equals(args[i])) {                output=args[++i];                continue;            }            help();            return;        }        try {            /*int prio=Thread.currentThread().getPriority();            System.out.println("current thread: " + Thread.currentThread() + ", prio: " + prio);            Thread.currentThread().setPriority(Thread.MAX_PRIORITY);            prio=Thread.currentThread().getPriority();            System.out.println("current thread: " + Thread.currentThread() + ", prio: " + prio);*/            t=new Test();            t.start(config, verbose, jmx, output);            t.runDiscoveryPhase();            if(sender) {                t.sendMessages(interval, interval_nanos, busy_sleep);            }            synchronized(t) {                while(t.receivedFinalResults() == false) {                    t.wait(2000);                }            }            if(dump_stats) {                String stats=t.dumpTransportStats();                System.out.println("\nTransport statistics:\n" + stats);            }            if(t.jmx) {                System.out.println("jmx=true: not terminating");                if(t != null) {                    t.stop();                    t=null;                }                while(true) {                    Util.sleep(60000);                }            }        }        catch(Exception e) {            e.printStackTrace();        }        finally {            if(t != null) {                t.stop();            }        }    }    static void help() {        System.out.println("Test [-help] ([-sender] | [-receiver]) " +                "[-config <config file>] " +                "[-props <stack config>] [-verbose] [-jmx] " +                "[-dump_stats] [-f <filename>] [-interval <ms between sends>] " +                "[-nanos <additional nanos to sleep in interval>] [-busy_sleep (cancels out -nanos)]");    }    private class ResultsPublisher implements Runnable {        final long interval=1000;        boolean running=true;        Thread t;        void start() {            if(t == null) {                t=new Thread(this, "ResultsPublisher");                t.setDaemon(true);                t.start();            }        }        void stop() {            if(t != null && t.isAlive()) {                Thread tmp=t;                t=null;                tmp.interrupt();            }        }        public void run() {            try {                while(t != null) {                    sendResults();                    Util.sleep(interval);                }            }            catch(Exception e) {                e.printStackTrace();            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -