📄 basicresourcepool.java
字号:
/* * Distributed as part of c3p0 v.0.9.1-pre6 * * Copyright (C) 2005 Machinery For Change, Inc. * * Author: Steve Waldman <swaldman@mchange.com> * * This library is free software; you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License version 2.1, as * published by the Free Software Foundation. * * This software is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this software; see the file LICENSE. If not, write to the * Free Software Foundation, Inc., 59 Temple Place, Suite 330, * Boston, MA 02111-1307, USA. */package com.mchange.v2.resourcepool;import java.util.*;import com.mchange.v2.async.*;import com.mchange.v2.log.*;import com.mchange.v2.holders.SynchronizedIntHolder;import com.mchange.v2.util.ResourceClosedException;class BasicResourcePool implements ResourcePool{ private final static MLogger logger = MLog.getLogger( BasicResourcePool.class ); final static int CULL_FREQUENCY_DIVISOR = 8; final static int MAX_CULL_FREQUENCY = (15 * 60 * 1000); //15 mins //MT: unchanged post c'tor Manager mgr; BasicResourcePoolFactory factory; AsynchronousRunner taskRunner; //MT: protected by this' lock RunnableQueue asyncEventQueue; Timer cullAndIdleRefurbishTimer; TimerTask cullTask; TimerTask idleRefurbishTask; HashSet acquireWaiters = new HashSet(); HashSet otherWaiters = new HashSet(); int target_pool_size; /* keys are all valid, managed resources, value is a Date */ HashMap managed = new HashMap(); /* all valid, managed resources currently available for checkout */ LinkedList unused = new LinkedList(); /* resources which have been invalidated somehow, but which are */ /* still checked out and in use. */ HashSet excluded = new HashSet(); Set idleCheckResources = new HashSet(); ResourcePoolEventSupport rpes = new ResourcePoolEventSupport(this); boolean force_kill_acquires = false; boolean broken = false; //DEBUG only! Object exampleResource; // // members below are unchanging // int start; int min; int max; int inc; int num_acq_attempts; int acq_attempt_delay; long check_idle_resources_delay; //milliseconds long max_resource_age; //milliseconds boolean age_is_absolute; boolean break_on_acquisition_failure; // // end unchanging members // // --- // // members below are changing but protected // by their own locks // int pending_acquires; int pending_removes; // SynchronizedIntHolder pendingAcquiresCounter = new SynchronizedIntHolder();// SynchronizedIntHolder pendingRemovesCounter = new SynchronizedIntHolder();// {// public synchronized void increment() // {// super.increment();// System.err.println("increment() --> " + getValue()); // }// public synchronized void decrement() // {// super.decrement();// System.err.println("decrement() --> " + getValue()); // }// }; // // end changing but protected members // /** * @param factory may be null */ public BasicResourcePool(Manager mgr, int start, int min, int max, int inc, int num_acq_attempts, int acq_attempt_delay, long check_idle_resources_delay, long max_resource_age, boolean age_is_absolute, boolean break_on_acquisition_failure, AsynchronousRunner taskRunner, RunnableQueue asyncEventQueue, Timer cullAndIdleRefurbishTimer, BasicResourcePoolFactory factory) throws ResourcePoolException { try { this.mgr = mgr; this.start = start; this.min = min; this.max = max; this.inc = inc; this.num_acq_attempts = num_acq_attempts; this.acq_attempt_delay = acq_attempt_delay; this.check_idle_resources_delay = check_idle_resources_delay; this.max_resource_age = max_resource_age; this.age_is_absolute = age_is_absolute; this.factory = factory; this.taskRunner = taskRunner; this.asyncEventQueue = asyncEventQueue; this.cullAndIdleRefurbishTimer = cullAndIdleRefurbishTimer; this.pending_acquires = 0; this.pending_removes = 0; this.target_pool_size = Math.max(start, min); //start acquiring our initial resources ensureStartResources(); if (max_resource_age > 0) { long cull_frequency = Math.min( max_resource_age / CULL_FREQUENCY_DIVISOR, MAX_CULL_FREQUENCY ) ; this.cullTask = new CullTask(); cullAndIdleRefurbishTimer.schedule( cullTask, max_resource_age, cull_frequency ); } else age_is_absolute = false; // there's no point keeping track of // the absolute age of things if we // aren't even culling. if (check_idle_resources_delay > 0) { this.idleRefurbishTask = new CheckIdleResourcesTask(); cullAndIdleRefurbishTimer.schedule( idleRefurbishTask, check_idle_resources_delay, check_idle_resources_delay ); } } catch (Exception e) { throw ResourcePoolUtils.convertThrowable( e ); } } public Object checkoutResource() throws ResourcePoolException, InterruptedException { try { return checkoutResource( 0 ); } catch (TimeoutException e) { //this should never happen //e.printStackTrace(); if ( logger.isLoggable( MLevel.WARNING ) ) logger.log( MLevel.WARNING, "Huh??? TimeoutException with no timeout set!!!", e); throw new ResourcePoolException("Huh??? TimeoutException with no timeout set!!!", e); } } // must be called from synchronized method, idempotent private void _recheckResizePool() { if (! broken) { int msz = managed.size(); //int expected_size = msz + pending_acquires - pending_removes;// System.err.print("target: " + target_pool_size);// System.err.println(" (msz: " + msz + "; pending_acquires: " + pending_acquires + "; pending_removes: " + pending_removes + ')'); //new Exception( "_recheckResizePool() STACK TRACE" ).printStackTrace(); int shrink_count; int expand_count; if ((shrink_count = msz - pending_removes - target_pool_size) > 0) shrinkPool( shrink_count ); else if ((expand_count = target_pool_size - (msz + pending_acquires)) > 0) expandPool( expand_count ); } } private synchronized void incrementPendingAcquires() { ++pending_acquires; //new Exception("ACQUIRE SOURCE STACK TRACE").printStackTrace(); } private synchronized void incrementPendingRemoves() { ++pending_removes; //new Exception("REMOVE SOURCE STACK TRACE").printStackTrace(); } private synchronized void decrementPendingAcquires() { --pending_acquires; } private synchronized void decrementPendingRemoves() { --pending_removes; } // idempotent private synchronized void recheckResizePool() { _recheckResizePool(); } // must be called from synchronized method private void expandPool(int count) { for (int i = 0; i < count; ++i) taskRunner.postRunnable( new AcquireTask() ); } // must be called from synchronized method private void shrinkPool(int count) { for (int i = 0; i < count; ++i) taskRunner.postRunnable( new RemoveTask() ); } /* * This function recursively calls itself... under nonpathological * situations, it shouldn't be a problem, but if resources can never * successfully check out for some reason, we might blow the stack... * * by the semantics of wait(), a timeout of zero means forever. */ public synchronized Object checkoutResource( long timeout ) throws TimeoutException, ResourcePoolException, InterruptedException { try { ensureNotBroken(); int available = unused.size(); if (available == 0) { int msz = managed.size(); if (msz < max && msz >= target_pool_size) { target_pool_size = Math.max( Math.min( max, target_pool_size + inc ), min ); //System.err.println("updated target_pool_size: " + target_pool_size); _recheckResizePool(); } awaitAvailable(timeout); //throws timeout exception } Object resc = unused.get(0); unused.remove(0); // this is a hack -- but "doing it right" adds a lot of complexity, and collisions between // an idle check and a checkout should be relatively rare. anyway, it should work just fine. if ( idleCheckResources.contains( resc ) ) { if (Debug.DEBUG && logger.isLoggable( MLevel.FINER)) logger.log( MLevel.FINER, "Resource we want to check out is in idleCheck! (waiting until idle-check completes.) [" + this + "]"); //System.err.println("c3p0-JENNIFER: INFO: Resource we want to check out is in idleCheck! (waiting until idle-check completes.)" + " [" + this + "]"); unused.add(0, resc ); // we'll wait for "something to happen" -- probably an idle check to // complete -- then we'll try again and hope for the best. Thread t = Thread.currentThread(); try { otherWaiters.add ( t ); this.wait( timeout ); ensureNotBroken(); } finally { otherWaiters.remove( t ); } return checkoutResource( timeout ); } if (isExpired( resc ) || !attemptRefurbishResourceOnCheckout( resc )) { removeResource( resc ); ensureMinResources(); return checkoutResource( timeout ); } else { asyncFireResourceCheckedOut( resc, managed.size(), unused.size(), excluded.size() ); if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX) trace(); return resc; } } catch ( ResourceClosedException e ) // one of our async threads died { //System.err.println(this + " -- the pool was found to be closed or broken during an attempt to check out a resource."); //e.printStackTrace(); if (logger.isLoggable( MLevel.SEVERE )) logger.log( MLevel.SEVERE, this + " -- the pool was found to be closed or broken during an attempt to check out a resource.", e ); this.unexpectedBreak(); throw e; } catch ( InterruptedException e ) {// System.err.println(this + " -- an attempt to checkout a resource was interrupted: some other thread " +// "must have either interrupted the Thread attempting checkout, or close() was called on the pool.");// e.printStackTrace(); if (broken) { if (logger.isLoggable( MLevel.FINER )) logger.log(MLevel.FINER, this + " -- an attempt to checkout a resource was interrupted, because the pool is now closed. " + "[Thread: " + Thread.currentThread().getName() + ']', e ); else if (logger.isLoggable( MLevel.INFO )) logger.log(MLevel.INFO, this + " -- an attempt to checkout a resource was interrupted, because the pool is now closed. " + "[Thread: " + Thread.currentThread().getName() + ']'); } else { if (logger.isLoggable( MLevel.WARNING )) { logger.log(MLevel.WARNING, this + " -- an attempt to checkout a resource was interrupted, and the pool is still live: some other thread " + "must have either interrupted the Thread attempting checkout!", e ); } } throw e; } } public synchronized void checkinResource( Object resc ) throws ResourcePoolException { try { //we permit straggling resources to be checked in //without exception even if we are broken if (managed.keySet().contains(resc)) doCheckinManaged( resc ); else if (excluded.contains(resc)) doCheckinExcluded( resc ); else throw new ResourcePoolException("ResourcePool" + (broken ? " [BROKEN!]" : "") + ": Tried to check-in a foreign resource!"); if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX) trace(); } catch ( ResourceClosedException e ) // one of our async threads died {// System.err.println(this + // " - checkinResource( ... ) -- even broken pools should allow checkins without exception. probable resource pool bug.");// e.printStackTrace(); if ( logger.isLoggable( MLevel.SEVERE ) ) logger.log( MLevel.SEVERE, this + " - checkinResource( ... ) -- even broken pools should allow checkins without exception. probable resource pool bug.", e); this.unexpectedBreak(); throw e; } } public synchronized void checkinAll() throws ResourcePoolException { try { Set checkedOutNotExcluded = new HashSet( managed.keySet() ); checkedOutNotExcluded.removeAll( unused ); for (Iterator ii = checkedOutNotExcluded.iterator(); ii.hasNext(); ) doCheckinManaged( ii.next() ); for (Iterator ii = excluded.iterator(); ii.hasNext(); ) doCheckinExcluded( ii.next() ); } catch ( ResourceClosedException e ) // one of our async threads died {// System.err.println(this + // " - checkinAll() -- even broken pools should allow checkins without exception. probable resource pool bug.");// e.printStackTrace(); if ( logger.isLoggable( MLevel.SEVERE ) ) logger.log( MLevel.SEVERE, this + " - checkinAll() -- even broken pools should allow checkins without exception. probable resource pool bug.", e ); this.unexpectedBreak(); throw e; } } public synchronized int statusInPool( Object resc ) throws ResourcePoolException { try { if ( unused.contains( resc ) ) return KNOWN_AND_AVAILABLE; else if ( managed.keySet().contains( resc ) || excluded.contains( resc ) ) return KNOWN_AND_CHECKED_OUT; else return UNKNOWN_OR_PURGED; } catch ( ResourceClosedException e ) // one of our async threads died {// e.printStackTrace(); if ( logger.isLoggable( MLevel.SEVERE ) ) logger.log( MLevel.SEVERE, "Apparent pool break.", e ); this.unexpectedBreak(); throw e; } } public synchronized void markBroken(Object resc) { try { if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX && logger.isLoggable( MLevel.FINER )) logger.log( MLevel.FINER, "Resource " + resc + " marked broken by pool (" + this + ")."); _markBroken( resc ); ensureMinResources(); } catch ( ResourceClosedException e ) // one of our async threads died { //e.printStackTrace(); if ( logger.isLoggable( MLevel.SEVERE ) ) logger.log( MLevel.SEVERE, "Apparent pool break.", e ); this.unexpectedBreak(); } } //min is immutable, no need to synchronize public int getMinPoolSize() { return min; } //max is immutable, no need to synchronize public int getMaxPoolSize() { return max; } public synchronized int getPoolSize() throws ResourcePoolException { return managed.size(); }// //i don't think i like the async, no-guarantees approach// public synchronized void requestResize( int req_sz )// {// if (req_sz > max)// req_sz = max;// else if (req_sz < min)// req_sz = min;// int sz = managed.size();// if (req_sz > sz)// postAcquireUntil( req_sz );// else if (req_sz < sz)// postRemoveTowards( req_sz );// } public synchronized int getAvailableCount() { return unused.size(); } public synchronized int getExcludedCount() { return excluded.size(); } public synchronized int getAwaitingCheckinCount() { return managed.size() - unused.size() + excluded.size(); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -