📄 transactionmanager.java
字号:
// You can redistribute this software and/or modify it under the terms of
// the Ozone Core License version 1 published by ozone-db.org.
//
// The original code and portions created by SMB are
// Copyright (C) 1997-@year@ by SMB GmbH. All rights reserved.
//
// $Id: TransactionManager.java,v 1.11.2.2 2003/12/21 16:01:23 per_nyfelt Exp $
package org.ozoneDB.core;
import org.ozoneDB.*;
import org.ozoneDB.DxLib.*;
import org.ozoneDB.core.DbRemote.*;
import org.ozoneDB.core.dr.DeadlockRecognition;
import org.ozoneDB.util.LogWriter;
import java.util.Random;
/**
* The transaction manager is the factory for transactions and locks.
*
*
* @author <a href="http://www.softwarebuero.de/">SMB</a>
* @author <a href="http://www.medium.net/">Medium.net</a>
* @version $Revision: 1.11.2.2 $Date: 2003/12/21 16:01:23 $
*/
public final class TransactionManager extends ServerComponent {
/**
* All currently running transactions (taID -> ta).
*/
protected DxMap taTable;
/**
* In local mode this maps threads to transactions.
*/
protected DxMap threadTable;
/**
* Used to signal a deadlock and which Transaction should abort.
*/
protected Transaction deadlockTA;
/**
* True if one thread runs exclusively.
*/
protected Thread exclusiveThread;
protected long acquireCount;
public TransactionManager( Env _env ) {
super( _env );
// _env.getLogWriter().newEntry(this,"<init>(): _env="+_env+",env="+env+".",LogWriter.INFO);
taTable = new DxHashMap( 32 );
threadTable = new DxHashMap( 32 );
}
public void startup() throws Exception {
env.logWriter.newEntry( this, "startup...", LogWriter.INFO );
}
public void shutdown() throws Exception {
env.logWriter.newEntry( this, "shutdown...", LogWriter.INFO );
env.logWriter.newEntry( this, " there are " + taTable.count() + " pending transaction(s)", LogWriter.INFO );
if (!taTable.isEmpty()) {
env.logWriter.newEntry( this, " aborting pending transactions...", LogWriter.INFO );
DxIterator it = taTable.iterator();
while (it.next() != null) {
((Transaction)it.object()).stop();
}
env.logWriter.newEntry( this, " waiting for transactions to end...", LogWriter.INFO );
for (int sec = 10; !taTable.isEmpty(); sec--) {
Thread.sleep( 1000 );
}
}
env.logWriter.newEntry( this, "acquire count total:" + acquireCount, LogWriter.INFO );
}
public void save() throws Exception {
}
/**
* Factory method to generate proper Lock objects. These Lock objects are
* used in the store backend but they have to be generated in the core to
* meet the requirements of the transaction implementation.
*/
public Lock newLock() {
return new MROWLock();
}
public int taTableCount() {
return taTable.count();
}
public Transaction taForID( TransactionID taID ) {
return (Transaction)taTable.elementForKey( taID );
}
/**
* der aktuelle thread is auch die aktuelle transaktion (wenn der
* thread ueberhaupt eine transaktion ist)
*/
public Transaction currentTA() {
Thread thread = Thread.currentThread();
// see startTransaction()
if (thread instanceof CommandThread) {
return ((CommandThread) thread).ta;
} else {
Transaction result = (Transaction) threadTable.elementForKey(thread);
return result;
}
}
public Transaction newTransaction( User owner ) throws TransactionException {
if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
env.logWriter.newEntry( this, "newTransaction() *****************************", LogWriter.DEBUG3 );
}
if (currentTA() != null) {
throw new TransactionException( "Thread is already joined to a transaction.", TransactionException.STATE );
}
Transaction ta = env.storeManager.createTransaction(env, owner);
Thread thread = Thread.currentThread();
// in the server threads are CommandThreads and the corresponding
// transaction is a member of the CommandThread
if (thread instanceof CommandThread) {
CommandThread commandThread = (CommandThread)Thread.currentThread();
commandThread.setTransaction( ta );
}
if (env.logWriter.hasTarget( LogWriter.DEBUG2 )) {
env.logWriter.newEntry( this, "newTransaction(): ta="+ta+", thread="+thread+".", LogWriter.DEBUG2 );
}
synchronized (this) {
// if we are local, threads can be of any type; we add them to the
// threadTable to be able to found the corresponding transaction
threadTable.addForKey( ta, thread );
taTable.addForKey( ta, ta.taID() );
}
return ta;
}
/**
* Delete the transaction that is associated with the current thread.
*/
public void deleteTransaction() {
if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
env.logWriter.newEntry( this, "deleteTransaction() --------------------------", LogWriter.DEBUG3 );
}
Transaction ta = currentTA();
if (ta == null) {
env.logWriter.newEntry( this, "deleteTransaction(): thread is not joined to a transaction.",
LogWriter.WARN );
} else {
if (env.logWriter.hasTarget( LogWriter.DEBUG2 )) {
env.logWriter.newEntry( this, "deleteTransaction() ta="+ta+".", LogWriter.DEBUG2 );
}
synchronized (this) {
acquireCount += ta.acquireCount;
if (false&&env.logWriter.hasTarget( LogWriter.DEBUG2 )) {
env.logWriter.newEntry( this, " acquire count:" + ta.acquireCount, LogWriter.DEBUG2 );
env.logWriter.newEntry( this, " total :" + acquireCount, LogWriter.DEBUG2 );
}
Thread thread = Thread.currentThread();
if (thread instanceof CommandThread) {
((CommandThread)thread).setTransaction( null );
}
threadTable.removeForKey( thread );
taTable.removeForKey( ta.taID() );
env.getGarbageCollector().removeTransactionRequiredToComplete(ta);
}
}
}
/**
If we notify all transactions, they not-blocked-ones may wake up from sleeping.
*/
protected boolean alsoNotifySomeSleepingTransactions = true;
/**
* Notify each thread that is associated with a currently blocked
* transaction by calling notifyAll() on the blocked transaction.
*/
public void notifyWaitingTransactions() {
if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
env.logWriter.newEntry( this, "notifyWaitingTransactions()", LogWriter.DEBUG3 );
}
DxArrayBag tas = new DxArrayBag( taTable.count() );
// search all blocked transactions in a synchronized block (to prevent
// others from chnaging the taTable) but call notifyAll() later in a
// non-synchronized block tp prevent us from deadlocks
synchronized (this) {
DxIterator it = taTable.iterator();
Transaction ta;
while ((ta = (Transaction)it.next()) != null) {
if ((ta.blocker != null)) {
tas.add( ta );
}
}
if (alsoNotifySomeSleepingTransactions) {
if (tas.isEmpty()) {
it.reset();
while ((ta = (Transaction)it.next()) != null) {
if (ta.isSleeping()) { // A sleeping
tas.add(ta);
break;
}
}
}
}
}
DxIterator it = tas.iterator();
Transaction ta;
while ((ta = (Transaction)it.next()) != null) {
if (false&&env.logWriter.hasTarget( LogWriter.DEBUG )) {
env.logWriter.newEntry( this, " notify: " + ta, LogWriter.DEBUG );
}
synchronized (ta) {
ta.notifyAll();
}
}
}
/**
* Handle the specified command on behalf of the current thread/transaction.
* This method is called by the InvokeServer after it has handled
* InvokeServer specific command.
*/
public void handleCommand(DbCommand command,DbInvokeClient client) {
handleCommand(command,client.getUser());
}
/**
* Handle the specified command on behalf of the current thread/transaction.
* This method is called by the InvokeServer after it has handled
* InvokeServer specific command.
*/
public void handleCommand(DbCommand command,User user) {
if (env.logWriter.hasTarget( LogWriter.DEBUG )) {
env.logWriter.newEntry( this, "handleCommand(): " + command.toString(), LogWriter.DEBUG );
}
CurrentDatabase.register(env.getDatabase());
try {
command.result = null;
try {
// close this connection/thread/transaction
if (command instanceof DbCloseConn) {
Transaction ta = currentTA();
if (ta != null) {
abortTransaction( ta, command );
deleteTransaction();
}
} else if (!(command instanceof DbTransaction)) {
// perform all commands other then DbTransaction
Transaction ta = currentTA();
if (ta == null) {
completeTransaction( command, /*client*/ user );
} else {
// if something goes wrong while performing this command the client
// has to abort the transaction
performCommand( ta, command );
}
} else {
// perform transaction demarcation command; test for error conditions
// and signal error via command.result to avoid exceptions that are
// converted to OzoneInternalException by the enclosing try/catch
switch (((DbTransaction)command).mode()) {
case DbTransaction.MODE_BEGIN: {
if (currentTA() != null) {
command.result = new TransactionException( "Thread is already joined to a transaction.",
TransactionException.STATE );
} else {
Transaction ta = newTransaction(user /*client.getUser()*/);
command.result = ta.taID();
}
} break;
case DbTransaction.MODE_PREPARE: {
prepareTransaction( currentTA(), command );
} break;
case DbTransaction.MODE_COMMIT_TWOPHASE: {
commitTransaction( currentTA(), command );
deleteTransaction();
} break;
case DbTransaction.MODE_COMMIT_ONEPHASE: {
Transaction ta = currentTA();
if (prepareTransaction( ta, command )) {
commitTransaction( ta, command );
}
deleteTransaction();
} break;
case DbTransaction.MODE_ABORT: {
abortTransaction( currentTA(), command );
deleteTransaction();
} break;
case DbTransaction.MODE_CHECKPOINT: {
command.result = new OzoneInternalException( "External CHECKPOINT is not implemented yet." );
} break;
case DbTransaction.MODE_STATUS: {
command.result = new Integer( currentTA().status() );
} break;
}
}
} catch (Throwable e) {
// all exception that are not handled by the underlying methods
// are internal 'panic' errors
env.logWriter.newEntry( this, "handleCommand(): " + e, e, LogWriter.ERROR );
command.result = new OzoneInternalException(e.toString(),e);
deleteTransaction();
}
} finally {
CurrentDatabase.unregister();
}
}
/**
* Perform the specified command within a new transaction.
*/
protected void completeTransaction( DbCommand command,User user /*DbInvokeClient client*/) throws Exception {
if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
env.logWriter.newEntry( this, "completeTransaction(): " + command, LogWriter.DEBUG3 );
}
if (/*client.getUser()*/user == null) {
throw new TransactionException( "No owner set for current transaction." );
}
Transaction ta = newTransaction(/*client.getUser()*/ user);
// perform->prepare->commit/abort
boolean alright = false;
try {
if (alright = performCommand( ta, command )) {
if (prepareTransaction( ta, command )) {
commitTransaction( ta, command );
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -