⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 quotaincomingmessagelistener.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
/* *  $Id: QuotaIncomingMessageListener.java,v 1.36 2006/03/24 21:55:16 bondolo Exp $ * *  Copyright (c) 2001 Sun Microsystems, Inc.  All rights reserved. * *  Redistribution and use in source and binary forms, with or without *  modification, are permitted provided that the following conditions *  are met: * *  1. Redistributions of source code must retain the above copyright *  notice, this list of conditions and the following disclaimer. * *  2. Redistributions in binary form must reproduce the above copyright *  notice, this list of conditions and the following disclaimer in *  the documentation and/or other materials provided with the *  distribution. * *  3. The end-user documentation included with the redistribution, *  if any, must include the following acknowledgment: *  "This product includes software developed by the *  Sun Microsystems, Inc. for Project JXTA." *  Alternately, this acknowledgment may appear in the software itself, *  if and wherever such third-party acknowledgments normally appear. * *  4. The names "Sun", "Sun Microsystems, Inc.", "JXTA" and "Project JXTA" *  must not be used to endorse or promote products derived from this *  software without prior written permission. For written *  permission, please contact Project JXTA at http://www.jxta.org. * *  5. Products derived from this software may not be called "JXTA", *  nor may "JXTA" appear in their name, without prior written *  permission of Sun. * *  THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED *  WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES *  OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE *  DISCLAIMED.  IN NO EVENT SHALL SUN MICROSYSTEMS OR *  ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF *  USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND *  ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, *  OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT *  OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF *  SUCH DAMAGE. * *  ==================================================================== * *  This software consists of voluntary contributions made by many *  individuals on behalf of Project JXTA.  For more *  information on Project JXTA, please see *  <http://www.jxta.org/>. * *  This license is based on the BSD license adopted by the Apache Foundation. */package net.jxta.impl.endpoint;import java.util.List;import java.util.LinkedList;import org.apache.log4j.Logger;import org.apache.log4j.Level;import net.jxta.endpoint.EndpointAddress;import net.jxta.endpoint.EndpointListener;import net.jxta.endpoint.Message;import net.jxta.impl.endpoint.endpointMeter.EndpointMeterBuildSettings;import net.jxta.impl.endpoint.endpointMeter.InboundMeter;import net.jxta.impl.util.UnbiasedQueue;import net.jxta.impl.util.ResourceDispatcher;import net.jxta.impl.util.ResourceAccount;import net.jxta.impl.util.Cache;import net.jxta.impl.util.CacheEntry;import net.jxta.impl.util.CacheEntryListener;import net.jxta.impl.util.TimeUtils;/** *  A wrapper around an EndpointListener which imposes fair sharing quotas. * * <p/><b>NOTE</b>: 20020526 jice * There would be great value in making such an object the explicit interface * between the endpoint and its clients, rather than a bland listener interface * The client would have the ability to specify the threads limit, possibly * setting it zero, and then could have access to the buffer. * To implement that with a simple listener would mean that the endpoint * has to TRUST the client that the listener does no-more than queuing and * signaling or else, force a full and possibly redundant hand-shake in all * cases, as is the case now.  To be improved. **/public class QuotaIncomingMessageListener implements EndpointListener {        /**     *  Log4J Logger     **/    private final static Logger LOG = Logger.getLogger(QuotaIncomingMessageListener.class.getName());        /**     * All QuotaIncomingMessageListener share one global resource manager for     * threads. Its budget is hardcoded for now.     *     * <p/>Parameters read as follows:     *     * <p/><pre>     * new ResourceDispatcher(     *      100,  // support at least that many listeners     *      1,    // each with at least that many reserved threads each     *      5,    // let a listener reserve up to that many threads     *      500,  // additional un-reserved threads     *      50,   // any listener can have up to this many un-reserved threads     *      20,   // threads that can never be reserved     *      true  // use round robin     *      );     * </pre>     *     * <p/>It means that we will authorize up to 1000 threads total (<code>     * 100 listeners * 5 reserved threads + 500 additional unreserved threads     * </code>).     *     * <p/>We can support more than 100 listeners, but we cannot garantee     * that we will be able to reserve even 1 thread for them. If we do     * it will be by pulling it out of the un-reserved set if there are     * any available at that time. If every listener uses only the minimal     * garaunteed of 1 thread, then we can support 600 listeners (<code>     * 100 listeners + 500 additional unreserved threads</code>).     *     * <p/>Round robin means that listeners that want to use threads beyond     * their reservation are queued waiting for an extra thread to become     * available.     **/    static private ResourceDispatcher threadDispatcher =    new ResourceDispatcher(100, 1, 5, 150, 50, 20, true, "threadDispatcher");            /*       All next hop peers that send us messages share one global resource       manager for message queing. Its budget is hardcoded for now.       If you tune only the first two values, the derived ones       compute to a total commitment of GmaxSenders * GmaxMsgSize * 10            Examples:       150 senders * 20K messages * 10 max allocation => 30M       350 senders * 300K messages * 10 max allocation => 1G       3500 senders * 8k messages * 10 max allocation => 280M       10000 senders * 8k messages * 20 max allocation => 1.6GB     */        /**     * Max guaranteed supported message size (bytes). This is a theoretical     * maximum message size and should reflect the maximum size of messages     * used in relevant protocols. All other calculations assume this as the     * "default" message size. If message size is very variable then this should     *  be the median message size rather than the maximum.     **/    static int GmaxMsgSize = 6 * 1024;        /**     * Max guaranteed senders (integer). Expected number of message sources     * amongst whom resouces are to be shared.     **/    static int GmaxSenders = 300;        /**     * Every sender account will always be granted 2 messages worth of queue     * size.     **/    static int GminResPerSender = 2 * GmaxMsgSize;        /**     * Every sender account can over allocate up to 5 messages worth of queue     * size if the space is available. If peers are very bursty with messages     * then this should be higher.     **/    static int GmaxResPerSender = 3 * GminResPerSender;        /**     * Additional resources in reserve, to be allocated on the fly. Available     * reservations beyond GminResPerSender are taken from there, so, we     * must have enough. This space is fairly shared by all senders who are     * over their minumum reserved allocation.     **/    static int TotalExtra = 2 * GmaxResPerSender * GmaxSenders;        /**     * There is a limit to the amount of on-the-fly that a single sender can     * hog. If peers are very bursty with messages then this should be higher.     **/    static int MaxExtraPerSender = 8 * GmaxResPerSender;        /**     * There is a part of the non-reserved resources that we will never use for     * reservation by senders in excess of GmaxSenders even if the number of     * accounts is way beyond the max garaunteed. Instead we'll prefer to grant     * 0 reserved items to additional senders.     **/    static int NeverReserved = TotalExtra / 8;            private final static ResourceDispatcher messageDispatcher =    new ResourceDispatcher(GmaxSenders,    GminResPerSender,    GmaxResPerSender,    TotalExtra,    MaxExtraPerSender,    NeverReserved,    false,                // No RoundRobin    "messageDispatcher");            /**     * A canonical mapping of all the message originators.     * This is a cache, since peers can disappear silently.     *     * <p/>Note: a number of accounts might not be the best criterion though.     * since just a few stale accounts could hog a lot of resources.     * May be we should just make that cache sensitive to the inconvenience     * endured by live accounts.     **/    static class MyCacheListener implements CacheEntryListener {        // This may only be called when something gets added to the        // cache or when an item is made purgeable. In our case that means        // all the synchro we need is already there.        public void purged(CacheEntry entry) {            ((ResourceAccount) entry.getValue()).close();        }    }        /**     * We put a large hard limit on the cache because we detect the     * need for purging normaly before that limit is reached, and we purge     * it explicitly.     *     * <p/>The number 100 is the maximum number of idle accounts that     * we keep around in case the peer comes back.     **/    private final static Cache allSources = new Cache(100, new MyCacheListener());        private final UnbiasedQueue messageQueue = new UnbiasedQueue( Integer.MAX_VALUE, false, new LinkedList() );        private final String name;    private final InboundMeter incomingMessageListenerMeter;        private final ResourceAccount myAccount;        /**     *  The "real" listener.     **/    private volatile EndpointListener listener = null;        // Close may be called redundantly and it costs.    private boolean closed = false;        /**     *  The last time we warned about having a long queue.     **/    private long lastLongQueueNotification = 0L;        /**     *  An incoming message in the queue with its addresses and accounting     **/    private static class MessageFromSource {        final Message msg;        final EndpointAddress srcAddress;        final EndpointAddress destAddress;        final ResourceAccount src;        final long timeReceived;        final long size;                MessageFromSource( Message msg, EndpointAddress srcAddress,        EndpointAddress destAddress, ResourceAccount src, long timeReceived,        long size) {            this.msg = msg;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -