📄 objectadapter.java
字号:
if (entry.toString().endsWith(target)) {
vector.remove(entry);
found = true;
break;
}
}
String key = null;
if (found) {
// add the target and also create a new table
// to hold handles for this consumer
vector.removeElement(entry);
session.updateObject(vector);
// unbind the table (i.e delete it)
key = getHandlesRootName(consumer);
session.unbind(key);
} else {
throw new PersistenceException("Error in removeDurableConsumer " +
"Cannot find consumer with name " + consumer);
}
} catch (Exception err) {
throw new PersistenceException("Error in removeDurableConsumer " +
err.toString());
}
} else {
throw new PersistenceException("Error in removeDurableConsumer " +
"Cannot get access to the destination table");
}
session.getCurrentTransaction().commit();
PMDSessionManager.instance().destroySession();
} catch (PersistenceException pe) {
throw pe;
} catch (Exception err) {
throw new PersistenceException("Error in removeDurableConsumer " +
err.toString());
}
}
// implementation of PersistenceAdapter.durableConsumerExists
public synchronized boolean durableConsumerExists(Connection connection,
String name)
throws PersistenceException {
boolean exists = false;
try {
PMDVector vector;
SessionIfc session = getSession();
session.getCurrentTransaction().begin();
if ((vector = destinationTable(session)) != null) {
try {
session.acquireLock(vector, MAX_WAIT_TIME);
String target = "@" + name;
Enumeration entries = vector.elements();
while (entries.hasMoreElements()) {
PersistentString entry = (PersistentString) entries.nextElement();
if (entry.toString().endsWith(target)) {
exists = true;
break;
}
}
} catch (Exception err) {
throw new PersistenceException("Error in durableConsumerExists " +
err.toString());
}
} else {
throw new PersistenceException("Error in durableConsumerExists " +
"Cannot get access to the destinationtable.");
}
session.getCurrentTransaction().commit();
PMDSessionManager.instance().destroySession();
} catch (PersistenceException pe) {
throw pe;
} catch (Exception err) {
throw new PersistenceException("Error in durableConsumerExists " +
err.toString());
}
return exists;
}
// implementation of PersistenceAdapter.getDurableConsumers
public synchronized Enumeration getDurableConsumers(Connection connection,
String topic)
throws PersistenceException {
Vector consumers = new Vector();
try {
PMDVector vector;
SessionIfc session = getSession();
session.getCurrentTransaction().begin();
if ((vector = destinationTable(session)) != null) {
try {
session.acquireLock(vector, MAX_WAIT_TIME);
String target = TOPIC + topic + "@";
Enumeration entries = vector.elements();
while (entries.hasMoreElements()) {
PersistentString entry =
(PersistentString) entries.nextElement();
if (entry.toString().startsWith(target)) {
consumers.addElement(
entry.toString().substring(target.length()));
}
}
} catch (Exception err) {
throw new PersistenceException("Error in getDurableConsumers " +
err.toString());
}
} else {
throw new PersistenceException("Error in getDurableConsumers " +
"Failed to get access to the destination table.");
}
session.getCurrentTransaction().commit();
PMDSessionManager.instance().destroySession();
} catch (PersistenceException pe) {
throw pe;
} catch (Exception err) {
throw new PersistenceException("Error in getDurableConsumers " +
err.toString());
}
return consumers.elements();
}
// implementation of PersistenceAdapter.getAllDurableConsumers
public HashMap getAllDurableConsumers(Connection connection)
throws PersistenceException {
HashMap consumers = new HashMap();
try {
PMDVector vector;
SessionIfc session = getSession();
session.getCurrentTransaction().begin();
if ((vector = destinationTable(session)) != null) {
try {
session.acquireLock(vector, MAX_WAIT_TIME);
Enumeration entries = vector.elements();
while (entries.hasMoreElements()) {
PersistentString entry =
(PersistentString) entries.nextElement();
String temp = entry.toString().substring(TOPIC.length());
int index = temp.indexOf("@");
if (!(entry.toString().startsWith(TOPIC)) ||
(index == -1)) {
// this is not a durable consumer so continue
continue;
}
// we have a durable consumer
consumers.put(temp.substring(index + 1),
temp.substring(0, index));
}
} catch (Exception err) {
throw new PersistenceException("Error in getAllDurableConsumers " +
err.toString());
}
} else {
throw new PersistenceException("Error in getAllDurableConsumers " +
"Failed to get access to the destination table");
}
session.getCurrentTransaction().commit();
PMDSessionManager.instance().destroySession();
} catch (PersistenceException pe) {
throw pe;
} catch (Exception err) {
throw new PersistenceException("Error in getAllDurableConsumers " +
err.toString());
}
return consumers;
}
// implementation of PersistenceAdapter.addDestination
public synchronized void addDestination(Connection connection,
String name, boolean queue)
throws PersistenceException {
try {
PMDVector vector;
SessionIfc session = getSession();
session.getCurrentTransaction().begin();
if ((vector = destinationTable(session)) != null) {
try {
session.acquireLock(vector, MAX_WAIT_TIME);
String target;
if (queue) {
target = QUEUE + name;
} else {
target = TOPIC + name;
}
boolean found = false;
Enumeration entries = vector.elements();
while (entries.hasMoreElements()) {
PersistentString entry = (PersistentString) entries.nextElement();
if (entry.toString().equals(target)) {
found = true;
break;
}
}
String key = null;
if (!found) {
// add the target and also create a new table
// to hold handles for this consumer
vector.addElement(new PersistentString(target));
session.updateObject(vector);
// if it is a queue then we also need to construct
// a handles table
if (queue) {
key = getHandlesRootName(name);
PMDVector handles =
(PMDVector) session.getCollectionManager().createVector();
session.createObject(handles);
session.bind(key, handles);
}
}
} catch (Exception err) {
throw new PersistenceException("Error in addDestination " +
err.toString());
}
} else {
throw new PersistenceException("Error in addDestination " +
"Failed to get access to destination table");
}
session.getCurrentTransaction().commit();
PMDSessionManager.instance().destroySession();
} catch (PersistenceException pe) {
throw pe;
} catch (Exception err) {
throw new PersistenceException("Error in addDestination " +
err.toString());
}
}
// implementation of PersistenceAdapter.removeDestination
public synchronized void removeDestination(Connection connection,
String name)
throws PersistenceException {
try {
PMDVector vector;
SessionIfc session = getSession();
session.getCurrentTransaction().begin();
if ((vector = destinationTable(session)) != null) {
try {
session.acquireLock(vector, MAX_WAIT_TIME);
// this will remove the destination and all registered
// consumers.
Vector to_delete = new Vector();
Enumeration entries = vector.elements();
while (entries.hasMoreElements()) {
PersistentString wrapped_entry =
(PersistentString) entries.nextElement();
String entry = wrapped_entry.toString();
// now the destination name must appear before
// the '.' separator
int name_idx = entry.indexOf("@");
int dest_idx = entry.indexOf(name);
if (entry.substring(TOPIC.length()).equals(name)) {
// we have found the actual destination entry
// Firstly mark it for deletion and then if it is a queue
// unbind the consumer state
to_delete.addElement(wrapped_entry);
if (entry.startsWith(QUEUE)) {
session.unbind(getHandlesRootName(name));
}
} else if ((name_idx >= 0) &&
(dest_idx >= 0) &&
(dest_idx < name_idx)) {
// we have located an entry which describes a
// durable consumer for a topic
String consumer = entry.substring(name_idx + 1);
to_delete.addElement(wrapped_entry);
session.unbind(getHandlesRootName(consumer));
}
}
// now we need to go through and process the to_delete
// entries
Enumeration elements = to_delete.elements();
while (elements.hasMoreElements()) {
vector.remove((PersistentString) elements.nextElement());
}
// update the vector object
session.updateObject(vector);
} catch (Exception err) {
throw new PersistenceException("Error in removeDestination " +
err.toString());
}
} else {
throw new PersistenceException("Error in removeDestination " +
"Failed to get access to the destination table.");
}
session.getCurrentTransaction().commit();
PMDSessionManager.instance().destroySession();
} catch (PersistenceException pe) {
throw pe;
} catch (Exception err) {
throw new PersistenceException("Error in addDestination " +
err.toString());
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -