📄 objectadapter.java
字号:
// implementation of PersistenceAdapter.checkDestination
public boolean checkDestination(Connection connection, String name)
throws PersistenceException {
boolean success = false;
try {
PMDVector vector;
SessionIfc session = getSession();
session.getCurrentTransaction().begin();
if ((vector = destinationTable(session)) != null) {
try {
session.acquireLock(vector, MAX_WAIT_TIME);
// this will remove the destination and all registered
// consumers.
Vector to_delete = new Vector();
Enumeration entries = vector.elements();
while (entries.hasMoreElements()) {
PersistentString entry =
(PersistentString) entries.nextElement();
// check that it is the specified destination
if (entry.toString().substring(
TOPIC.length()).equals(name)) {
success = true;
break;
}
}
} catch (Exception err) {
throw new PersistenceException("Error in checkDestination " +
err.toString());
}
} else {
throw new PersistenceException("Error in checkDestination " +
"Failed to get access to destination table.");
}
session.getCurrentTransaction().abort();
PMDSessionManager.instance().destroySession();
} catch (PersistenceException pe) {
throw pe;
} catch (Exception err) {
throw new PersistenceException("Error in checkDestination " +
err.toString());
}
return success;
}
// implementation of PersistenceAdapter.getAllDestinations
public synchronized Enumeration getAllDestinations(Connection connection)
throws PersistenceException {
Vector destinations = new Vector();
try {
PMDVector vector;
SessionIfc session = getSession();
session.getCurrentTransaction().begin();
if ((vector = destinationTable(session)) != null) {
try {
session.acquireLock(vector, MAX_WAIT_TIME);
// this will remove the destination and all registered
// consumers.
Vector to_delete = new Vector();
Enumeration entries = vector.elements();
while (entries.hasMoreElements()) {
PersistentString entry =
(PersistentString) entries.nextElement();
String dest = entry.toString();
// check that it is the specified destination
if (dest.indexOf("@") == -1) {
if (dest.startsWith(QUEUE)) {
destinations.addElement(
new JmsQueue(dest.substring(QUEUE.length())));
} else if (dest.startsWith(TOPIC)) {
destinations.addElement(
new JmsTopic(dest.substring(TOPIC.length())));
}
}
}
} catch (Exception err) {
throw new PersistenceException("Error in checkDestination " +
err.toString());
}
} else {
throw new PersistenceException("Error in checkDestination " +
"Failed to get access to the destination table.");
}
session.getCurrentTransaction().abort();
PMDSessionManager.instance().destroySession();
} catch (PersistenceException pe) {
throw pe;
} catch (Exception err) {
throw new PersistenceException("Error in checkDestination " +
err.toString());
}
return destinations.elements();
}
// implementation of PersistenceAdapter.getQueueMessageCount
public synchronized int getQueueMessageCount(Connection connection,
String queue)
throws PersistenceException {
int count = -1;
try {
PMDVector vector;
SessionIfc session = getSession();
String key = getHandlesRootName(queue);
session.getCurrentTransaction().begin();
if ((vector = handleTable(key, session)) != null) {
try {
session.acquireLock(vector, MAX_WAIT_TIME);
count = vector.size();
} catch (Exception err) {
throw new PersistenceException("Error in getQueueMessageCount " +
err.toString());
}
} else {
throw new PersistenceException("Error in getQueueMessageCount " +
"Failed to get access to queue " + queue);
}
session.getCurrentTransaction().abort();
PMDSessionManager.instance().destroySession();
} catch (PersistenceException pe) {
throw pe;
} catch (Exception err) {
throw new PersistenceException("Error in getQueueMessageCount " +
err.toString());
}
return count;
}
// implementation of PersistenceAdapter.getQueueMessageCount
public synchronized int getDurableConsumerMessageCount(Connection connection,
String topic, String name)
throws PersistenceException {
int count = -1;
try {
PMDVector vector;
SessionIfc session = getSession();
String key = getHandlesRootName(name);
session.getCurrentTransaction().begin();
if ((vector = handleTable(key, session)) != null) {
try {
session.acquireLock(vector, MAX_WAIT_TIME);
count = vector.size();
} catch (Exception err) {
throw new PersistenceException("Error in getDurableConsumerMessageCount " +
err.toString());
}
} else {
throw new PersistenceException("Error in getDurableConsumerMessageCount " +
"Cannot access table for " + topic + " : " + name);
}
session.getCurrentTransaction().abort();
PMDSessionManager.instance().destroySession();
} catch (PersistenceException pe) {
throw pe;
} catch (Exception err) {
throw new PersistenceException("Error in getDurableConsumerMessageCount " +
err.toString());
}
return count;
}
// implementation of PersistenceAdapter.getQueueMessageCount
public synchronized void removeExpiredMessages(Connection connection)
throws PersistenceException {
SessionIfc session = null;
try {
PMDVector vector;
PMDHashMap map;
session = getSession();
session.getCurrentTransaction().begin();
long now = System.currentTimeMillis();
if ((map = messageTable(session)) != null) {
try {
session.acquireLock(map, MAX_WAIT_TIME);
Enumeration iter = map.elements();
Vector to_remove = new Vector();
// collect a list of messages that have expired so that
// they can be deleted later
while (iter.hasMoreElements()) {
PMDHandle handle = (PMDHandle) iter.nextElement();
PersistentMessage msg = (PersistentMessage) handle.resolve();
if ((msg.getExpiryTime() != 0) &&
(msg.getExpiryTime() <= now)) {
session.deleteObject(msg);
to_remove.add(msg.getMessage().getJMSMessageID());
}
}
// delete the expired messages
while (to_remove.size() > 0) {
map.remove(to_remove.remove(0));
}
session.updateObject(map);
} catch (Exception err) {
throw new PersistenceException("Error in removeExpiredMessages " +
err.toString());
}
// now we need to go through all the handle tables and
// remove expired messages. Very long and tedious
if ((vector = destinationTable(session)) != null) {
try {
session.acquireLock(vector, MAX_WAIT_TIME);
Enumeration entries = vector.elements();
Vector to_remove = new Vector();
while (entries.hasMoreElements()) {
to_remove.clear();
PersistentString entry = (PersistentString) entries.nextElement();
String name = getHandlesRootNameFromDestination(
entry.toString());
if (name == null) {
continue;
}
// retrieve the handle tbale based on the name
PMDVector handles_vector = handleTable(name, session);
if (handles_vector == null) {
continue;
}
// we have the handle table. Now iterate over all the
// messages and removed expired messages
Enumeration handles = handles_vector.elements();
while (handles.hasMoreElements()) {
PersistentMessageHandle handle =
(PersistentMessageHandle) handles.nextElement();
if ((handle.getExpiryTime() != 0) &&
(handle.getExpiryTime() <= now)) {
to_remove.add(handle);
}
}
// now remove the handles
while (to_remove.size() > 0) {
handles_vector.remove(
(PersistentMessageHandle) to_remove.remove(0));
}
session.updateObject(handles_vector);
}
} catch (Exception err) {
throw new PersistenceException("Error in removeExpiredMessages " +
err.toString());
}
} else {
throw new PersistenceException("Error in removeExpiredMessages " +
"Failed to get the message table");
}
}
session.getCurrentTransaction().commit();
PMDSessionManager.instance().destroySession();
} catch (PersistenceException pe) {
try {
session.getCurrentTransaction().abort();
} catch (Exception exception) {
// ignore
}
throw pe;
} catch (Exception err) {
throw new PersistenceException("Error in removeExpiredMessages " +
err.toString());
}
}
// implementation of PersistenceAdapter.removeExpiredMessageHandles
public void removeExpiredMessageHandles(Connection connection,
String consumer)
throws PersistenceException {
// no operation
}
// implementation of PersistenceAdapter.getQueueMessageCount
public synchronized Vector getNonExpiredMessages(Connection connection,
JmsDestination destination)
throws PersistenceException {
Vector result = new Vector();
SessionIfc session = null;
try {
PMDVector vector;
session = getSession();
session.getCurrentTransaction().begin();
long now = System.currentTimeMillis();
// now we need to go through all the handle tables and
// retrieve messages which have not expired for the specified
// destination
if ((vector = destinationTable(session)) != null) {
try {
session.acquireLock(vector, MAX_WAIT_TIME);
Enumeration entries = vector.elements();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -