📄 basicresourcepool.java
字号:
public synchronized void resetPool() { try { for (Iterator ii = cloneOfManaged().keySet().iterator(); ii.hasNext();) markBrokenNoEnsureMinResources(ii.next()); ensureMinResources(); } catch ( ResourceClosedException e ) // one of our async threads died { //e.printStackTrace(); if ( logger.isLoggable( MLevel.SEVERE ) ) logger.log( MLevel.SEVERE, "Apparent pool break.", e ); this.unexpectedBreak(); } } public synchronized void close() throws ResourcePoolException { //we permit closes when we are already broken, so //that resources that were checked out when the break //occured can still be cleaned up close( true ); } public void finalize() throws Throwable { //obviously, clients mustn't rely on finalize, //but must close pools ASAP after use. //System.err.println("finalizing..." + this); if (! broken ) this.close(); } public void addResourcePoolListener(ResourcePoolListener rpl) { if ( asyncEventQueue == null ) throw new RuntimeException(this + " does not support ResourcePoolEvents. " + "Probably it was constructed by a BasicResourceFactory configured not to support such events."); else rpes.addResourcePoolListener(rpl); } public void removeResourcePoolListener(ResourcePoolListener rpl) { if ( asyncEventQueue == null ) throw new RuntimeException(this + " does not support ResourcePoolEvents. " + "Probably it was constructed by a BasicResourceFactory configured not to support such events."); else rpes.removeResourcePoolListener(rpl); } private synchronized boolean isForceKillAcquiresPending() { return force_kill_acquires; } // this is designed as a response to a determination that our resource source is down. // rather than declaring ourselves broken in this case (as we did previously), we // kill all pending acquisition attempts, but retry on new acqusition requests. private synchronized void forceKillAcquires() throws InterruptedException { Thread t = Thread.currentThread(); try { force_kill_acquires = true; this.notifyAll(); //wake up any threads waiting on an acquire, and force them all to die. while (acquireWaiters.size() > 0) //we want to let all the waiting acquires die before we unset force_kill_acquires { otherWaiters.add( t ); this.wait(); } force_kill_acquires = false; } finally { otherWaiters.remove( t ); } } //same as close(), but we do not destroy checked out //resources private synchronized void unexpectedBreak() { if ( logger.isLoggable( MLevel.SEVERE ) ) logger.log( MLevel.SEVERE, this + " -- Unexpectedly broken!!!", new ResourcePoolException("Unexpected Break Stack Trace!") ); close( false ); } private boolean canFireEvents() { return (! broken && asyncEventQueue != null); } private void asyncFireResourceAcquired( final Object resc, final int pool_size, final int available_size, final int removed_but_unreturned_size ) { if ( canFireEvents() ) { Runnable r = new Runnable() { public void run() {rpes.fireResourceAcquired(resc, pool_size, available_size, removed_but_unreturned_size);} }; asyncEventQueue.postRunnable(r); } } private void asyncFireResourceCheckedIn( final Object resc, final int pool_size, final int available_size, final int removed_but_unreturned_size ) { if ( canFireEvents() ) { Runnable r = new Runnable() { public void run() {rpes.fireResourceCheckedIn(resc, pool_size, available_size, removed_but_unreturned_size);} }; asyncEventQueue.postRunnable(r); } } private void asyncFireResourceCheckedOut( final Object resc, final int pool_size, final int available_size, final int removed_but_unreturned_size ) { if ( canFireEvents() ) { Runnable r = new Runnable() { public void run() {rpes.fireResourceCheckedOut(resc,pool_size,available_size,removed_but_unreturned_size);} }; asyncEventQueue.postRunnable(r); } } private void asyncFireResourceRemoved( final Object resc, final boolean checked_out_resource, final int pool_size, final int available_size, final int removed_but_unreturned_size ) { if ( canFireEvents() ) { //System.err.println("ASYNC RSRC REMOVED"); //new Exception().printStackTrace(); Runnable r = new Runnable() { public void run() { rpes.fireResourceRemoved(resc, checked_out_resource, pool_size,available_size,removed_but_unreturned_size); } }; asyncEventQueue.postRunnable(r); } } // needn't be called from a sync'ed method private void destroyResource(final Object resc) { destroyResource( resc, false ); } // needn't be called from a sync'ed method private void destroyResource(final Object resc, boolean synchronous) { Runnable r = new Runnable() { public void run() { try { if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX && logger.isLoggable( MLevel.FINER )) logger.log(MLevel.FINER, "Preparing to destroy resource: " + resc); mgr.destroyResource(resc); if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX && logger.isLoggable( MLevel.FINER )) logger.log(MLevel.FINER, "Successfully destroyed resource: " + resc); } catch ( Exception e ) { if ( logger.isLoggable( MLevel.WARNING ) ) logger.log( MLevel.WARNING, "Failed to destroy resource: " + resc, e );// System.err.println("Failed to destroy resource: " + resc);// e.printStackTrace(); } } }; if ( synchronous || broken ) //if we're broken, our taskRunner may be dead, so we destroy synchronously r.run(); else { try { taskRunner.postRunnable( r ); } catch (Exception e) { if (logger.isLoggable(MLevel.FINER)) logger.log( MLevel.FINER, "AsynchronousRunner refused to accept task to destroy resource. " + "It is probably shared, and has probably been closed underneath us. " + "Reverting to synchronous destruction. This is not usually a problem.", e ); destroyResource( resc, true ); } } } //this method SHOULD NOT be invoked from a synchronized //block!!!! private void doAcquire() throws Exception { Object resc = mgr.acquireResource(); //note we acquire the resource while we DO NOT hold the pool's lock! boolean destroy = false; int msz; synchronized(this) //assimilate resc if we do need it { msz = managed.size(); if (msz < target_pool_size) assimilateResource(resc); else destroy = true; } if (destroy) { mgr.destroyResource( resc ); //destroy resc if superfluous, without holding the pool's lock if (logger.isLoggable( MLevel.FINER)) logger.log(MLevel.FINER, "destroying overacquired resource: " + resc); } } public synchronized void setPoolSize( int sz ) throws ResourcePoolException { try { setTargetPoolSize( sz ); while ( managed.size() != sz ) this.wait(); } catch (Exception e) { String msg = "An exception occurred while trying to set the pool size!"; if ( logger.isLoggable( MLevel.FINER ) ) logger.log( MLevel.FINER, msg, e ); throw ResourcePoolUtils.convertThrowable( msg, e ); } } public synchronized void setTargetPoolSize(int sz) { if (sz > max) { throw new IllegalArgumentException("Requested size [" + sz + "] is greater than max [" + max + "]."); } else if (sz < min) { throw new IllegalArgumentException("Requested size [" + sz + "] is less than min [" + min + "]."); } this.target_pool_size = sz; _recheckResizePool(); }// private void acquireUntil(int num) throws Exception// {// int msz = managed.size();// for (int i = msz; i < num; ++i)// assimilateResource();// } //the following methods should only be invoked from //sync'ed methods / blocks...// private Object useUnusedButNotInIdleCheck()// {// for (Iterator ii = unused.iterator(); ii.hasNext(); )// {// Object maybeOut = ii.next();// if (! idleCheckResources.contains( maybeOut ))// {// ii.remove();// return maybeOut;// }// }// throw new RuntimeException("Internal Error -- the pool determined that it did have a resource available for checkout, but was unable to find one.");// }// private int actuallyAvailable()// { return unused.size() - idleCheckResources.size(); } private void markBrokenNoEnsureMinResources(Object resc) { try { _markBroken( resc ); } catch ( ResourceClosedException e ) // one of our async threads died { //e.printStackTrace(); if ( logger.isLoggable( MLevel.SEVERE ) ) logger.log( MLevel.SEVERE, "Apparent pool break.", e ); this.unexpectedBreak(); } } private void _markBroken( Object resc ) { if ( unused.contains( resc ) ) removeResource( resc ); else excludeResource( resc ); } //DEBUG //Exception firstClose = null; public synchronized void close( boolean close_checked_out_resources ) { if (! broken ) //ignore repeated calls to close { //DEBUG //firstClose = new Exception("First close() -- debug stack trace [CRAIG]"); //firstClose.printStackTrace(); this.broken = true; final Collection cleanupResources = ( close_checked_out_resources ? (Collection) cloneOfManaged().keySet() : (Collection) cloneOfUnused() ); if ( cullTask != null ) cullTask.cancel(); if (idleRefurbishTask != null) idleRefurbishTask.cancel(); // we destroy resources asynchronously, but with a dedicated one-off Thread, rather than // our asynchronous runner, because our asynchrous runner may be shutting down. The // destruction is asynchrounous because destroying a resource might require the resource's // lock, and we already have the pool's lock. But client threads may well have the resource's // lock while they try to check-in to the pool. The async destruction of resources avoids // the possibility of deadlock. managed.keySet().removeAll( cleanupResources ); unused.removeAll( cleanupResources ); Thread resourceDestroyer = new Thread("Resource Destroyer in BasicResourcePool.close()") { public void run() { for (Iterator ii = cleanupResources.iterator(); ii.hasNext();) { try { Object resc = ii.next(); //System.err.println("Destroying resource... " + resc); destroyResource( resc, true ); } catch (Exception e) { if (Debug.DEBUG) { //e.printStackTrace(); if ( logger.isLoggable( MLevel.FINE ) ) logger.log( MLevel.FINE, "BasicResourcePool -- A resource couldn't be cleaned up on close()", e ); } } } } }; resourceDestroyer.start(); for (Iterator ii = acquireWaiters.iterator(); ii.hasNext(); ) ((Thread) ii.next()).interrupt(); for (Iterator ii = otherWaiters.iterator(); ii.hasNext(); ) ((Thread) ii.next()).interrupt(); if (factory != null) factory.markBroken( this ); // System.err.println(this + " closed."); } else { if ( logger.isLoggable( MLevel.WARNING ) ) logger.warning(this + " -- close() called multiple times."); //System.err.println(this + " -- close() called multiple times."); //DEBUG //firstClose.printStackTrace(); //new Exception("Repeat close() [CRAIG]").printStackTrace(); } } private void doCheckinManaged( final Object resc ) throws ResourcePoolException { if (unused.contains(resc)) { if ( Debug.DEBUG ) throw new ResourcePoolException("Tried to check-in an already checked-in resource: " + resc); } else { Runnable doMe = new Runnable() { public void run() { boolean resc_okay = attemptRefurbishResourceOnCheckin( resc ); synchronized( BasicResourcePool.this ) { if ( resc_okay ) { unused.add(0, resc ); if (! age_is_absolute ) //we need to reset the clock, 'cuz we are counting idle time managed.put( resc, new Date() ); } else { removeResource( resc ); ensureMinResources(); } asyncFireResourceCheckedIn( resc, managed.size(), unused.size(), excluded.size() ); BasicResourcePool.this.notifyAll(); } } }; taskRunner.postRunnable( doMe ); } } private void doCheckinExcluded( Object resc ) { excluded.remove(resc); destroyResource(resc); } /* * by the semantics of wait(), a timeout of zero means forever. */ private void awaitAvailable(long timeout) throws InterruptedException, TimeoutException, ResourcePoolException { if (force_kill_acquires) throw new ResourcePoolException("A ResourcePool cannot acquire a new resource -- the factory or source appears to be down."); Thread t = Thread.currentThread(); try { acquireWaiters.add( t ); int avail; long start = ( timeout > 0 ? System.currentTimeMillis() : -1); if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX) { if ( logger.isLoggable( MLevel.FINE ) ) logger.fine("awaitAvailable(): " + (exampleResource != null ? exampleResource : "[unknown]") ); trace(); } while ((avail = unused.size()) == 0) { // the if case below can only occur when 1) a user attempts a // checkout which would provoke an acquire; 2) this // increments the pending acquires, so we go to the // wait below without provoking postAcquireMore(); 3) // the resources are acquired; 4) external management // of the pool (via for instance unpoolResource() // depletes the newly acquired resources before we // regain this' monitor; 5) we fall into wait() with // no acquires being scheduled, and perhaps a managed.size() // of zero, leading to deadlock. This could only occur in // fairly pathological situations where the pool is being // externally forced to a very low (even zero) size, but // since I've seen it, I've fixed it. if (pending_acquires == 0 && managed.size() < max) recheckResizePool(); this.wait(timeout); if (timeout > 0 && System.currentTimeMillis() - start > timeout) throw new TimeoutException("internal -- timeout at awaitAvailable()"); if (force_kill_acquires) throw new CannotAcquireResourceException("A ResourcePool could not acquire a resource from its primary factory or source."); ensureNotBroken(); } } finally { acquireWaiters.remove( t ); if (acquireWaiters.size() == 0) this.notifyAll(); } } private void assimilateResource( Object resc ) throws Exception { managed.put(resc, new Date()); unused.add(0, resc); //System.err.println("assimilate resource... unused: " + unused.size()); asyncFireResourceAcquired( resc, managed.size(), unused.size(), excluded.size() ); this.notifyAll(); if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX) trace(); if (Debug.DEBUG && exampleResource == null)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -