📄 batchingrdbmsadapter.java
字号:
// implementation of PersistenceAdapter.getQueueMessageCount
public Vector getNonExpiredMessages(Connection connection,
JmsDestination destination)
throws PersistenceException {
flush();
return _rdbms.getNonExpiredMessages(connection, destination);
}
/**
* Return a connection to the database from the pool of connections. It
* will throw an PersistenceException if it cannot retrieve a connection.
* The client should close the connection normally, since the pool is a
* connection event listener.
*
* @return Connection - a pooled connection or null
* @exception PersistenceException - if it cannot retrieve a connection
*/
public Connection getConnection()
throws PersistenceException {
return _rdbms.getConnection();
}
/**
* Purge all processed messages from the database.
*
* @return int - the number of messages deleted
*/
public synchronized int purgeMessages() {
try {
flush();
} catch (PersistenceException exception) {
_log.error("Error in purgeMessages " + exception);
}
return _rdbms.purgeMessages();
}
// implementation of EventHandler.handleEvent
public void handleEvent(int event, Object callback, long time) {
_rdbms.handleEvent(event, callback, time);
}
// implementation of EventHandler.getHandle
public HandleIfc getHandle() {
return null;
}
/**
* Close the current piece of work and commit it to the database. This is
* called before a query or fetch is executed on the data.
* <p>
* It will grab a connection and commit the transactional objects before
* returning. If there is any problem it will throw a PersistenceException
* excpetion
*
* @throws PersistenceException
*/
private synchronized void flush() throws PersistenceException {
if (_batch.size() == 0) {
return;
}
// need to do this in a separate thread since the current thread is
// already associated with a Connection object
Thread thread = new Thread(new Runnable() {
public void run() {
Connection connection = null;
try {
connection = _rdbms.getConnection();
Iterator iter = _batch.iterator();
while (iter.hasNext()) {
TransactionalObjectWrapper wrapper =
(TransactionalObjectWrapper) iter.next();
switch (wrapper._action) {
case TransactionalObjectWrapper.ADD_MESSAGE:
_rdbms.addMessage(connection,
(MessageImpl) wrapper._object);
break;
case TransactionalObjectWrapper.UPDATE_MESSAGE:
_rdbms.updateMessage(connection,
(MessageImpl) wrapper._object);
break;
case TransactionalObjectWrapper.DELETE_MESSAGE:
_rdbms.removeMessage(connection,
(String) wrapper._object);
break;
case TransactionalObjectWrapper.ADD_HANDLE:
_rdbms.addMessageHandle(connection,
(PersistentMessageHandle) wrapper._object);
break;
case TransactionalObjectWrapper.UPDATE_HANDLE:
_rdbms.updateMessageHandle(connection,
(PersistentMessageHandle) wrapper._object);
break;
case TransactionalObjectWrapper.DELETE_HANDLE:
_rdbms.removeMessageHandle(connection,
(PersistentMessageHandle) wrapper._object);
break;
}
}
connection.commit();
// if the commit has worked then flush the batch list
_batch.clear();
_messages.clear();
_handles.clear();
} catch (PersistenceException exception) {
SQLHelper.rollback(connection);
_log.error("Failure in flush()", exception);
} catch (Exception exception) {
_log.error("Failure in flush()", exception);
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception nested) {
_log.error("Failure in flush()", nested);
}
}
}
}
});
// start the thread.
thread.start();
// wait for the thread to finish...sort of defeat the purpose here.
try {
thread.join();
} catch (InterruptedException exception) {
// ignore the error
}
}
/**
* Add this transactional object along with the associated action to the
* current batch job.
*
* @param action - the action to take
* @param object - the transactional object
* @throws PersistenceException
*/
private synchronized void addToBatch(int action, Object object)
throws PersistenceException {
if (_batch.size() >= _maxStatementsToBatch) {
flush();
}
switch (action) {
case TransactionalObjectWrapper.ADD_MESSAGE:
{
TransactionalObjectWrapper txobj =
new TransactionalObjectWrapper(action, object);
MessageImpl message = (MessageImpl) object;
MessageId id = message.getMessageId();
if (_messages.containsKey(id)) {
throw new PersistenceException("Inconsistency in cache " +
id + " is present when it shouldn't be.");
}
_messages.put(id, txobj);
_batch.addLast(txobj);
break;
}
case TransactionalObjectWrapper.UPDATE_MESSAGE:
{
MessageImpl message = (MessageImpl) object;
MessageId id = message.getMessageId();
TransactionalObjectWrapper txobj =
(TransactionalObjectWrapper) _messages.get(id);
TransactionalObjectWrapper newtxobj =
new TransactionalObjectWrapper(action, object);
if (txobj != null) {
// if the batch already contains a entry for this
// message then remove it and add the new version
// with an
_batch.remove(txobj);
if (txobj._action == TransactionalObjectWrapper.ADD_MESSAGE) {
newtxobj._action = TransactionalObjectWrapper.ADD_MESSAGE;
_batch.addLast(newtxobj);
} else if (txobj._action == TransactionalObjectWrapper.UPDATE_MESSAGE) {
_batch.addLast(newtxobj);
} else {
// could only be a delete and should never happen
throw new PersistenceException("Inconsistency in cache." +
" Cannot update a deleted message.");
}
} else {
_batch.addLast(newtxobj);
}
_messages.put(id, newtxobj);
break;
}
case TransactionalObjectWrapper.DELETE_MESSAGE:
{
MessageImpl message = (MessageImpl) object;
MessageId id = message.getMessageId();
TransactionalObjectWrapper txobj =
(TransactionalObjectWrapper) _messages.get(id);
TransactionalObjectWrapper newtxobj =
new TransactionalObjectWrapper(action, object);
if (txobj != null) {
// if the batch already contains a entry for this
// message then remove it and add the new version
// with an
_batch.remove(txobj);
if (txobj._action == TransactionalObjectWrapper.ADD_MESSAGE) {
// if an add and the delete happened in the same batch then
// we don't have to commit either of the transactions.
} else if (txobj._action == TransactionalObjectWrapper.UPDATE_MESSAGE) {
// if the update and the delete happened in the same batch then
// we need add the transaction
_batch.addLast(newtxobj);
} else {
// if the delete already exists then simple ignore it.
}
} else {
_batch.addLast(newtxobj);
}
_messages.put(id, newtxobj);
break;
}
case TransactionalObjectWrapper.ADD_HANDLE:
{
TransactionalObjectWrapper txobj =
new TransactionalObjectWrapper(action, object);
PersistentMessageHandle handle = (PersistentMessageHandle) object;
if (_handles.containsKey(handle)) {
throw new PersistenceException("Inconsistency in cache " +
handle + " is present when it shouldn't be.");
}
_handles.put(handle, txobj);
_batch.addLast(txobj);
break;
}
case TransactionalObjectWrapper.UPDATE_HANDLE:
{
PersistentMessageHandle handle = (PersistentMessageHandle) object;
TransactionalObjectWrapper txobj =
(TransactionalObjectWrapper) _handles.get(handle);
TransactionalObjectWrapper newtxobj =
new TransactionalObjectWrapper(action, object);
if (txobj != null) {
// if the batch already contains a entry for this
// handle then remove it and add the new version.
_batch.remove(txobj);
if (txobj._action == TransactionalObjectWrapper.ADD_HANDLE) {
newtxobj._action = TransactionalObjectWrapper.ADD_HANDLE;
_batch.addLast(newtxobj);
} else if (txobj._action == TransactionalObjectWrapper.UPDATE_HANDLE) {
_batch.addLast(newtxobj);
} else {
// could only be a delete and should never happen
throw new PersistenceException("Inconsistency in cache." +
" Cannot update a deleted handle.");
}
} else {
_batch.addLast(newtxobj);
}
_handles.put(handle, newtxobj);
break;
}
case TransactionalObjectWrapper.DELETE_HANDLE:
{
PersistentMessageHandle handle = (PersistentMessageHandle) object;
TransactionalObjectWrapper txobj =
(TransactionalObjectWrapper) _handles.get(handle);
TransactionalObjectWrapper newtxobj =
new TransactionalObjectWrapper(action, object);
if (txobj != null) {
// if the batch already contains a entry for this
// message then remove it and add the new version
// with an
_batch.remove(txobj);
if (txobj._action == TransactionalObjectWrapper.ADD_HANDLE) {
// if an add and the delete happenedd in the same batch then
// we don't have to commit either of the transactions.
} else if (txobj._action == TransactionalObjectWrapper.UPDATE_HANDLE) {
// if the update and the delete happened in the same batch then
// we need add the transaction
_batch.addLast(newtxobj);
} else {
// if the delete already exists then simple ignore it.
}
} else {
_batch.addLast(newtxobj);
}
_handles.put(handle, newtxobj);
break;
}
}
}
public void addUser(Connection connection, User user)
throws PersistenceException {
}
public Enumeration getAllUsers(Connection connection)
throws PersistenceException {
return null;
}
public User getUser(Connection connection, User user)
throws PersistenceException {
return null;
}
public void removeUser(Connection connection, User user)
throws PersistenceException {
}
public void updateUser(Connection connection, User user)
throws PersistenceException {
}
/**
* Inner class that wraps up transactional objects so that they can
* correctly be processed in a batch transaction
*/
private class TransactionalObjectWrapper {
/**
* This is the list of actions that the wrapper supports.
*/
public final static int ADD_MESSAGE = 1;
public final static int UPDATE_MESSAGE = 2;
public final static int DELETE_MESSAGE = 3;
public final static int ADD_HANDLE = 4;
public final static int UPDATE_HANDLE = 5;
public final static int DELETE_HANDLE = 6;
/**
* Action associated with the transactional object
*/
public int _action;
/**
* The transactional object
*/
public Object _object;
/**
* Constructor using the action and object
*/
public TransactionalObjectWrapper(int action, Object object) {
_action = action;
_object = object;
}
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -