⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 quotaincomingmessagelistener.java

📁 jxta_src_2.41b jxta 2.41b 最新版源码 from www.jxta.org
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
            this.src = src;            this.srcAddress = srcAddress;            this.destAddress = destAddress;            this.timeReceived = timeReceived;            this.size = size;        }    }        /**     *  The thread which services removing items from the queue     **/    private static class ListenerThread extends Thread {                /**         *  We put all of the listeners into a single thread group         *         **/        // FIXME 20020910 bondolo@jxta.org We need a way to make this group appear in the top group        private final static ThreadGroup listenerGroup = new ThreadGroup( "Quota Incoming Message Listeners" );                private final static List idleThreads = new LinkedList();                private QuotaIncomingMessageListener current;                private boolean terminated = false;                static ListenerThread newListenerThread(QuotaIncomingMessageListener current) {                        ListenerThread lt = null;            synchronized(idleThreads) {                if (idleThreads.isEmpty()) {                    return new ListenerThread(current);                }                                lt = (ListenerThread) idleThreads.remove(0);            }                        // As long as the listenerThread is not in the idle list, it is not            // allowed top die. It will keep waiting for that new job.            // Since we removed it from the list, only us can give it a new job.            lt.newJob(current);            return lt;        }                private ListenerThread(QuotaIncomingMessageListener current) {            super( listenerGroup, "QuotaListenerThread" );            this.current = current;            setDaemon( true );            start();        }                        // Not thread safe: only one thread should be able to give a        // job at any point in time.  The use of "synchronized" here,        // is for handshake with wait.        void newJob(QuotaIncomingMessageListener current) {            synchronized(this) {                this.current = current;                notify();            }        }                void terminate() {            synchronized(idleThreads) {                terminated = true;            }            interrupt();        }                // Wait for a job until we are allowed to retire.        // We are forbidden to retire before we can        // prove that we can no-longer be given a job: that is, hold        // the idle list lock, verify that we're still in the list,        // remove ourselves from the idle list, release the lock. If        // we believe we're unemployed and then discover that we've        // been removed from the idle list, we have or will have a job        // soon.                boolean getJob() {                        // We cannot get a job if we do not register            // with the employment agency. It is garanteed that we're not in it            // at this point: the only add() is below and we never leave this            // routine while being on the list.                        synchronized(idleThreads) {                // Since we're perfectly idle and immune from getting a                // new job at this point. This is the best time to comply                // with a pending termination request. If the request happens                // while we wait, then interrupt will expedite the process.                // If, wakeing up due to that, we find that we have a job,                // then we will comply the next time we are idle again and come                // back here.                                if (terminated) {                    return false;                }                                idleThreads.add(0, this); // == generic list parlance for addFirst.            }                        // From this point, we could have a new job                        // Wait until we're given something to do again or            // we've waited too long.  Three sources of wakeing            // up: timeout, new job, interrupted. In all cases we            // move on. If we want to retire, we must check that we            // truely have nothing to do. In case of collision,            // go back to work.                        while (true) {                synchronized(this) {                    if (current != null) {                        return true; // Have job already. Go back to work.                    }                                        // Wait for a job until we're so bored that we try                    // to retire.                    try {                        wait(4 * TimeUtils.ASECOND);                    } catch(InterruptedException ie) {                        Thread.interrupted();                    }                                        // Ok, did we wake up because we have a job or what ?                                        // If we have job, nomatter what "terminated" or the clock says,                    // we have to process it.                                        if (current != null) {                        return true;                    }                }                                // Can't get a job. We're supposed to go away, but we                // must make sure that we are not going to be given a                // job while we do it.  For that, we need the lock on                // the idleList, which means we must release the lock                // on this.                                synchronized(idleThreads) {                                        if (idleThreads.remove(this)) {                        // Good, we did remove ourselves; which means no other                        // thread had nor can. Which means that we have not                        // and will not be given a job. So we can go away.                        return false;                    }                                        // We did not remove ourselves; we were already out.                    // this means that we have been given a job or will be                    // given one for sure. Go back and wait for it.                }            }        }                // When we first run, we always have a job. When we have a job        // we can treat current as a local variable. As long as we        // have chained job it is forbidden to quit.  Once we run out        // of chained job, we return to wait for a new one or for a        // reason to quit.                public void run() {                        try {                do {                    // Process chained jobs.                                        while (current != null) {                        current = current.doOne();                    }                                        // Ran out. Time to deal with employment.                } while(getJob());                            } catch ( Throwable all ) {                LOG.fatal( "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all );            }        }    }        /**     * Constructor for the QuotaIncomingMessageListener object     *     * @param  name     a unique name for this listener     * @param  listener the recipient listener.     **/    public QuotaIncomingMessageListener( String name, EndpointListener listener ) {        this(name, listener, null);    }        /**     * Constructor for the QuotaIncomingMessageListener object     *     * @param  name             a unique name for this listener     * @param  listener         the recipient listener.     * @param  incomingMessageListenerMeter     metering handler.     **/    public QuotaIncomingMessageListener( String name, EndpointListener listener, InboundMeter incomingMessageListenerMeter) {        this.listener = listener;        this.name = name;        this.incomingMessageListenerMeter = incomingMessageListenerMeter;                synchronized(threadDispatcher) {            myAccount = threadDispatcher.newAccount(1, -1, this);            threadDispatcher.notify(); // makes a contender run earlier        }                // Make sure account creation cannot become a denial of service.        // Give away a time slice.        Thread.yield();    }        /**     *  {@inheritDoc}     *     *  <p/>Returns our name     **/    public String toString() {        return name;    }        /**     * Gets the listener attribute of the QuotaIncomingMessageListener object     *     * @return    The listener value     */    public EndpointListener getListener() {        return listener;    }        /**     *  Close this listener and release all of its resources.     **/    public void close() {                LinkedList rmdMessages = new LinkedList();                synchronized(threadDispatcher) {            if (closed) {                return;            }            closed = true;                        // Unplug the listener right away. We no longer want to invoke it, even if another            // thread has picked up a message and is about to process it. (this is circumvents            // bugs in applications that may not expect the listener to be invoked during or            // slightly after closure. We still make no absolute guarantee about that. Making it            // 100% sure would create deadlocks).                        listener = null;                        messageQueue.close();                        // close myAccount            if (myAccount.isIdle()){                myAccount.close();            }                        // Drain the queue into a local list            // Do not use (pop(0));            // we do not need to block and since we're not using a            // synchronizedUnbiasedQueue, we're not supposed to use the            // timeout based routines; they're inconsistent with the            // non timeout ones, synch-wise                        MessageFromSource mfs = null;            while ((mfs = (MessageFromSource) messageQueue.pop()) != null) {                rmdMessages.add(mfs);            }                        threadDispatcher.notify(); // makes a contender run earlier        }        

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -