⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rdbmsadapter.java

📁 实现了Jms的服务器源码,支持多种适配器,DB,FTP,支持多种数据库
💻 JAVA
📖 第 1 页 / 共 3 页
字号:

    // implementation of PersistenceAdapter.getQueueMessageCount
    public void removeExpiredMessages(Connection connection)
        throws PersistenceException {

        Messages.instance().removeExpiredMessages(connection);
    }

    // implementation of PersistenceAdapter.removeExpiredMessageHandles
    public void removeExpiredMessageHandles(Connection connection,
                                            String consumer)
        throws PersistenceException {

        MessageHandles.instance().removeExpiredMessageHandles(connection,
            consumer);
    }

    // implementation of PersistenceAdapter.getNonExpiredMessages
    public Vector getNonExpiredMessages(Connection connection,
                                        JmsDestination destination)
        throws PersistenceException {

        return Messages.instance().getNonExpiredMessages(
            connection, destination);
    }

    // implementation of EventHandler.getHandle
    public HandleIfc getHandle() {
        return null;
    }

    // implementation of EventHandler.handleEvent
    public void handleEvent(int event, Object callback, long time) {
        // disabled, as per bug 816895 - Exception in purgeMessages
//          if (event == COLLECT_DATABASE_GARBAGE_EVENT) {
//              // collect garbage now, but before doing so change the thread
//              // priority to low.
//              try {
//                  Thread.currentThread().setPriority(_gcThreadPriority);
//                  purgeMessages();
//              } finally {
//                  Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
//                  registerEvent();
//              }
//          }
    }

    /**
     * Return a connection to the database from the pool of connections. It
     * will throw an PersistenceException if it cannot retrieve a connection.
     * The client should close the connection normally, since the pool is a
     * connection event listener.
     *
     * @return Connection - a pooled connection or null
     * @exception PersistenceException - if it cannot retrieve a connection
     */
    public Connection getConnection()
        throws PersistenceException {
        return _connectionManager.getConnection();
    }

    /**
     * Return a reference to the DBConnectionManager
     *
     * @return DBConnectionManager
     */
    public DBConnectionManager getDBConnectionManager() {
        return _connectionManager;
    }

    public void addUser(Connection connection, User user)
        throws PersistenceException {
        Users.instance().add(connection, user);
    }

    public Enumeration getAllUsers(Connection connection)
        throws PersistenceException {
        return Users.instance().getAllUsers(connection).elements();
    }

    public User getUser(Connection connection, User user)
        throws PersistenceException {
        return Users.instance().get(connection, user);
    }

    public void removeUser(Connection connection, User user)
        throws PersistenceException {
        Users.instance().remove(connection, user);
    }

    public void updateUser(Connection connection, User user)
        throws PersistenceException {
        Users.instance().update(connection, user);
    }

    /**
     * Incrementally purge all processed messages from the database.
     * @TODO this needs to be revisited. See bug 816895
     * - existing expired messages are purged at startup
     * - messages received that subsequently expire while the server is 
     *   running are removed individually.
     * - not clear how the previous implementation ever worked.
     *   The Messages.getMessageIds() method returns all messages, not
     *   just those processed, nor is it clear that the processed flag
     *   is ever non-zero.
     *   The current implementation (as a fix for bug 816895 - Exception in 
     *   purgeMessages) simply delegates to removeExpiredMessages()
     *
     * @return the number of messages deleted
     */
    public synchronized int purgeMessages() {
        int deleted = 0;

        Connection connection = null;
        try {
            connection = getConnection();
            removeExpiredMessages(connection);
            connection.commit();
        } catch (Exception exception) {
            _log.error("Exception in purgeMessages", exception);
        } finally {
            SQLHelper.close(connection);
        }
        return 0;

//          if (connection == null) {
//              return 0;
//          }

//          // we have a valid connection so we can proceed
//          try {
//              long stime = System.currentTimeMillis();
//              HashMap msgids = Messages.instance().getMessageIds(
//                  connection, _lastTime, _gcBlockSize);

//              // if there are no messages then reset the last time to
//              // 0 and break;
//              if (msgids.size() > 0) {
//                  // find the minimum and maximum..we can improve the way we
//                  // do this.
//                  Iterator iter = msgids.values().iterator();
//                  long min = -1;
//                  long max = -1;

//                  while (iter.hasNext()) {
//                      Long id = (Long) iter.next();
//                      if ((min == -1) &&
//                          (max == -1)) {
//                          min = id.longValue();
//                          max = id.longValue();
//                      }

//                      if (id.longValue() < min) {
//                          min = id.longValue();
//                      } else if (id.longValue() > max) {
//                          max = id.longValue();
//                      }
//                  }

//                  // set the last time for the next iteration unless the
//                  // the size of the msgids is less than the gcBlockSize.
//                  // If the later is the case then reset the last time.
//                  // This is in preparation for the next pass through this
//                  // method.
//                  if (msgids.size() < _gcBlockSize) {
//                      _lastTime = 0;
//                  } else {
//                      _lastTime = max;
//                  }

//                  // now iterate through the message list and delete the
//                  // messages that do not have corresponding handles.
//                  Vector hdlids = MessageHandles.instance().getMessageIds(connection, min, max);
//                  iter = msgids.keySet().iterator();
//                  while (iter.hasNext()) {
//                      String id = (String) iter.next();
//                      if (!hdlids.contains(id)) {
//                          // this message is not referenced by anyone so we can
//                          // delete it
//                          Messages.instance().remove(connection, id);
//                          deleted++;
//                      }
//                  }
//                  connection.commit();
//              } else {
//                  // reset the lastTime
//                  _lastTime = 0;
//              }
//              _log.debug("DBGC Deleted " + deleted + " messages and took "
//                  + (System.currentTimeMillis() - stime) +
//                  "ms to complete.");
//          } catch (Exception exception) {
//              try {
//                  connection.rollback();
//              } catch (Exception nested) {
//                  // ignore this exception
//              }
//              _log.error("Exception in purgeMessages", exception);
//              deleted = 0;
//          } finally {
//              try {
//                  connection.close();
//              } catch (Exception nested) {
//                  // ignore
//              }
//          }
//
//        return deleted;
    }

    /**
     * Get the schema version
     *
     * @param connection the connection to use
     * @return the schema version, or null, if no version has been initialised
     * @throws PersistenceException for any related persistence exception
     */
    private String getSchemaVersion(Connection connection)
        throws PersistenceException {

        String version = null;
        PreparedStatement query = null;
        ResultSet result = null;
        try {
            query = connection.prepareStatement(
                "select * from system_data where id = 1");
            result = query.executeQuery();
            if (result.next()) {
                version = result.getString("version");
            }
        } catch (SQLException exception) {
            throw new PersistenceException(
                "Failed to get the schema version", exception);
        } finally {
            SQLHelper.close(result);
            SQLHelper.close(query);
        	
        }
        return version;
    }

    /**
     * Initialise the schema version
     *
     * @param connection the connection to use
     */
    private void initSchemaVersion(Connection connection)
        throws PersistenceException {

        _log.info("Initialising schema version " + SCHEMA_VERSION);
        PreparedStatement insert = null;
        try {
            insert = connection.prepareStatement(
                "insert into system_data values (?,?,?)");
            insert.setInt(1, 1);
            insert.setString(2, SCHEMA_VERSION);
            insert.setDate(3, new Date(System.currentTimeMillis()));
            insert.executeUpdate();
            
        } catch (SQLException exception) {
            throw new PersistenceException(
                "Failed to initialise schema version", exception);
        } finally{
            SQLHelper.close(insert);
        }
    }

    /**
     * Register an event to collect and remove processed messages with the
     * {@link BasicEventManager}
     */
    private void registerEvent() {
//        try {
            // disabled, as per bug 816895 - Exception in purgeMessages
//              BasicEventManager.instance().registerEventRelative(
//                  new Event(COLLECT_DATABASE_GARBAGE_EVENT, this, null),
//                  _gcInterval);
//          } catch (IllegalEventDefinedException exception) {
//              _log.error("registerEvent failed", exception);
//          }
    }

    /**
     * Creates a {@link DBConnectionManager} using its fully qualified class
     * name
     *
     * @param className the fully qualified class name
     * @throws PersistenceException if it cannot be created
     */
    private DBConnectionManager getConnectionManager(String className) 
        throws PersistenceException {

        DBConnectionManager result = null;
        Class clazz = null;
        ClassLoader loader = Thread.currentThread().getContextClassLoader();
        try {
            if (loader != null) {
                clazz = loader.loadClass(className);
            }
        } catch (ClassNotFoundException ignore) {
        }
        try {
            if (clazz == null) {
                clazz = Class.forName(className);
            }
        } catch (ClassNotFoundException exception) {
            throw new PersistenceException(
                "Failed to locate connection manager implementation: "
                + className, exception);
        }

        try {
            result = (DBConnectionManager) clazz.newInstance();
        } catch (Exception exception) {
            throw new PersistenceException(
                "Failed to create connection manager", exception);
        }

        return result;
    }

}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -