main.java
来自「JGRoups源码」· Java 代码 · 共 289 行
JAVA
289 行
package org.jgroups.tests;
import java.awt.FlowLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import javax.swing.JButton;
import javax.swing.JFrame;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.MessageListener;
import org.jgroups.View;
import org.jgroups.blocks.PullPushAdapter;
//start java -cp .;C:\JGroups-2.2.9.1.bin\commons-logging.jar;C:\JGroups-2.2.9.1.bin\concurrent.jar;C:\JGroups-2.2.9.1.bin\jgroups-all.jar;C:\JGroups-2.2.9.1.binjmxri.jar;C:\JGroups-2.2.9.1.bin\log4j-1.2.6.jar mygroup.Main
public class Main implements MessageListener, MembershipListener {
private Channel channel;
private PullPushAdapter adapter;
private int port;
final int NUM=10000;
String props= "UDP(bind_addr=192.168.5.2;mcast_addr=224.10.10.10;mcast_port=5555;ip_ttl=1;" +
"mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
"PING(timeout=2000;num_initial_members=3):" +
"MERGE2(min_interval=5000;max_interval=10000):" + "FD_SOCK:" +
"VERIFY_SUSPECT(timeout=1500):" +
"pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):" +
"UNICAST(timeout=5000):" + "pbcast.STABLE(desired_avg_gossip=20000):" +
"FRAG(frag_size=8096;down_thread=false;up_thread=false):" +
// "CAUSAL:" +
"pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
"shun=false;print_local_addr=false)";
public Main(int port, boolean isgui) {
this.port = port;
if (isgui) {
redirectToSocket = false;
JFrame frm = new JFrame();
frm.addWindowListener(new WindowAdapter(){
public void windowClosing(WindowEvent e) {
super.windowClosing(e);
};
});
JButton bpub = new JButton("Start Publisher");
bpub.addActionListener(new ActionListener(){
public void actionPerformed(ActionEvent e) {
publish();
}
});
JButton bsub = new JButton("Start Subscriber");
bsub.addActionListener(new ActionListener(){
public void actionPerformed(ActionEvent e) {
subscribe();
}
});
frm.getContentPane().setLayout(new FlowLayout(FlowLayout.CENTER));
frm.getContentPane().add(bpub);
frm.getContentPane().add(bsub);
frm.setBounds(100, 100, 400, 300);
frm.setVisible(true);
}
else
{
redirectToSocket = true;
}
}
//Thread t;
private void publish() {
init(false);
Thread t = new Thread(new Runnable(){
public void run() {
loop();
}
});
t.setDaemon(true);
t.start();
}
private void subscribe() {
init(true);
}
private void loop() {
int cpt= 0;
for(int i=0; i<NUM; i++) {
try {
String msg = "Message "+cpt++;
adapter.send(new Message(null, null, msg));
//Thread.sleep(750);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void init(boolean isConsumer) {
try {
channel=new JChannel(props);
channel.connect("MyTest");
adapter=new PullPushAdapter(channel);
adapter.addMembershipListener(this);
if (isConsumer) adapter.setListener(this);
} catch(Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
boolean isgui = true;
if (isgui) {
int port = Integer.parseInt(args[1]);
Main m = new Main(port, true);
} else {
if (args.length>1) {
int port = Integer.parseInt(args[1]);
if (args[0].equals("server")) {
Main m = new Main(port, false);
m.process(false);
} else if (args[0].equals("client")) {
Main m = new Main(port, false);
m.process(true);
}
}
}
}
public void process(boolean isConsumer) {
try {
channel=new JChannel(props);
channel.connect("MyTest");
adapter=new PullPushAdapter(channel);
adapter.addMembershipListener(this);
if (isConsumer) adapter.setListener(this);
Thread t = new Thread(new Runnable(){
public void run() {
listenFromSocket();
}
});
t.setDaemon(true);
t.start();
blockThread();
} catch(Exception e) {
e.printStackTrace();
}
}
private void blockThread() {
synchronized (this) {
try { wait(); }
catch (InterruptedException e) { e.printStackTrace(); }
}
}
private boolean redirectToSocket = false;
private int cpt = 0;
private long start, stop;
public void receive(Message arg0) {
String data = new String(arg0.getBuffer()) ;
if(cpt == 0)
start=System.currentTimeMillis();
cpt++;
if (cpt%1000==0)
System.out.println(cpt);
if (log) System.out.println("Lu sur le Bus "+port+" : "+data);
if (redirectToSocket)
writeToSocket( data);
if(cpt >= NUM) {
stop=System.currentTimeMillis();
long diff=stop-start;
double msgs_sec=NUM / (diff/1000.0);
System.out.println("received " + NUM + " msgs in " + diff + "ms (" + msgs_sec + " msgs/sec)");
}
}
public byte[] getState() { return null; }
public void setState(byte[] arg0) {}
/* SOCKET COMMUNICATIONS */
private boolean log = false;
private PrintWriter pwriter;
private void writeToSocket(String message) {
pwriter.write(message+"\n");
pwriter.flush();
}
public void listenFromSocket() {
try {
ServerSocket ss = new ServerSocket(port);
Socket sock = ss.accept();
System.out.println("Listing messages for the group on " + port);
BufferedReader r = new BufferedReader(new InputStreamReader(sock
.getInputStream()));
pwriter = new PrintWriter(sock.getOutputStream());
String line = "";
while ((line = r.readLine()) != null) {
if (log)
System.out.println("A envoyer au Bus : " + line);
adapter.send( new Message(null, null, line.getBytes()));
}
pwriter.close();
r.close();
sock.close();
ss.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public void viewAccepted(View arg0) {
// TODO Auto-generated method stub
System.out.println("**** MEMBERS STATUS ****");
for(int i=0; i<arg0.getMembers().size(); i++) {
System.out.println(arg0.getMembers().elementAt(i));
}
System.out.println("****");
}
public void suspect(Address arg0) {}
public void block() {}
}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?