📄 externaldatabase.java
字号:
synchronized (txTable) {
// disassociate all threads that have joined the transaction
DxIterator it = txTable.iterator();
AbstractTransaction cursorTX;
while ((cursorTX = (AbstractTransaction) it.next()) != null) {
if (cursorTX == tx) {
it.removeObject();
}
}
}
}
/**
* This method is never directly called from the client code.
*/
public void rollbackTX(AbstractTransaction tx) throws TransactionException, IOException {
synchronized (tx) {
if (tx.connection == null) {
// don't throw an exception to allow subsequent calls
return;
}
commandTX(tx, new DbTransaction(DbTransaction.MODE_ABORT));
releasePooledConnection(tx.connection);
tx.connection = null;
}
synchronized (txTable) {
// disassociate all threads that have joined the transaction
DxIterator it = txTable.iterator();
AbstractTransaction cursorTX;
while ((cursorTX = (AbstractTransaction) it.next()) != null) {
if (cursorTX == tx) {
it.removeObject();
}
}
}
}
/**
* Obtain the _internal_ server status of this transaction.
* @return Status of the transaction. Defined in
* {@link org.ozoneDB.core.Transaction}.<p>
*
* This method is never directly called from the client code.
*/
public final int getStatusTX(AbstractTransaction tx) throws TransactionException, IOException {
if (tx.connection == null) {
return Transaction.STATUS_NONE;
} else {
DbTransaction command = new DbTransaction(DbTransaction.MODE_STATUS);
commandTX(tx, command);
return ((Integer) command.result).intValue();
}
}
/**
* This method is never directly called from the client code.
*/
protected final Object commandTX(AbstractTransaction tx, DbTransaction command) throws TransactionException, IOException {
if (tx.connection == null) {
throw new TransactionException("Thread has not yet joined a transaction.", TransactionException.STATE);
}
try {
// exceptions are catched and re-thrown be sendCommand();
// use the connection of the transaction to allow non-joined
// transaction to complete the transaction
return sendCommand(command, true, tx.connection);
} catch (IOException e) {
throw e;
} catch (TransactionException e) {
throw e;
} catch (Exception e) {
throw new TransactionException(e.toString(), TransactionException.UNEXPECTED);
}
}
// connection pooling *********************************
/**
* The way for the actual database types to create a corresponding
* connection.
*/
protected abstract DbClient newConnection() throws Exception;
/**
* Get a connection from the connection pool.<p>
*
* Earlier versions kept one connection for each client thread. That is,
* client threads were directly mapped to server threads. Especially for
* servlet environment this produces a lot of connections (server threads),
* which in fact is not needed.<p>
*
* So we are using a pool of connections now. Connections are actually
* created and added to the pool when the pool is empty. Currently connections
* are never closed once they are added to the pool.
*
*/
protected final DbClient acquirePooledConnection() throws Exception {
// keep both pools stable while we are messing with them
synchronized (apool) {
synchronized (upool) {
if (apool.isEmpty()) {
DbClient connection = newConnection();
apool.push(connection);
}
DbClient connection = (DbClient) apool.pop();
if (upool.add(connection) == false) {
throw new IllegalStateException("Connection is already in use.");
}
return connection;
}
}
}
/**
* Release a formerly acquired pooled connection so that it may used by
* another request.
*
* @param connection The pooled connection to be released
* @see #acquirePooledConnection
*/
protected final void releasePooledConnection(DbClient connection) {
// keep both pools stable while we are messing with it
synchronized (apool) {
synchronized (upool) {
if (upool.remove(connection) == false) {
throw new IllegalStateException("Given connection is not element of the pool of used connections.");
}
apool.push(connection);
}
}
}
/**
* Return the server connection for the specified thread. If the thread is
* not yet joined to a connection create a new one and associate with the
* thread.
*/
// protected final DbClient connectionForThread( Thread thread ) throws Exception {
// if (!isOpen()) {
// throw new RuntimeException( "Database not open" );
// }
//
// // is there a connection associated to this thread?
// DbClient connection = (DbClient)connectionTable.elementForKey( thread );
//
// if (connection == null) {
// synchronized (connectionTable) {
// // close all connections that are no longer used by a thread;
// // don't re-use connections because they are not really stateless
// DxIterator it = connectionTable.iterator();
// while (it.next() != null) {
// if (!((Thread)it.key()).isAlive()) {
// // System.out.println ("closing connection...");
// connection = (DbClient)it.removeObject();
// connection.send( new DbCloseConn() );
// connection.close();
// }
// }
// // make a new connection
// connection = newConnection();
// connectionTable.addForKey( connection, thread );
// }
// }
// return connection;
// }
/**
* Send the specified command to the server. If there is a global
* transaction, its connection is used. Else, if the current thread is joined
* to a transaction the connection of the transaction is used. Otherwise a
* connection from the pool is used.
*/
protected final Object sendCommand(DbCommand command, boolean waitForResult) throws Exception,IOException,TransactionException,ClassNotFoundException,org.ozoneDB.OzoneObjectException {
if (!isOpen()) {
throw new IllegalStateException("Database is not open.");
}
Thread thread = Thread.currentThread();
AbstractTransaction txOfThread = (AbstractTransaction) txTable.elementForKey(thread);
if (txOfThread != null) {
DbClient connection = txOfThread.connection;
return sendCommand(command, waitForResult, connection);
} else {
DbClient connection = null;
try {
connection = acquirePooledConnection();
return sendCommand(command, waitForResult, connection);
} finally {
releasePooledConnection(connection);
}
}
}
/**
* Send the specified command to the server. Use the specified connection.
* While working the connection is synchronized to allow multiple threads
* to use this connection.
*
* @param waitForResult
* true: read the result from the external database and return it
* false: do not read the result from the external database and return null.
* This is dangerous if not properly used. In this case, the result
* is not read from the stream and left to be read from the next reader.
* As this is not desireable, only supply false if the command does not return any result.
*/
protected Object sendCommand(DbCommand command,boolean waitForResult,DbClient connection) throws Exception,IOException,TransactionException,ClassNotFoundException,OzoneObjectException {
Object result = null;
// this allows multiple thread to use one connection; this happens,
// if a thread joines a thransaction instead of beginning it
synchronized (connection) {
connection.send(command);
if (waitForResult) {
//read the result and set proxy links
result = connection.receive();
if (result != null) {
// FIXME: It is not wise to overload the result with both a normal result and an exception.
if (result instanceof Throwable) {
if (result instanceof RuntimeException) {
if (result instanceof OzoneObjectException) {
throw (OzoneObjectException) result;
} else {
((Exception)result).fillInStackTrace();
throw (RuntimeException) result;
}
} else if (result instanceof Exception) {
((Exception)result).fillInStackTrace();
throw (Exception) result;
} else {
if (result instanceof Error) {
((Error) result).fillInStackTrace();
throw (Error) result;
}
}
}
}
}
}
return result;
}
protected ExternalDatabase linkForProxy(OzoneProxy proxy) {
ExternalDatabase link = wrapper != null ? wrapper : this;
// System.out.println ("*** linkForProxy(): " + link.getClass().getName());
return link;
}
protected synchronized void setWrapper(ExternalDatabase _wrapper) {
wrapper = _wrapper;
}
// OzoneInterface methods *****************************
public boolean isOpen() throws Exception {
return txTable!=null;
}
/**
* Open this database connection according to the specified properties.
*/
protected void open(Hashtable _props) throws Exception {
txTable = new DxHashMap();
apool = new DxArrayDeque(32);
upool = new DxHashSet(32);
databases.add(this);
}
/**
* Factory method that creates a new database object. The actual type of the
* database object ({@link ExternalDatabase} or {@link LocalDatabase})
* depends on the specified database URL. The returned database connection
* is already open.
*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -