📄 httpservletmessenger.java
字号:
@Override public boolean isIdleImpl() { // We do not use self destruction. return false; } /** * Send messages. Messages are queued and processed by a thread * running HttpClientConnection. */ @Override public synchronized void sendMessageBImpl(Message message, String service, String serviceParam) throws IOException { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Send " + message + " to " + dstAddress.toString() + "\n\t" + toString()); } if (isClosed()) { IOException failure = new IOException("Messenger was closed, it cannot be used to send messages."); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, failure.getMessage(), failure); } throw failure; } // Set the message with the appropriate src and dest address message.replaceMessageElement(EndpointServiceImpl.MESSAGE_SOURCE_NS, srcAddressElement); EndpointAddress destAddressToUse = getDestAddressToUse(service, serviceParam); MessageElement dstAddressElement = new StringMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NAME, destAddressToUse.toString(), null); message.replaceMessageElement(EndpointServiceImpl.MESSAGE_DESTINATION_NS, dstAddressElement); // doSend returns false only when this messenger is closed. if (!doSend(message)) { // send message failed IOException failure = new IOException("Messenger was closed, it cannot be used to send messages."); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "sendMessage failed (messenger closed).\n\t" + toString(), failure); } throw failure; } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("sendMessage successful for " + message + "\n\t" + toString()); } } // Must be called from synchronized context only. private boolean doSend(Message message) { // No need to wait for the messenger to be free. Transport // messengers have no obligation to behave nicely if they're // used by mltiple threads. If a thread comes here while // we're already busy sending, then that's a congestion. Just // drop the new message (pretend it went out). // This is not even so nasty, because jetty has a sizeable // output buffer. As long as that buffer is not full, sending // is instantaneou. If sending starts blocking, then we can honestly // drop messages. if (isClosed()) { return false; } long now = TimeUtils.timeNow(); if (sendResult != SEND_IDLE) { // check if that connection is a lemon if ((sendResult == SEND_TOOLONG) && (now > (sendingSince + MAX_SENDING_BLOCK))) { close(); } return true; } // put the message on the outgoing "queue" of size 1 outgoingMessage = message; sendResult = SEND_INPROGRESS; sendingSince = now; if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Queued " + message); } // notify the servlet if it was waiting for a message notifyAll(); // wait for the result of the send Since there is ample // buffering underneath, we're not supposed to wait for long; // unless we outdo the connection, in which case, we might as // well start dropping. No point in blocking. // FIXME: jice@jxta.org - this is a rudimentary fix. We need // something like watchedOutputStream instead. Being forced // to do a two-way handshake with a servlet thread is pretty // ridiculous too. We need a simpler http transport. long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(MAX_SENDING_WAIT); while (!isClosed()) { if (sendResult != SEND_INPROGRESS) { break; } long waitfor = TimeUtils.toRelativeTimeMillis(absoluteTimeOut); if (waitfor <= 0) { break; } try { wait(waitfor); } catch (InterruptedException e) { Thread.interrupted(); if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "InterruptedException timeout = " + MAX_SENDING_WAIT + "\n\t" + toString(), e); } break; } } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Got result\n\t" + toString()); } if (isClosed() && (SEND_INPROGRESS == sendResult)) { return false; } // If the operation completed just confirm that we're done // reading the result too. Else mark that we don't care. // When it completes the result will be discarded. By default, // the we return ok. If the operation did not complete fast // enough, that's what we return. boolean result = (sendResult != SEND_FAIL); if (sendResult == SEND_INPROGRESS) { sendResult = SEND_TOOLONG; outgoingMessage = null; } else { sendResult = SEND_IDLE; } // Let another contending thread use this messenger. notifyAll(); return result; } /** * Retrieve a message from the "queue" of messages for the servlet. * * @param timeout Number of milliseconds to wait for a message. Per Java * convention 0 (zero) means wait forever. * @return the message or <code>null</code> if no message was available * before the timeout was reached. * @throws InterruptedException If the thread is interrupted while waiting. */ protected synchronized Message waitForMessage(long timeout) throws InterruptedException { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Waiting (" + (0 == timeout ? "forever" : Long.toString(timeout)) + ") for message\n\t" + toString()); } if (0 == timeout) { // it's forever timeout = Long.MAX_VALUE; } long absoluteTimeOut = TimeUtils.toAbsoluteTimeMillis(timeout); while (!isClosed() && (null == outgoingMessage)) { long waitfor = TimeUtils.toRelativeTimeMillis(absoluteTimeOut); if (waitfor <= 0) { break; } wait(waitfor); } // get the message Message result = outgoingMessage; outgoingMessage = null; // Msg can only be picked-up once. if (!isClosed() && (result == null)) { // ABSURD, but seems to happen: the message sent was NULL // and the sender thread is waiting for the completion event. // There would be no such thing, but we can make sure it stops // waiting and still leave the messenger in a sane state if there // was no such absurdity going on. sendResult = SEND_IDLE; notifyAll(); } if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("Returning " + result + "\n\t" + toString()); } return result; } protected synchronized void messageSent(boolean wasSuccessful) { if (Logging.SHOW_FINE && LOG.isLoggable(Level.FINE)) { LOG.fine("messageSent(" + wasSuccessful + ")\n\t" + toString()); } if (SEND_TOOLONG == sendResult) { // No-one cares for the result any more. Let the next send go. sendResult = SEND_IDLE; } else { sendResult = wasSuccessful ? SEND_SUCCESS : SEND_FAIL; } notifyAll(); } /** * {@inheritDoc} * * <p/>An implementation for debugging. Do not depend on the format. */ @Override public String toString() { return "[" + super.toString() + "] isClosed=" + isClosed() + " sendResult=" + sendResult + " outmsg=" + outgoingMessage; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -