📄 tcpconnection.java
字号:
/*
*
* $Id: TcpConnection.java,v 1.15 2002/06/11 19:26:39 hamada Exp $
*
* Copyright (c) 2001 Sun Microsystems, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Sun Microsystems, Inc. for Project JXTA."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA"
* must not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact Project JXTA at http://www.jxta.org.
*
* 5. Products derived from this software may not be called "JXTA",
* nor may "JXTA" appear in their name, without prior written
* permission of Sun.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of Project JXTA. For more
* information on Project JXTA, please see
* <http://www.jxta.org/>.
*
* This license is based on the BSD license adopted by the Apache Foundation.
*/
package net.jxta.impl.endpoint.tcp;
import java.net.*;
import java.io.*;
import java.util.*;
import org.apache.log4j.Category;
import org.apache.log4j.Priority;
import net.jxta.document.MimeMediaType;
import net.jxta.endpoint.*;
import net.jxta.impl.endpoint.MessageImpl;
import net.jxta.impl.endpoint.EndpointReceiveQueue;
import net.jxta.impl.endpoint.MessageWireFormat;
import net.jxta.impl.endpoint.MessageWireFormatFactory;
/**
* Low-level TcpMessenger
*
* @since JXTA 1.0
*/
public class TcpConnection implements Runnable {
private static final Category LOG = Category.getInstance(TcpConnection.class.getName());
public static final int MaxNbOfMessages = 40;
private InetAddress inetAddress = null;
private int port = 0;
private TcpTransport proto = null;
private Header header = null;
private TcpSocket tcpSocket = null;
private boolean waiting = false;
private EndpointAddress dstAddress = null;
private long lastUsed = 0;
private String dstAddrString = null;
private Thread thread = null;
private EndpointReceiveQueue queue = null;
private long nbOfMessagesSent = 0;
private MimeMediaType appMsg = new MimeMediaType( "application/x-jxta-msg");
public TcpConnection (EndpointAddress destaddr,
TcpTransport p)
throws IOException {
// Not much has to be done: the connection with the remote node
// will happen only when the data will be sent.
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("Constructor\n addr = " + destaddr.toString());
header = new Header();
proto = p;
this.dstAddress = (EndpointAddress) destaddr.clone();
try {
String tmp = destaddr.getProtocolAddress();
int portIndex = tmp.indexOf(":");
if (portIndex == -1) {
throw new IOException("Invalid EndpointAddress" + tmp);
}
port = Integer.valueOf(tmp.substring(portIndex + 1)).intValue();
inetAddress = InetAddress.getByName(tmp.substring(0, portIndex));
dstAddrString = inetAddress.getHostAddress();
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("Constructor using: " + inetAddress.getHostAddress());
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.INFO))
LOG.info("Bad address " + destaddr.toString(), e);
throw new IOException("Bad address " + destaddr.toString());
}
tcpSocket = new TcpSocket (inetAddress, port);
}
// Try to connect. This operation is idempotent.
private void connect () {
synchronized (this) {
if (queue == null) {
// There is new queue. Create one.
queue = new EndpointReceiveQueue();
queue.setMaxNbOfMessages (MaxNbOfMessages);
}
// Start the background thread.
if (thread != null) {
// Nothing to do
return;
}
thread = new Thread (this, "TCP Connection to " + dstAddrString + " [Unused]");
thread.start();
}
Thread.yield();
}
private synchronized void setThreadName () {
if (thread != null) {
try {
thread.setName ("TCP Connection to " + dstAddrString + " [" +
queue.getNbOfQueuedMessages() + ", " +
nbOfMessagesSent + ", " +
(isConnected() ? "Connected" : "Disconnected") +
" ]");
} catch (Exception ez1) {
if (LOG.isEnabledFor(Priority.ERROR))
LOG.error("Cannot change thread name", ez1);
}
}
}
// Public method that sends messages to the remote peer.
// Only queue the message and let the background thread actually sends the message.
public void sendMessage (Message msg) throws IOException {
// Try to get a Socket just to check that a connection is possible.
if ( tcpSocket.getSocket (5000) == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("sendMessage() cannot get a socket for " + dstAddrString);
}
throw new IOException("sendMessage() cannot get a socket for " + dstAddrString);
}
connect();
// Queue the message.
synchronized (this) {
queue.push (msg);
setThreadName();
}
Thread.yield();
}
// This is the background Thread. While the connection is active, takes
// messages from the queue and send it.
public void run() {
Message msg = null;
while (true) {
try {
// Wait for messages to be sent.
msg = queue.waitForMessage();
if (msg == null) {
// If the message is null, the queue has been closed.
thread = null;
return;
}
setThreadName();
// Send the message.
doSendMessage (msg);
} catch (InterruptedException ez1) {
// The connection is just being close. This Thread must exit.
thread = null;
return;
} catch (IOException ez2) {
// The message failed. Notify the failure
notifyFailure();
thread = null;
return;
}
}
}
/**
* Send a TransportMessage onto that link.
*
* @param message TransportMessage to be sent.
*/
private void doSendMessage(Message message) throws IOException {
if (dstAddress == null) {
if (LOG.isEnabledFor(Priority.WARN))
LOG.warn("sendMessage: no destination address");
throw new IOException( "sendMessage: no destination address" );
}
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("sendMessage\n to = " + dstAddress.toString());
// Build the protocol header
// Allocate a buffer to contain the message and the header
ByteArrayOutputStream baos = new ByteArrayOutputStream();
MessageWireFormatFactory.newMessageWireFormat(
new MimeMediaType( "application/x-jxta-msg" ) )
.writeMessage(baos, message);
baos.close();
byte bytes[] = baos.toByteArray();
Vector msgBuffers = new Vector();
msgBuffers.addElement( new byte[header.length] );
msgBuffers.addElement( bytes );
// First build the header
header.cmd = Header.UNICAST;
header.srcAddr = proto.usingInterface.getAddress();
header.srcPort = proto.serverSocketPort;
header.option = Header.NONBLOCKING;
header.size = bytes.length;
header.buildForNetwork( (byte []) msgBuffers.elementAt(0), 0);
int retryAttempt = 0;
boolean success = false;
while ( retryAttempt < 2 ) {
// Open a socket with the destination and send the message
Socket socket = tcpSocket.getSocket(5000);
if (socket == null) {
if (LOG.isEnabledFor(Priority.WARN)) {
LOG.warn("sendMessage() cannot get a socket for " + dstAddress.toString());
}
throw new IOException("sendMessage() cannot get a socket for " + dstAddress.toString());
}
try {
OutputStream outputStream = socket.getOutputStream();
InputStream ip = socket.getInputStream();
if (retryAttempt == 0) {
// Drain the back channel, just in case some keep alive
// hint is hanging there. Use the last one we find.
// Normaly the hint arrives too late for use at the end
// of transmission and we do not want to wait for it. The hint
// never arrives before we send something. As a result, we
// can always assume that we get that hint at the begining of
// the next transaction. If it tells us that the connection
// should be closed, we can assume that the other side has
// closed already and that we must re-open.
int rep = 1;
for (int i = ip.available(); i--> 0;)
rep = ip.read();
if (rep == 0) {
// close and re-open.
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("Other party wanted the connection closed.");
}
try {
outputStream.close();
ip.close();
} catch (Exception ignored) {}
tcpSocket.close();
continue;
}
}
for( int eachBuffer = 0; eachBuffer < msgBuffers.size();eachBuffer++ ) {
// Send the message
outputStream.write( (byte[]) msgBuffers.elementAt(eachBuffer) );
}
outputStream.flush();
setLastUsed(System.currentTimeMillis());
success = true;
break;
} catch (Exception e) {
// The connection has failed. Try to reconnects before failing.
++retryAttempt;
tcpSocket.close();
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("sendMessage current socket is closed.");
continue;
}
}
if (success) {
// we force a resheduling here, allowing other threads to run
++nbOfMessagesSent;
Thread.yield();
} else {
tcpSocket.setLastFailed(System.currentTimeMillis());
if (LOG.isEnabledFor(Priority.WARN))
LOG.warn("sendMessage failed - exit");
throw new IOException( "sendMessage failed - exit" );
}
}
public void notifyActivity() {
tcpSocket.notifyActivity();
}
private synchronized void notifyFailure() {
close();
}
public synchronized void close() {
tcpSocket.close();
if (queue != null) {
queue.close();
queue = null;
}
if (thread != null) {
try {
thread.interrupt();
thread = null;
} catch (Exception ez1) {
if (LOG.isEnabledFor(Priority.WARN))
LOG.warn("Cannot interrupt thread for " + dstAddrString, ez1);
}
}
}
public synchronized boolean isConnected() {
Socket socket = tcpSocket.getSocket(1000);
return socket != null;
}
public synchronized long getLastUsed () {
return lastUsed;
}
public synchronized void setLastUsed (long time) {
lastUsed = time;
}
// Just in case the code that allocated an instance of this object forgot to do "close".
public void finalize() {
close();
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -