📄 blockingqueuewrapper.java
字号:
/*------------------------------------------------------------------------------Name: BlockingQueueWrapper.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE file------------------------------------------------------------------------------*/package org.xmlBlaster.util.queue;import java.util.ArrayList;import java.util.logging.Level;import java.util.logging.Logger;import org.xmlBlaster.util.Global;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.def.ErrorCode;/** * BlockingQueueWrapper is a wrapper to I_Queue which can be used to perform a * blocking peek on an I_Queue. * * @author <a href="mailto:michele@laghi.eu">Michele Laghi</a> */public class BlockingQueueWrapper implements I_StorageSizeListener { private static Logger log = Logger.getLogger(BlockingQueueWrapper.class.getName()); private long pollInterval = 1000L; private I_Queue queue; private boolean isRegistered; private boolean waiting; public interface I_BlockingQueueCb { ArrayList queueOperation(I_Queue queue, int numEntries, long numBytes, int minPrio, int maxPrio, I_QueueEntry limitEntry) throws XmlBlasterException; } /** * Constructor * @param pollInterval time in milliseconds to wait before a check about the * queue size is done. */ public BlockingQueueWrapper(long pollInterval) { if (pollInterval > 0L) this.pollInterval = pollInterval; else log.warning("The requested pollInterval is negative, will set it to the default value '" + this.pollInterval + "'"); } public BlockingQueueWrapper() { this(1000L); } public synchronized void init(I_Queue queue) throws XmlBlasterException { if (queue == null) throw new XmlBlasterException(Global.instance(), ErrorCode.USER_CONFIGURATION, "The queue passed is null"); this.queue = queue; } public void clear() { I_Queue q = this.queue; if (q != null) q.clear(); } /** * @return null after shutdown */ public I_Queue getQueue() { return this.queue; } public synchronized void shutdown() { I_Queue q = this.queue; if (q != null) { if (this.isRegistered) q.removeStorageSizeListener(this); } this.queue = null; } /** * Enforced by I_StorageSizeListener. * * @param queue * @param numEntries * @param numBytes * @param isShutdown */ public void changed(I_Storage storage, long numEntries, long numBytes, boolean isShutdown) { if (this.waiting) { // to optimize performance we check if really needed synchronized(this) { try { this.notify(); } catch (IllegalMonitorStateException ex) { if (log.isLoggable(Level.INFO)) { log.warning("A notify occured when the object was not synchronized"); ex.printStackTrace(); } } } } } /** * Blocks until at least numOfEntries are found in the queue, or the timeout has occured. This method can return partial results, * i.e. if the requested amout of entries is 10 and the number of entries in the queue is 4 when the timeout occurs, then the * four entries found are returned. * * This method works best if the queue performs its puts without inhibiting queueSizeEvents (the second argument in the put method), * since the put would be intercepted directly. However even if the putter decides to inhibit this event, this method will poll * according to what specified in the constructor and will work anyway (possibly with a slight offset after the put has occured). * * @param numOfEntries the number of entries to return (or the maximum number of entries if the timeout occurs). * @param timeout The timeout in milliseconds to wait until to return, timeout=0: no blocking, timeout<0:Infinite blocking * @return The ArrayList containing the I_Entry entries of the queue found. * @throws XmlBlasterException if the queue is null or if the backend queue throws an Exception. */ private final synchronized ArrayList blockingQueueOperation(int numOfEntries, long timeout, int minPrio, int maxPrio, I_QueueEntry limitEntry, I_BlockingQueueCb cb) throws XmlBlasterException { I_Queue q = this.queue; if (q == null) throw new XmlBlasterException(Global.instance(), ErrorCode.USER_JDBC_INVALID, "The invoked queue is null (already shutdown ?)"); ArrayList ret = q.peek(numOfEntries, -1L); // TODO: if numOfEntries == -1 und timeout > 0L we expect at least one if ((ret.size() > 0 && ret.size() >= numOfEntries) || timeout == 0L) // should be sufficient a == return ret; try { this.waiting = true; if (!this.isRegistered) { this.isRegistered = true; q.addStorageSizeListener(this); } long endTime = System.currentTimeMillis() + timeout; long remainingTime = 0L; boolean infiniteBlocking = (timeout < 0L); while ( (remainingTime=endTime-System.currentTimeMillis()) > 0L || infiniteBlocking) { if ((numOfEntries != -1 && q.getNumOfEntries() >= numOfEntries) || numOfEntries == -1 && q.getNumOfEntries() > 0) { return cb.queueOperation(q, numOfEntries, -1L, minPrio, maxPrio, limitEntry); } long sleepTime = Math.max(remainingTime, this.pollInterval); this.wait(sleepTime); } return cb.queueOperation(q, numOfEntries, -1L, minPrio, maxPrio, limitEntry); } catch (InterruptedException ex) { ex.printStackTrace(); return cb.queueOperation(q, numOfEntries, -1L, minPrio, maxPrio, limitEntry); } finally { this.waiting = false; } } public ArrayList blockingPeek(int numOfEntries, long timeout) throws XmlBlasterException { return blockingQueueOperation(numOfEntries, timeout, 0, 0, null, new I_BlockingQueueCb() { public ArrayList queueOperation(I_Queue queue, int numEntries, long numBytes, int minPrio, int maxPrio, I_QueueEntry limitEntry) throws XmlBlasterException { return queue.peek(numEntries, numBytes); } }); } public ArrayList blockingTakeLowest(int numOfEntries, long timeout, I_QueueEntry limitEntry) throws XmlBlasterException { return blockingQueueOperation(numOfEntries, timeout, 0, 0, limitEntry, new I_BlockingQueueCb() { public ArrayList queueOperation(I_Queue queue, int numEntries, long numBytes, int minPrio, int maxPrio, I_QueueEntry limitEntry) throws XmlBlasterException { boolean leaveOne = false; return queue.takeLowest(numEntries, numBytes, limitEntry, leaveOne); } }); } public ArrayList blockingPeekLowest(int numOfEntries, long timeout, I_QueueEntry limitEntry) throws XmlBlasterException { return blockingQueueOperation(numOfEntries, timeout, 0, 0, limitEntry, new I_BlockingQueueCb() { public ArrayList queueOperation(I_Queue queue, int numEntries, long numBytes, int minPrio, int maxPrio, I_QueueEntry limitEntry) throws XmlBlasterException { boolean leaveOne = false; return queue.peekLowest(numEntries, numBytes, limitEntry, leaveOne); } }); } public ArrayList blockingTakeWithPriority(int numOfEntries, long timeout, int minPrio, int maxPrio) throws XmlBlasterException { return blockingQueueOperation(numOfEntries, timeout, minPrio, maxPrio, null, new I_BlockingQueueCb() { public ArrayList queueOperation(I_Queue queue, int numEntries, long numBytes, int minPrio, int maxPrio, I_QueueEntry limitEntry) throws XmlBlasterException { return queue.takeWithPriority(numEntries, numBytes, minPrio, maxPrio); } }); } public ArrayList blockingPeekWithPriority(int numOfEntries, long timeout, int minPrio, int maxPrio) throws XmlBlasterException { return blockingQueueOperation(numOfEntries, timeout, minPrio, maxPrio, null, new I_BlockingQueueCb() { public ArrayList queueOperation(I_Queue queue, int numEntries, long numBytes, int minPrio, int maxPrio, I_QueueEntry limitEntry) throws XmlBlasterException { return queue.peekWithPriority(numEntries, numBytes, minPrio, maxPrio); } }); } public ArrayList blockingPeekSamePriority(int numOfEntries, long timeout) throws XmlBlasterException { return blockingQueueOperation(numOfEntries, timeout, 0, 0, null, new I_BlockingQueueCb() { public ArrayList queueOperation(I_Queue queue, int numEntries, long numBytes, int minPrio, int maxPrio, I_QueueEntry limitEntry) throws XmlBlasterException { return queue.peekSamePriority(numEntries, numBytes); } }); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -