📄 jdbcmanagercommontable.java
字号:
throws SQLException, XmlBlasterException { if (log.isLoggable(Level.FINER)) log.finer("Entering request=" + request); Connection conn = null; boolean success = false; Statement statement = null; int ret = 0; try { conn = pool.getConnection(); statement = conn.createStatement(); statement.setQueryTimeout(this.pool.getQueryTimeout()); ret = statement.executeUpdate(request); if (log.isLoggable(Level.FINE)) log.fine("Executed statement '" + request + "', number of changed entries=" + ret); success = true; } finally { try { if (statement !=null) statement.close(); } catch (Throwable ex) { success = false; log.warning("update: throwable when closing statement: " + ex.toString()); } if (conn != null) this.pool.releaseConnection(conn, success); } return ret; } /** * This version makes an update but does no connection management, i.e. a * connection must be established outside this method and passed in the * argument list. The closing and cleaning up of the connection must also * be handled outside. * * @param request the string specifying which request to make * @param conn the connection to use for this request */ private final int update(String request, Connection conn) throws SQLException { if (log.isLoggable(Level.FINER)) log.finer("Request=" + request + " and connection " + conn); Statement statement = null; int ret = 0; try { statement = conn.createStatement(); statement.setQueryTimeout(this.pool.getQueryTimeout()); ret = statement.executeUpdate(request); if (log.isLoggable(Level.FINE)) log.fine("Executed statement '" + request + "', number of changed entries=" + ret); } finally { try { if (statement !=null) statement.close(); } catch (Throwable ex) { log.warning("update: throwable when closing statement: " + ex.toString()); } } return ret; } /** * deletes all transient messages */ public int deleteAllTransient(String queueName) throws XmlBlasterException { try { if (log.isLoggable(Level.FINER)) log.finer("deleteAllTransient"); if (!this.isConnected) { if (log.isLoggable(Level.FINE)) log.fine("Currently not possible. No connection to the DB"); return 0; } String req = "delete from " + this.entriesTableName + " where queueName='" + queueName + "' AND durable='F'"; return update(req); } catch (XmlBlasterException ex) { throw ex; } catch (Throwable ex) { Connection conn = null; boolean success = false; try { conn = this.pool.getConnection(); if (checkIfDBLoss(conn, getLogId(queueName, "deleteAllTransient"), ex)) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME + ".deleteAllTransient", "", ex); success = true; } finally { if (conn != null) this.pool.releaseConnection(conn, success); } throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNKNOWN, ME + ".deleteAllTransient", "", ex); } } /** * The prefix is the initial part of the SQL update/query. Note that this * method can be used both for SELECT statements as for updates such as * DELETE or UPDATE. * An example of prefix: * "delete from tableName where dataId in("; */ private final ArrayList whereInStatement(String reqPrefix, long[] uniqueIds) { if (log.isLoggable(Level.FINER)) log.finer("whereInStatement"); final String reqPostfix = ")"; boolean isFirst = true; int initialLength = reqPrefix.length() + reqPostfix.length() + 2; StringBuffer buf = new StringBuffer(); int length = initialLength; int currentLength = 0; ArrayList ret = new ArrayList(); int count = 0; for (int i=0; i<uniqueIds.length; i++) { String req = null; String entryId = Long.toString(uniqueIds[i]); currentLength = entryId.length(); length += currentLength; if ((length > this.maxStatementLength) || (i == (uniqueIds.length-1)) || count >= this.maxNumStatements) { // then make the update if (i == (uniqueIds.length-1)) { if (!isFirst) buf.append(","); count++; buf.append(entryId); } req = reqPrefix + buf.toString() + reqPostfix; if (count > 0) ret.add(req); length = initialLength + currentLength; buf = new StringBuffer(); count = 0; isFirst = true; } else count++; if (!isFirst) { buf.append(","); length++; } else isFirst = false; count++; buf.append(entryId); } return ret; } /** * Helper method to find out if still to retrieve entries in getAndDeleteLowest or not. */ private final boolean isInsideRange(int numEntries, int maxNumEntries, long numBytes, long maxNumBytes) { if (maxNumEntries < 0) { if (maxNumBytes <0L) return true; return numBytes < maxNumBytes; } // then maxNumEntries >= 0 if (maxNumBytes <0L) return numEntries < maxNumEntries; // then the less restrictive of both is used (since none is negative) return numEntries < maxNumEntries || numBytes < maxNumBytes; } /** * Under the same transaction it gets and deletes all the entries which fit * into the constrains specified in the argument list. * The entries are really deleted only if doDelete is true, otherwise they are left untouched on the queue * @see org.xmlBlaster.util.queue.I_Queue#takeLowest(int, long, org.xmlBlaster.util.queue.I_QueueEntry, boolean) */ public ReturnDataHolder getAndDeleteLowest(StorageId storageId, int numOfEntries, long numOfBytes, int maxPriority, long minUniqueId, boolean leaveOne, boolean doDelete) throws XmlBlasterException { String queueName = storageId.getStrippedId(); if (log.isLoggable(Level.FINER)) log.finer("Entering"); ReturnDataHolder ret = new ReturnDataHolder(); if (!this.isConnected) { if (log.isLoggable(Level.FINE)) log.fine("Currently not possible. No connection to the DB"); return ret; } PreparedQuery query = null; boolean success = true; try { String req = "select * from " + this.entriesTableName + " WHERE queueName='" + queueName + "' ORDER BY prio ASC, " + this.dataIdColName + " DESC"; query = new PreparedQuery(pool, req, false, -1); // process the result set. Give only back what asked for (and only delete that) ResultSet rs = query.rs; int count = 0; long amount = 0L; boolean doContinue = true; boolean stillEntriesInQueue = false; while ( (stillEntriesInQueue=rs.next()) && doContinue) { long dataId = rs.getLong(DATA_ID); // preStatement.setLong(1, dataId); /*String queueName =*/ rs.getString(QUEUE_NAME); // preStatement.setString(3, queueName); int prio = rs.getInt(PRIO); // preStatement.setInt(4, prio); String typeName = rs.getString(TYPE_NAME); // preStatement.setString(5, typeName); //boolean persistent = rs.getBoolean(PERSISTENT); // preStatement.setBoolean(4, persistent); //this only to make ORACLE happy since it does not support BOOLEAN String persistentAsChar = rs.getString(PERSISTENT); boolean persistent = false; if ("T".equalsIgnoreCase(persistentAsChar)) persistent = true; long sizeInBytes = rs.getLong(SIZE_IN_BYTES); if (!isInsideRange(count, numOfEntries, amount, numOfBytes)) break; // byte[] blob = rs.getBytes(7); // preStatement.setObject(5, blob); InputStream is = rs.getBinaryStream(BLOB); // check if allowed or already outside the range ... if ((prio<maxPriority) || ((prio==maxPriority)&&(dataId>minUniqueId)) ) { if (log.isLoggable(Level.FINEST)) log.finest("dataId: " + dataId + ", prio: " + prio + ", typeName: " + typeName + " persistent: " + persistent);// ret.list.add(this.factory.createEntry(prio, dataId, typeName, persistent, sizeInBytes, blob, storageId)); ret.list.add(this.factory.createEntry(prio, dataId, typeName, persistent, sizeInBytes, is, storageId)); amount += sizeInBytes; } else doContinue = false; count++; } ret.countBytes = amount; ret.countEntries = count; // prepare for deleting (we don't use deleteEntries since we want // to use the same transaction (and the same connection) if (leaveOne) { // leave at least one entry if (stillEntriesInQueue) stillEntriesInQueue = rs.next(); if ((!stillEntriesInQueue) && (ret.list.size()>0)) { ret.countEntries--; I_Entry entryToDelete = (I_Entry)ret.list.remove(ret.list.size()-1); ret.countBytes -= entryToDelete.getSizeInBytes(); if (log.isLoggable(Level.FINE)) log.fine("takeLowest size to delete: " + entryToDelete.getSizeInBytes()); } } if (doDelete) { //first strip the unique ids: long[] uniqueIds = new long[ret.list.size()]; for (int i=0; i < uniqueIds.length; i++) uniqueIds[i] = ((I_Entry)ret.list.get(i)).getUniqueId(); String reqPrefix = "delete from " + this.entriesTableName + " where queueName='" + queueName + "' AND " + this.dataIdColName + " in("; ArrayList reqList = this.whereInStatement(reqPrefix, uniqueIds); for (int i=0; i < reqList.size(); i++) { req = (String)reqList.get(i); if (log.isLoggable(Level.FINE)) log.fine("'delete from " + req + "'"); update(req, query.conn); } } if (!query.conn.getAutoCommit()) query.conn.commit(); return ret; } catch (XmlBlasterException ex) { success = false; throw ex; } catch (Throwable ex) { success = false; try { if (query != null && query.rs != null) { query.rs.close(); query.rs = null; } } catch (Throwable ex1) { log.severe("exception occured when closing query: " + ex1.toString()); } try { if (query != null && query.conn != null) query.conn.rollback(); } catch (Throwable ex1) { log.severe("could not rollback: " + ex.toString()); ex1.printStackTrace(); } Connection tmpConn = null; if (query != null) { tmpConn = query.conn; query.closeStatement(); } if (checkIfDBLoss(tmpConn, getLogId(queueName, "getAndDeleteLowest"), ex)) throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNAVAILABLE, ME + ".getAndDeleteLowest", "", ex); else throw new XmlBlasterException(this.glob, ErrorCode.RESOURCE_DB_UNKNOWN, ME + ".getAndDeleteLowest", "", ex); } finally { try { if (query != null) query.close(success); } catch (Throwable ex1) { log.severe("exception when closing query: " + ex1.toString()); ex1.printStackTrace(); } } } /** * Deletes the entries specified by the entries array. Since all entries * are deleted at the same time, in case of an exception the result * is uncertain (it d
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -