basicconnectiontable.java
来自「JGRoups源码」· Java 代码 · 共 783 行 · 第 1/2 页
JAVA
783 行
package org.jgroups.blocks;
import org.jgroups.Address;
import org.jgroups.Version;
import org.jgroups.stack.IpAddress;
import org.jgroups.util.Queue;
import org.jgroups.util.QueueClosedException;
import org.jgroups.util.Util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.net.Socket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.ServerSocket;
import java.io.DataOutputStream;
import java.io.DataInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.EOFException;
import java.util.*;
/**
* Shared class for TCP connection tables.
* @author Scott Marlow
*/
public abstract class BasicConnectionTable {
final HashMap conns=new HashMap(); // keys: Addresses (peer address), values: Connection
Receiver receiver=null;
boolean use_send_queues=true;
InetAddress bind_addr=null;
Address local_addr=null; // bind_addr + port of srv_sock
int srv_port=7800;
int recv_buf_size=120000;
int send_buf_size=60000;
final Vector conn_listeners=new Vector(); // listeners to be notified when a conn is established/torn down
final Object recv_mutex=new Object(); // to serialize simultaneous access to receive() from multiple Connections
Reaper reaper=null; // closes conns that have been idle for more than n secs
long reaper_interval=60000; // reap unused conns once a minute
long conn_expire_time=300000; // connections can be idle for 5 minutes before they are reaped
int sock_conn_timeout=1000; // max time in millis to wait for Socket.connect() to return
ThreadGroup thread_group=null;
protected final Log log= LogFactory.getLog(getClass());
final byte[] cookie={'b', 'e', 'l', 'a'};
boolean use_reaper=false; // by default we don't reap idle conns
static final int backlog=20; // 20 conn requests are queued by ServerSocket (addtl will be discarded)
ServerSocket srv_sock=null;
boolean reuse_addr=false;
boolean tcp_nodelay=false;
int linger=-1;
/**
* The address which will be broadcast to the group (the externally visible address which this host should
* be contacted on). If external_addr is null, it will default to the same address that the server socket is bound to.
*/
InetAddress external_addr=null;
int max_port=0; // maximum port to bind to (if < srv_port, no limit)
Thread acceptor=null; // continuously calls srv_sock.accept()
boolean running=false;
final static long MAX_JOIN_TIMEOUT=10000;
public final void setReceiver(Receiver r) {
receiver=r;
}
public void addConnectionListener(ConnectionListener l) {
if(l != null && !conn_listeners.contains(l))
conn_listeners.addElement(l);
}
public void removeConnectionListener(ConnectionListener l) {
if(l != null) conn_listeners.removeElement(l);
}
public Address getLocalAddress() {
if(local_addr == null)
local_addr=bind_addr != null ? new IpAddress(bind_addr, srv_port) : null;
return local_addr;
}
public int getSendBufferSize() {
return send_buf_size;
}
public void setSendBufferSize(int send_buf_size) {
this.send_buf_size=send_buf_size;
}
public int getReceiveBufferSize() {
return recv_buf_size;
}
public void setReceiveBufferSize(int recv_buf_size) {
this.recv_buf_size=recv_buf_size;
}
public int getSocketConnectionTimeout() {
return sock_conn_timeout;
}
public void setSocketConnectionTimeout(int sock_conn_timeout) {
this.sock_conn_timeout=sock_conn_timeout;
}
public int getNumConnections() {
return conns.size();
}
public boolean getTcpNodelay() {
return tcp_nodelay;
}
public void setTcpNodelay(boolean tcp_nodelay) {
this.tcp_nodelay=tcp_nodelay;
}
public int getLinger() {
return linger;
}
public void setLinger(int linger) {
this.linger=linger;
}
public boolean getUseSendQueues() {return use_send_queues;}
public void setUseSendQueues(boolean flag) {this.use_send_queues=flag;}
public void start() throws Exception {
running=true;
}
public void stop() {
running=false;
}
/**
Remove <code>addr</code>from connection table. This is typically triggered when a member is suspected.
*/
public void remove(Address addr) {
Connection conn;
synchronized(conns) {
conn=(Connection)conns.remove(addr);
}
if(conn != null) {
try {
conn.destroy(); // won't do anything if already destroyed
}
catch(Exception e) {
}
}
if(log.isTraceEnabled()) log.trace("removed " + addr + ", connections are " + toString());
}
/**
* Calls the receiver callback. We serialize access to this method because it may be called concurrently
* by several Connection handler threads. Therefore the receiver doesn't need to synchronize.
*/
public void receive(Address sender, byte[] data, int offset, int length) {
if(receiver != null) {
synchronized(recv_mutex) {
receiver.receive(sender, data, offset, length);
}
}
else
if(log.isErrorEnabled()) log.error("receiver is null (not set) !");
}
public String toString() {
StringBuffer ret=new StringBuffer();
Address key;
Connection val;
Map.Entry entry;
HashMap copy;
synchronized(conns) {
copy=new HashMap(conns);
}
ret.append("connections (" + copy.size() + "):\n");
for(Iterator it=copy.entrySet().iterator(); it.hasNext();) {
entry=(Map.Entry)it.next();
key=(Address)entry.getKey();
val=(Connection)entry.getValue();
ret.append("key: " + key + ": " + val + '\n');
}
ret.append('\n');
return ret.toString();
}
void notifyConnectionOpened(Address peer) {
if(peer == null) return;
for(int i=0; i < conn_listeners.size(); i++)
((ConnectionListener)conn_listeners.elementAt(i)).connectionOpened(peer);
}
void notifyConnectionClosed(Address peer) {
if(peer == null) return;
for(int i=0; i < conn_listeners.size(); i++)
((ConnectionListener)conn_listeners.elementAt(i)).connectionClosed(peer);
}
void addConnection(Address peer, Connection c) {
conns.put(peer, c);
if(reaper != null && !reaper.isRunning())
reaper.start();
}
public void send(Address dest, byte[] data, int offset, int length) throws Exception {
Connection conn;
if(dest == null) {
if(log.isErrorEnabled())
log.error("destination is null");
return;
}
if(data == null) {
log.warn("data is null; discarding packet");
return;
}
if(!running) {
if(log.isWarnEnabled())
log.warn("connection table is not running, discarding message to " + dest);
return;
}
// 1. Try to obtain correct Connection (or create one if not yet existent)
try {
conn=getConnection(dest);
if(conn == null) return;
}
catch(Throwable ex) {
throw new Exception("connection to " + dest + " could not be established", ex);
}
// 2. Send the message using that connection
try {
conn.send(data, offset, length);
}
catch(Throwable ex) {
if(log.isTraceEnabled())
log.trace("sending msg to " + dest + " failed (" + ex.getClass().getName() + "); removing from connection table", ex);
remove(dest);
}
}
abstract Connection getConnection(Address dest) throws Exception;
/**
* Removes all connections from ConnectionTable which are not in c
* @param c
*/
//public void retainAll(Collection c) {
// conns.keySet().retainAll(c);
//}
/**
* Removes all connections from ConnectionTable which are not in current_mbrs
* @param current_mbrs
*/
public void retainAll(Collection current_mbrs) {
if(current_mbrs == null) return;
HashMap copy;
synchronized(conns) {
copy=new HashMap(conns);
conns.keySet().retainAll(current_mbrs);
}
// All of the connections that were not retained must be destroyed
// so that their resources are cleaned up.
Map.Entry entry;
for(Iterator it=copy.entrySet().iterator(); it.hasNext();) {
entry=(Map.Entry)it.next();
Object oKey=entry.getKey();
if(!current_mbrs.contains(oKey)) { // This connection NOT in the resultant connection set
Connection conn=(Connection)entry.getValue();
if(null != conn) { // Destroy this connection
if(log.isTraceEnabled())
log.trace("Destroy this orphaned connection: " + conn);
conn.destroy();
}
}
}
copy.clear();
}
/** Used for message reception. */
public interface Receiver {
void receive(Address sender, byte[] data, int offset, int length);
}
/** Used to be notified about connection establishment and teardown. */
public interface ConnectionListener {
void connectionOpened(Address peer_addr);
void connectionClosed(Address peer_addr);
}
class Connection implements Runnable {
Socket sock=null; // socket to/from peer (result of srv_sock.accept() or new Socket())
String sock_addr=null; // used for Thread.getName()
DataOutputStream out=null; // for sending messages
DataInputStream in=null; // for receiving messages
Thread receiverThread=null; // thread for receiving messages
Address peer_addr=null; // address of the 'other end' of the connection
final Object send_mutex=new Object(); // serialize sends
long last_access=System.currentTimeMillis(); // last time a message was sent or received
/** Queue<byte[]> of data to be sent to the peer of this connection */
Queue send_queue=new Queue();
Sender sender=new ConnectionTable.Connection.Sender();
boolean is_running=false;
private String getSockAddress() {
if(sock_addr != null)
return sock_addr;
if(sock != null) {
StringBuffer sb=new StringBuffer();
sb.append(sock.getLocalAddress().getHostAddress()).append(':').append(sock.getLocalPort());
sb.append(" - ").append(sock.getInetAddress().getHostAddress()).append(':').append(sock.getPort());
sock_addr=sb.toString();
}
return sock_addr;
}
Connection(Socket s, Address peer_addr) {
sock=s;
this.peer_addr=peer_addr;
try {
// out=new DataOutputStream(sock.getOutputStream());
// in=new DataInputStream(sock.getInputStream());
// The change to buffered input and output stream yielded a 400% performance gain !
// bela Sept 7 2006
out=new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()));
in=new DataInputStream(new BufferedInputStream(sock.getInputStream()));
}
catch(Exception ex) {
if(log.isErrorEnabled()) log.error("exception is " + ex);
}
}
boolean established() {
return receiverThread != null;
}
void setPeerAddress(Address peer_addr) {
this.peer_addr=peer_addr;
}
Address getPeerAddress() {return peer_addr;}
void updateLastAccessed() {
last_access=System.currentTimeMillis();
}
void init() {
is_running=true;
if(receiverThread == null || !receiverThread.isAlive()) {
// Roland Kurmann 4/7/2003, put in thread_group
receiverThread=new Thread(thread_group, this, "ConnectionTable.Connection.Receiver [" + getSockAddress() + "]");
receiverThread.setDaemon(true);
receiverThread.start();
if(log.isTraceEnabled())
log.trace("ConnectionTable.Connection.Receiver started");
}
}
void destroy() {
is_running=false;
closeSocket(); // should terminate handler as well
sender.stop();
Thread tmp=receiverThread;
receiverThread=null;
if(tmp != null) {
try {
tmp.interrupt();
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?