⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 transactionmanager.java

📁 Java的面向对象数据库系统的源代码
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
// You can redistribute this software and/or modify it under the terms of
// the Ozone Core License version 1 published by ozone-db.org.
//
// The original code and portions created by SMB are
// Copyright (C) 1997-@year@ by SMB GmbH. All rights reserved.
//
// $Id: TransactionManager.java,v 1.11.2.2 2003/12/21 16:01:23 per_nyfelt Exp $

package org.ozoneDB.core;

import org.ozoneDB.*;
import org.ozoneDB.DxLib.*;
import org.ozoneDB.core.DbRemote.*;
import org.ozoneDB.core.dr.DeadlockRecognition;
import org.ozoneDB.util.LogWriter;

import java.util.Random;


/**
 * The transaction manager is the factory for transactions and locks.
 *
 *
 * @author <a href="http://www.softwarebuero.de/">SMB</a>
 * @author <a href="http://www.medium.net/">Medium.net</a>
 * @version $Revision: 1.11.2.2 $Date: 2003/12/21 16:01:23 $
 */
public final class TransactionManager extends ServerComponent {

    /**
     * All currently running transactions (taID -> ta).
     */
    protected DxMap taTable;

    /**
     * In local mode this maps threads to transactions.
     */
    protected DxMap threadTable;

    /**
     * Used to signal a deadlock and which Transaction should abort.
     */
    protected Transaction deadlockTA;

    /**
     * True if one thread runs exclusively.
     */
    protected Thread exclusiveThread;

    protected long acquireCount;

    public TransactionManager( Env _env ) {
        super( _env );
//      _env.getLogWriter().newEntry(this,"<init>(): _env="+_env+",env="+env+".",LogWriter.INFO);
        taTable = new DxHashMap( 32 );
        threadTable = new DxHashMap( 32 );
    }


    public void startup() throws Exception {
        env.logWriter.newEntry( this, "startup...", LogWriter.INFO );
    }


    public void shutdown() throws Exception {
        env.logWriter.newEntry( this, "shutdown...", LogWriter.INFO );

        env.logWriter.newEntry( this, "    there are " + taTable.count() + " pending transaction(s)", LogWriter.INFO );
        if (!taTable.isEmpty()) {
            env.logWriter.newEntry( this, "    aborting pending transactions...", LogWriter.INFO );
            DxIterator it = taTable.iterator();
            while (it.next() != null) {
                ((Transaction)it.object()).stop();
            }

            env.logWriter.newEntry( this, "    waiting for transactions to end...", LogWriter.INFO );
            for (int sec = 10; !taTable.isEmpty(); sec--) {
                Thread.sleep( 1000 );
            }
        }

      	env.logWriter.newEntry( this, "acquire count total:" + acquireCount, LogWriter.INFO );
    }


    public void save() throws Exception {
    }


    /**
     * Factory method to generate proper Lock objects. These Lock objects are
     * used in the store backend but they have to be generated in the core to
     * meet the requirements of the transaction implementation.
     */
    public Lock newLock() {
        return new MROWLock();
    }


    public int taTableCount() {
        return taTable.count();
    }


    public Transaction taForID( TransactionID taID ) {
        return (Transaction)taTable.elementForKey( taID );
    }


    /**
     * der aktuelle thread is auch die aktuelle transaktion (wenn der
     * thread ueberhaupt eine transaktion ist)
     */
    public Transaction currentTA() {
        Thread thread = Thread.currentThread();

        // see startTransaction()
        if (thread instanceof CommandThread) {
            return ((CommandThread) thread).ta;
        } else {
            Transaction result = (Transaction) threadTable.elementForKey(thread);
            return result;
        }
    }


    public Transaction newTransaction( User owner ) throws TransactionException {
        if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
            env.logWriter.newEntry( this, "newTransaction() *****************************", LogWriter.DEBUG3 );
        }

        if (currentTA() != null) {
            throw new TransactionException( "Thread is already joined to a transaction.", TransactionException.STATE );
        }

        Transaction ta = env.storeManager.createTransaction(env, owner);
        Thread thread = Thread.currentThread();

        // in the server threads are CommandThreads and the corresponding
        // transaction is a member of the CommandThread
        if (thread instanceof CommandThread) {
            CommandThread commandThread = (CommandThread)Thread.currentThread();
            commandThread.setTransaction( ta );
        }

        if (env.logWriter.hasTarget( LogWriter.DEBUG2 )) {
            env.logWriter.newEntry( this, "newTransaction(): ta="+ta+", thread="+thread+".", LogWriter.DEBUG2 );
        }

        synchronized (this) {
            // if we are local, threads can be of any type; we add them to the
            // threadTable to be able to found the corresponding transaction
            threadTable.addForKey( ta, thread );

            taTable.addForKey( ta, ta.taID() );
            
        }
        return ta;
    }


    /**
     * Delete the transaction that is associated with the current thread.
     */
    public void deleteTransaction() {
        if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
            env.logWriter.newEntry( this, "deleteTransaction() --------------------------", LogWriter.DEBUG3 );
        }

        Transaction ta = currentTA();
        if (ta == null) {
            env.logWriter.newEntry( this, "deleteTransaction(): thread is not joined to a transaction.",
                    LogWriter.WARN );
        } else {
            if (env.logWriter.hasTarget( LogWriter.DEBUG2 )) {
                env.logWriter.newEntry( this, "deleteTransaction() ta="+ta+".", LogWriter.DEBUG2 );
            }
            synchronized (this) {
                acquireCount += ta.acquireCount;
                if (false&&env.logWriter.hasTarget( LogWriter.DEBUG2 )) {
                    env.logWriter.newEntry( this, "    acquire count:" + ta.acquireCount, LogWriter.DEBUG2 );
                    env.logWriter.newEntry( this, "    total        :" + acquireCount, LogWriter.DEBUG2 );
                }

                Thread thread = Thread.currentThread();
                if (thread instanceof CommandThread) {
                    ((CommandThread)thread).setTransaction( null );
                }

                threadTable.removeForKey( thread );
                taTable.removeForKey( ta.taID() );

                env.getGarbageCollector().removeTransactionRequiredToComplete(ta);
            }
        }
    }

    /**
        If we notify all transactions, they not-blocked-ones may wake up from sleeping.
    */
    protected boolean alsoNotifySomeSleepingTransactions = true;

    /**
     * Notify each thread that is associated with a currently blocked
     * transaction by calling notifyAll() on the blocked transaction.
     */
    public void notifyWaitingTransactions() {
        if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
            env.logWriter.newEntry( this, "notifyWaitingTransactions()", LogWriter.DEBUG3 );
        }

        DxArrayBag tas = new DxArrayBag( taTable.count() );

        // search all blocked transactions in a synchronized block (to prevent
        // others from chnaging the taTable) but call notifyAll() later in a
        // non-synchronized block tp prevent us from deadlocks
        synchronized (this) {
            DxIterator it = taTable.iterator();
            Transaction ta;
            while ((ta = (Transaction)it.next()) != null) {
                if ((ta.blocker != null)) {
                    tas.add( ta );
                }
            }

            if (alsoNotifySomeSleepingTransactions) {
                if (tas.isEmpty()) {
                    it.reset();
                    while ((ta = (Transaction)it.next()) != null) {
                        if (ta.isSleeping()) { // A sleeping
                            tas.add(ta);
                            break;
                        }
                    }
                }
            }
        }

        DxIterator it = tas.iterator();
        Transaction ta;
        while ((ta = (Transaction)it.next()) != null) {
            if (false&&env.logWriter.hasTarget( LogWriter.DEBUG )) {
                env.logWriter.newEntry( this, "    notify: " + ta, LogWriter.DEBUG );
            }
            synchronized (ta) {
                ta.notifyAll();
            }
        }
    }


    /**
     * Handle the specified command on behalf of the current thread/transaction.
     * This method is called by the InvokeServer after it has handled
     * InvokeServer specific command.
     */
    public void handleCommand(DbCommand command,DbInvokeClient client) {
    	handleCommand(command,client.getUser());
    }

    /**
     * Handle the specified command on behalf of the current thread/transaction.
     * This method is called by the InvokeServer after it has handled
     * InvokeServer specific command.
     */
    public void handleCommand(DbCommand command,User user) {
        if (env.logWriter.hasTarget( LogWriter.DEBUG )) {
            env.logWriter.newEntry( this, "handleCommand(): " + command.toString(), LogWriter.DEBUG );
        }

        CurrentDatabase.register(env.getDatabase());
        
        try {
            command.result = null;
            try {
                // close this connection/thread/transaction
                if (command instanceof DbCloseConn) {
                    Transaction ta = currentTA();
                    if (ta != null) {
                        abortTransaction( ta, command );
                        deleteTransaction();
                    }
                } else if (!(command instanceof DbTransaction)) {
    
                    // perform all commands other then DbTransaction
                    Transaction ta = currentTA();
                    if (ta == null) {
                        completeTransaction( command, /*client*/ user );
                    } else {
                        // if something goes wrong while performing this command the client
                        // has to abort the transaction
                        performCommand( ta, command );
                    }
                } else {
                    // perform transaction demarcation command; test for error conditions
                    // and signal error via command.result to avoid exceptions that are
                    // converted to OzoneInternalException by the enclosing try/catch
                    switch (((DbTransaction)command).mode()) {
                        case DbTransaction.MODE_BEGIN: {
                            if (currentTA() != null) {
                                command.result = new TransactionException( "Thread is already joined to a transaction.",
                                        TransactionException.STATE );
                            } else {
                                Transaction ta = newTransaction(user /*client.getUser()*/);
                                command.result = ta.taID();
                            }
                        } break;
                        case DbTransaction.MODE_PREPARE: {
                            prepareTransaction( currentTA(), command );
                        } break;
                        case DbTransaction.MODE_COMMIT_TWOPHASE: {
                            commitTransaction( currentTA(), command );
                            deleteTransaction();
                        } break;
                        case DbTransaction.MODE_COMMIT_ONEPHASE: {
                            Transaction ta = currentTA();
                            if (prepareTransaction( ta, command )) {
                                commitTransaction( ta, command );
                            }
                            deleteTransaction();
                        } break;
                        case DbTransaction.MODE_ABORT: {
                            abortTransaction( currentTA(), command );
                            deleteTransaction();
                        } break;
                        case DbTransaction.MODE_CHECKPOINT: {
                            command.result = new OzoneInternalException( "External CHECKPOINT is not implemented yet." );
                        } break;
                        case DbTransaction.MODE_STATUS: {
                            command.result = new Integer( currentTA().status() );
                        } break;
                    }
                }
            } catch (Throwable e) {
                // all exception that are not handled by the underlying methods
                // are internal 'panic' errors
                env.logWriter.newEntry( this, "handleCommand(): " + e, e, LogWriter.ERROR );
                command.result = new OzoneInternalException(e.toString(),e);
                deleteTransaction();
            }
        } finally {
            CurrentDatabase.unregister();
        }
    }


    /**
     * Perform the specified command within a new transaction.
     */
    protected void completeTransaction( DbCommand command,User user /*DbInvokeClient client*/) throws Exception {
        if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) {
            env.logWriter.newEntry( this, "completeTransaction(): " + command, LogWriter.DEBUG3 );
        }
        if (/*client.getUser()*/user == null) {
            throw new TransactionException( "No owner set for current transaction." );
        }

        Transaction ta = newTransaction(/*client.getUser()*/ user);

        // perform->prepare->commit/abort
        boolean alright = false;

        try {
            if (alright = performCommand( ta, command )) {
                if (prepareTransaction( ta, command )) {
                    commitTransaction( ta, command );
                }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -