📄 pollingthread.java
字号:
/* * Copyright 2004-2005 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. *//* * @(#)JavaMailServiceImpl.java */package samples.connectors.mailconnector.ra.inbound;import javax.resource.NotSupportedException;import javax.resource.spi.*;import javax.resource.spi.endpoint.*;import javax.resource.spi.work.*;import javax.resource.cci.*;import javax.resource.*;import javax.mail.*;import javax.mail.internet.*;import java.util.*;import java.util.logging.*;/** * * @author Alejandro Murillo * */public class PollingThread implements Work { public static final Logger logger = Logger.getLogger("samples.connectors.mailconnector.ra.inbound"); static ResourceBundle resource = java.util.ResourceBundle.getBundle("samples.connectors.mailconnector.ra.inbound.LocalStrings"); private boolean active = false; protected transient WorkManager workManager; private transient HashMap endpointConsumers = null; private static int QUANTUM = 10000; // 10 Seconds /** * Constructor. */ public PollingThread(WorkManager workManager) { this.active = true; this.workManager = workManager; /* Set up the hash tables for the use of the resource adapter. * These tables hold references to MessageEndpointFactory and * endpointConsumers. The factoryToConsumer table links the Message * factory id to the Consumer Id. */ endpointConsumers = new HashMap(10); logger.info("[PollinThread::Constructor] Leaving"); } /** * release: called by the WorkerManager */ public void release() { logger.info("[S] Worker Manager called release for PollingThread "); active = false; } /** * run */ public void run() { logger.info("[PT] WorkManager started polling thread "); // do not overuse system resources //setPriority(Thread.MIN_PRIORITY); while (active) { try { pollEndpoints(); Thread.sleep(QUANTUM); } catch(Exception e) { e.printStackTrace(); } } logger.info("[PT] Polling Thread Leaving"); } private void pollEndpoints() { //logger.info("[PT] polling endpoints entering"); synchronized(endpointConsumers) { Collection consumers = endpointConsumers.entrySet(); if ( consumers != null ) { Iterator iter = consumers.iterator(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); EndpointConsumer ec = (EndpointConsumer) entry.getValue(); try { if (ec.hasNewMessages()) scheduleMessageDeliveryThread(ec); } catch(Exception e) { e.printStackTrace(); } } } } //logger.info("[PT] Polling endpoints Leaving"); } /** * @param message the message to be delivered */ private void scheduleMessageDeliveryThread(EndpointConsumer ec) throws Exception { logger.info("[PT] scheduling a delivery FROM: " + ec.getUniqueKey()); try { Work deliveryThread = new DeliveryThread(ec); workManager.scheduleWork(deliveryThread); } catch (WorkRejectedException ex) { NotSupportedException newEx = new NotSupportedException( java.text.MessageFormat.format( resource.getString( "resourceadapterimpl.worker_activation_rejected"), new Object[] { ex.getMessage() })); newEx.initCause(ex); throw newEx; } catch (Exception ex) { NotSupportedException newEx = new NotSupportedException( java.text.MessageFormat.format( resource.getString( "resourceadapterimpl.worker_activation_failed"), new Object[] { ex.getMessage() })); newEx.initCause(ex); throw newEx; } } public void stopPolling() { removeAllEndpointConsumers(); this.active = false; } public void addEndpointConsumer(MessageEndpointFactory endpointFactory, EndpointConsumer ec) { logger.finest("[PT.addEndpointConsumer()] Entered"); synchronized(endpointConsumers) { endpointConsumers.put(endpointFactory, ec); } } public void removeEndpointConsumer(MessageEndpointFactory endpointFactory) { logger.finest("[PT.removeEndpointConsumer()] Entered"); EndpointConsumer ec = (EndpointConsumer) endpointConsumers.get(endpointFactory); synchronized (endpointConsumers) { endpointConsumers.remove(ec); } } /** * Iterates through the endpointConsumers, shutting them down * and preparing for stopping the Resource Adapter. */ private void removeAllEndpointConsumers() { synchronized(endpointConsumers) { Collection consumers = endpointConsumers.entrySet(); if ( consumers != null ) { Iterator iter = consumers.iterator(); while (iter.hasNext()) { Map.Entry entry = (Map.Entry) iter.next(); EndpointConsumer ec = (EndpointConsumer) entry.getValue(); try { endpointConsumers.remove(ec); } catch(Exception e) { e.printStackTrace(); } } } } endpointConsumers = null; }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -