📄 jdbcconnectionpool.java
字号:
/*------------------------------------------------------------------------------Name: JdbcConnectionPool.javaProject: xmlBlaster.orgCopyright: xmlBlaster.org, see xmlBlaster-LICENSE fileComment: JDBC Connection Pool for persistent queues.Author: michele@laghi.eu------------------------------------------------------------------------------*/package org.xmlBlaster.util.queue.jdbc;import org.apache.commons.lang.Tokenizer;import java.util.logging.Logger;import java.util.logging.Level;import org.xmlBlaster.util.ReplaceVariable;import org.xmlBlaster.util.ThreadLister;import org.xmlBlaster.util.XmlBlasterException;import org.xmlBlaster.util.Global;import java.util.Properties;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import java.sql.SQLWarning;import java.sql.DatabaseMetaData;import java.util.Hashtable;import org.xmlBlaster.util.I_Timeout;import org.xmlBlaster.util.def.ErrorCode;import org.xmlBlaster.util.queue.I_StorageProblemListener;import org.xmlBlaster.util.queue.I_StorageProblemNotifier;import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;/** * A Pool of connections to the database to be used for a persistent queue. To * keep genericity, queries and update strings are read from properties. * @see <a href="http://www.xmlBlaster.org/xmlBlaster/doc/requirements/queue.jdbc.commontable.html">The queue.jdbc.commontable requirement</a> */public class JdbcConnectionPool implements I_Timeout, I_StorageProblemNotifier { private static String ME = "JdbcConnectionPool"; private static Logger log = Logger.getLogger(JdbcConnectionPool.class.getName()); private Global glob = null; private BoundedLinkedQueue connections; /** the initial capacity of this pool. */ private int capacity; private int waitingCalls = 0; private long connectionBusyTimeout = 20*60*1000L; /* On high load we wait up to 20 min until xmlBlaster shuts down */ private int maxWaitingThreads = 200; private Hashtable mapping = null; private boolean initialized = false; private String tableNamePrefix = "XB"; // stands for "XMLBLASTER", it is chosen short for Postgres max. eval length = 26 chars (timestamp has already 19 chars) private String colNamePrefix = ""; // SQLServer does not allow column name 'byteSize' and 'dataId', so we can add a token e.g. XBbyteSize, XBdataId private int tableAllocationIncrement = 2; /** will be set when a connecton is broken */ private int status = I_StorageProblemListener.UNDEF; private boolean waitingForReentrantConnections = false; private String url; private String user; private String password; private long reconnectionTimeout = 10000L; private I_StorageProblemListener storageProblemListener = null; private static boolean firstConnectError = true; private Properties pluginProp = null; private boolean dbAdmin = true; /** Sets the number of seconds the driver will wait for a * <code>Statement</code> object to execute to the given number of seconds. * If the limit is exceeded, an <code>SQLException</code> is thrown. * * @param seconds the new query timeout limit in seconds; zero means * there is no limit */ private int queryTimeout = 0; // wait indefinitely private int managerCount = 0; private boolean isShutdown = false; private boolean enableBatchMode; private String configurationIdentifier; private boolean cascadeDeleteSupported; private boolean nestedBracketsSupported; private int forceIsoaltionLevel = -1; private boolean debug; private final int MIN_POOL_SIZE = 1; /** * returns the plugin properties, i.e. the specific properties passed to the jdbc queue plugin. * These are commonly used by the jdbc manager. */ public Properties getPluginProperties() { return this.pluginProp; } /** * Invoked by the timer when a check for reconnection is wanted again * @see I_Timeout#timeout(Object) */ public void timeout(Object userData) { log.info("Timeout, trying DB reconnect, current index: " + this.connections.size() + ", waiting for reentrant connections: " + this.waitingForReentrantConnections); synchronized (this) { if (this.waitingForReentrantConnections) { if (this.connections.size() == this.capacity) this.waitingForReentrantConnections = false; else { // respan the timer ... this.glob.getJdbcConnectionPoolTimer().addTimeoutListener(this, this.reconnectionTimeout, null); return; // wait until all connections have been given back to the pool ... // in case a connection is blocking this could be blocking everything: find a better way to // throw away the blocking connection and avoid to get it back later. } } // try a connection ... try { if (log.isLoggable(Level.FINE)) log.fine("timeout:retrying to establish connections"); // initializing and establishing of connections to DB but first clearing the connections ... connect(false, false); } catch (Throwable ex) { // clean up the connections which might have been established //for (int i = 0; i < this.connections.size(); i++) disconnect(i); // respan the timer ... disconnect(-1L, true); this.glob.getJdbcConnectionPoolTimer().addTimeoutListener(this, this.reconnectionTimeout, null); } } } /** * Sets the connection listener. Only one is allowed at a time. So if there is * already a connection listener, it will be overwritten (and the old one will * not get anyu notification anymore). */ synchronized public boolean registerStorageProblemListener(I_StorageProblemListener storageProblemListener) { this.storageProblemListener = storageProblemListener; return true; // always true } /** * Unregisters the storageProblemListener. If no one has been defined, or * if the one you want to unregister is different from the one you have * registered, nothing is done and 'false' is returned. */ synchronized public boolean unRegisterStorageProblemListener(I_StorageProblemListener storageProblemListener) { if ((this.storageProblemListener == null) || (this.storageProblemListener != storageProblemListener)) return false; this.storageProblemListener = null; return true; } /** * returns null if no connection available */ private Connection get(long delay) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("get invoked"); Connection conn = null; if (this.connections.size() > this.capacity) throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_UNKNOWN, ME, "get: Inconsistency in connection index: a negative one is not possible: '" + this.connections.size() + "'"); if (log.isLoggable(Level.FINE)) log.fine("going to retreive a connection"); try { conn = (Connection)this.connections.poll(delay); if (conn != null) { // assert code setInPool(conn, false); try { if (!conn.getAutoCommit()) { log.severe("Get error, expected autoCommit=true but was false" + ThreadLister.getAllStackTraces()); conn.setAutoCommit(true); } } catch (Throwable e) { log.severe("Get error, expected autoCommit=true but got exception: " + e.toString() + ThreadLister.getAllStackTraces()); } } } catch (InterruptedException ex) { log.warning("the waiting for a connection was interrupted: " + ex.getMessage()); } if (log.isLoggable(Level.FINE)) log.fine("retreived the connection"); if (conn == null) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME, "get: connectionBusyTimeout=" + this.connectionBusyTimeout + " occured when waiting for a free DB connection (see xmlBlaster.properties)." + " Either the timeout is too short or other connections are blocking, waitingCalls=" + this.waitingCalls + ", connectionPoolSize=" + this.capacity); return conn; } private boolean put(Connection conn) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) { String warning = ""; try { SQLException warn = conn.getWarnings(); warning = warn.getMessage(); } catch (Throwable e) {} log.finer("put invoked " + warning); } if (conn == null) return false; if (isInPool(conn)) { // assert code log.severe("Put error, the returned connection is already in the pool: " + ThreadLister.getAllStackTraces()); throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_ILLEGALSTATE, ME, "Put error, the returned connection is already in the pool"); } try { if (!conn.getAutoCommit()) { // assert log.severe("Put error, expected autoCommit=true but was false" + ThreadLister.getAllStackTraces()); conn.setAutoCommit(true); } } catch (Throwable e) { log.severe("Put error, expected autoCommit=true but got exception: " + e.toString() + ThreadLister.getAllStackTraces()); throw new XmlBlasterException(this.glob, ErrorCode.INTERNAL_ILLEGALSTATE, ME, "Put error, expected autoCommit=true but got exception", e); } try { setInPool(conn, true); boolean tmp = this.connections.offer(conn, 5L); return tmp; } catch (InterruptedException ex) { log.warning("put: an interruption occured: " + ex.getMessage()); boolean ret = false; // we do this loop since a CTRL-C could cause an interrupted exception // and thereby cause the loss of one entry in the connection pool. for (int i=0; i < 3; i++) { try { ret = this.connections.offer(conn, 5L); setInPool(conn, true); break; } catch (InterruptedException e) { log.warning("put: an interruption occured #" + i + " : " + e.getMessage()); } } //if (i >= 3) is logged by calling function return ret; } } /** * Returns the global object associated with this pool. */ public Global getGlobal() { return this.glob; } /** The default constructor currently does nothing. Initialization is done in the initialize() method. */ public JdbcConnectionPool() { //this.connections = new BoundedLinkedQueue(); } /** * returns true if the pool already is initialized, false otherwise. */ public boolean isInitialized() { return this.initialized; } /** * Discards the passed connection from the pool and instead adds a new fresh connection to the pool.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -