⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 quotaincomingmessagelistener.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        // Make sure account deletion cannot become a denial of service        // Give away a time slice.        Thread.yield();                synchronized(messageDispatcher) {                        // Explicitly release each message in the queue            // so that the per-peer accounting is maintained.            while (! rmdMessages.isEmpty()) {                                MessageFromSource mfs =                (MessageFromSource) rmdMessages.removeFirst();                                if (EndpointMeterBuildSettings.ENDPOINT_METERING && (incomingMessageListenerMeter != null)) {                    incomingMessageListenerMeter.inboundMessageDropped(mfs.msg, System.currentTimeMillis() - mfs.timeReceived);                }                                mfs.src.inNeed(false);                mfs.src.releaseQuantity(mfs.size);                // Check src account idleness here. Idleness status is                // stable under messageDispatcher synchronization.                if (mfs.src.isIdle()) {                    allSources.stickyCacheEntry(                    (CacheEntry) mfs.src.getUserObject(),                    false);                }            }        }                rmdMessages = null;    }        /**     * process one message and move on.     **/    public QuotaIncomingMessageListener doOne() {        MessageFromSource mfs = null;                // Dequeue a message and update the thread's account "need" status.        synchronized(threadDispatcher) {            mfs = (MessageFromSource) messageQueue.pop( );            myAccount.inNeed(messageQueue.getCurrentInQueue() != 0);            threadDispatcher.notify(); // makes a contender run earlier        }                // Release the resource to the message account and process.                // Msg can be null on occasions since we release the lock between        // picking a listener and dequeing...more than one thread        // could decide it has work, while there's a single        // message. Not too easy to avoid.        if (mfs != null) {            // We discount that message right now, because we have no idea            // what resources are going to be kept, freed, allocated in            // relation to that message or not, until the listener comes            // back. We cannot assume anything.            synchronized(messageDispatcher) {                mfs.src.inNeed(false); // Make sure we won't get to keep it.                mfs.src.releaseQuantity(mfs.size);                // Check idleness here. Idleness is stable under                // messageDispatcher synchronization.                if (mfs.src.isIdle()) {                    CacheEntry ce = (CacheEntry) mfs.src.getUserObject();                                        if( null != ce ) {                       allSources.stickyCacheEntry(ce, false);                    }                }            }                        long timeDequeued = 0;                        if (EndpointMeterBuildSettings.ENDPOINT_METERING && (incomingMessageListenerMeter != null))  {                timeDequeued = System.currentTimeMillis();                incomingMessageListenerMeter.inboundMessageDeQueued(mfs.msg, timeDequeued - mfs.timeReceived);            }                        // call the listener for this message            EndpointListener l = listener;            try {                // Latch the listener and test it before use. Close() may be racing with us.                // If it turns out that the application has closed this quota listener by now,                 // do not invoke the app listener. We cannot be holding the lock while                // invoking the listener. So, it is possible for this QuotaListener to close                // between the time we latch the application listener and the time we invoke it.                // As a result, it is possible, though unlikely that the application listener                // is invoked after removal. Applications must expect it.  If an application is                // bogus in that respect we make it unlikely that the bug will ever show itself.                // This is as far as we can go without creating deadlocks.                if (l != null) {                    l.processIncomingMessage( mfs.msg, mfs.srcAddress, mfs.destAddress);                }            } catch (Throwable ignored) {                if (LOG.isEnabledFor(Level.ERROR)) {                    LOG.error("Uncaught Throwable in listener : " + this + "(" + l.getClass().getName() + ")" , ignored);                }            } finally {                if (EndpointMeterBuildSettings.ENDPOINT_METERING && (incomingMessageListenerMeter != null)) {                    incomingMessageListenerMeter.inboundMessageProcessed(mfs.msg, System.currentTimeMillis() - timeDequeued);                }            }        }                ResourceAccount next;        synchronized(threadDispatcher) {            myAccount.inNeed(messageQueue.getCurrentInQueue() > 0);            next = myAccount.releaseItem();            if ((messageQueue.isClosed()) && myAccount.isIdle()) {                                // We have been laid off and it looks like all threads                // have returned. We can close the shop.                myAccount.close();            }            threadDispatcher.notify(); // makes a contender run earlier        }                if (next == null) {            return null;        }                return (QuotaIncomingMessageListener) next.getUserObject();    }        /**     *  {@inheritDoc}     *     * <p/>Try to give a new thread for this message (this listener).     * Subsequently it will run other listenersaccording to what the dispatcher     * says.     **/    public void processIncomingMessage( Message message, EndpointAddress srcAddr, EndpointAddress dstAddr) {        if( messageQueue.isClosed() ) {            return;        }                long timeReceived = 0;                if (EndpointMeterBuildSettings.ENDPOINT_METERING) {            timeReceived = System.currentTimeMillis();        }                ResourceAccount msgSrcAccount;        // XXX 20040930 bondolo why not use UnmodifiableEndpointAddress as key?        String srcAddrStr = srcAddr.toString();        CacheEntry ce = null;        long msgSize = message.getByteLength();                int attempt = 0;        while (true) {            if (attempt > 0) {                // Give some cpu to upper layers.                Thread.yield();            }                        synchronized(messageDispatcher) {                ce = allSources.getCacheEntry(srcAddrStr);                                if (ce == null) {                    // Cross-ref the cache entry as the cookie in the account.                    // we'll need it to efficiently manipulate the purgeability                    // of the cache entry. Each time we need the cache entry, it                    // cost us a lookup. Rather do it just once.                    // At first the user object in the account is just a string                    // for traces.                    // We change it when we know what to set.                    msgSrcAccount = (ResourceAccount)                    messageDispatcher.newAccount(4*10240, -1, srcAddrStr);                    if (msgSrcAccount.getNbReserved() < 1) {                        // That's bad ! We must get rid of some stale                        // accounts. Purge 1/10 of the idle accounts.                        msgSrcAccount.close();                        allSources.purge(10);                        msgSrcAccount = (ResourceAccount)                        messageDispatcher.newAccount(4*10240, -1, "retrying:" + srcAddrStr);                    }                                        allSources.put(srcAddrStr, msgSrcAccount);                                        ce = allSources.getCacheEntry(srcAddrStr);                    msgSrcAccount.setUserObject(ce);                } else {                    msgSrcAccount = (ResourceAccount) ce.getValue();                }                                                if (! msgSrcAccount.obtainQuantity(msgSize)) {                    if (++attempt < 2) {                        // During the retry, we'll give up the cpu. It helps a lot because otherwise input threads can run non-stop                        // and nothing runs up-top.                        continue;                    }                                        // Too many backloged messages from there.                    // discard right away.                    if (LOG.isEnabledFor(Level.INFO)) {                        LOG.info("Peer exceeds queuing limits; msg discarded.");                    }                    return;                }                                // Now, we hold a message resource for that source, so it                // cannot be purged from the cache.                allSources.stickyCacheEntry(ce, true);                break;            }        }                boolean obtained = false;        boolean pushed = false;                synchronized(threadDispatcher) {            do {                pushed = messageQueue.push( new MessageFromSource(message, srcAddr, dstAddr, msgSrcAccount, timeReceived, msgSize) );                                if( (!pushed) && messageQueue.isClosed() ) {                    if (LOG.isEnabledFor(Level.DEBUG)) {                        LOG.debug("queue closed, message discarded");                    }                    break;                }            } while( !pushed );                        if (LOG.isEnabledFor(Level.WARN)) {                int queueLen = messageQueue.getCurrentInQueue();                long timeNow = TimeUtils.timeNow();                                if ( (queueLen > 100) && (TimeUtils.toRelativeTimeMillis(timeNow, lastLongQueueNotification) > TimeUtils.ASECOND) ) {                    lastLongQueueNotification = timeNow;                    LOG.warn("Very long queue (" + queueLen + ") for listener: " + this);                }            }                        if (EndpointMeterBuildSettings.ENDPOINT_METERING && (incomingMessageListenerMeter != null))  {                incomingMessageListenerMeter.inboundMessageQueued(message);            }                        if (pushed) {                obtained = myAccount.obtainItem();            }            threadDispatcher.notify();  // makes a contender run earlier        }                if (! pushed) {            // We need to release the resources that we have obtained.            // The acount cannot have possibly been purged; it is marked            // sticky and we hold resources. So, we can re-use the cache            // entry we obtained the last time we had the messageDispatcher            // modnitor.            synchronized(messageDispatcher) {                msgSrcAccount.inNeed(false); // Make sure we won't get to keep it.                msgSrcAccount.releaseQuantity(msgSize);                                // Check idleness here. Idleness is stable under                // messageDispatcher synchronization.                if (msgSrcAccount.isIdle()) {                    allSources.stickyCacheEntry(ce, false);                }            }                        return;        }                if (obtained) {            ListenerThread.newListenerThread(this);        } else {            if (LOG.isEnabledFor(Level.INFO)) {                LOG.info("Listener '" + this +  "' exceeds thread's limits; msg waits.");            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -