📄 quotaincomingmessagelistener.java
字号:
this.src = src; this.srcAddress = srcAddress; this.destAddress = destAddress; this.timeReceived = timeReceived; this.size = size; } } /** * The thread which services removing items from the queue **/ private static class ListenerThread extends Thread { /** * We put all of the listeners into a single thread group * **/ // FIXME 20020910 bondolo@jxta.org We need a way to make this group appear in the top group private final static ThreadGroup listenerGroup = new ThreadGroup( "Quota Incoming Message Listeners" ); private final static List idleThreads = new LinkedList(); private QuotaIncomingMessageListener current; private boolean terminated = false; static ListenerThread newListenerThread(QuotaIncomingMessageListener current) { ListenerThread lt = null; synchronized(idleThreads) { if (idleThreads.isEmpty()) { return new ListenerThread(current); } lt = (ListenerThread) idleThreads.remove(0); } // As long as the listenerThread is not in the idle list, it is not // allowed top die. It will keep waiting for that new job. // Since we removed it from the list, only us can give it a new job. lt.newJob(current); return lt; } private ListenerThread(QuotaIncomingMessageListener current) { super( listenerGroup, "QuotaListenerThread" ); this.current = current; setDaemon( true ); start(); } // Not thread safe: only one thread should be able to give a // job at any point in time. The use of "synchronized" here, // is for handshake with wait. void newJob(QuotaIncomingMessageListener current) { synchronized(this) { this.current = current; notify(); } } void terminate() { synchronized(idleThreads) { terminated = true; } interrupt(); } // Wait for a job until we are allowed to retire. // We are forbidden to retire before we can // prove that we can no-longer be given a job: that is, hold // the idle list lock, verify that we're still in the list, // remove ourselves from the idle list, release the lock. If // we believe we're unemployed and then discover that we've // been removed from the idle list, we have or will have a job // soon. boolean getJob() { // We cannot get a job if we do not register // with the employment agency. It is garanteed that we're not in it // at this point: the only add() is below and we never leave this // routine while being on the list. synchronized(idleThreads) { // Since we're perfectly idle and immune from getting a // new job at this point. This is the best time to comply // with a pending termination request. If the request happens // while we wait, then interrupt will expedite the process. // If, wakeing up due to that, we find that we have a job, // then we will comply the next time we are idle again and come // back here. if (terminated) { return false; } idleThreads.add(0, this); // == generic list parlance for addFirst. } // From this point, we could have a new job // Wait until we're given something to do again or // we've waited too long. Three sources of wakeing // up: timeout, new job, interrupted. In all cases we // move on. If we want to retire, we must check that we // truely have nothing to do. In case of collision, // go back to work. while (true) { synchronized(this) { if (current != null) { return true; // Have job already. Go back to work. } // Wait for a job until we're so bored that we try // to retire. try { wait(4 * TimeUtils.ASECOND); } catch(InterruptedException ie) { Thread.interrupted(); } // Ok, did we wake up because we have a job or what ? // If we have job, nomatter what "terminated" or the clock says, // we have to process it. if (current != null) { return true; } } // Can't get a job. We're supposed to go away, but we // must make sure that we are not going to be given a // job while we do it. For that, we need the lock on // the idleList, which means we must release the lock // on this. synchronized(idleThreads) { if (idleThreads.remove(this)) { // Good, we did remove ourselves; which means no other // thread had nor can. Which means that we have not // and will not be given a job. So we can go away. return false; } // We did not remove ourselves; we were already out. // this means that we have been given a job or will be // given one for sure. Go back and wait for it. } } } // When we first run, we always have a job. When we have a job // we can treat current as a local variable. As long as we // have chained job it is forbidden to quit. Once we run out // of chained job, we return to wait for a new one or for a // reason to quit. public void run() { try { do { // Process chained jobs. while (current != null) { current = current.doOne(); } // Ran out. Time to deal with employment. } while(getJob()); } catch ( Throwable all ) { LOG.fatal( "Uncaught Throwable in thread :" + Thread.currentThread().getName(), all ); } } } /** * Constructor for the QuotaIncomingMessageListener object * * @param name a unique name for this listener * @param listener the recipient listener. **/ public QuotaIncomingMessageListener( String name, EndpointListener listener ) { this(name, listener, null); } /** * Constructor for the QuotaIncomingMessageListener object * * @param name a unique name for this listener * @param listener the recipient listener. * @param incomingMessageListenerMeter metering handler. **/ public QuotaIncomingMessageListener( String name, EndpointListener listener, InboundMeter incomingMessageListenerMeter) { this.listener = listener; this.name = name; this.incomingMessageListenerMeter = incomingMessageListenerMeter; synchronized(threadDispatcher) { myAccount = threadDispatcher.newAccount(1, -1, this); threadDispatcher.notify(); // makes a contender run earlier } // Make sure account creation cannot become a denial of service. // Give away a time slice. Thread.yield(); } /** * {@inheritDoc} * * <p/>Returns our name **/ public String toString() { return name; } /** * Gets the listener attribute of the QuotaIncomingMessageListener object * * @return The listener value */ public EndpointListener getListener() { return listener; } /** * Close this listener and release all of its resources. **/ public void close() { LinkedList rmdMessages = new LinkedList(); synchronized(threadDispatcher) { if (closed) { return; } closed = true; // Unplug the listener right away. We no longer want to invoke it, even if another // thread has picked up a message and is about to process it. (this is circumvents // bugs in applications that may not expect the listener to be invoked during or // slightly after closure. We still make no absolute guarantee about that. Making it // 100% sure would create deadlocks). listener = null; messageQueue.close(); // close myAccount if (myAccount.isIdle()){ myAccount.close(); } // Drain the queue into a local list // Do not use (pop(0)); // we do not need to block and since we're not using a // synchronizedUnbiasedQueue, we're not supposed to use the // timeout based routines; they're inconsistent with the // non timeout ones, synch-wise MessageFromSource mfs = null; while ((mfs = (MessageFromSource) messageQueue.pop()) != null) { rmdMessages.add(mfs); } threadDispatcher.notify(); // makes a contender run earlier }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -