📄 quotaincomingmessagelistener.java
字号:
// Make sure account deletion cannot become a denial of service // Give away a time slice. Thread.yield(); synchronized(messageDispatcher) { // Explicitly release each message in the queue // so that the per-peer accounting is maintained. while (! rmdMessages.isEmpty()) { MessageFromSource mfs = (MessageFromSource) rmdMessages.removeFirst(); if (EndpointMeterBuildSettings.ENDPOINT_METERING && (incomingMessageListenerMeter != null)) { incomingMessageListenerMeter.inboundMessageDropped(mfs.msg, System.currentTimeMillis() - mfs.timeReceived); } mfs.src.inNeed(false); mfs.src.releaseQuantity(mfs.size); // Check src account idleness here. Idleness status is // stable under messageDispatcher synchronization. if (mfs.src.isIdle()) { allSources.stickyCacheEntry( (CacheEntry) mfs.src.getUserObject(), false); } } } rmdMessages = null; } /** * process one message and move on. **/ public QuotaIncomingMessageListener doOne() { MessageFromSource mfs = null; // Dequeue a message and update the thread's account "need" status. synchronized(threadDispatcher) { mfs = (MessageFromSource) messageQueue.pop( ); myAccount.inNeed(messageQueue.getCurrentInQueue() != 0); threadDispatcher.notify(); // makes a contender run earlier } // Release the resource to the message account and process. // Msg can be null on occasions since we release the lock between // picking a listener and dequeing...more than one thread // could decide it has work, while there's a single // message. Not too easy to avoid. if (mfs != null) { // We discount that message right now, because we have no idea // what resources are going to be kept, freed, allocated in // relation to that message or not, until the listener comes // back. We cannot assume anything. synchronized(messageDispatcher) { mfs.src.inNeed(false); // Make sure we won't get to keep it. mfs.src.releaseQuantity(mfs.size); // Check idleness here. Idleness is stable under // messageDispatcher synchronization. if (mfs.src.isIdle()) { CacheEntry ce = (CacheEntry) mfs.src.getUserObject(); if( null != ce ) { allSources.stickyCacheEntry(ce, false); } } } long timeDequeued = 0; if (EndpointMeterBuildSettings.ENDPOINT_METERING && (incomingMessageListenerMeter != null)) { timeDequeued = System.currentTimeMillis(); incomingMessageListenerMeter.inboundMessageDeQueued(mfs.msg, timeDequeued - mfs.timeReceived); } // call the listener for this message EndpointListener l = listener; try { // Latch the listener and test it before use. Close() may be racing with us. // If it turns out that the application has closed this quota listener by now, // do not invoke the app listener. We cannot be holding the lock while // invoking the listener. So, it is possible for this QuotaListener to close // between the time we latch the application listener and the time we invoke it. // As a result, it is possible, though unlikely that the application listener // is invoked after removal. Applications must expect it. If an application is // bogus in that respect we make it unlikely that the bug will ever show itself. // This is as far as we can go without creating deadlocks. if (l != null) { l.processIncomingMessage( mfs.msg, mfs.srcAddress, mfs.destAddress); } } catch (Throwable ignored) { if (LOG.isEnabledFor(Level.ERROR)) { LOG.error("Uncaught Throwable in listener : " + this + "(" + l.getClass().getName() + ")" , ignored); } } finally { if (EndpointMeterBuildSettings.ENDPOINT_METERING && (incomingMessageListenerMeter != null)) { incomingMessageListenerMeter.inboundMessageProcessed(mfs.msg, System.currentTimeMillis() - timeDequeued); } } } ResourceAccount next; synchronized(threadDispatcher) { myAccount.inNeed(messageQueue.getCurrentInQueue() > 0); next = myAccount.releaseItem(); if ((messageQueue.isClosed()) && myAccount.isIdle()) { // We have been laid off and it looks like all threads // have returned. We can close the shop. myAccount.close(); } threadDispatcher.notify(); // makes a contender run earlier } if (next == null) { return null; } return (QuotaIncomingMessageListener) next.getUserObject(); } /** * {@inheritDoc} * * <p/>Try to give a new thread for this message (this listener). * Subsequently it will run other listenersaccording to what the dispatcher * says. **/ public void processIncomingMessage( Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) { if( messageQueue.isClosed() ) { return; } long timeReceived = 0; if (EndpointMeterBuildSettings.ENDPOINT_METERING) { timeReceived = System.currentTimeMillis(); } ResourceAccount msgSrcAccount; // XXX 20040930 bondolo why not use UnmodifiableEndpointAddress as key? String srcAddrStr = srcAddr.toString(); CacheEntry ce = null; long msgSize = message.getByteLength(); int attempt = 0; while (true) { if (attempt > 0) { // Give some cpu to upper layers. Thread.yield(); } synchronized(messageDispatcher) { ce = allSources.getCacheEntry(srcAddrStr); if (ce == null) { // Cross-ref the cache entry as the cookie in the account. // we'll need it to efficiently manipulate the purgeability // of the cache entry. Each time we need the cache entry, it // cost us a lookup. Rather do it just once. // At first the user object in the account is just a string // for traces. // We change it when we know what to set. msgSrcAccount = (ResourceAccount) messageDispatcher.newAccount(4*10240, -1, srcAddrStr); if (msgSrcAccount.getNbReserved() < 1) { // That's bad ! We must get rid of some stale // accounts. Purge 1/10 of the idle accounts. msgSrcAccount.close(); allSources.purge(10); msgSrcAccount = (ResourceAccount) messageDispatcher.newAccount(4*10240, -1, "retrying:" + srcAddrStr); } allSources.put(srcAddrStr, msgSrcAccount); ce = allSources.getCacheEntry(srcAddrStr); msgSrcAccount.setUserObject(ce); } else { msgSrcAccount = (ResourceAccount) ce.getValue(); } if (! msgSrcAccount.obtainQuantity(msgSize)) { if (++attempt < 2) { // During the retry, we'll give up the cpu. It helps a lot because otherwise input threads can run non-stop // and nothing runs up-top. continue; } // Too many backloged messages from there. // discard right away. if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Peer exceeds queuing limits; msg discarded."); } return; } // Now, we hold a message resource for that source, so it // cannot be purged from the cache. allSources.stickyCacheEntry(ce, true); break; } } boolean obtained = false; boolean pushed = false; synchronized(threadDispatcher) { do { pushed = messageQueue.push( new MessageFromSource(message, srcAddr, dstAddr, msgSrcAccount, timeReceived, msgSize) ); if( (!pushed) && messageQueue.isClosed() ) { if (LOG.isEnabledFor(Level.DEBUG)) { LOG.debug("queue closed, message discarded"); } break; } } while( !pushed ); if (LOG.isEnabledFor(Level.WARN)) { int queueLen = messageQueue.getCurrentInQueue(); long timeNow = TimeUtils.timeNow(); if ( (queueLen > 100) && (TimeUtils.toRelativeTimeMillis(timeNow, lastLongQueueNotification) > TimeUtils.ASECOND) ) { lastLongQueueNotification = timeNow; LOG.warn("Very long queue (" + queueLen + ") for listener: " + this); } } if (EndpointMeterBuildSettings.ENDPOINT_METERING && (incomingMessageListenerMeter != null)) { incomingMessageListenerMeter.inboundMessageQueued(message); } if (pushed) { obtained = myAccount.obtainItem(); } threadDispatcher.notify(); // makes a contender run earlier } if (! pushed) { // We need to release the resources that we have obtained. // The acount cannot have possibly been purged; it is marked // sticky and we hold resources. So, we can re-use the cache // entry we obtained the last time we had the messageDispatcher // modnitor. synchronized(messageDispatcher) { msgSrcAccount.inNeed(false); // Make sure we won't get to keep it. msgSrcAccount.releaseQuantity(msgSize); // Check idleness here. Idleness is stable under // messageDispatcher synchronization. if (msgSrcAccount.isIdle()) { allSources.stickyCacheEntry(ce, false); } } return; } if (obtained) { ListenerThread.newListenerThread(this); } else { if (LOG.isEnabledFor(Level.INFO)) { LOG.info("Listener '" + this + "' exceeds thread's limits; msg waits."); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -