📄 rdbmsadapter.java
字号:
// implementation of PersistenceAdapter.getQueueMessageCount
public void removeExpiredMessages(Connection connection)
throws PersistenceException {
Messages.instance().removeExpiredMessages(connection);
}
// implementation of PersistenceAdapter.removeExpiredMessageHandles
public void removeExpiredMessageHandles(Connection connection,
String consumer)
throws PersistenceException {
MessageHandles.instance().removeExpiredMessageHandles(connection,
consumer);
}
// implementation of PersistenceAdapter.getNonExpiredMessages
public Vector getNonExpiredMessages(Connection connection,
JmsDestination destination)
throws PersistenceException {
return Messages.instance().getNonExpiredMessages(
connection, destination);
}
// implementation of EventHandler.getHandle
public HandleIfc getHandle() {
return null;
}
// implementation of EventHandler.handleEvent
public void handleEvent(int event, Object callback, long time) {
// disabled, as per bug 816895 - Exception in purgeMessages
// if (event == COLLECT_DATABASE_GARBAGE_EVENT) {
// // collect garbage now, but before doing so change the thread
// // priority to low.
// try {
// Thread.currentThread().setPriority(_gcThreadPriority);
// purgeMessages();
// } finally {
// Thread.currentThread().setPriority(Thread.NORM_PRIORITY);
// registerEvent();
// }
// }
}
/**
* Return a connection to the database from the pool of connections. It
* will throw an PersistenceException if it cannot retrieve a connection.
* The client should close the connection normally, since the pool is a
* connection event listener.
*
* @return Connection - a pooled connection or null
* @exception PersistenceException - if it cannot retrieve a connection
*/
public Connection getConnection()
throws PersistenceException {
return _connectionManager.getConnection();
}
/**
* Return a reference to the DBConnectionManager
*
* @return DBConnectionManager
*/
public DBConnectionManager getDBConnectionManager() {
return _connectionManager;
}
public void addUser(Connection connection, User user)
throws PersistenceException {
Users.instance().add(connection, user);
}
public Enumeration getAllUsers(Connection connection)
throws PersistenceException {
return Users.instance().getAllUsers(connection).elements();
}
public User getUser(Connection connection, User user)
throws PersistenceException {
return Users.instance().get(connection, user);
}
public void removeUser(Connection connection, User user)
throws PersistenceException {
Users.instance().remove(connection, user);
}
public void updateUser(Connection connection, User user)
throws PersistenceException {
Users.instance().update(connection, user);
}
/**
* Incrementally purge all processed messages from the database.
* @TODO this needs to be revisited. See bug 816895
* - existing expired messages are purged at startup
* - messages received that subsequently expire while the server is
* running are removed individually.
* - not clear how the previous implementation ever worked.
* The Messages.getMessageIds() method returns all messages, not
* just those processed, nor is it clear that the processed flag
* is ever non-zero.
* The current implementation (as a fix for bug 816895 - Exception in
* purgeMessages) simply delegates to removeExpiredMessages()
*
* @return the number of messages deleted
*/
public synchronized int purgeMessages() {
int deleted = 0;
Connection connection = null;
try {
connection = getConnection();
removeExpiredMessages(connection);
connection.commit();
} catch (Exception exception) {
_log.error("Exception in purgeMessages", exception);
} finally {
SQLHelper.close(connection);
}
return 0;
// if (connection == null) {
// return 0;
// }
// // we have a valid connection so we can proceed
// try {
// long stime = System.currentTimeMillis();
// HashMap msgids = Messages.instance().getMessageIds(
// connection, _lastTime, _gcBlockSize);
// // if there are no messages then reset the last time to
// // 0 and break;
// if (msgids.size() > 0) {
// // find the minimum and maximum..we can improve the way we
// // do this.
// Iterator iter = msgids.values().iterator();
// long min = -1;
// long max = -1;
// while (iter.hasNext()) {
// Long id = (Long) iter.next();
// if ((min == -1) &&
// (max == -1)) {
// min = id.longValue();
// max = id.longValue();
// }
// if (id.longValue() < min) {
// min = id.longValue();
// } else if (id.longValue() > max) {
// max = id.longValue();
// }
// }
// // set the last time for the next iteration unless the
// // the size of the msgids is less than the gcBlockSize.
// // If the later is the case then reset the last time.
// // This is in preparation for the next pass through this
// // method.
// if (msgids.size() < _gcBlockSize) {
// _lastTime = 0;
// } else {
// _lastTime = max;
// }
// // now iterate through the message list and delete the
// // messages that do not have corresponding handles.
// Vector hdlids = MessageHandles.instance().getMessageIds(connection, min, max);
// iter = msgids.keySet().iterator();
// while (iter.hasNext()) {
// String id = (String) iter.next();
// if (!hdlids.contains(id)) {
// // this message is not referenced by anyone so we can
// // delete it
// Messages.instance().remove(connection, id);
// deleted++;
// }
// }
// connection.commit();
// } else {
// // reset the lastTime
// _lastTime = 0;
// }
// _log.debug("DBGC Deleted " + deleted + " messages and took "
// + (System.currentTimeMillis() - stime) +
// "ms to complete.");
// } catch (Exception exception) {
// try {
// connection.rollback();
// } catch (Exception nested) {
// // ignore this exception
// }
// _log.error("Exception in purgeMessages", exception);
// deleted = 0;
// } finally {
// try {
// connection.close();
// } catch (Exception nested) {
// // ignore
// }
// }
//
// return deleted;
}
/**
* Get the schema version
*
* @param connection the connection to use
* @return the schema version, or null, if no version has been initialised
* @throws PersistenceException for any related persistence exception
*/
private String getSchemaVersion(Connection connection)
throws PersistenceException {
String version = null;
PreparedStatement query = null;
ResultSet result = null;
try {
query = connection.prepareStatement(
"select * from system_data where id = 1");
result = query.executeQuery();
if (result.next()) {
version = result.getString("version");
}
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to get the schema version", exception);
} finally {
SQLHelper.close(result);
SQLHelper.close(query);
}
return version;
}
/**
* Initialise the schema version
*
* @param connection the connection to use
*/
private void initSchemaVersion(Connection connection)
throws PersistenceException {
_log.info("Initialising schema version " + SCHEMA_VERSION);
PreparedStatement insert = null;
try {
insert = connection.prepareStatement(
"insert into system_data values (?,?,?)");
insert.setInt(1, 1);
insert.setString(2, SCHEMA_VERSION);
insert.setDate(3, new Date(System.currentTimeMillis()));
insert.executeUpdate();
} catch (SQLException exception) {
throw new PersistenceException(
"Failed to initialise schema version", exception);
} finally{
SQLHelper.close(insert);
}
}
/**
* Register an event to collect and remove processed messages with the
* {@link BasicEventManager}
*/
private void registerEvent() {
// try {
// disabled, as per bug 816895 - Exception in purgeMessages
// BasicEventManager.instance().registerEventRelative(
// new Event(COLLECT_DATABASE_GARBAGE_EVENT, this, null),
// _gcInterval);
// } catch (IllegalEventDefinedException exception) {
// _log.error("registerEvent failed", exception);
// }
}
/**
* Creates a {@link DBConnectionManager} using its fully qualified class
* name
*
* @param className the fully qualified class name
* @throws PersistenceException if it cannot be created
*/
private DBConnectionManager getConnectionManager(String className)
throws PersistenceException {
DBConnectionManager result = null;
Class clazz = null;
ClassLoader loader = Thread.currentThread().getContextClassLoader();
try {
if (loader != null) {
clazz = loader.loadClass(className);
}
} catch (ClassNotFoundException ignore) {
}
try {
if (clazz == null) {
clazz = Class.forName(className);
}
} catch (ClassNotFoundException exception) {
throw new PersistenceException(
"Failed to locate connection manager implementation: "
+ className, exception);
}
try {
result = (DBConnectionManager) clazz.newInstance();
} catch (Exception exception) {
throw new PersistenceException(
"Failed to create connection manager", exception);
}
return result;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -