📄 httprelayclientmessagereceiver.java
字号:
}
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("starting blocking receiver");
blockingReceiver = new BlockingMessageReceiver();
receiverThread = new Thread(blockingReceiver, "BlockingMessageReceiver" );
receiverThread.setDaemon(true);
receiverThread.start();
}
} else if(requestMode==MODE_POLL) {
if (pollingTask == null) { // we weren't polling so far
if (receiverThread != null) { // Stop the blocking mode
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("stopping blocking receiver");
blockingReceiver.stop();
receiverThread.interrupt();
blockingReceiver = null;
receiverThread = null;
}
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("starting polling task");
if (pollTimer == null) pollTimer = new Timer(true);
pollingTask = new PollingMessageReceiverTask();
pollTimer.schedule(pollingTask, 0, POLL_INTERVAL);
}
}
}
/** stop the underlying server **/
public void stop() throws InterruptedException {
super.stop();
timer.cancel();
if (blockingReceiver!=null) {
blockingReceiver.stop();
}
}
public String toString() {
return "HttpRelayClientMessageReceiver connected to " + url;
}
/**
* Makes an HTTP POST to the relay, sending the queryString. Returns
* the Response object.
* @param queryString The command to POST
* @param timeout How long to wait for a response. NOTE: currently unused,
* as URL/HttpURLConnection do not support the functionality.
*/
private HttpURLConnection postCommand(String queryString,
int timeout)
throws IOException, InterruptedException {
if (LOG.isEnabledFor(Priority.DEBUG)) {
LOG.debug("posting command to relay " + url + ": " + queryString);
}
HttpURLConnection urlConn = (HttpURLConnection) url.openConnection();
urlConn.setDoOutput(true);
urlConn.setRequestMethod("POST");
urlConn.setAllowUserInteraction(false);
urlConn.setRequestProperty("Content-Length",
Integer.toString(queryString.length()));
urlConn.connect();
BufferedOutputStream out = new BufferedOutputStream(urlConn.getOutputStream());
out.write(queryString.getBytes(), 0, queryString.length() );
out.flush();
out.close();
return urlConn;
}
/** Pulls the value for 'key' out of the map as an Integer. If the value
* does not exist, or the value is not a valid Integer, null is returned.
**/
Integer extractInteger(HashMap map, String key) {
String str = (String) map.get(key);
if (str==null)
return null;
else {
try {
Integer retval = new Integer(str);
return retval;
} catch(NumberFormatException e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug( "Relay misbheaving; reported non-integer for: "
+ key);
}
return null;
}
}
/**
* Takes a response object, buffers the contents, constructs a message,
* and hands the message over to the endpoint.
*/
private void processRelayedMessage(HttpURLConnection conn)
throws IOException {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("processing relayed message");
int contentLen = conn.getContentLength();
if (contentLen<0) {
throw new IOException("Content-length must be specified");
}
InputStream in = conn.getInputStream();
try {
Message msg = endpoint.newMessage();
MessageWireFormatFactory.newMessageWireFormat(
new MimeMediaType( "application/x-jxta-msg" )).readMessage(in, msg);
endpoint.demux(msg);
// Make sure we are not hogging the cpu with input
// thus preventing it from being processed.
// It is better to slow the sending side a bit.
Thread.yield();
} catch (IOException e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Error reading message ");
throw e;
}
}
/** simple reader that fills buf from in up to maxLen bytes **/
private int read(InputStream in, byte[] buf, int maxLen)
throws IOException {
int pos = 0;
int n;
while ((n = in.read(buf, pos, maxLen - pos)) > -1)
pos += n;
return pos;
}
/** Runnable that blocks for messages. Stays in infinite loop until
* the receiver is shut down. **/
private class BlockingMessageReceiver implements Runnable {
boolean isRunning = true;
void stop() {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Trying to stop: " + url );
isRunning = false;
}
public void run() {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("BlockingMessageReceiver starting");
while (isRunning) {
try {
// we have to reconstruct the query every time in case the
// the lease was renewed
QueryString query = getLeaseIdQueryString();
query.add(HttpUtil.COMMAND_NAME, HttpUtil.COMMAND_VALUE_BLOCK);
String command = query.toString();
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("POSTing block command to relay: " + url);
HttpURLConnection conn = postCommand(command,
BLOCK_TIMEOUT);
if (isRunning) {
int responseCode = conn.getResponseCode();
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("response code == " + responseCode);
if (responseCode == HttpUtil.HTTP_SC_OK) {
if (conn.getContentLength() == 0) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Relayed sent back no message");
} else {
processRelayedMessage(conn);
}
} else {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Non-OK response code from relay; "+
"waiting for interval: " +
BLOCK_RETRY_INTERVAL);
Thread.sleep(BLOCK_RETRY_INTERVAL);
}
}
conn.disconnect();
} catch(FileNotFoundException e) {
// This can also happen if the lease is broken because
// of a "feature" of JDK's HttpUrlConnection which
// pretends to be a filesystem and has getResponseCode
// throw FileNotFoundExc with some combinations of urls
// and response code !!!
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Exception during relay request: " + e);
lostLease();
// sleep before we start trying again
try {
Thread.sleep(BLOCK_RETRY_INTERVAL);
} catch (InterruptedException e2) {
// no code intended
Thread.interrupted();
}
} catch(IOException ioe) {
// sleep before we start trying again
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("IOException during relay request: " + ioe);
try {
Thread.sleep(BLOCK_RETRY_INTERVAL);
} catch (InterruptedException e2) {
Thread.interrupted();
// no code intended
}
} catch(RelayLeaseException e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("Exception: Lease is invalid");
// We're impatient, speed-up the normal schedule if needed.
lostLease();
// sleep before we start trying again
try {
Thread.sleep(BLOCK_RETRY_INTERVAL);
} catch (InterruptedException e2) {
Thread.interrupted();
// no code intended
}
} catch( Throwable e ) {
if (LOG.isEnabledFor(Priority.DEBUG))
LOG.debug("Throwable caught during blocking request to "+
"relay; waiting for an interval: " + BLOCK_RETRY_INTERVAL, e);
try {
Thread.sleep(BLOCK_RETRY_INTERVAL);
} catch (InterruptedException e2) {
Thread.interrupted();
// no code intended
}
}
}
}
}
/** polls for messages. XXX untested. ***/
private class PollingMessageReceiverTask extends TimerTask {
public void run() {
try {
QueryString query = getLeaseIdQueryString();
query.add(HttpUtil.COMMAND_NAME, HttpUtil.COMMAND_VALUE_POLL);
String command = query.toString();
HttpURLConnection conn = postCommand(command,
POLL_INTERVAL);
if (conn.getResponseCode() == HttpUtil.HTTP_SC_OK) {
processRelayedMessage(conn);
}
conn.disconnect();
} catch(RelayLeaseException e) {
// We're impatient, speed-up the normal schedule.
lostLease();
} catch(FileNotFoundException e) {
lostLease();
} catch(IOException e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("IOException during poll request to relay: "+e);
} catch(InterruptedException e) {
if (LOG.isEnabledFor(Priority.DEBUG)) LOG.debug("InterruptedException while polling");
}
}
}
/** Returns a new QueryString object with the lease session id added **/
private QueryString getLeaseIdQueryString() throws RelayLeaseException {
QueryString query = new QueryString();
String leaseId = getLeaseId();
if (leaseId!=null) {
query.add("leaseId", getLeaseId() );
return query;
} else {
throw new RelayLeaseException("Lease expired");
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -