rdbmsadapter.java

来自「OpenJMS是一个开源的Java Message Service API 1.」· Java 代码 · 共 959 行 · 第 1/3 页

JAVA
959
字号
                                                         destination, name);    }    // implementation of PersistenceAdapter.getQueueMessageCount    public void removeExpiredMessages(Connection connection)            throws PersistenceException {        _messages.removeExpiredMessages(connection);    }    // implementation of PersistenceAdapter.removeExpiredMessageHandles    public void removeExpiredMessageHandles(Connection connection,                                            String consumer)            throws PersistenceException {        _handles.removeExpiredMessageHandles(connection,                                                              consumer);    }    // implementation of PersistenceAdapter.getNonExpiredMessages    public Vector getNonExpiredMessages(Connection connection,                                        JmsDestination destination)            throws PersistenceException {        return _messages.getNonExpiredMessages(connection,                                                         destination);    }    // implementation of EventHandler.handleEvent    public void handleEvent(int event, Object callback, long time) {        // disabled, as per bug 816895 - Exception in purgeMessages//          if (event == COLLECT_DATABASE_GARBAGE_EVENT) {//              // collect garbage now, but before doing so change the thread//              // priority to low.//              try {//                  Thread.currentThread().setPriority(_gcThreadPriority);//                  purgeMessages();//              } finally {//                  Thread.currentThread().setPriority(Thread.NORM_PRIORITY);//                  registerEvent();//              }//          }    }    /**     * Return a connection to the database from the pool of connections. It will     * throw an PersistenceException if it cannot retrieve a connection. The     * client should close the connection normally, since the pool is a     * connection event listener.     *     * @return Connection - a pooled connection or null     * @throws PersistenceException - if it cannot retrieve a connection     */    public Connection getConnection()            throws PersistenceException {        return _connectionManager.getConnection();    }    /**     * Return a reference to the DBConnectionManager     *     * @return DBConnectionManager     */    public DBConnectionManager getDBConnectionManager() {        return _connectionManager;    }    public void addUser(Connection connection, User user)            throws PersistenceException {        _users.add(connection, user);    }    public Enumeration getAllUsers(Connection connection)            throws PersistenceException {        return _users.getAllUsers(connection).elements();    }    public User getUser(Connection connection, User user)            throws PersistenceException {        return _users.get(connection, user);    }    public void removeUser(Connection connection, User user)            throws PersistenceException {        _users.remove(connection, user);    }    public void updateUser(Connection connection, User user)            throws PersistenceException {        _users.update(connection, user);    }    /**     * Incrementally purge all processed messages from the database.     * @todo this needs to be revisited. See bug 816895     * - existing expired messages are purged at startup     * - messages received that subsequently expire while the server is     *   running are removed individually.     * - not clear how the previous implementation ever worked.     *   The Messages.getMessageIds() method returns all messages, not     *   just those processed, nor is it clear that the processed flag     *   is ever non-zero.     *   The current implementation (as a fix for bug 816895 - Exception in     *   purgeMessages) simply delegates to removeExpiredMessages()     *     * @return the number of messages deleted     */    public synchronized int purgeMessages() {        // int deleted = 0;        Connection connection = null;        try {            connection = getConnection();            removeExpiredMessages(connection);            connection.commit();        } catch (Exception exception) {            _log.error("Exception in purgeMessages", exception);        } finally {            SQLHelper.close(connection);        }        return 0;//          if (connection == null) {//              return 0;//          }//          // we have a valid connection so we can proceed//          try {//              long stime = System.currentTimeMillis();//              HashMap msgids = _messages.getMessageIds(//                  connection, _lastTime, _gcBlockSize);//              // if there are no messages then reset the last time to//              // 0 and break;//              if (msgids.size() > 0) {//                  // find the minimum and maximum..we can improve the way we//                  // do this.//                  Iterator iter = msgids.values().iterator();//                  long min = -1;//                  long max = -1;//                  while (iter.hasNext()) {//                      Long id = (Long) iter.next();//                      if ((min == -1) &&//                          (max == -1)) {//                          min = id.longValue();//                          max = id.longValue();//                      }//                      if (id.longValue() < min) {//                          min = id.longValue();//                      } else if (id.longValue() > max) {//                          max = id.longValue();//                      }//                  }//                  // set the last time for the next iteration unless the//                  // the size of the msgids is less than the gcBlockSize.//                  // If the later is the case then reset the last time.//                  // This is in preparation for the next pass through this//                  // method.//                  if (msgids.size() < _gcBlockSize) {//                      _lastTime = 0;//                  } else {//                      _lastTime = max;//                  }//                  // now iterate through the message list and delete the//                  // messages that do not have corresponding handles.//                  Vector hdlids = _handles.getMessageIds(connection, min, max);//                  iter = msgids.keySet().iterator();//                  while (iter.hasNext()) {//                      String id = (String) iter.next();//                      if (!hdlids.contains(id)) {//                          // this message is not referenced by anyone so we can//                          // delete it//                          _messages.remove(connection, id);//                          deleted++;//                      }//                  }//                  connection.commit();//              } else {//                  // reset the lastTime//                  _lastTime = 0;//              }//              _log.debug("DBGC Deleted " + deleted + " messages and took "//                  + (System.currentTimeMillis() - stime) +//                  "ms to complete.");//          } catch (Exception exception) {//              try {//                  connection.rollback();//              } catch (Exception nested) {//                  // ignore this exception//              }//              _log.error("Exception in purgeMessages", exception);//              deleted = 0;//          } finally {//              try {//                  connection.close();//              } catch (Exception nested) {//                  // ignore//              }//          }////        return deleted;    }    /**     * Get the schema version     *     * @param connection the connection to use     * @return the schema version, or null, if no version has been initialised     * @throws PersistenceException for any related persistence exception     */    private String getSchemaVersion(Connection connection)            throws PersistenceException {        String version = null;        PreparedStatement query = null;        ResultSet result = null;        try {            query = connection.prepareStatement(                    "select version from system_data where id = 1");            result = query.executeQuery();            if (result.next()) {                version = result.getString(1);            }        } catch (SQLException exception) {            throw new PersistenceException("Failed to get the schema version",                                           exception);        } finally {            SQLHelper.close(result);            SQLHelper.close(query);        }        return version;    }    /**     * Initialise the schema version     *     * @param connection the connection to use     */    private void initSchemaVersion(Connection connection)            throws PersistenceException {        _log.info("Initialising schema version " + SCHEMA_VERSION);        PreparedStatement insert = null;        try {            insert = connection.prepareStatement("insert into system_data (id, version, creationdate) "                                                 + "values (?,?,?)");            insert.setInt(1, 1);            insert.setString(2, SCHEMA_VERSION);            insert.setDate(3, new Date(System.currentTimeMillis()));            insert.executeUpdate();        } catch (SQLException exception) {            throw new PersistenceException(                    "Failed to initialise schema version", exception);        } finally {            SQLHelper.close(insert);        }    }    /**     * Register an event to collect and remove processed messages with the     * {@link BasicEventManager}     *///   private void registerEvent() {//        try {        // disabled, as per bug 816895 - Exception in purgeMessages//              BasicEventManager.instance().registerEventRelative(//                  new Event(COLLECT_DATABASE_GARBAGE_EVENT, this, null),//                  _gcInterval);//          } catch (IllegalEventDefinedException exception) {//              _log.error("registerEvent failed", exception);//          }//   }    /**     * Creates a {@link DBConnectionManager} using its fully qualified class     * name     *     * @param className the fully qualified class name     * @throws PersistenceException if it cannot be created     */    private DBConnectionManager getConnectionManager(String className)            throws PersistenceException {        DBConnectionManager result = null;        Class clazz = null;        ClassLoader loader = Thread.currentThread().getContextClassLoader();        try {            if (loader != null) {                clazz = loader.loadClass(className);            }        } catch (ClassNotFoundException ignore) {        }        try {            if (clazz == null) {                clazz = Class.forName(className);            }        } catch (ClassNotFoundException exception) {            throw new PersistenceException("Failed to locate connection manager implementation: "                                           + className, exception);        }        try {            result = (DBConnectionManager) clazz.newInstance();        } catch (Exception exception) {            throw new PersistenceException(                    "Failed to create connection manager", exception);        }        return result;    }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?