📄 transactionmanager.java
字号:
// You can redistribute this software and/or modify it under the terms of// the Ozone Core License version 1 published by ozone-db.org.//// The original code and portions created by SMB are// Copyright (C) 1997-2000 by SMB GmbH. All rights reserved.//// $Id: TransactionManager.java,v 1.50 2000/10/28 16:55:17 daniela Exp $package org.ozoneDB.core;import java.io.IOException;import java.util.Random;import org.ozoneDB.*;import org.ozoneDB.DxLib.*;import org.ozoneDB.core.DbRemote.*;import org.ozoneDB.core.dr.*;import org.ozoneDB.util.*;/** * The transaction manager is the factory for transactions and locks. * * * @author <a href="http://www.softwarebuero.de/">SMB</a> * @version $Revision: 1.50 $Date: 2000/10/28 16:55:17 $ */public final class TransactionManager { protected transient Env env; /** * All currently running transactions. taID -> ta */ protected DxMap taTable; /** * In local mode this maps threads to transactions. */ protected DxMap threadTable; /** * Used to signal a deadlock and which Transaction should abort. */ protected Transaction deadlockTA; /** * True if one thread runs exclusively. */ protected Thread exclusiveThread; protected long acquireCount; public TransactionManager( Env _env ) { env = _env; taTable = new DxHashMap( 32 ); threadTable = new DxHashMap( 32 ); } public void startup() throws Exception { env.logWriter.newEntry( this, "startup...", LogWriter.INFO ); } public void shutdown() throws Exception { env.logWriter.newEntry( this, "shutdown...", LogWriter.INFO ); env.logWriter.newEntry( this, " there are " + taTable.count() + " pending transaction(s)", LogWriter.INFO ); if (!taTable.isEmpty()) { env.logWriter.newEntry( this, " aborting pending transactions...", LogWriter.INFO ); DxIterator it = taTable.iterator(); while (it.next() != null) { ((Transaction)it.object()).stop(); } env.logWriter.newEntry( this, " waiting for transactions to end...", LogWriter.INFO ); for (int sec = 10; !taTable.isEmpty(); sec--) { Thread.sleep( 1000 ); } } env.logWriter.newEntry( this, "acquire count total:" + acquireCount, LogWriter.INFO ); } /** * Factory method to generate proper Lock objects. These Lock objects are * used in the store backend but they have to be generated in the core to * meet the requirements of the transaction implementation. */ public Lock newLock() { return new MROWLock(); } public int taTableCount() { return taTable.count(); } public Transaction taForID( TransactionID taID ) { return (Transaction)taTable.elementForKey( taID ); } /** * der aktuelle thread is auch die aktuelle transaktion (wenn der * thread ueberhaupt eine transaktion ist) */ public Transaction currentTA() { Thread thread = Thread.currentThread(); // see startTransaction() if (thread instanceof CommandThread) { return ((CommandThread)thread).ta; } else { Transaction result = (Transaction)threadTable.elementForKey( thread ); return result; } } public synchronized Transaction newTransaction( User owner ) throws Exception { if (env.logWriter.hasTarget( LogWriter.DEBUG )) { env.logWriter.newEntry( this, "newTransaction() *****************************", LogWriter.DEBUG ); } if (currentTA() != null) { throw new TransactionExc( "Thread is already joined to a transaction.", TransactionExc.STATE ); } Transaction ta = new Transaction( env, owner ); Thread thread = Thread.currentThread(); // in the server threads are CommandThreads and the corresponding // transaction is a member of the CommandThread if (thread instanceof CommandThread) { CommandThread commandThread = (CommandThread)Thread.currentThread(); commandThread.setTransaction( ta ); } // if we are local, threads can be of any type; we add them to the // threadTable to be able to found the corresponding transaction threadTable.addForKey( ta, thread ); taTable.addForKey( ta, ta.taID() ); return ta; } /** * Delete the transaction that is associated with the current thread. */ public synchronized void deleteTransaction() { if (env.logWriter.hasTarget( LogWriter.DEBUG )) { env.logWriter.newEntry( this, "deleteTransaction() --------------------------", LogWriter.DEBUG ); } Transaction ta = currentTA(); if (ta == null) { env.logWriter.newEntry( this, "deleteTransaction(): thread is not joined to a transaction.", LogWriter.WARN ); } else { acquireCount += ta.acquireCount; if (env.logWriter.hasTarget( LogWriter.DEBUG )) { env.logWriter.newEntry( this, " acquire count:" + ta.acquireCount, LogWriter.DEBUG ); env.logWriter.newEntry( this, " total :" + acquireCount, LogWriter.DEBUG ); } Thread thread = Thread.currentThread(); if (thread instanceof CommandThread) { ((CommandThread)thread).setTransaction( null ); } threadTable.removeForKey( thread ); taTable.removeForKey( ta.taID() ); } } /** * Notify each thread that is associated with acurrently blocked * transaction by calling notifyAll() on the blocked transaction. */ public void notifyWaitingTransactions() { if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { env.logWriter.newEntry( this, "notifyWaitingTransactions()", LogWriter.DEBUG3 ); } DxIterator it = taTable.iterator(); Transaction ta; while ((ta = (Transaction)it.next()) != null) { if (ta.blocker != null) { if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { env.logWriter.newEntry( this, " notify: " + ta, LogWriter.DEBUG3 ); } synchronized (ta) { ta.notifyAll(); } } } } /** * Handle the specified command on behalf of the current thread/transaction. * This method is called by the InvokeServer after it has handled * InvokeServer specific command. */ public void handleCommand( DbCommand command, User owner ) { if (env.logWriter.hasTarget( LogWriter.DEBUG )) { env.logWriter.newEntry( this, "handleCommand(): " + command.toString(), LogWriter.DEBUG ); } command.result = null; try { // close this connection/thread/transaction if (command instanceof DbCloseConn) { Transaction ta = currentTA(); if (ta != null) { abortTransaction( ta, command ); deleteTransaction(); } } else if (!(command instanceof DbTransaction)) { // perform all commands other then DbTransaction Transaction ta = currentTA(); if (ta == null) { completeTransaction( command, owner ); } else { // if something goes wrong while performing this command the client // has to abort the transaction performCommand( ta, command ); } } else { // perform transaction demarcation command; test for error conditions // and signal error via command.result to avoid exceptions that are // converted to OzoneInternalExc by the enclosing try/catch switch (((DbTransaction)command).mode()) { case DbTransaction.MODE_BEGIN: { if (currentTA() != null) { command.result = new TransactionExc( "Thread is already joined to a transaction.", TransactionExc.STATE ); } else { Transaction ta = newTransaction( owner ); command.result = ta.taID(); } break; } case DbTransaction.MODE_PREPARE: { prepareTransaction( currentTA(), command ); break; } case DbTransaction.MODE_COMMIT_TWOPHASE: { commitTransaction( currentTA(), command ); deleteTransaction(); break; } case DbTransaction.MODE_COMMIT_ONEPHASE: { Transaction ta = currentTA(); if (prepareTransaction( ta, command )) { commitTransaction( ta, command ); } deleteTransaction(); break; } case DbTransaction.MODE_ABORT: { abortTransaction( currentTA(), command ); deleteTransaction(); break; } case DbTransaction.MODE_CHECKPOINT: { command.result = new OzoneInternalExc( "External CHECKPOINT is not implemented yet." ); break; } case DbTransaction.MODE_STATUS: { command.result = new Integer( currentTA().status() ); break; } } } } catch (Throwable e) { // all exception that are not handled by the underlying methods // are internal 'panic' errors env.logWriter.newEntry( this, "handleCommand(): " + e, e, LogWriter.ERROR ); command.result = new OzoneInternalExc( e.toString() ); deleteTransaction(); } } /** * Perform the specified command within a new transaction. */ protected void completeTransaction( DbCommand command, User owner ) throws Exception { if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { env.logWriter.newEntry( this, "completeTransaction(): " + command, LogWriter.DEBUG3 ); } if (owner == null) { throw new TransactionExc( "No owner set for current transaction." ); } Transaction ta = newTransaction( owner ); // perform->prepare->commit/abort if (performCommand( ta, command )) { if (prepareTransaction( ta, command )) { commitTransaction( ta, command ); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -