📄 jdbcmanagercommontable.java
字号:
} else log.fine("modifyEntry: exception, the error reason: '" + ex.toString()); } try { preStatement.close(); preStatement = null; } catch (Throwable ex1) { log.severe("modifyEntry: Exception when closing the statement: " + ex1.toString()); }// if (!conn.getAutoCommit()) conn.rollback(); // DANGER !!!!!!! NOT SAFE YET log.warning("Could not update entry '" + entry.getClass().getName() + "'-'" + entry.getLogId() + "-" + entry.getUniqueId() + "': " + ex.toString()); if (checkIfDBLoss(conn, getLogId(queueName, "modifyEntry"), ex)) { throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME + ".modifyEntry", "", ex); } else { throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNKNOWN, ME + ".modifyEntry", "", ex); } } finally { try { if (exStatement != null) exStatement.close(); } catch (Throwable ex) { log.warning("modifyEntry: throwable when closing the connection: " + ex.toString()); } try { if (preStatement != null) preStatement.close(); } catch (Throwable ex) { success = false; log.warning("modifyEntry: throwable when closing the connection: " + ex.toString()); } if (conn != null) this.pool.releaseConnection(conn, success); } return ret; } /** * * Adds a row to the specified queue table * @param queueName The name of the queue on which to perform the operation * @param entry the object to be stored. * * @return true on success false if the entry was already in the table. * * @throws SQLException if an error other than double entry occured while adding the row * @throws XmlBlasterException if an error occured when trying to get a connection */ private final boolean addSingleEntry(String queueName, I_Entry entry, Connection conn) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering"); if (!this.isConnected) { if (log.isLoggable(Level.FINE)) log.fine("For entry '" + entry.getUniqueId() + "' currently not possible. No connection to the DB"); throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME + ".addSingleEntry", " the connection to the DB is unavailable already before trying to add an entry"); } PreparedStatement preStatement = null;// PreparedStatement exStatement = null; Statement exStatement = null; boolean ret = false; long dataId = entry.getUniqueId(); int prio = entry.getPriority(); byte[] blob = this.factory.toBlob(entry); String typeName = entry.getEmbeddedType(); boolean persistent = entry.isPersistent(); long sizeInBytes = entry.getSizeInBytes(); if (log.isLoggable(Level.FINEST)) log.finest("addition. dataId: " + dataId + ", prio: " + prio + ", typeName: " + typeName + ", byteSize in bytes: " + sizeInBytes); try { String req = "INSERT INTO " + this.entriesTableName + " VALUES ( ?, ?, ?, ?, ?, ?, ?)"; if (log.isLoggable(Level.FINE)) log.fine(req); preStatement = conn.prepareStatement(req); preStatement.setQueryTimeout(this.pool.getQueryTimeout()); preStatement.setLong(DATA_ID, dataId); preStatement.setString(QUEUE_NAME, queueName); preStatement.setInt(PRIO, prio); preStatement.setString(TYPE_NAME, typeName); if (persistent == true) preStatement.setString(PERSISTENT, "T"); else preStatement.setString(PERSISTENT, "F"); preStatement.setLong(SIZE_IN_BYTES, sizeInBytes); ByteArrayInputStream blob_stream = new ByteArrayInputStream(blob); preStatement.setBinaryStream(BLOB, blob_stream, blob.length); //(int)sizeInBytes); // preStatement.setBytes(BLOB, blob); if (log.isLoggable(Level.FINE)) log.fine(preStatement.toString()); int num = preStatement.executeUpdate(); if (log.isLoggable(Level.FINE)) log.fine("Added " + num + " entries, entryId='" + entry.getUniqueId() + "'"); ret = true; } catch (Throwable ex) { String originalExceptionStack = Global.getStackTraceAsString(ex); String originalExceptionReason = ex.getMessage(); if (log.isLoggable(Level.FINE)) { if (ex instanceof SQLException) { log.fine("addEntry: sql exception, the sql state: '" + ((SQLException)ex).getSQLState() ); log.fine("addEntry: sql exception, the error code: '" + ((SQLException)ex).getErrorCode() ); } } try { if (preStatement != null) { preStatement.close(); preStatement = null; } } catch (Throwable ex1) { log.severe("exception when closing statement: " + ex1.toString()); ex1.printStackTrace(); }// if (!conn.getAutoCommit()) conn.rollback(); // DANGER !!!!!!! NOT SAFE YET log.warning("Could not insert entry '" + entry.getClass().getName() + "'-'" + entry.getLogId() + "-" + entry.getUniqueId() + "': " + ex.toString()); if (checkIfDBLoss(conn, getLogId(queueName, "addEntry"), ex)) { throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME + ".addEntry", "", ex); } // check if the exception was due to an existing entry. If yes, no exception will be thrown try { String req = "SELECT count(*) from " + this.entriesTableName + " where (" + this.dataIdColName + "='" + dataId + "')"; if (log.isLoggable(Level.FINE)) log.fine("addEntry: checking if entry already in db: request='" + req + "'"); exStatement = conn.createStatement();// exStatement.setQueryTimeout(this.pool.getQueryTimeout()); ResultSet rs = exStatement.executeQuery(req); rs.next(); int size = rs.getInt(1); if (size < 1) throw ex; } catch (Throwable ex1) { String secondException = Global.getStackTraceAsString(ex1); log.warning("The exception '" + ex1.getMessage() + "' occured at " + secondException + "'. The original exception was '" + originalExceptionReason + "' at '" + originalExceptionStack); if (log.isLoggable(Level.FINE)) log.fine("addEntry: checking if entry already in db: exception in select: '" + ex.toString() + "'"); if (checkIfDBLoss(conn, getLogId(queueName, "addEntry"), ex1)) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME + ".addEntry", "", ex1); else throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNKNOWN, ME + ".addEntry", "", ex1); } ret = false; } finally { try { if (exStatement != null) exStatement.close(); } catch (Throwable ex) { log.warning("addEntry: throwable when closing the connection: " + ex.toString()); } try { if (preStatement != null) preStatement.close(); } catch (Throwable ex) { log.warning("addEntry: throwable when closing the connection: " + ex.toString()); } } return ret; } /** * Adds a row to the specified queue table * @param queueName The name of the queue on which to perform the operation * @param entry the object to be stored. * * @return true on success * * @throws SQLException if an error occured while adding the row * @throws XmlBlasterException if an error occured when trying to get a connection */ public final boolean addEntry(String queueName, I_Entry entry) throws XmlBlasterException { Connection conn = null; boolean success = true; try { conn = this.pool.getConnection(); return addSingleEntry(queueName, entry, conn); } catch (XmlBlasterException ex) { success = false; throw ex; } finally { if (conn != null) this.pool.releaseConnection(conn, success); } } private int[] addEntriesSingleMode(Connection conn, String queueName, I_Entry[] entries) throws XmlBlasterException { // due to a problem in Postgres (implicit abortion of transaction by exceptions) // we can't do everything in the same transaction. That's why we simulate a single // transaction by deleting the processed entries in case of a failure other than // double entries exception. int i = 0; int[] ret = new int[entries.length]; try { if (log.isLoggable(Level.FINE)) log.fine("addEntriesSingleMode adding each entry in single mode since an exception occured when using 'batch mode'"); for (i=0; i < entries.length; i++) { if (addSingleEntry(queueName, entries[i], conn)) ret[i] = 1; else ret[i] = 0; if (log.isLoggable(Level.FINE)) log.fine("addEntriesSingleMode adding entry '" + i + "' in single mode succeeded"); } if (!conn.getAutoCommit()) conn.commit(); return ret; } catch (XmlBlasterException ex1) { // conn.rollback(); try { for (int ii=0; ii < i; ii++) { if (ret[ii] > 0) deleteEntry(queueName, entries[ii].getUniqueId()); // this could be collected and done in one shot } } catch (Throwable ex2) { log.severe("addEntriesSingleMode exception occured when rolling back (this could generate inconsistencies in the data) : " + ex2.toString()); } throw ex1; } catch (Throwable ex1) { // conn.rollback(); try { for (int ii=0; ii < i; ii++) { if (ret[ii] > 0) deleteEntry(queueName, entries[ii].getUniqueId()); // this could be collected and done in one shot } } catch (Throwable ex2) { log.severe("addEntriesSingleMode exception occured when rolling back (this could generate inconsistencies in the data) : " + ex2.toString()); } throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNKNOWN, ME + ".addEntriesSingleMode", "", ex1); } } /** * * Adds several rows to the specified queue table in batch mode to improve performance * @param queueName The name of the queue on which to perform the operation * @param entries the entries to store * @return array of boolean telling which entries where stored and which not. * * @throws SQLException if an error occured while adding the row * @throws XmlBlasterException if an error occured when trying to get a connection */ public int[] addEntries(String queueName, I_Entry[] entries) throws XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering"); if (!this.isConnected) { if (log.isLoggable(Level.FINE)) log.fine(" for '" + entries.length + "' currently not possible. No connection to the DB"); throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME + ".addEntries", " the connection to the DB is unavailable already before trying to add entries"); } PreparedStatement preStatement = null; Connection conn = null; boolean success = true; try { conn = this.pool.getConnection(); if (!this.supportsBatch || !this.enableBatchMode) return addEntriesSingleMode(conn, queueName, entries); if (conn.getAutoCommit()) conn.setAutoCommit(false); String req = "INSERT INTO " + this.entriesTableName + " VALUES ( ?, ?, ?, ?, ?, ?, ?)"; if (log.isLoggable(Level.FINE)) log.fine(req); preStatement = conn.prepareStatement(req); preStatement.setQueryTimeout(this.pool.getQueryTimeout()); for (int i=0; i < entries.length; i++) { I_Entry entry = entries[i]; long dataId = entry.getUniqueId(); int prio = entry.getPriority(); byte[] blob = this.factory.toBlob(entry); String typeName = entry.getEmbeddedType(); boolean persistent = entry.isPersistent(); long sizeInBytes = entry.getSizeInBytes(); preStatement.setLong(DATA_ID, dataId); preStatement.setString(QUEUE_NAME, queueName); preStatement.setInt(PRIO, prio); preStatement.setString(TYPE_NAME, typeName); if (persistent == true) preStatement.setString(PERSISTENT, "T"); else preStatement.setString(PERSISTENT, "F"); preStatement.setLong(SIZE_IN_BYTES, sizeInBytes); ByteArrayInputStream blob_stream = new ByteArrayInputStream(blob); preStatement.setBinaryStream(BLOB, blob_stream, blob.length); //(int)sizeInBytes); //preStatement.setBytes(7, blob); if (log.isLoggable(Level.FINE)) log.fine(preStatement.toString()); preStatement.addBatch(); } int[] ret = preStatement.executeBatch(); if (!conn.getAutoCommit()) conn.commit(); return ret; } catch (Throwable ex) { success = false; try { if (!conn.getAutoCommit()) { conn.rollback(); // rollback the original request ... conn.setAutoCommit(true); // since if an exeption occurs it infects future queries within the same transaction } } catch (Throwable ex1) { log.severe("error occured when trying to rollback after exception: reason: " + ex1.toString() + " original reason:" + ex.toString()); ex.printStackTrace(); // original stack trace } if (log.isLoggable(Level.FINE)) log.fine("Could not insert entries: " + ex.toString()); if ((!this.supportsBatch || !this.enableBatchMode) || checkIfDBLoss(conn, getLogId(queueName, "addEntries"), ex)) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME + ".addEntries", "", ex); else { // check if the exception was due to an already existing entry by re return addEntriesSingleMode(conn, queueName, entries); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -