⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 datatodatabasesink.java

📁 OBPM是一个开源
💻 JAVA
📖 第 1 页 / 共 2 页
字号:
package org.apache.ddlutils.io;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;

import org.apache.commons.beanutils.DynaBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ddlutils.DatabaseOperationException;
import org.apache.ddlutils.Platform;
import org.apache.ddlutils.dynabean.SqlDynaClass;
import org.apache.ddlutils.model.Column;
import org.apache.ddlutils.model.Database;
import org.apache.ddlutils.model.ForeignKey;
import org.apache.ddlutils.model.Reference;
import org.apache.ddlutils.model.Table;

/**
 * Data sink that directly inserts the beans into the database. If configured, it will make
 * sure that the beans are inserted in the correct order according to the foreignkeys. Note
 * that this will only work if there are no circles.
 * 
 * @version $Revision: 289996 $
 */
public class DataToDatabaseSink implements DataSink
{
    /** Our log. */
    private final Log _log = LogFactory.getLog(DataToDatabaseSink.class);
 
    /** Generates the sql and writes it to the database. */
    private Platform _platform;
    /** The database model. */
    private Database _model;
    /** The connection to the database. */
    private Connection _connection;
    /** Whether to stop when an error has occurred while inserting a bean into the database. */
    private boolean _haltOnErrors = true;
    /** Whether to delay the insertion of beans so that the beans referenced by it via foreignkeys, are already inserted into the database. */
    private boolean _ensureFkOrder = true;
    /** Whether to use batch mode inserts. */
    private boolean _useBatchMode = false;
    /** The queued objects for batch insertion. */
    private ArrayList _batchQueue = new ArrayList();
    /** The number of beans to insert in one batch. */
    private int _batchSize = 1024;
    /** Stores the tables that are target of a foreign key. */
    private HashSet _fkTables = new HashSet();
    /** Contains the tables that have a self-referencing foreign key to a (partially) identity primary key. */
    private HashSet _tablesWithSelfIdentityReference = new HashSet();
    /** Contains the tables that have a self-referencing foreign key that is required. */
    private HashSet _tablesWithRequiredSelfReference = new HashSet();
    /** Maps original to processed identities. */
    private HashMap _identityMap = new HashMap();
    /** Stores the objects that are waiting for other objects to be inserted. */
    private ArrayList _waitingObjects = new ArrayList();

    /**
     * Creates a new sink application.
     * 
     * @param platform The database platform
     * @param model    The database model
     */
    public DataToDatabaseSink(Platform platform, Database model)
    {
        _platform = platform;
        _model    = model;
        for (int tableIdx = 0; tableIdx < model.getTableCount(); tableIdx++)
        {
            Table      table     = model.getTable(tableIdx);
            ForeignKey selfRefFk = table.getSelfReferencingForeignKey();

            if (selfRefFk != null)
            {
                Column[] pkColumns = table.getPrimaryKeyColumns();

                for (int idx = 0; idx < pkColumns.length; idx++)
                {
                    if (pkColumns[idx].isAutoIncrement())
                    {
                        _tablesWithSelfIdentityReference.add(table);
                        break;
                    }
                }
                for (int idx = 0; idx < selfRefFk.getReferenceCount(); idx++)
                {
                    if (selfRefFk.getReference(idx).getLocalColumn().isRequired())
                    {
                        _tablesWithRequiredSelfReference.add(table);
                        break;
                    }
                }
            }
        }
    }

    /**
     * Determines whether this sink halts when an error happens during the insertion of a bean
     * into the database. Default is <code>true</code>.
     *
     * @return <code>true</code> if the sink stops when an error occurred
     */
    public boolean isHaltOnErrors()
    {
        return _haltOnErrors;
    }

    /**
     * Specifies whether this sink halts when an error happens during the insertion of a bean
     * into the database.
     *
     * @param haltOnErrors <code>true</code> if the sink shall stop when an error occurred
     */
    public void setHaltOnErrors(boolean haltOnErrors)
    {
        _haltOnErrors = haltOnErrors;
    }

    /**
     * Determines whether the sink delays the insertion of beans so that the beans referenced by it
     * via foreignkeys are already inserted into the database.
     *
     * @return <code>true</code> if beans are inserted after its foreignkey-references
     */
    public boolean isEnsureFkOrder()
    {
        return _ensureFkOrder;
    }

    /**
     * Specifies whether the sink shall delay the insertion of beans so that the beans referenced by it
     * via foreignkeys are already inserted into the database.<br/>
     * Note that you should careful with setting <code>haltOnErrors</code> to false as this might
     * result in beans not inserted at all. The sink will then throw an appropriate exception at the end
     * of the insertion process (method {@link #end()}).
     *
     * @param ensureFkOrder <code>true</code> if beans shall be inserted after its foreignkey-references
     */
    public void setEnsureForeignKeyOrder(boolean ensureFkOrder)
    {
        _ensureFkOrder = ensureFkOrder;
    }

    /**
     * Determines whether batch mode is used for inserting the beans.
     *
     * @return <code>true</code> if batch mode is used (<code>false</code> per default)
     */
    public boolean isUseBatchMode()
    {
        return _useBatchMode;
    }

    /**
     * Specifies whether batch mode is used for inserting the beans. Note that this requires
     * that the primary key values are not defined by the database.
     *
     * @param useBatchMode <code>true</code> if batch mode shall be used
     */
    public void setUseBatchMode(boolean useBatchMode)
    {
        _useBatchMode = useBatchMode;
    }

    /**
     * Returns the (maximum) number of beans to insert in one batch.
     *
     * @return The number of beans
     */
    public int getBatchSize()
    {
        return _batchSize;
    }

    /**
     * Sets the (maximum) number of beans to insert in one batch.
     *
     * @param batchSize The number of beans
     */
    public void setBatchSize(int batchSize)
    {
        _batchSize = batchSize;
    }

    /**
     * {@inheritDoc}
     */
    public void end() throws DataSinkException
    {
        purgeBatchQueue();
        try
        {
            _connection.close();
        }
        catch (SQLException ex)
        {
            throw new DataSinkException(ex);
        }
        if (!_waitingObjects.isEmpty())
        {
            if (_log.isDebugEnabled())
            {
                for (Iterator it = _waitingObjects.iterator(); it.hasNext();)
                {
                    WaitingObject obj   = (WaitingObject)it.next();
                    Table         table = _model.getDynaClassFor(obj.getObject()).getTable();
                    Identity      objId = buildIdentityFromPKs(table, obj.getObject());

                    _log.debug("Row " + objId + " is still not written because it depends on these yet unwritten rows");
                    for (Iterator fkIt = obj.getPendingFKs(); fkIt.hasNext();)
                    {
                        Identity pendingFkId = (Identity)fkIt.next();

                        _log.debug("  " + pendingFkId);
                    }
                        
                }
            }
            if (_waitingObjects.size() == 1)
            {
                throw new DataSinkException("There is one row still not written because of missing referenced rows");
            }
            else
            {
                throw new DataSinkException("There are " + _waitingObjects.size() + " rows still not written because of missing referenced rows");
            }
        }
    }

    /**
     * {@inheritDoc}
     */
    public void start() throws DataSinkException
    {
        _fkTables.clear();
        _waitingObjects.clear();
        if (_ensureFkOrder)
        {
            for (int tableIdx = 0; tableIdx < _model.getTableCount(); tableIdx++)
            {
                Table table = _model.getTable(tableIdx);
    
                for (int fkIdx = 0; fkIdx < table.getForeignKeyCount(); fkIdx++)
                {
                    ForeignKey curFk = table.getForeignKey(fkIdx);
    
                    _fkTables.add(curFk.getForeignTable());
                }
            }
        }
        try
        {
            _connection = _platform.borrowConnection();
        }
        catch (DatabaseOperationException ex)
        {
            throw new DataSinkException(ex);
        }
    }

    /**
     * {@inheritDoc}
     */
    public void addBean(DynaBean bean) throws DataSinkException
    {
        Table    table        = _model.getDynaClassFor(bean).getTable();
        Identity origIdentity = buildIdentityFromPKs(table, bean);

        if (_ensureFkOrder && (table.getForeignKeyCount() > 0))
        {
            WaitingObject waitingObj = new WaitingObject(bean, origIdentity);

            for (int idx = 0; idx < table.getForeignKeyCount(); idx++)
            {
                ForeignKey fk         = table.getForeignKey(idx);
                Identity   fkIdentity = buildIdentityFromFK(table, fk, bean);

                if ((fkIdentity != null) && !fkIdentity.equals(origIdentity))
                {
                    Identity processedIdentity = (Identity)_identityMap.get(fkIdentity);

                    if (processedIdentity != null)
                    {
                        updateFKColumns(bean, fkIdentity.getForeignKeyName(), processedIdentity);
                    }
                    else
                    {
                        waitingObj.addPendingFK(fkIdentity);
                    }
                }
            }
            if (waitingObj.hasPendingFKs())
            {
                if (_log.isDebugEnabled())
                {
                    StringBuffer msg = new StringBuffer();

                    msg.append("Defering insertion of row ");
                    msg.append(buildIdentityFromPKs(table, bean).toString());
                    msg.append(" because it is waiting for:");
                    for (Iterator it = waitingObj.getPendingFKs(); it.hasNext();)
                    {
                        msg.append("\n  ");
                        msg.append(it.next().toString());
                    }
                    _log.debug(msg.toString());
                }
                _waitingObjects.add(waitingObj);
                return;
            }
        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -