📄 tcptransport.java
字号:
// The multicast socket needs first to be allocated.
try {
multicastSocket = new MulticastSocket(propagatePort);
multicastSocket.setInterface( usingInterface );
multicastSocket.joinGroup(propagateInetAddress);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(e);
}
}
try {
multicastSocket.send(packet);
//XXX: Shouldn't we close the socket?!
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(e);
}
}
/**
* Return the protocol name we support with this instance. For the TCP
* transport this is the only protocol supported so we return the same
* result as getSupportedProtocolName()
*
* @return String containing the name of the protocol we are supporting.
**/
public String getProtocolName() {
return protocolName;
}
/**
* Return the endpoint address by which this peer is accessible via this
* transport.
*/
public EndpointAddress getPublicAddress() {
return publicAddress;
}
public boolean isConnectionOriented() {
return true;
}
/**
* Returns true if the endpoint protocol can be used by the EndpointRouter
*
* @return boolean true if the protocol can be used by the EndpointRouter
*/
public boolean allowRouting() {
return true;
}
/**
* Ping a remote host.
*
* This implementation tries to open a connection, and after tests the result. Note
* if there is already an open connection, no new connection is actually created.
**/
public boolean ping(EndpointAddress addr) {
try {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("ping to " + addr.toString());
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("ping cannot print address because: ", e);
}
// XXX: the following is a work around in order to
// strip out peers that advertise a local IP address
// as their EndpointService (127.0.0.1)
// lomax@jxta.org
try {
String tmpAddr = addr.getProtocolAddress();
if (tmpAddr.indexOf("127.0.0.1") >= 0) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" pinging localhost - discard");
return false;
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" failed ", e);
return false;
}
TcpConnection conn = connManager.getTcpConnection(addr);
if (conn == null) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" no connection return false ");
return false;
}
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug(" return " + conn.isConnected());
return conn.isConnected();
}
// If there is already a TcpConnection for the given host, notify it
// that the connection might be up again (if it was disconnected)
private void checkConnection(EndpointAddress addr) {
TcpConnection conn = connManager.getTcpConnection(addr);
if (conn == null) {
// No pending connection. Nothing to do.
return;
}
conn.notifyActivity();
}
public class IncomingUnicastThreads implements Runnable {
protected Vector sockets = null;
protected TcpTransport tp = null;
private volatile int waitingThreads = 0;
private int nbOfThreads = 0;
private int nbOfKeepAlive = 0;
public IncomingUnicastThreads(TcpTransport tp, int initialNumber) {
this.tp = tp;
sockets = new Vector();
for (int i = 0; i < initialNumber; ++i) {
++nbOfThreads;
Thread t = new Thread(this, "TCP Incoming Unicast:" + nbOfThreads);
t.start();
}
}
public void run() {
try {
Socket socket = null;
while (true) {
socket = waitForSocket();
if (socket == null) {
// That means that this thread must die
decThreads();
return;
}
boolean keep = doesKeepAlive();
tp.runReceive(socket, keep);
if (keep) {
stopKeepAlive();
}
}
} catch ( Throwable all ) {
if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal( "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all );
}
}
synchronized private void decThreads() {
--nbOfThreads;
}
synchronized protected Socket waitForSocket() {
Socket socket = null;
while (true) {
if (sockets.size() > 0) {
try {
socket = (Socket) sockets.elementAt(0);
} catch (Exception e) {
// Strange error. XXX: to handle better
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForSocket error: " + e);
}
if (socket != null) {
try {
sockets.removeElementAt(0);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForSocket error: " + e);
}
return socket;
}
// We should not get here.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForSocket cannot get incoming socket");
}
// No more socket to consume... Just wait.
// Check if this thread must die or wait for a new socket
if (mustDie()) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForSocket: killing thread");
return null;
}
try {
++waitingThreads;
wait();
--waitingThreads;
} catch (Exception e) {
}
}
}
synchronized private boolean mustDie() {
if ((nbOfThreads > MaxNbOfUnicastThreads) ||
(waitingThreads > 1)) {
return true;
}
return false;
}
synchronized private boolean doesKeepAlive() {
if (nbOfKeepAlive >= MaxNbOfUnicastKeepAliveThreads) {
return false;
}
++nbOfKeepAlive;
return true;
}
synchronized private void stopKeepAlive() {
--nbOfKeepAlive;
}
synchronized protected void newSocket(Socket socket) {
if (sockets.size() > MaxNbOfPendingSockets) {
// Too many request. Just drop it.
try {
socket.close();
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("newSocket socket.close failed with " + e);
}
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("newSocket: too many incoming request, dropped incoming message");
return;
}
sockets.addElement(socket);
// Check if we need to allocate a new Thread
if (waitingThreads == 0) {
++nbOfThreads;
// Allocate a new thread.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("newSocket: create a new Thread");
Thread t = new Thread(this, "TCP Incoming Unicast:" + nbOfThreads );
t.start();
}
try {
notify();
} catch (Exception e) {
}
}
}
void processReceivingSocket(Socket socket) {
unicastThreads.newSocket(socket);
}
public class IncomingMulticastThreads implements Runnable {
protected Vector buffers = null;
protected TcpTransport tp = null;
private volatile int waitingThreads = 0;
private int nbOfThreads = 0;
public IncomingMulticastThreads(TcpTransport tp, int initialNumber) {
this.tp = tp;
buffers = new Vector();
for (int i = 0; i < initialNumber; ++i) {
++nbOfThreads;
Thread t = new Thread(this, "TCP Incoming Multicast:" + nbOfThreads);
t.start();
}
}
public void run() {
try {
byte[] buffer = null;
while (true) {
buffer = waitForBuffer();
if (buffer == null) {
// That means that this thread must die
decThreads();
return;
}
tp.processMulticast(buffer);
}
} catch ( Throwable all ) {
if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal( "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all );
}
}
synchronized private void decThreads() {
--nbOfThreads;
}
synchronized protected byte[] waitForBuffer() {
byte[] buffer = null;
while (true) {
if (buffers.size() > 0) {
try {
buffer = (byte[]) buffers.elementAt(0);
} catch (Exception e) {
// Strange error. XXX: to handle better
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForBuffer error: " + e);
}
if (buffer != null) {
try {
buffers.removeElementAt(0);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForBuffer error: " + e);
}
return buffer;
}
// We should not get here.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForBuffer cannot get incoming buffer");
}
// No more socket to consume... Just wait.
// Check if this thread must die or wait for a new socket
if (mustDie()) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("waitForBuffer: killing thread " + Thread.currentThread().getName() );
return null;
}
try {
++waitingThreads;
wait();
--waitingThreads;
} catch (Exception e) {
}
}
}
synchronized private boolean mustDie() {
if ((nbOfThreads > MaxNbOfMulticastThreads) ||
(waitingThreads > 0)) {
return true;
}
return false;
}
synchronized protected void newBuffer(byte[] buffer) {
buffers.addElement(buffer);
// Check if we need to allocate a new Thread
if (waitingThreads == 0) {
++nbOfThreads;
// Allocate a new thread.
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("newBuffer: create a new Multicast receive Thread");
Thread t = new Thread(this, "TCP Incoming Multicast:" + nbOfThreads);
t.start();
}
try {
notify();
} catch (Exception e) {
}
}
}
void processReceivingBuffer(byte[] buffer) {
multicastThreads.newBuffer(buffer);
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -