unicasttest.java

来自「JGRoups源码」· Java 代码 · 共 351 行

JAVA
351
字号
// $Id: UnicastTest.java,v 1.8 2005/08/18 09:45:25 belaban Exp $package org.jgroups.tests;import org.jgroups.*;import org.jgroups.util.Util;import java.io.*;import java.util.Vector;/** * Tests the UNICAST by sending unicast messages between a sender and a receiver * * @author Bela Ban */public class UnicastTest implements Runnable {    UnicastTest test;    JChannel channel;    final String groupname="UnicastTest-Group";    Thread t=null;    long sleep_time=0;    boolean exit_on_end=false, busy_sleep=false;    public static class Data implements Externalizable {        public Data() {        }        public void writeExternal(ObjectOutput out) throws IOException {        }        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {        }    }    public static class StartData extends Data {        long num_values=0;        public StartData() {            super();        }        StartData(long num_values) {            this.num_values=num_values;        }        public void writeExternal(ObjectOutput out) throws IOException {            out.writeLong(num_values);        }        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {            num_values=in.readLong();        }    }    public static class Value extends Data {        long value=0;        public Value() {            super();        }        Value(long value) {            this.value=value;        }        public void writeExternal(ObjectOutput out) throws IOException {            out.writeLong(value);        }        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {            value=in.readLong();        }    }    public void init(String props, long sleep_time, boolean exit_on_end, boolean busy_sleep) throws Exception {        this.sleep_time=sleep_time;        this.exit_on_end=exit_on_end;        this.busy_sleep=busy_sleep;        channel=new JChannel(props);        channel.connect(groupname);        t=new Thread(this, "UnicastTest - receiver thread");        t.start();    }    public void run() {        Data data;        Message msg;        Object obj;        boolean started=false;        long start=0, stop=0;        long current_value=0, tmp=0, num_values=0;        long total_msgs=0, total_time=0, msgs_per_sec;        while(true) {            try {                obj=channel.receive(0);                if(obj instanceof View)                    System.out.println("** view: " + obj);                else                    if(obj instanceof Message) {                        msg=(Message)obj;                        data=(Data)msg.getObject();                        if(data instanceof StartData) {                            if(started) {                                System.err.println("UnicastTest.run(): received START data, but am already processing data");                            }                            else {                                started=true;                                current_value=0; // first value to be received                                tmp=0;                                num_values=((StartData)data).num_values;                                start=System.currentTimeMillis();                            }                        }                        else                            if(data instanceof Value) {                                tmp=((Value)data).value;                                if(current_value + 1 != tmp) {                                    System.err.println("-- message received (" + tmp + ") is not 1 greater than " + current_value);                                }                                else {                                    current_value++;                                    if(current_value % 1000 == 0)                                        System.out.println("received " + current_value);                                    if(current_value >= num_values) {                                        stop=System.currentTimeMillis();                                        total_time=stop - start;                                        msgs_per_sec=(long)(num_values / (total_time / 1000.0));                                        System.out.println("-- received " + num_values + " messages in " + total_time +                                                           " ms (" + msgs_per_sec + " messages/sec)");                                        started=false;                                        if(exit_on_end)                                            System.exit(0);                                    }                                }                            }                    }            }            catch(ChannelNotConnectedException not_connected) {                System.err.println(not_connected);                break;            }            catch(ChannelClosedException closed_ex) {                System.err.println(closed_ex);                break;            }            catch(TimeoutException timeout) {                System.err.println(timeout);                break;            }            catch(Throwable t) {                System.err.println(t);                started=false;                current_value=0;                tmp=0;                Util.sleep(1000);            }        }        // System.out.println("UnicastTest.run(): receiver thread terminated");    }    public void eventLoop() throws Exception {        int c;        while(true) {            System.out.print("[1] Send msgs [2] Print view [q] Quit ");            System.out.flush();            c=System.in.read();            switch(c) {            case -1:                break;            case '1':                sendMessages();                break;            case '2':                printView();                break;            case '3':                break;            case '4':                break;            case '5':                break;            case '6':                break;            case 'q':                channel.close();                return;            default:                break;            }        }    }    void sendMessages() throws Exception {        long num_msgs=getNumberOfMessages();        Address receiver=getReceiver();        Message msg;        Value val=new Value(1);        if(receiver == null) {            System.err.println("UnicastTest.sendMessages(): receiver is null, cannot send messages");            return;        }        System.out.println("sending " + num_msgs + " messages to " + receiver);        msg=new Message(receiver, null, new StartData(num_msgs));        channel.send(msg);        for(int i=1; i <= num_msgs; i++) {            val=new Value(i);            msg=new Message(receiver, null, val);            if(i % 1000 == 0)                System.out.println("-- sent " + i);            channel.send(msg);            if(sleep_time > 0)                Util.sleep(sleep_time, busy_sleep);        }        System.out.println("done sending " + num_msgs + " to " + receiver);    }    void printView() {        System.out.println("\n-- view: " + channel.getView() + '\n');        try {            System.in.skip(System.in.available());        }        catch(Exception e) {        }    }    long getNumberOfMessages() {        BufferedReader reader=null;        String tmp=null;        try {            System.out.print("Number of messages to send: ");            System.out.flush();            System.in.skip(System.in.available());            reader=new BufferedReader(new InputStreamReader(System.in));            tmp=reader.readLine().trim();            return Long.parseLong(tmp);        }        catch(Exception e) {            System.err.println("UnicastTest.getNumberOfMessages(): " + e);            return 0;        }    }    Address getReceiver() {        Vector mbrs=null;        int index;        BufferedReader reader;        String tmp;        try {            mbrs=channel.getView().getMembers();            System.out.println("pick receiver from the following members:");            for(int i=0; i < mbrs.size(); i++) {                if(mbrs.elementAt(i).equals(channel.getLocalAddress()))                    System.out.println("[" + i + "]: " + mbrs.elementAt(i) + " (self)");                else                    System.out.println("[" + i + "]: " + mbrs.elementAt(i));            }            System.out.flush();            System.in.skip(System.in.available());            reader=new BufferedReader(new InputStreamReader(System.in));            tmp=reader.readLine().trim();            index=Integer.parseInt(tmp);            return (Address)mbrs.elementAt(index); // index out of bounds caught below        }        catch(Exception e) {            System.err.println("UnicastTest.getReceiver(): " + e);            return null;        }    }    public static void main(String[] args) {        long sleep_time=0;        boolean exit_on_end=false;        boolean busy_sleep=false;        String udp_props="UDP(mcast_addr=228.8.8.8;mcast_port=45566;ip_ttl=32;" +                "ucast_recv_buf_size=32000;ucast_send_buf_size=64000;" +                "mcast_send_buf_size=32000;mcast_recv_buf_size=64000;loopback=true):";        String regular_props="PING(timeout=1000;num_initial_members=2):" +                "MERGE2(min_interval=5000;max_interval=10000):" +                "FD_SOCK:" +                "VERIFY_SUSPECT(timeout=1500):" +                "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):" +                "UNICAST(timeout=2000,4000,6000;window_size=100;min_threshold=10;use_gms=false):" +                "pbcast.STABLE(desired_avg_gossip=20000):" +                "FRAG(frag_size=8192;down_thread=false;up_thread=false):" +                "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)";        String props=udp_props + regular_props;        String loopback_props="LOOPBACK:" + regular_props;        for(int i=0; i < args.length; i++) {            if("-help".equals(args[i])) {                help();                return;            }            if("-props".equals(args[i])) {                props=args[++i];                continue;            }            if("-sleep".equals(args[i])) {                sleep_time=Long.parseLong(args[++i]);                continue;            }            if("-loopback".equals(args[i])) {                props=loopback_props;                continue;            }            if("-exit_on_end".equals(args[i])) {                exit_on_end=true;                continue;            }            if("-busy_sleep".equals(args[i])) {                busy_sleep=true;                continue;            }        }        try {            UnicastTest test=new UnicastTest();            test.init(props, sleep_time, exit_on_end, busy_sleep);            test.eventLoop();        }        catch(Exception ex) {            System.err.println(ex);        }    }    static void help() {        System.out.println("UnicastTest [-help] [-props <props>] [-sleep <time in ms between msg sends] " +                           "[-loopback] [-exit_on_end] [-busy-sleep]");    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?