📄 httptransport.java
字号:
}
if (failureMode >= MAXFAILURES ) {
// This is the second repeated failure. Let assume that BlockingSend
// cannot be used on this connection, and fail over PollingSend.
// Add this connection to the PollingSend protocol.
if (LOG.isEnabledFor(Priority.INFO)) LOG.info( "HttpTransport reverting to polling mode--too many failures" );
tp.startPolling();
tp.pollingThread.addPollingHttp(routerName);
break;
}
// The repeated failure occured less than PollingDelay. This could either
// be a temporary problem, or simply that the BlockingSend protocol
// can not be run on this connection.
// Wait for half
failureMode++;
// Wait for PollingDelay: if once in a while, we have to wait for PollingDelay,
// this protocol is still more efficient.
try {
Thread.sleep(PollingDelay);
} catch ( InterruptedException woken ) {
Thread.interrupted();
}
// Assume that the previous failure happned now (forgetting about the
// the forced delayed.
lastFailure = System.currentTimeMillis();
} else {
failureMode = 0; // we got a message, reset failures
lastReceivedMsg = System.currentTimeMillis();
}
}
// Forget the connection
tp.forgetConnection(routerName);
} catch ( Throwable all ) {
if (LOG.isEnabledFor(Priority.FATAL)) LOG.fatal( "Uncaught Throwable in thread : " + Thread.currentThread().getName(), all );
}
}
}
/**
* This is a portion of the API which might not be useful when
* the configuration part of the JXTA platform will be completed.
* XXX: to revisit.
*/
public HttpTransport() {
}
/**
* Returns true if this protocol accepts to be overloaded.
* That is let a protocol with the name protocol name in a
* descendant group be registered.
*
* @return boolean true if overload is allowed.
*/
public boolean allowOverLoad() {
return false;
}
/**
* Mark this module as up and running.
*/
public int startApp(String[] arg) {
return 0;
}
/**
* closes this TransportProtocol.
*/
public void stopApp() {
endpoint.removeEndpointProtocol(this);
stopPolling();
}
/**
* Intialize the transport now that an endpoint is available.
*
* @param ep the endpoint we are working for.
*
**/
public void init(PeerGroup g, ID assignedID, Advertisement impl)
throws PeerGroupException
{
try {
group = g;
endpoint = g.getEndpointService();
ModuleImplAdvertisement implAdv = (ModuleImplAdvertisement) impl;
PeerAdvertisement configAdv =
(PeerAdvertisement) g.getConfigAdvertisement();
// Get out invariable parameters from the implAdv
Element param = implAdv.getParam();
if (param != null) {
Enumeration list = param.getChildren("Proto");
if (list.hasMoreElements()) {
TextElement pname = (TextElement) list.nextElement();
protocolName = pname.getTextValue();
}
}
// FIXME 20011220 bondolo@jxta.org Temporarily accept both nodes of
// type HTTPAdv and TransportAdvertisement.
Enumeration httpChilds = param.getChildren(
TransportAdvertisement.getAdvertisementType());
// get the TransportAdv from either TransportAdv or HttpAdv
if( httpChilds.hasMoreElements() ) {
param = (Element) httpChilds.nextElement();
} else {
httpChilds = param.getChildren(
HTTPAdv.getAdvertisementType());
if( httpChilds.hasMoreElements() ) {
param = (Element) httpChilds.nextElement();
}
}
// For now, re-use the HttpAdv code...less coding.
HTTPAdv adv = (HTTPAdv)
AdvertisementFactory.newAdvertisement((TextElement) param);
interfaceAddress = adv.getInterfaceAddress();
if (adv.getRouterEnabled()) routers = adv.getRouters();
if (adv.getProxyEnabled()) proxyName = adv.getProxy();
if (adv.getServerEnabled()) {
serverName = adv.getServer();
try {
serverSocketPort = Integer.parseInt(adv.getPort());
} catch(Exception e) {
if (LOG.isEnabledFor(Priority.ERROR)) LOG.error("Bad ServerPort from advertisement");
// we're left with the builtin default.
}
}
// Apply our current sanity rules. They are:
// If we need a router we cannot be a routing server.
// If we do not need a router, then we cannot need a proxy.
//
// If there is a contradiction in the config, we give priority to
// the need-a-router setting. The reason for this choice is
// pragmatic:
// If there is a proxy settig but no router setting, we do not
// have the info to make-up a router setting, but we can ignore
// the proxy. As for the server, well, it's a toss-up, and if the
// user does not seem to know what he's doing; he probably did not
// realy need to set-up a server.
if (routers == null) {
if (proxyName != null) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("This system does not use "
+"an external router; ignoring proxy settings." );
}
proxyName = null;
} else {
if (serverName != null) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn( "This system needs an external router."
+" It cannot be a routing server." );
}
serverName = null;
}
// If no interface address has been specified, select one that
// might work.
if (interfaceAddress == null) {
interfaceAddress =
InetAddress.getLocalHost().getHostAddress();
}
usingInterface = InetAddress.getByName( interfaceAddress );
endpoint = g.getEndpointService();
localClientId = g.getPeerID().getUniqueValue().toString();
if (serverName != null) {
// Server side
// Builds the publicAddresses
publicServerAddress =
endpoint.newEndpointAddress(protocolName + "://"
+ serverName);
publicAddress = publicServerAddress;
} else {
// Client side
publicClientAddress =
endpoint.newEndpointAddress(protocolName + "://"
+ MagicWord
+ localClientId);
publicAddress = publicClientAddress;
// make the proxy url if need be
if( null != proxyName ) {
try {
String host = proxyName;
int port = 8080;
int sepAt = proxyName.lastIndexOf( ':' );
if( -1 != sepAt ) {
host = proxyName.substring(0, sepAt );
port = Integer.parseInt( proxyName.substring(sepAt + 1) );
}
proxyHost = new URL( "http", host, port, "/" );
} catch( MalformedURLException badurl ) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn( "proxy name was not valid" );
proxyHost = null;
}
}
}
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("Configuring HTTP Transport");
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("Public address : " + publicAddress );
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("Using interface : " + usingInterface.getHostAddress() );
if( null != serverName ) {
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("HTTP in server mode. " );
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("HTTP server name: "
+ (serverName == null ? "(none)" : serverName));
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("Server port:"
+ ((serverSocketPort < 0 ) ? "(none)" : Integer.toString(serverSocketPort) ));
} else {
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("HTTP in client mode. " +
(useBlockingSend ? "(blocking)" : "(polling)") );
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("HTTP routers: "
+ (routers == null ? "(none)" : routers.toString()));
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("HTTP proxy: "
+ (proxyHost == null ? "(none)" : proxyHost.toString() ));
}
myThreadGroup = new ThreadGroup( "HttpTransport " +
usingInterface.getHostAddress() );
HttpSpool = "httpspool_" +
usingInterface.getHostAddress() +
((serverSocketPort < 0 ) ? "" : "_" + Integer.toString(serverSocketPort) );
// create the spool folder
if (!initDir(HttpSpool)) {
return;
}
// Cleanup the spooler. All message waiting for clients are removed.
// Messages can be lost that way, but, that messages can be lost anyway.
// XXX: something less brutal can be done, but this seems to be reasonable
// for the time being. lomax@jxta.org
cleanupAllClients();
if (serverName != null) {
// Server side
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("Starting server : " + serverName );
// Create listening Thread
server = new HttpServer( myThreadGroup, this,
usingInterface,
serverSocketPort );
server.start();
} else {
if( !useBlockingSend) startPolling();
// client side
if (routers != null) {
Enumeration e = routers.elements();
while (e.hasMoreElements()) {
String routerName = (String) e.nextElement();
// Register itself to remote site
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("Starting client connection with " + routerName);
registerWithRemote( routerName );
if (useBlockingSend) {
Thread th = new ServerConnection(
myThreadGroup,
this,
routerName,
localClientId);
th.start();
} else {
pollingThread.addPollingHttp(routerName);
}
}
} else {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn( "There are no HTTP routers defined, no communication " +
"will occurr until some are defined." );
}
}
// We're ready to go.
endpoint.addEndpointProtocol(this);
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.INFO)) LOG.info("Not initialized: " + e.toString());
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Initialization exception", e);
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn("FIXME: there may be threads that need killing.");
throw new PeerGroupException(e.getMessage());
}
}
/**
* Return an int Containing port number we are using for incoming
* connects.
*
* @return int Containing port number we are using for incoming connects.
**/
public int getPort() {
return serverSocketPort;
}
/**
* Return the protocol name we support
**/
public String getProtocolName() {
return protocolName;
}
public EndpointAddress getPublicAddress() {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -