transaction.java

来自「RESIN 3.2 最新源码」· Java 代码 · 共 680 行

JAVA
680
字号
/* * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved * * This file is part of Resin(R) Open Source * * Each copy or derived work must preserve the copyright notice and this * notice unmodified. * * Resin Open Source is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * Resin Open Source is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty * of NON-INFRINGEMENT.  See the GNU General Public License for more * details. * * You should have received a copy of the GNU General Public License * along with Resin Open Source; if not, write to the * *   Free Software Foundation, Inc. *   59 Temple Place, Suite 330 *   Boston, MA 02111-1307  USA * * @author Scott Ferguson */package com.caucho.db.store;import com.caucho.db.jdbc.ConnectionImpl;import com.caucho.log.Log;import com.caucho.sql.SQLExceptionWrapper;import com.caucho.util.L10N;import com.caucho.util.LongKeyHashMap;import java.io.IOException;import java.sql.SQLException;import java.util.ArrayList;import java.util.Iterator;import java.util.logging.Level;import java.util.logging.Logger;/** * Represents a single transaction. */public class Transaction extends StoreTransaction {  private static final Logger log = Log.open(Transaction.class);  private static final L10N L = new L10N(Transaction.class);  private static long AUTO_COMMIT_TIMEOUT = 30000L;  private boolean _isAutoCommit = true;  private ConnectionImpl _conn;    private ArrayList<Lock> _readLocks;  private ArrayList<Lock> _writeLocks;    private LongKeyHashMap<WriteBlock> _writeBlocks;    private ArrayList<Block> _updateBlocks;  // inodes that need to be deleted on a commit  private ArrayList<Inode> _deleteInodes;    // inodes that need to be deleted on a rollback  private ArrayList<Inode> _addInodes;    // blocks that need deallocating on a commit  private ArrayList<Block> _deallocateBlocks;  private boolean _isRollbackOnly;  private SQLException _rollbackExn;  private long _timeout = AUTO_COMMIT_TIMEOUT;  private Transaction()  {  }  public static Transaction create(ConnectionImpl conn)  {    Transaction xa = new Transaction();        xa.init(conn);    return xa;  }  public static Transaction create()  {    Transaction xa = new Transaction();    return xa;  }  private void init(ConnectionImpl conn)  {    _conn = conn;    _timeout = AUTO_COMMIT_TIMEOUT;    _isRollbackOnly = false;    _rollbackExn = null;  }  /**   * Sets the transaction timeout.   */  public void setTimeout(long timeout)  {    _timeout = timeout;  }    /**   * Acquires a new read lock.   */  /*  public void addReadLock(Lock lock)  {    _readLocks.add(lock);  }  */    /**   * Acquires a new read lock.   */  public boolean hasReadLock(Lock lock)  {    return _readLocks.contains(lock);  }  /**   * Returns true for an auto-commit transaction.   */  public boolean isAutoCommit()  {    return _isAutoCommit;  }  /**   * Returns true for an auto-commit transaction.   */  public void setAutoCommit(boolean autoCommit)  {    _isAutoCommit = autoCommit;  }    /**   * Acquires a new write lock.   */  public void lockRead(Lock lock)    throws SQLException  {    if (_isRollbackOnly) {      if (_rollbackExn != null)	throw _rollbackExn;      else	throw new SQLException(L.l("can't get lock with rollback transaction"));    }    try {      if (_readLocks == null)	_readLocks = new ArrayList<Lock>();            if (_readLocks.contains(lock))	throw new SQLException(L.l("lockRead must not already have a read lock"));            lock.lockRead(this, _timeout);      _readLocks.add(lock);    } catch (SQLException e) {      setRollbackOnly(e);            throw e;    }  }  /**   * Acquires a new write lock.   */  public void lockReadAndWrite(Lock lock)    throws SQLException  {    if (_isRollbackOnly) {      if (_rollbackExn != null)	throw _rollbackExn;      else	throw new SQLException(L.l("can't get lock with rollback transaction"));    }    try {      if (_readLocks == null)	_readLocks = new ArrayList<Lock>();      if (_writeLocks == null)	_writeLocks = new ArrayList<Lock>();      if (_readLocks.contains(lock))	throw new SQLException(L.l("lockReadAndWrite cannot already have a read lock"));      if (_writeLocks.contains(lock))	throw new SQLException(L.l("lockReadAndWrite cannot already have a write lock"));            lock.lockReadAndWrite(this, _timeout);      _readLocks.add(lock);      _writeLocks.add(lock);    } catch (SQLException e) {      setRollbackOnly(e);            throw e;    }  }  /**   * Conditionally a new write lock, if no contention exists.   */  public boolean lockReadAndWriteNoWait(Lock lock)    throws SQLException  {    if (_isRollbackOnly) {      if (_rollbackExn != null)	throw _rollbackExn;      else	throw new SQLException(L.l("can't get lock with rollback transaction"));    }    try {      if (_readLocks == null)	_readLocks = new ArrayList<Lock>();      if (_writeLocks == null)	_writeLocks = new ArrayList<Lock>();      if (_readLocks.contains(lock))	throw new SQLException(L.l("lockReadAndWrite cannot already have a read lock"));      if (_writeLocks.contains(lock))	throw new SQLException(L.l("lockReadAndWrite cannot already have a write lock"));            if (lock.lockReadAndWriteNoWait()) {	_readLocks.add(lock);	_writeLocks.add(lock);	return true;      }    } catch (SQLException e) {      setRollbackOnly(e);            throw e;    }    return false;  }  /**   * Acquires a new write lock.   */  public void lockWrite(Lock lock)    throws SQLException  {    if (_isRollbackOnly) {      if (_rollbackExn != null)	throw _rollbackExn;      else	throw new SQLException(L.l("can't get lock with rollback transaction"));    }    try {      if (_readLocks == null)	_readLocks = new ArrayList<Lock>();      if (_writeLocks == null)	_writeLocks = new ArrayList<Lock>();      if (! _readLocks.contains(lock)) {	Thread.dumpStack();	throw new SQLException(L.l("lockWrite must already have a read lock"));      }      if (_writeLocks.contains(lock))	throw new SQLException(L.l("lockWrite cannot already have a write lock"));            lock.lockWrite(this, _timeout);      _writeLocks.add(lock);    } catch (SQLException e) {      setRollbackOnly(e);            throw e;    }  }  /**   * Adds a block for update.   */  public void addUpdateBlock(Block block)  {    if (block == null)      return;        if (_updateBlocks == null)      _updateBlocks = new ArrayList<Block>();    if (_updateBlocks.size() == 0	|| _updateBlocks.get(_updateBlocks.size() - 1) != block)      _updateBlocks.add(block);  }    /**   * If auto-commit, commit the read   */  public void autoCommitRead(Lock lock)    throws SQLException  {    unlockRead(lock);  }    public void unlockRead(Lock lock)    throws SQLException  {    if (_readLocks.remove(lock))      lock.unlockRead();  }    /**   * If auto-commit, commit the write   */  public void autoCommitWrite(Lock lock)    throws SQLException  {    _readLocks.remove(lock);    if (_writeLocks.remove(lock)) {      try {	commit();      } finally {	lock.unlockWrite();      }    }  }    public void unlockReadAndWrite(Lock lock)    throws SQLException  {    _readLocks.remove(lock);        if (_writeLocks.remove(lock)) {      lock.unlockReadAndWrite();    }  }    public void unlockWrite(Lock lock)    throws SQLException  {    if (_writeLocks.remove(lock)) {      lock.unlockWrite();    }  }  /**   * Returns a read block.   */  public Block readBlock(Store store, long blockAddress)    throws IOException  {    long blockId = store.addressToBlockId(blockAddress);          Block block;        if (_writeBlocks != null)      block = _writeBlocks.get(blockId);    else      block = null;    if (block != null)      block.allocate();    else      block = store.readBlock(blockId);    return block;  }  /**   * Returns a modified block.   */  public WriteBlock getWriteBlock(long blockId)  {    if (_writeBlocks == null)      return null;    return _writeBlocks.get(blockId);  }  /**   * Returns a modified block.   */  public WriteBlock createWriteBlock(Block block)    throws IOException  {    if (block instanceof WriteBlock)      return (WriteBlock) block;    WriteBlock writeBlock = getWriteBlock(block.getBlockId());    if (writeBlock != null) {      block.free();      writeBlock.allocate();      return writeBlock;    }        if (isAutoCommit())      writeBlock = new AutoCommitWriteBlock(block);    else {      // XXX: locking      writeBlock = new XAWriteBlock(block);      setBlock(writeBlock);    }    return writeBlock;  }  /**   * Returns a modified block.   */  public Block createAutoCommitWriteBlock(Block block)    throws IOException  {    if (block instanceof WriteBlock) {      return block;    }    else {      WriteBlock writeBlock = getWriteBlock(block.getBlockId());      if (writeBlock != null) {	block.free();	writeBlock.allocate();	return writeBlock;      }            writeBlock = new AutoCommitWriteBlock(block);      // setBlock(writeBlock);      return writeBlock;    }  }  /**   * Returns a modified block.   */  public Block allocateRow(Store store)    throws IOException  {    return store.allocateRow();  }  /**   * Returns a modified block.   */  public void deallocateBlock(Block block)    throws IOException  {    if (isAutoCommit())      block.getStore().freeBlock(block.getBlockId());    else {      if (_deallocateBlocks == null)	_deallocateBlocks = new ArrayList<Block>();            _deallocateBlocks.add(block);    }  }  /**   * Returns a modified block.   */  public Block createWriteBlock(Store store, long blockAddress)    throws IOException  {    Block block = readBlock(store, blockAddress);    return createWriteBlock(block);  }  /**   * Returns a modified block.   */  private void setBlock(WriteBlock block)  {    // block.setDirty();    if (_writeBlocks == null)      _writeBlocks = new LongKeyHashMap<WriteBlock>(8);    _writeBlocks.put(block.getBlockId(), block);  }  /**   * Adds inode which should be deleted on a commit.   */  public void addDeleteInode(Inode inode)  {    if (_deleteInodes == null)      _deleteInodes = new ArrayList<Inode>();        _deleteInodes.add(inode);  }  /**   * Adds inode which should be deleted on a rollback.   */  public void addAddInode(Inode inode)  {    if (_addInodes == null)      _addInodes = new ArrayList<Inode>();        _addInodes.add(inode);  }  public void autoCommit()    throws SQLException  {    if (_isAutoCommit) {      ConnectionImpl conn = _conn;      _conn = null;            if (conn != null) {	conn.setTransaction(null);      }    }  }  public void setRollbackOnly(SQLException e)  {    if (_rollbackExn == null)      _rollbackExn = e;        _isRollbackOnly = true;    releaseLocks();    // XXX: release write blocks    _writeBlocks = null;  }  public void setRollbackOnly()  {    setRollbackOnly(null);  }  public void commit()    throws SQLException  {    try {      writeData();    } finally {      releaseLocks();      close();    }  }  public void writeData()    throws SQLException  {    LongKeyHashMap<WriteBlock> writeBlocks = _writeBlocks;    if (_deleteInodes != null) {      while (_deleteInodes.size() > 0) {	Inode inode = _deleteInodes.remove(0);	// XXX: should be allocating based on auto-commit	inode.remove();      }    }    ArrayList<Block> updateBlocks = _updateBlocks;    _updateBlocks = null;        if (updateBlocks != null) {      while (updateBlocks.size() > 0) {	Block block = updateBlocks.remove(updateBlocks.size() - 1);	try {	  block.commit();	} catch (IOException e) {	  log.log(Level.WARNING, e.toString(), e);	}      }    }    if (writeBlocks != null) {      Iterator<WriteBlock> blockIter = writeBlocks.valueIterator();      while (blockIter.hasNext()) {	WriteBlock block = blockIter.next();	try {	  block.commit();	} catch (IOException e) {	  log.log(Level.WARNING, e.toString(), e);	}      }            // writeBlocks.clear();    }    if (_deallocateBlocks != null) {      while (_deallocateBlocks.size() > 0) {	Block block = _deallocateBlocks.remove(0);	try {	  block.getStore().freeBlock(block.getBlockId());	} catch (IOException e) {	  throw new SQLExceptionWrapper(e);	}      }    }  }  public void rollback()    throws SQLException  {    releaseLocks();    close();  }  private void releaseLocks()  {    // need to unlock write before upgrade to block other threads    if (_writeLocks != null) {      for (int i = 0; i < _writeLocks.size(); i++) {	Lock lock = _writeLocks.get(i);	if (_readLocks != null)	  _readLocks.remove(lock);	try {	  lock.unlockReadAndWrite();	} catch (Throwable e) {	  log.log(Level.WARNING, e.toString(), e);	}      }      _writeLocks.clear();    }        if (_readLocks != null) {      for (int i = 0; i < _readLocks.size(); i++) {	Lock lock = _readLocks.get(i);	try {	  lock.unlockRead();	} catch (Throwable e) {	  log.log(Level.WARNING, e.toString(), e);	}      }      _readLocks.clear();    }  }  void close()  {    LongKeyHashMap<WriteBlock> writeBlocks = _writeBlocks;    _writeBlocks = null;    if (writeBlocks != null) {      Iterator<WriteBlock> blockIter = writeBlocks.valueIterator();      while (blockIter.hasNext()) {	WriteBlock block = blockIter.next();	block.destroy();      }            // writeBlocks.clear();    }    _isRollbackOnly = false;    _rollbackExn = null;  }}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?