📄 jdbcmanagercommontable.java
字号:
} finally { try { if (preStatement != null) preStatement.close(); } catch (Throwable ex) { success = false; log.warning("addEntries: throwable when closing the statement: " + ex.toString()); } try { if (conn != null) { if (!conn.getAutoCommit()) conn.setAutoCommit(true); } } catch (Throwable ex) { success = false; log.warning("addEntries: throwable when closing the connection: " + ex.toString()); } if (conn != null) this.pool.releaseConnection(conn, success); } } /** * Cleans up the specified queue. It deletes all queue entries in the 'entries' table. * @return the number of queues deleted (not the number of entries). */ public final int cleanUp(String queueName) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering"); if (!this.isConnected) { if (log.isLoggable(Level.FINE)) log.fine("Currently not possible. No connection to the DB"); return 0; } try { String req = "delete from " + this.entriesTableName + " where queueName='" + queueName + "'"; if (log.isLoggable(Level.FINE)) log.fine(" request is '" + req + "'"); int num = update(req); return (num > 0) ? 1 : 0; } catch (XmlBlasterException ex) { throw ex; } catch (Throwable ex) { Connection conn = null; boolean success = true; try { conn = this.pool.getConnection(); if (checkIfDBLoss(conn, getLogId(queueName, "deleteEntries"), ex)) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME + ".deleteEntries", "", ex); } catch (XmlBlasterException e) { success = false; throw e; } finally { if (conn != null) this.pool.releaseConnection(conn, success); } throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNKNOWN, ME + ".deleteEntries", "", ex); } } /** * Wipes out the entire DB. * i.e. it deletes the three tables from the DB. IMPORTANT: * @param doSetupNewTables if set to 'true' it recreates the necessary empty tables. */ public int wipeOutDB(boolean doSetupNewTables) throws XmlBlasterException { // retrieve all tables to delete if (log.isLoggable(Level.FINER)) log.finer("wipeOutDB"); int count = 0; Connection conn = null; boolean success = true; try { try { String req = "DROP TABLE " + this.entriesTableName; if (log.isLoggable(Level.FINER)) log.finer("wipeOutDB " + req + " will be invoked on the DB"); conn = this.pool.getConnection(); if (conn.getAutoCommit()) conn.setAutoCommit(false); this.update(req, conn); count++; } catch (SQLException ex) { success = false; if (checkIfDBLoss(conn, getLogId(null, "wipeOutDB"), ex)) throw new XmlBlasterException(glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, getLogId(null, "wipeOutDB"), "SQLException when wiping out DB", ex); else { log.warning("Exception occurred when trying to drop the table '" + this.entriesTableName + "', it probably is already dropped. Reason: " + ex.toString()); } } if (!conn.getAutoCommit()) conn.commit(); } catch (Throwable ex) { success = false; try { if (conn != null) conn.rollback(); } catch (Throwable ex1) { success = false; log.severe("wipeOutDB: exception occurred when rolling back: " + ex1.toString()); } if (ex instanceof XmlBlasterException) throw (XmlBlasterException)ex; throw new XmlBlasterException(glob, ErrorCode.RESOURCE_DB_UNKNOWN, getLogId(null, "wipeOutDB"), "wipeOutDB: exception ", ex); } finally { try { if (conn != null) { if (!conn.getAutoCommit()) conn.setAutoCommit(true); } } catch (Throwable ex) { success = false; throw new XmlBlasterException(glob, ErrorCode.RESOURCE_DB_UNKNOWN, getLogId(null, "wipeOutDB"), "wipeOutDB: exception when closing the query", ex); } finally { if (conn != null) this.pool.releaseConnection(conn, success); } } try { if (doSetupNewTables) setUp(); } catch (Throwable ex) { log.severe("SQLException occured when cleaning up the table. Reason " + ex.toString()); } return count; } /** * Dumps the metadata to the log object. The log will write out this * information as an info. This means you don't need to have the switch * 'dump' set to true to see this information. */ public void dumpMetaData() { this.pool.dumpMetaData(); } /** * returns the amount of bytes currently in the specified queue * TODO: Replace all four selects with one: * "select count(*), sum(byteSize), durable from XB_ENTRIES WHERE queueName='history_xmlBlaster_192_168_1_4_3412Hello' GROUP BY durable;" * @param tableName the name of the table in which to count * @return the current amount of bytes used in the table. */ public long getNumOfBytes(String queueName) throws XmlBlasterException { if (!this.isConnected) throw new XmlBlasterException(glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, getLogId(queueName, "getNumOfBytes"), "The DB is disconnected. Handling queue '" + queueName + "' is currently not possible"); Connection conn = null; boolean success = true; PreparedStatement st = null; try { String req = "SELECT sum(" + this.byteSizeColName + ") from " + this.entriesTableName + " where queueName='" + queueName + "'"; conn = this.pool.getConnection(); st = conn.prepareStatement(req); st.setQueryTimeout(this.pool.getQueryTimeout()); ResultSet rs = st.executeQuery(); rs.next(); return rs.getLong(1); } catch (XmlBlasterException ex) { success = false; throw ex; } catch (Throwable ex) { success = false; try { if (st != null) st.close(); st = null; } catch (Throwable ex1) { log.warning(".getNumOfBytes: exception when closing statement: " + ex1.toString()); } if (checkIfDBLoss(conn, getLogId(queueName, "getNumOfBytes"), ex)) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME + ".getNumOfBytes", "", ex); else throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNKNOWN, ME + ".getNumOfBytes", "", ex); } finally { try { if (st != null) st.close(); } catch (Throwable ex) { success = false; log.warning(".getNumOfBytes: exception when closing statement: " + ex.toString()); } if (conn != null) this.pool.releaseConnection(conn, success); } } /** * Sets up a table for the queue specified by this queue name. * If one already exists (i.e. if it recovers from a crash) its associated * table name is returned. */ public final void setUp() throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("setUp"); if (log.isLoggable(Level.FINE)) log.fine("Initializing the first time the pool"); tablesCheckAndSetup(this.pool.isDbAdmin()); } private final ArrayList processResultSet(ResultSet rs, StorageId storageId, int numOfEntries, long numOfBytes, boolean onlyId, I_EntryFilter entryFilter) throws SQLException, XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering"); ArrayList entries = new ArrayList(); int count = 0; long amount = 0L; while ( (rs.next()) && ((count < numOfEntries) || (numOfEntries < 0)) && ((amount < numOfBytes) || (numOfBytes < 0))) { if(onlyId) { entries.add(new Long(rs.getLong(1))); } else { long dataId = rs.getLong(DATA_ID); // preStatement.setLong(1, dataId); String queueName = rs.getString(QUEUE_NAME); // preStatement.setString(3, queueName); int prio = rs.getInt(PRIO); // preStatement.setInt(4, prio); String typeName = rs.getString(TYPE_NAME); // preStatement.setString(5, typeName); if (typeName != null) typeName = typeName.trim(); //this only to make ORACLE happy since it does not support BOOLEAN String persistentAsChar = rs.getString(PERSISTENT); if (persistentAsChar != null) persistentAsChar = persistentAsChar.trim(); boolean persistent = false; if ("T".equalsIgnoreCase(persistentAsChar)) persistent = true; long sizeInBytes = rs.getLong(SIZE_IN_BYTES); InputStream is = rs.getBinaryStream(BLOB); // byte[] blob = rs.getBytes(7); // preStatement.setObject(5, blob); if (storageId == null) storageId = StorageId.valueOf(queueName); if (is == null) { String txt = "dataId='" + dataId + "' prio='" + prio + "' typeName='" + typeName + "' persistent='" + persistent + "' sizeInBytes='" + sizeInBytes + "'"; log.warning("The stream for the blob of data: " + txt + " is null"); } if ( (numOfBytes < 0) || (sizeInBytes+amount < numOfBytes) || (count == 0)) { if (log.isLoggable(Level.FINEST)) log.finest("processResultSet: dataId: " + dataId + ", prio: " + prio + ", typeName: " + typeName + " persistent: " + persistent);// entries.add(this.factory.createEntry(prio, dataId, typeName, persistent, sizeInBytes, blob, storageId)); I_Entry entry = this.factory.createEntry(prio, dataId, typeName, persistent, sizeInBytes, is, storageId); if (entryFilter != null) entry = entryFilter.intercept(entry, this.storage); entries.add(entry); amount += sizeInBytes; } } count++; } return entries; } /** * It accepts result sets with (long dataId, long size) * @param numOfBytes as input is the maximum number of bytes to process. As * output it stores the number of bytes processed. * @param numOfEntries the maximum number of entries to process * */ private final ReturnDataHolder processResultSetForDeleting(ResultSet rs, int numOfEntries, long numOfBytes) throws SQLException, XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("processResultSetForDeleting invoked"); ReturnDataHolder ret = new ReturnDataHolder(); long currentAmount = 0L; while ( (rs.next()) && ((ret.countEntries < numOfEntries) || (numOfEntries < 0)) && ((ret.countBytes < numOfBytes) || (numOfBytes < 0))) { currentAmount = rs.getLong(2); if ( (numOfBytes < 0) || (ret.countBytes+currentAmount < numOfBytes) || (ret.countEntries == 0)) { ret.list.add(new Long(rs.getLong(1))); ret.countBytes += currentAmount; ret.countEntries++; } } return ret; } /** * Returns the pool associated to this object. * @return JdbcConnectionPool the pool managing the connections to the DB */ public JdbcConnectionPool getPool() { return this.pool; } /** * A generic invocation for a simple update on the database. It will use * a normal statement (not a prepared statement), * @param request the request string * @param fetchSize the number of entries to fetch, i.e. to update */ private final int update(String request)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -