📄 inmemoryspoolrepository.java
字号:
/** * Removes a Collection of mails from the repository * @param mails The Collection of <code>MailImpl</code>'s to delete * @throws MessagingException * @since 2.2.0 */ public void remove(Collection mails) throws MessagingException { Iterator delList = mails.iterator(); while (delList.hasNext()) { remove((Mail)delList.next()); } } /** * Removes a message identified by key. * * @param key the key of the message to be removed from the repository */ public void remove(String key) throws MessagingException { if (lock(key)) { try { if (spool != null) spool.remove(key); } finally { unlock(key); } } else { StringBuffer exceptionBuffer = new StringBuffer(64) .append("Cannot lock ") .append(key) .append(" to remove it"); throw new MessagingException(exceptionBuffer.toString()); } } /** * List string keys of messages in repository. * * @return an <code>Iterator</code> over the list of keys in the repository * */ public Iterator list() { // Fix ConcurrentModificationException by cloning // the keyset before getting an iterator final ArrayList clone; synchronized(spool) { clone = new ArrayList(spool.keySet()); } return clone.iterator(); } /** * <p>Returns an arbitrarily selected mail deposited in this Repository. * Usage: SpoolManager calls accept() to see if there are any unprocessed * mails in the spool repository.</p> * * <p>Synchronized to ensure thread safe access to the underlying spool.</p> * * @return the mail */ public synchronized Mail accept() throws InterruptedException { if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { getLogger().debug("Method accept() called"); } return accept(new SpoolRepository.AcceptFilter () { public boolean accept (String _, String __, long ___, String ____) { return true; } public long getWaitTime () { return 0; } }); } /** * <p>Returns an arbitrarily selected mail deposited in this Repository that * is either ready immediately for delivery, or is younger than it's last_updated plus * the number of failed attempts times the delay time. * Usage: RemoteDeliverySpool calls accept() with some delay and should block until an * unprocessed mail is available.</p> * * <p>Synchronized to ensure thread safe access to the underlying spool.</p> * * @return the mail */ public synchronized Mail accept(final long delay) throws InterruptedException { if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { getLogger().debug("Method accept(delay) called"); } return accept(new SpoolRepository.AcceptFilter () { long youngest = 0; public boolean accept (String key, String state, long lastUpdated, String errorMessage) { if (state.equals(Mail.ERROR)) { //Test the time... long timeToProcess = delay + lastUpdated; if (System.currentTimeMillis() > timeToProcess) { //We're ready to process this again return true; } else { //We're not ready to process this. if (youngest == 0 || youngest > timeToProcess) { //Mark this as the next most likely possible mail to process youngest = timeToProcess; } return false; } } else { //This mail is good to go... return the key return true; } } public long getWaitTime () { if (youngest == 0) { return 0; } else { long duration = youngest - System.currentTimeMillis(); youngest = 0; //get ready for next round return duration <= 0 ? 1 : duration; } } }); } /** * Returns an arbitrarily select mail deposited in this Repository for * which the supplied filter's accept method returns true. * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines * based on number of retries if the mail is ready for processing. * If no message is ready the method will block until one is, the amount of time to block is * determined by calling the filters getWaitTime method. * * <p>Synchronized to ensure thread safe access to the underlying spool.</p> * * @return the mail */ public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException { if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { getLogger().debug("Method accept(Filter) called"); } while (!Thread.currentThread().isInterrupted()) try { for (Iterator it = list(); it.hasNext(); ) { String s = it.next().toString(); if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { StringBuffer logBuffer = new StringBuffer(64) .append("Found item ") .append(s) .append(" in spool."); getLogger().debug(logBuffer.toString()); } if (lock(s)) { if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) { getLogger().debug("accept(Filter) has locked: " + s); } try { Mail mail = retrieve(s); // Retrieve can return null if the mail is no longer on the spool // (i.e. another thread has gotten to it first). // In this case we simply continue to the next key if (mail == null || !filter.accept (mail.getName(), mail.getState(), mail.getLastUpdated().getTime(), mail.getErrorMessage())) { unlock(s); continue; } return mail; } catch (javax.mail.MessagingException e) { unlock(s); getLogger().error("Exception during retrieve -- skipping item " + s, e); } } } //We did not find any... let's wait for a certain amount of time wait (filter.getWaitTime()); } catch (InterruptedException ex) { throw ex; } catch (ConcurrentModificationException cme) { // Should never get here now that list methods clones keyset for iterator getLogger().error("CME in spooler - please report to http://james.apache.org", cme); } throw new InterruptedException(); } /** * */ public InMemorySpoolRepository() { spool = new Hashtable(); lock = new Lock(); } public int size() { return spool.size(); } public void clear() { spool.clear(); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -