📄 consumers.java
字号:
}
/**
* Return the id of the durable consumer.
*
* @param connection - the database connection to use
* @param name - consumer name
* @return the consumer identity
*/
public synchronized long getConsumerId(String name) {
Consumer map = (Consumer) _consumers.get(name);
return (map != null) ? map.consumerId : 0;
}
/**
* Return true if a consumer exists
*
* @param name - the consumer name
*/
public synchronized boolean exists(String name) {
return (_consumers.get(name) != null);
}
/**
* Returns a list of consumer names associated with a topic
*
* @param topic - the topic to query
*/
public synchronized Vector getDurableConsumers(String destination) {
Vector result = new Vector(); // vector for legacy reasons
long destinationId = Destinations.instance().getId(destination);
if (destinationId != 0) {
Iterator iter = _consumers.values().iterator();
while (iter.hasNext()) {
Consumer map = (Consumer) iter.next();
if (map.destinationId == destinationId) {
result.add(map.name);
}
}
}
return result;
}
/**
* Return a map of consumer names to destinations names.
*
* @return HashMap - list of all durable consumers
*/
public synchronized HashMap getAllDurableConsumers() {
HashMap result = new HashMap();
Iterator iter = _consumers.values().iterator();
while (iter.hasNext()) {
Consumer map = (Consumer) iter.next();
JmsDestination dest = Destinations.instance().get(
map.destinationId);
if (dest instanceof JmsTopic) {
result.put(map.name, dest.getName());
}
}
return result;
}
/**
* Return the consumer name corresponding to the specified identity
*
* @param id - the consumer identity
*/
public synchronized String getConsumerName(long id) {
String name = null;
Iterator iter = _consumers.values().iterator();
while (iter.hasNext()) {
Consumer map = (Consumer) iter.next();
if (map.consumerId == id) {
name = map.name;
break;
}
}
return name;
}
/**
* Deallocates resources owned or referenced by the instance
*/
public synchronized void close() {
_consumers.clear();
_consumers = null;
_instance = null;
}
/**
* Removes all cached consumer details for a given destination
*
* @param destinationId the Id of the destination
*/
protected synchronized void removeCached(long destinationId) {
Object[] list = _consumers.values().toArray();
for (int i = 0; i < list.length; i++) {
Consumer map = (Consumer) list[i];
if (map.destinationId == destinationId) {
_consumers.remove(map.name);
}
}
}
/**
* Constructor
*/
private Consumers() {
_consumers = new HashMap();
}
/**
* Load the cache during init time. It needs to get access to the
* TransactionService and the DatabaseService so that it can get
* get access to a transaction and a database connection. This method
* reads <b>all the consumers</b> into memory.
* <p>
* If there is any problem completing this operation then the method
* will throw a PersistenceException
*
* @param connection - the connection to use
* @throws PersistenceException - if the load fails
*/
private void load(Connection connection)
throws PersistenceException {
PreparedStatement select = null;
ResultSet set = null;
try {
select = connection.prepareStatement("select * from consumers");
set = select.executeQuery();
while (set.next()) {
String name = set.getString("name");
long consumerId = set.getLong("consumerId");
long destinationId = set.getLong("destinationId");
long created = set.getLong("created");
Consumer map = new Consumer(name, consumerId, destinationId,
created);
_consumers.put(name, map);
}
} catch (SQLException exception) {
throw new PersistenceException("Failed to retrieve consumers",
exception);
} finally {
SQLHelper.close(set);
SQLHelper.close(select);
}
}
/**
* Remove all the rows in the specified table with the corresponding
* consumer identity.
*
* @param table - the table to destroy
* @param consumerId - the target consumerId
* @param connection - the database connection to use
* @throws SQLException - thrown on any error
*/
private void remove(String table, long consumerId, Connection connection)
throws SQLException {
PreparedStatement delete = null;
try {
delete = connection.prepareStatement(
"delete from " + table + " where consumerId=?");
delete.setLong(1, consumerId);
delete.executeUpdate();
} finally {
SQLHelper.close(delete);
}
}
/**
* Raise a PersistenceException with the specified parameters
*
* @param operation - operation that failed
* @param name - corresponding consumert name
* @param destination - corresponding destination
* @param reason - the reason for the exception
*/
private void raise(String operation, String name, String destination,
String reason)
throws PersistenceException {
throw new PersistenceException("Cannot " + operation + " consumer=" +
name + ", destination=" + destination + ": " + reason);
}
/**
* Raise a PersistenceException with the specified parameters
*
* @param operation - operation that failed
* @param name - corresponding consumert name
* @param reason - the reasone for the exception
*/
private void raise(String operation, String name, String reason)
throws PersistenceException {
throw new PersistenceException("Cannot " + operation + " consumer=" +
name + ": " + reason);
}
/**
* This is an internal class that is used to store consumer entries
*/
private class Consumer {
/**
* The name of the consumer
*/
public String name;
/**
* The unique consumer identity
*/
public long consumerId;
/**
* The identity of the destination that this durable consumer is
* subscribed too
*/
public long destinationId;
/**
* The time that this durable consumer was created
*/
public long created;
public Consumer(String name, long consumerId, long destinationId,
long created) {
this.name = name;
this.consumerId = consumerId;
this.destinationId = destinationId;
this.created = created;
}
public String getKey() {
return name;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -