📄 tcptransport.java
字号:
/*
*
* $Id: TcpTransport.java,v 1.84 2002/06/07 21:11:00 hamada Exp $
*
* Copyright (c) 2001 Sun Microsystems, Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Sun Microsystems, Inc. for Project JXTA."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA"
* must not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact Project JXTA at http://www.jxta.org.
*
* 5. Products derived from this software may not be called "JXTA",
* nor may "JXTA" appear in their name, without prior written
* permission of Sun.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL SUN MICROSYSTEMS OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of Project JXTA. For more
* information on Project JXTA, please see
* <http://www.jxta.org/>.
*
* This license is based on the BSD license adopted by the Apache Foundation.
*/
package net.jxta.impl.endpoint.tcp;
import java.net.*;
import java.io.*;
import java.util.*;
import org.apache.log4j.Category;
import org.apache.log4j.Priority;
import net.jxta.document.MimeMediaType;
import net.jxta.endpoint.*;
import net.jxta.protocol.*;
import net.jxta.peergroup.PeerGroup;
import net.jxta.id.ID;
import net.jxta.document.Element;
import net.jxta.document.TextElement;
import net.jxta.document.Advertisement;
import net.jxta.document.AdvertisementFactory;
import net.jxta.exception.PeerGroupException;
import net.jxta.impl.endpoint.MessageImpl;
import net.jxta.impl.endpoint.MessageWireFormat;
import net.jxta.impl.endpoint.MessageWireFormatFactory;
import net.jxta.impl.protocol.*;
import net.jxta.impl.endpoint.Address;
import net.jxta.platform.Module;
/***
* This class implements the TCP Transport Protocol
*/
public class TcpTransport implements EndpointProtocol, Runnable, Module {
private static final Category LOG = Category.getInstance(TcpTransport.class.getName());
public ServerSocket unicastSocket;
public InetAddress usingInterface;
private InetAddress propagateInetAddress;
private int propagatePort;
private int propagateSize;
public int serverSocketPort;
private Thread multicastThread = null;
private Thread unicastThread = null;
private MulticastSocket multicastSocket = null;
private PeerGroup group = null;
public EndpointService endpoint = null;
private EndpointAddress publicAddress = null;
private EndpointAdvertisement epadv= null;
private EndpointAddress mAddress = null;
private String localSubnet = null;
private String protocolName = "tcp";
private boolean allowMulticast = true;
private TcpConnectionManager connManager = null;
/**
* This is a portion of the API which might not be useful when
* the configuration part of the JXTA platform will be completed.
* XXX: to revisit.
*/
public static int unicastPortNb = 9701;
public static String multicastAddress = "224.0.1.85";
public static int multicastPortNb = 1234;
public static int multicastPacketSize = 16384;
public static String serverName = null;
public static String interfaceAddress = null;
private IncomingUnicastThreads unicastThreads = null;
public static final int DefaultNbOfUnicastThreads = 1;
public static final int MaxNbOfUnicastThreads = 50;
public static final int MaxNbOfUnicastKeepAliveThreads = 40;
public static final int MaxKeepAliveDelay = 15 * 60 * 1000; // 15 Minute
private IncomingMulticastThreads multicastThreads = null;
public static final int DefaultNbOfMulticastThreads = 0;
public static final int MaxNbOfMulticastThreads = 20;
public static final int MaxNbOfPendingSockets = 100;
public static final int MaxCnxBacklog = 50; // Java's default is 50
public TcpTransport() {
}
public int getPort() {
return serverSocketPort;
}
public InetAddress getMcastAddr() {
return propagateInetAddress;
}
public int getMcastPort() {
return propagatePort;
}
public int getMcastSize() {
return propagateSize;
}
/**
* Returns true if this protocol accepts to be overloaded.
* That is let a protocol with the name protocol name in a
* descendant group be registered.
*
* @return boolean true if over load is allowed.
*/
public boolean allowOverLoad() {
return false;
}
/**
* Mark this module as up and running.
*/
public int startApp(String[] arg) {
return 0;
}
/**
* closes this TransportProtocol.
*/
public void stopApp() {
endpoint.removeEndpointProtocol(this);
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("There may be more things to be done here");
}
/**
* Initialization of the TcpTransport (called by Platform)
*/
public void init(PeerGroup g, ID assignedID, Advertisement impl)
throws PeerGroupException {
// First, create our TcpConnection Manager
connManager = new TcpConnectionManager(this);
try {
ModuleImplAdvertisement implAdv = (ModuleImplAdvertisement) impl;
PeerAdvertisement configAdv = (PeerAdvertisement)
g.getConfigAdvertisement();
// Get out invariable parameters from the implAdv
Element param = implAdv.getParam();
if (param != null) {
Enumeration list = param.getChildren("Proto");
if (list.hasMoreElements()) {
TextElement pname = (TextElement) list.nextElement();
protocolName = pname.getTextValue();
}
}
// Get our peer-defined parameters in the configAdv
param = configAdv.getServiceParam(assignedID);
// FIXME 20011220 bondolo@jxta.org Temporarily accept both nodes of
// type TCPAdv and TransportAdvertisement.
Enumeration tcpChilds = param.getChildren(
TransportAdvertisement.getAdvertisementType());
// get the TransportAdv from either TransportAdv or HttpAdv
if( tcpChilds.hasMoreElements() ) {
param = (Element) tcpChilds.nextElement();
} else {
tcpChilds = param.getChildren(
TCPAdv.getAdvertisementType());
if( tcpChilds.hasMoreElements() ) {
param = (Element) tcpChilds.nextElement();
}
}
TCPAdv adv = (TCPAdv)
AdvertisementFactory.newAdvertisement((TextElement) param);
unicastPortNb = new Integer(adv.getPort()).intValue();
// FIXME: these three should not be peer configurable but
// that will do for now (no worse than before).
allowMulticast = adv.getMulticastState();
if (allowMulticast) {
multicastAddress = adv.getMulticastAddr();
multicastPortNb = new Integer(adv.getMulticastPort()).intValue();
multicastPacketSize = new Integer(adv.getMulticastSize()).intValue();
}
interfaceAddress = adv.getInterfaceAddress();
serverName = adv.getServer();
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info("Configuring TCP Transport" +
"\nUnicastPort:"+ unicastPortNb +
"\nMulticastState:" + allowMulticast +
"\nMulticastAddr:" + multicastAddress +
"\n MulticastPort:" + multicastPortNb +
"\n MulticastPacketSize:" + multicastPacketSize +
"\n Interface address:" + (interfaceAddress == null ? "(unspecified)" : interfaceAddress )+
"\n Public address:"+ (serverName == null ? "(unspecified)" : serverName ) );
}
unicastThreads = new IncomingUnicastThreads(this, DefaultNbOfUnicastThreads);
if (allowMulticast) {
multicastThreads = new IncomingMulticastThreads(this, DefaultNbOfMulticastThreads);
}
/* Open the incoming socket. */
if (interfaceAddress == null) {
interfaceAddress = InetAddress.getLocalHost().getHostAddress();
}
usingInterface = InetAddress.getByName( interfaceAddress );
unicastSocket = new ServerSocket(unicastPortNb, MaxCnxBacklog, usingInterface);
// First create a listening thread waiting for incoming message
// on serverSocket
serverSocketPort = unicastPortNb;
// Build the actual publicAddress and locally bound InetAddress
if (serverName == null) {
serverName = usingInterface.getHostAddress() + ":" + unicastPortNb;
}
endpoint = g.getEndpointService();
publicAddress = endpoint.newEndpointAddress(protocolName
+ "://"
+ serverName);
mAddress = endpoint.newEndpointAddress(protocolName
+ "://"
+ multicastAddress);
if (LOG.isEnabledFor(Priority.INFO)) {
LOG.info( "Binding to : " + usingInterface.getHostAddress() +
"\nPublishing Address : " + publicAddress.toString() );
}
unicastThread = new Thread(this,"TcpTransport:unicast server thread" );
unicastThread.start();
// Create the multicast input socket
if (allowMulticast) {
propagatePort = multicastPortNb;
propagateSize = multicastPacketSize;
propagateInetAddress = InetAddress.getByName(multicastAddress);
multicastSocket = new MulticastSocket(propagatePort);
multicastSocket.setInterface( usingInterface );
multicastSocket.joinGroup(propagateInetAddress);
if (LOG.isEnabledFor(Priority.INFO))
LOG.info( "Multicast : " + propagateInetAddress.getHostAddress() +
":" + propagatePort +
" on interface " + multicastSocket.getInterface().getHostAddress() );
multicastThread = new Thread(this, "TcpTransport:multicast server thread" );
multicastThread.start();
}
// We're fully ready to function.
endpoint.addEndpointProtocol(this);
} catch(Exception e) {
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("Not initialized: " + e.toString());
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Initialization exception", e);
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("FIXME: there may be threads that need killing.");
throw new PeerGroupException(e.getMessage());
}
}
public void run() {
try {
Thread current = Thread.currentThread();
if (current.equals(unicastThread)) {
runUnicastServer();
}
if (current.equals(multicastThread)) {
runMulticastServer();
}
} catch ( Throwable all ) {
if (LOG.isEnabledFor(Priority.FATAL))
LOG.fatal( "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all );
}
}
public void runUnicastServer() {
Socket socket = null;
for (;;) {
try {
socket = unicastSocket.accept();
// Configure the socket
socket.setSoTimeout(MaxKeepAliveDelay);
socket.setKeepAlive(true);
} catch (IOException e1) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("ServerSocket.accept() on port " +
serverSocketPort +
" has failed with: " + e1.toString());
continue;
} catch (SecurityException e2) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("ServerSocket.accept() on port " +
serverSocketPort +
" has failed : " + e2.toString());
continue;
}
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("runUnicastServer: received a connection from " + socket.getInetAddress().getHostAddress() );
processReceivingSocket(socket);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -