rdbmsadapter.java
来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 959 行 · 第 1/3 页
JAVA
959 行
destination, name); } // implementation of PersistenceAdapter.getQueueMessageCount public void removeExpiredMessages(Connection connection) throws PersistenceException { _messages.removeExpiredMessages(connection); } // implementation of PersistenceAdapter.removeExpiredMessageHandles public void removeExpiredMessageHandles(Connection connection, String consumer) throws PersistenceException { _handles.removeExpiredMessageHandles(connection, consumer); } // implementation of PersistenceAdapter.getNonExpiredMessages public Vector getNonExpiredMessages(Connection connection, JmsDestination destination) throws PersistenceException { return _messages.getNonExpiredMessages(connection, destination); } // implementation of EventHandler.handleEvent public void handleEvent(int event, Object callback, long time) { // disabled, as per bug 816895 - Exception in purgeMessages// if (event == COLLECT_DATABASE_GARBAGE_EVENT) {// // collect garbage now, but before doing so change the thread// // priority to low.// try {// Thread.currentThread().setPriority(_gcThreadPriority);// purgeMessages();// } finally {// Thread.currentThread().setPriority(Thread.NORM_PRIORITY);// registerEvent();// }// } } /** * Return a connection to the database from the pool of connections. It will * throw an PersistenceException if it cannot retrieve a connection. The * client should close the connection normally, since the pool is a * connection event listener. * * @return Connection - a pooled connection or null * @throws PersistenceException - if it cannot retrieve a connection */ public Connection getConnection() throws PersistenceException { return _connectionManager.getConnection(); } /** * Return a reference to the DBConnectionManager * * @return DBConnectionManager */ public DBConnectionManager getDBConnectionManager() { return _connectionManager; } public void addUser(Connection connection, User user) throws PersistenceException { _users.add(connection, user); } public Enumeration getAllUsers(Connection connection) throws PersistenceException { return _users.getAllUsers(connection).elements(); } public User getUser(Connection connection, User user) throws PersistenceException { return _users.get(connection, user); } public void removeUser(Connection connection, User user) throws PersistenceException { _users.remove(connection, user); } public void updateUser(Connection connection, User user) throws PersistenceException { _users.update(connection, user); } /** * Incrementally purge all processed messages from the database. * @todo this needs to be revisited. See bug 816895 * - existing expired messages are purged at startup * - messages received that subsequently expire while the server is * running are removed individually. * - not clear how the previous implementation ever worked. * The Messages.getMessageIds() method returns all messages, not * just those processed, nor is it clear that the processed flag * is ever non-zero. * The current implementation (as a fix for bug 816895 - Exception in * purgeMessages) simply delegates to removeExpiredMessages() * * @return the number of messages deleted */ public synchronized int purgeMessages() { // int deleted = 0; Connection connection = null; try { connection = getConnection(); removeExpiredMessages(connection); connection.commit(); } catch (Exception exception) { _log.error("Exception in purgeMessages", exception); } finally { SQLHelper.close(connection); } return 0;// if (connection == null) {// return 0;// }// // we have a valid connection so we can proceed// try {// long stime = System.currentTimeMillis();// HashMap msgids = _messages.getMessageIds(// connection, _lastTime, _gcBlockSize);// // if there are no messages then reset the last time to// // 0 and break;// if (msgids.size() > 0) {// // find the minimum and maximum..we can improve the way we// // do this.// Iterator iter = msgids.values().iterator();// long min = -1;// long max = -1;// while (iter.hasNext()) {// Long id = (Long) iter.next();// if ((min == -1) &&// (max == -1)) {// min = id.longValue();// max = id.longValue();// }// if (id.longValue() < min) {// min = id.longValue();// } else if (id.longValue() > max) {// max = id.longValue();// }// }// // set the last time for the next iteration unless the// // the size of the msgids is less than the gcBlockSize.// // If the later is the case then reset the last time.// // This is in preparation for the next pass through this// // method.// if (msgids.size() < _gcBlockSize) {// _lastTime = 0;// } else {// _lastTime = max;// }// // now iterate through the message list and delete the// // messages that do not have corresponding handles.// Vector hdlids = _handles.getMessageIds(connection, min, max);// iter = msgids.keySet().iterator();// while (iter.hasNext()) {// String id = (String) iter.next();// if (!hdlids.contains(id)) {// // this message is not referenced by anyone so we can// // delete it// _messages.remove(connection, id);// deleted++;// }// }// connection.commit();// } else {// // reset the lastTime// _lastTime = 0;// }// _log.debug("DBGC Deleted " + deleted + " messages and took "// + (System.currentTimeMillis() - stime) +// "ms to complete.");// } catch (Exception exception) {// try {// connection.rollback();// } catch (Exception nested) {// // ignore this exception// }// _log.error("Exception in purgeMessages", exception);// deleted = 0;// } finally {// try {// connection.close();// } catch (Exception nested) {// // ignore// }// }//// return deleted; } /** * Get the schema version * * @param connection the connection to use * @return the schema version, or null, if no version has been initialised * @throws PersistenceException for any related persistence exception */ private String getSchemaVersion(Connection connection) throws PersistenceException { String version = null; PreparedStatement query = null; ResultSet result = null; try { query = connection.prepareStatement( "select version from system_data where id = 1"); result = query.executeQuery(); if (result.next()) { version = result.getString(1); } } catch (SQLException exception) { throw new PersistenceException("Failed to get the schema version", exception); } finally { SQLHelper.close(result); SQLHelper.close(query); } return version; } /** * Initialise the schema version * * @param connection the connection to use */ private void initSchemaVersion(Connection connection) throws PersistenceException { _log.info("Initialising schema version " + SCHEMA_VERSION); PreparedStatement insert = null; try { insert = connection.prepareStatement("insert into system_data (id, version, creationdate) " + "values (?,?,?)"); insert.setInt(1, 1); insert.setString(2, SCHEMA_VERSION); insert.setDate(3, new Date(System.currentTimeMillis())); insert.executeUpdate(); } catch (SQLException exception) { throw new PersistenceException( "Failed to initialise schema version", exception); } finally { SQLHelper.close(insert); } } /** * Register an event to collect and remove processed messages with the * {@link BasicEventManager} */// private void registerEvent() {// try { // disabled, as per bug 816895 - Exception in purgeMessages// BasicEventManager.instance().registerEventRelative(// new Event(COLLECT_DATABASE_GARBAGE_EVENT, this, null),// _gcInterval);// } catch (IllegalEventDefinedException exception) {// _log.error("registerEvent failed", exception);// }// } /** * Creates a {@link DBConnectionManager} using its fully qualified class * name * * @param className the fully qualified class name * @throws PersistenceException if it cannot be created */ private DBConnectionManager getConnectionManager(String className) throws PersistenceException { DBConnectionManager result = null; Class clazz = null; ClassLoader loader = Thread.currentThread().getContextClassLoader(); try { if (loader != null) { clazz = loader.loadClass(className); } } catch (ClassNotFoundException ignore) { } try { if (clazz == null) { clazz = Class.forName(className); } } catch (ClassNotFoundException exception) { throw new PersistenceException("Failed to locate connection manager implementation: " + className, exception); } try { result = (DBConnectionManager) clazz.newInstance(); } catch (Exception exception) { throw new PersistenceException( "Failed to create connection manager", exception); } return result; }}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?