📄 httptransport.java
字号:
String fn = cm.getOldestFile(dn);
if (fn == null) {
// No more messages in the spooler for that peer
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("No messages for client");
if (!blocking) {
return HttpMessage.EMPTYMESSAGE;
} else {
// if we are blocking then we return no message at all. the
// client will simply have to wait until something happens.
return null;
}
}
// There is a message to be sent in the spooler. Send it.
// For the time being, just return a message at the time.
// Maybe a good optimization would be to return a bunch
// of them...
// XXX: to revist. lomax@jxta.org
byte[] buffer = (new HttpMessage(dn, fn)).getBytes();
// Remove the file.
// XXX: maybe it would be wise to wait for an OK from the other
// side before removing the file, but that might be too heavy.
// To be revisited. lomax@jxta.org
try {
cm.remove(dn, fn);
} catch( IOException failed ) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn( "failed to remove cm file : " + dn +
File.separatorChar + fn );
}
try {
client = server.getClientConnection(clientId);
if (client != null) {
Vector buffers = new Vector(1);
buffers.addElement(buffer);
client.sendToClient( buffers, buffer.length );
} else {
Vector buffers = new Vector();
buffers.addElement(buffer);
sendResponse(outputStream, buffers, buffer.length);
closeSocket(inputSocket, inputStream, outputStream);
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.WARN)) LOG.warn( "couldnt send reply?" );
return null;
}
return null;
}
/**
* Given a serialized message stored in a string create a message object
* and then provide it to the endpoint for processing.
*
* @param data the message in wire format.
* @return HttpMessage message indicating the result of the processing.
*
**/
private HttpMessage processIncoming(String data) {
try {
byte[] buffer = data.getBytes();
ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
Message msg = endpoint.newMessage();
MessageWireFormatFactory.newMessageWireFormat(
new MimeMediaType( "text/xml" ) ).readMessage(bais, msg);
endpoint.demux(msg);
} catch (Exception ignored) {
// Just discard message
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("failure processing incoming", ignored );
return HttpMessage.FAILEDMESSAGE;
}
return HttpMessage.OKMESSAGE;
}
/**
* Creates and return an EndpointMessenger
*/
public EndpointMessenger getMessenger( EndpointAddress dst)
throws IOException {
String tmp = dst.getProtocolAddress();
if ((tmp.length() > MagicWord.length()) &&
(tmp.substring(0, MagicWord.length()).equals(MagicWord))) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" getting a server messenger");
return new HttpServerMessenger( dst, this);
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" getting a client messenger");
return new HttpNonBlockingMessenger(dst, this);
}
}
/**
* Propagates a TransportMessage on this TransportProtocol
*
* @param msg the TransportMessage to be propagated
*/
public void propagate(Message message,
String pName,
String pParams,
String prunePeer) throws IOException {
// HttpTransport does not implement propagate
return;
}
private void startPolling() {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("startPolling");
// Sanity check
if (pollingThread != null) {
pollingThread.setPolling( true );
return;
}
// Polling means that we are a client
publicAddress = publicClientAddress;
// Create listening Thread
pollingThread = new HttpClient( myThreadGroup, this );
pollingThread.start();
}
private void stopPolling() {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("stopPolling");
// Sanity check
if (pollingThread == null) {
return;
}
// tell the thread to quit
pollingThread.endPolling();
pollingThread = null;
// We stopped polling, maybe are we server now
publicAddress = publicServerAddress;
}
/**
* Poll the remote addr for messages for this peer.
*
* @param addr URL of the remote server to poll
* @param type type of polling to perform, blocking or non-blocking
* @return boolean true if a message was received.
**/
boolean pollRemote( URL addr, int type ) {
HttpMessage httpMsg = null;
byte[] buffer = null;
String content = null;
try {
content = sendHttpGET( addr, type, true );
if (content == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" no message to poll (no reponse)");
return false;
}
httpMsg = new HttpMessage(content);
if ( HttpMessage.Empty == httpMsg.getCode() ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" no message to poll(by server code)");
return false;
}
buffer = httpMsg.getData();
if (buffer == null) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" no message to poll(empty msg body)");
return false;
}
ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
Message msg = endpoint.newMessage();
MessageWireFormatFactory.newMessageWireFormat(
new MimeMediaType( "text/xml" ) ).readMessage(bais, msg);
endpoint.demux(msg);
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug(" polled one message");
return true;
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("pollRemote failed", e);
return false;
}
}
/**
* register this client with the server named by the string authority
*
* @param authority the server this client should try to register with
* @return boolean true if the register was acknowledged otherwise
* false
**/
private boolean registerWithRemote( String authority ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Register with Remote : " + authority );
try {
String host = authority;
int port = 80;
int sepAt = authority.lastIndexOf( ':' );
if( -1 != sepAt ) {
host = authority.substring(0, sepAt );
port = Integer.parseInt( authority.substring(sepAt + 1) );
}
URL addr = new URL( "http", host, port, "/reg/" + localClientId
+ "/" );
String result = sendHttpGET( addr, PollingSend, true );
if (result != null) {
int code = (new HttpMessage( result )).getCode();
if( HttpMessage.Ok == code ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "register with remote succeeded" );
return true;
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "register with remote failed with code : " + code );
}
}
else {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "register with remote got no response" );
// FIXME 20010910 bondolo@jxta.org This occurs when we are
// talking to an old server.
return true;
}
} catch (IOException e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "register with remote failed for some reason", e );
}
return false;
}
/**
* Read a message from the input stream.
*
* @param ip the InputStream to read from.
* @param blocking read until until EOF
* @param String the http response or null if none could retrieved.
**/
private String getResult( InputStream ip, boolean blocking ) {
final int MAXRETRIES = 8;
final int RETRYINTERVAL = 2 * 1000;
String tmp = "";
byte[] buffer = null;
if (!blocking) {
// non-blocking receive. We never block on I/O unless available() lies
try {
int cnt = 0;
while (true) {
int avail = ip.available();
// are we at the end of the stream?
if (ip.available() == -1) {
// we never saw the tags we were looking for
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "getResult:EOF before we found the message end" );
return null;
}
if (ip.available() == 0) {
try {
if (++cnt > MAXRETRIES ) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "getResult:Retry count exceeded. Had " +
tmp.length() + " bytes" );
return null;
}
Thread.sleep( RETRYINTERVAL );
} catch (InterruptedException e) {
// if we are woken prematurely, give up
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "getResult:Thread interrupted, giving up" );
Thread.interrupted();
return null;
}
continue;
}
buffer = new byte [avail];
int got = ip.read(buffer);
// if we didnt read anything, just loop.
if (got == 0)
continue;
// reset the retry. we are still being fed
cnt = 0;
//if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "getResult : non-blocking read of " + got + " bytes" );
tmp += new String(buffer, 0, got);
// are we done?
if (tmp.startsWith("GET")) {
break;
}
// are we done?
if (tmp.lastIndexOf("</HTML>") >= 0) {
break;
}
}
} catch (Exception e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "getResult:caught exception, giving up.", e );
return null;
}
} else {
// blocking receive. We block on I/O
// FIXME 20010918 bondolo@jxta.org The way in which we termintate
// waiting is not really appropriate. What we should be doing here is
// read the http
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -