main.java

来自「JGRoups源码」· Java 代码 · 共 289 行

JAVA
289
字号
package org.jgroups.tests;


import java.awt.FlowLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

import javax.swing.JButton;
import javax.swing.JFrame;

import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.MessageListener;
import org.jgroups.View;
import org.jgroups.blocks.PullPushAdapter;


//start java -cp .;C:\JGroups-2.2.9.1.bin\commons-logging.jar;C:\JGroups-2.2.9.1.bin\concurrent.jar;C:\JGroups-2.2.9.1.bin\jgroups-all.jar;C:\JGroups-2.2.9.1.binjmxri.jar;C:\JGroups-2.2.9.1.bin\log4j-1.2.6.jar mygroup.Main





public class Main implements MessageListener, MembershipListener { 
	
	
	private Channel channel; 
	private PullPushAdapter adapter; 
	private int port;

    final int NUM=10000;
	
      String props=      "UDP(bind_addr=192.168.5.2;mcast_addr=224.10.10.10;mcast_port=5555;ip_ttl=1;" +
	  "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
	  "PING(timeout=2000;num_initial_members=3):" +
	  "MERGE2(min_interval=5000;max_interval=10000):" + "FD_SOCK:" +
	  "VERIFY_SUSPECT(timeout=1500):" +
	  "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):" +
	  "UNICAST(timeout=5000):" + "pbcast.STABLE(desired_avg_gossip=20000):" +
	  "FRAG(frag_size=8096;down_thread=false;up_thread=false):" +
	  // "CAUSAL:" +
	  "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
	  "shun=false;print_local_addr=false)";

	
		
	public Main(int port, boolean isgui) {
    	this.port = port;
    	    	
    	if (isgui) {
    		redirectToSocket = false;
    		JFrame frm = new JFrame();
    		frm.addWindowListener(new WindowAdapter(){
    			public void windowClosing(WindowEvent e) {		
    				super.windowClosing(e);
    			};
        	});
        	        
        	JButton bpub = new JButton("Start Publisher");
        	bpub.addActionListener(new ActionListener(){
    			public void actionPerformed(ActionEvent e) {
    				publish();
    			}
        	});
        	JButton bsub = new JButton("Start Subscriber");
        	bsub.addActionListener(new ActionListener(){
    			public void actionPerformed(ActionEvent e) {
    				subscribe();
    			}
        	});
        	
        	frm.getContentPane().setLayout(new FlowLayout(FlowLayout.CENTER));
        	frm.getContentPane().add(bpub);
        	frm.getContentPane().add(bsub);
        	
        	
        	frm.setBounds(100, 100, 400, 300);
        	frm.setVisible(true);	
    	}    	    	
    	else
    	{
    		redirectToSocket = true;
    	}
    }
	 
	
	//Thread t;
	private void publish() {
		init(false);		
		Thread t = new Thread(new Runnable(){			
			public void run() {				
				loop();
			}
		});
		t.setDaemon(true);
		t.start();
	}
	
	
	private void subscribe() {
		init(true);
	}
	
	
	private void loop() {
		int cpt= 0;
		
		for(int i=0; i<NUM; i++) {
			try {
				String msg = "Message "+cpt++;
				adapter.send(new Message(null, null, msg));
				//Thread.sleep(750);
			} catch (Exception e) {
				e.printStackTrace();
			}	
		}		
	}
	
	
	
	
	private void init(boolean isConsumer) {
		try {
			channel=new JChannel(props); 
			channel.connect("MyTest"); 
			adapter=new PullPushAdapter(channel);
			adapter.addMembershipListener(this);
			if (isConsumer) adapter.setListener(this); 						
		} catch(Exception e) {
			e.printStackTrace();
		}				    	
	}
	
	
	public static void main(String[] args) {	
		
		boolean isgui = true;
		if (isgui) {
			int port = Integer.parseInt(args[1]);
			Main m = new Main(port, true);	
		} else {
			if (args.length>1) {
				int port = Integer.parseInt(args[1]);
				if (args[0].equals("server")) {				
					Main m = new Main(port, false);
					m.process(false);
				} else if (args[0].equals("client")) {				
					Main m = new Main(port, false);
					m.process(true);
				} 
			}		
		}					
	}
	
	
	
	
	public void process(boolean isConsumer) {
		
		try {
			channel=new JChannel(props); 
			channel.connect("MyTest"); 
			adapter=new PullPushAdapter(channel);
			adapter.addMembershipListener(this);
			if (isConsumer) adapter.setListener(this); 
			
			Thread t = new Thread(new Runnable(){
				public void run() {
					listenFromSocket();
				}
			});
			t.setDaemon(true);
			t.start();
			
			
			
			blockThread();
		} catch(Exception e) {
			e.printStackTrace();
		}				    	        	 
    }
	
	
	private void blockThread() { 
		synchronized (this) { 
			try { wait(); } 
			catch (InterruptedException e) { e.printStackTrace(); } 
		} 
	}

	private boolean redirectToSocket = false;
	private int cpt = 0;

    private long start, stop;
    public void receive(Message arg0) {
		
		String data = new String(arg0.getBuffer()) ;

        if(cpt == 0)
            start=System.currentTimeMillis();

        cpt++;
		if (cpt%1000==0)
			System.out.println(cpt);			
					
		if (log) System.out.println("Lu sur le Bus "+port+" : "+data);		
		if (redirectToSocket)
			writeToSocket( data);

        if(cpt >= NUM) {
            stop=System.currentTimeMillis();
            long diff=stop-start;
            double msgs_sec=NUM / (diff/1000.0);
            System.out.println("received " + NUM + " msgs in " + diff + "ms (" + msgs_sec + " msgs/sec)");
        }
    }
	
	public byte[] getState() { return null; }
	public void setState(byte[] arg0) {}
	

	
	/* SOCKET COMMUNICATIONS */
	private boolean log = false;
	private PrintWriter pwriter;
	
	private void writeToSocket(String message) { 
		pwriter.write(message+"\n");
		pwriter.flush(); 
	}
	 

	public void listenFromSocket() {
		try {
			ServerSocket ss = new ServerSocket(port);
			Socket sock = ss.accept();
			System.out.println("Listing messages for the group on " + port);
			BufferedReader r = new BufferedReader(new InputStreamReader(sock
					.getInputStream()));
			pwriter = new PrintWriter(sock.getOutputStream());
			String line = "";
			while ((line = r.readLine()) != null) {
				if (log)
					System.out.println("A envoyer au Bus : " + line);
				adapter.send( new Message(null, null, line.getBytes()));				
			}
			pwriter.close();
			r.close();
			sock.close();
			ss.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}



	public void viewAccepted(View arg0) {
		// TODO Auto-generated method stub
		System.out.println("**** MEMBERS STATUS ****");
		for(int i=0; i<arg0.getMembers().size(); i++) {
			System.out.println(arg0.getMembers().elementAt(i));		
		}
		System.out.println("****");
	}

	
	public void suspect(Address arg0) {}
	public void block() {}
	
	
	
	
}




⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?