📄 jmsconnectormanager.java
字号:
ShareableObjectPool vendorConnectors = null; synchronized (vendorConnectorPools) { String vendorId = conn.getVendorAdapter().getVendorId(); vendorConnectors = getVendorPool(vendorId); // it's possible the pool does not yet exist (if, for example, the connector // is created before invoking the call/JMSTransport, as is the case with // SimpleJMSListener) if (vendorConnectors == null) { vendorConnectors = new ShareableObjectPool(); vendorConnectorPools.put(vendorId, vendorConnectors); } } synchronized (vendorConnectors) { vendorConnectors.addObject(conn); } if (log.isDebugEnabled()) { log.debug("Exit: JMSConnectorManager::addConnectorToPool"); } } /** * Removes a JMSConnector from the appropriate vendor pool */ public void removeConnectorFromPool(JMSConnector conn) { if (log.isDebugEnabled()) { log.debug("Enter: JMSConnectorManager::removeConnectorFromPool"); } ShareableObjectPool vendorConnectors = null; synchronized (vendorConnectorPools) { vendorConnectors = getVendorPool(conn.getVendorAdapter().getVendorId()); } if (vendorConnectors == null) return; synchronized (vendorConnectors) { // first release, to decrement the ref count (it is automatically incremented when // the connector is matched) vendorConnectors.release(conn); vendorConnectors.removeObject(conn); } if (log.isDebugEnabled()) { log.debug("Exit: JMSConnectorManager::removeConnectorFromPool"); } } /** * Performs a non-exclusive checkout of the JMSConnector */ public void reserve(JMSConnector connector) throws Exception { ShareableObjectPool pool = null; synchronized (vendorConnectorPools) { pool = getVendorPool(connector.getVendorAdapter().getVendorId()); } if (pool != null) pool.reserve(connector); } /** * Performs a non-exclusive checkin of the JMSConnector */ public void release(JMSConnector connector) { ShareableObjectPool pool = null; synchronized (vendorConnectorPools) { pool = getVendorPool(connector.getVendorAdapter().getVendorId()); } if (pool != null) pool.release(connector); } /** * A simple non-blocking pool impl for objects that can be shared. * Only a ref count is necessary to prevent collisions at shutdown. * Todo: max size, cleanup stale connections */ public class ShareableObjectPool { // maps object to ref count wrapper private java.util.HashMap m_elements; // holds objects which should no longer be leased (pending removal) private java.util.HashMap m_expiring; private int m_numElements = 0; public ShareableObjectPool() { m_elements = new java.util.HashMap(); m_expiring = new java.util.HashMap(); } /** * Adds the object to the pool, if not already added */ public void addObject(Object obj) { ReferenceCountedObject ref = new ReferenceCountedObject(obj); synchronized (m_elements) { if (!m_elements.containsKey(obj) && !m_expiring.containsKey(obj)) m_elements.put(obj, ref); } } /** * Removes the object from the pool. If the object is reserved, * waits the specified time before forcibly removing * Todo: check expirations with the next request instead of holding up the current request */ public void removeObject(Object obj, long waitTime) { ReferenceCountedObject ref = null; synchronized (m_elements) { ref = (ReferenceCountedObject)m_elements.get(obj); if (ref == null) return; m_elements.remove(obj); if (ref.count() == 0) return; else // mark the object for expiration m_expiring.put(obj, ref); } // connector is now marked for expiration. wait for the ref count to drop to zero long expiration = System.currentTimeMillis() + waitTime; while (ref.count() > 0) { try { Thread.sleep(5000); } catch (InterruptedException e) {} // ignore if (System.currentTimeMillis() > expiration) break; } // also clear from the expiring list m_expiring.remove(obj); } public void removeObject(Object obj) { removeObject(obj, DEFAULT_WAIT_FOR_SHUTDOWN); } /** * Marks the connector as in use by incrementing the connector's reference count */ public void reserve(Object obj) throws Exception { synchronized (m_elements) { if (m_expiring.containsKey(obj)) throw new Exception("resourceUnavailable"); ReferenceCountedObject ref = (ReferenceCountedObject)m_elements.get(obj); ref.increment(); } } /** * Decrements the connector's reference count */ public void release(Object obj) { synchronized (m_elements) { ReferenceCountedObject ref = (ReferenceCountedObject)m_elements.get(obj); ref.decrement(); } } public synchronized java.util.Set getElements() { return m_elements.keySet(); } public synchronized int size() { return m_elements.size(); } /** * Wrapper to track the use count of an object */ public class ReferenceCountedObject { private Object m_object; private int m_refCount; public ReferenceCountedObject(Object obj) { m_object = obj; m_refCount = 0; } public synchronized void increment() { m_refCount++; } public synchronized void decrement() { if (m_refCount > 0) m_refCount--; } public synchronized int count() { return m_refCount; } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -