⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 persistencespace.java

📁 Java的面向对象数据库系统的源代码
💻 JAVA
字号:
// You can redistribute this software and/or modify it under the terms of
// the Ozone Core License version 1 published by ozone-db.org.
//
// The original code and portions created by SMB are
// Copyright (C) 1997-@year@ by SMB GmbH. All rights reserved.
//
// $Id$

package org.ozoneDB.core.storage.classicStore;

import org.ozoneDB.core.*;
import org.ozoneDB.core.storage.classicStore.ClassicStore;
import org.ozoneDB.core.storage.classicStore.Cluster;
import org.ozoneDB.core.storage.classicStore.ClusterID;
import org.ozoneDB.core.storage.classicStore.DeathObject;
import org.ozoneDB.util.*;
import org.ozoneDB.DxLib.*;
import org.ozoneDB.OzoneInternalException;

import java.io.*;


/**
 * @author <a href="http://www.softwarebuero.de/">SMB</a>
 */
public class PersistenceSpace extends Object {
    final static String TRANSACTION_FLAG = "transaction";
    final static int TRANSACTION_FLAG_VERSION = 1;
    final static int PROPS_FILE_VERSION = 1;

    final static String CID = "ozoneDB.classicStore.clusterID";

    Env env;
    ClassicStore classicStore;

    Cluster currentCluster;
    TransactionID currentTransaction;
    DxSet touchedClusters;
    DxSet clustersToCompress;


    public PersistenceSpace( Env _env ) {
        env = _env;
        classicStore = (ClassicStore)env.getStoreManager();
    }


    /**
     */
    protected boolean startup() throws Exception {
        //env.logWriter.newEntry (this, "PersistenceSpace.open", LogWriter.DEBUG);
        File transFile = new File( env.getDatabaseDir() + Env.DATA_DIR, TRANSACTION_FLAG );
        if (transFile.exists()) {
            // we had a crash (transaction abort while commiting):
            // rollback the transaction to get a consitent database
            rollBackTransaction( transFile );
        }

        if (!readProperties()) {
            // check, if the datadir is empty, i.e. we start the first time
            String[] list = new File( env.getDatabaseDir() + Env.DATA_DIR ).list();
            if (list.length != 0) {
                recover();
            } else {
                newCluster();
            }
        }
        return true;
    }


    /**
     */
    protected boolean shutdown() throws Exception {
        //env.logWriter.newEntry (this, "PersistenceSpace.close", LogWriter.DEBUG);
        if (currentCluster != null) {
            writeProperties();
            currentCluster.close();
        }

        currentCluster = null;
        touchedClusters = null;
        clustersToCompress = null;

        return true;
    }


    /**
     */
    protected boolean readProperties() {
        ClusterID cid = (ClusterID)env.state.property( CID, null );
        if (cid == null) {
            return false;
        }

        currentCluster = new Cluster( env, classicStore, cid );
        return true;
    }


    /**
     */
    protected void writeProperties() {
        env.state.setProperty( CID, currentCluster.cluID() );
    }


    /**
     * begins a transaction commit with setting the transaction label
     */
    protected void startTransaction( TransactionID tid ) throws OzoneInternalException {
        //env.logWriter.newEntry ("PersistenceSpace.beginTransaction: " + tid, LogWriter.DEBUG);
        currentTransaction = tid;
        touchedClusters = new DxHashSet();
        clustersToCompress = new DxHashSet();

        try {
            // write the transaction flag to harddisk
            FileOutputStream fo = new FileOutputStream( new File( env.getDatabaseDir() + Env.DATA_DIR, TRANSACTION_FLAG ) );
            DataOutputStream out = new DataOutputStream( fo );
            out.writeInt( TRANSACTION_FLAG_VERSION );
            out.writeLong( currentTransaction.value() );
            // rescue the current cluster id
            out.writeLong( currentCluster.cluID().value() );
            out.close();
        } catch (IOException e) {
            throw new OzoneInternalException("Failed to start transaction", e);
        }
    }


    /** */
    protected void prepareCommitTransaction( TransactionID tid ) throws OzoneInternalException {
        try {
            // close the current cluster stream
            currentCluster.close();

            // remove now the deleted clusters from disk
            DxIterator it = clustersToCompress.iterator();
            // 1 : compress all clusters
            while (it.next() != null) {
                compressCluster( (ClusterID)it.object() );
            }
            // 2 : if everything was fine, remove the cluster files
            it.reset();
            while (it.next() != null) {
                new Cluster( env, classicStore, (ClusterID)it.object() ).removeFromDisk();
            }
        } catch (IOException e) {
            throw new OzoneInternalException("Failed to prepare to commit the transaction", e);
        }
    }


    /** */
    protected void commitTransaction( TransactionID tid ) {
        // remove the transaction label
        File f = new File( env.getDatabaseDir() + Env.DATA_DIR, TRANSACTION_FLAG );
        if (f.exists()) {
            f.delete();
        }

        //env.logWriter.newEntry ("PersistenceSpace.endTransaction: " + currentTransaction, LogWriter.DEBUG);
        touchedClusters = null;
        clustersToCompress = null;
        currentTransaction = null;
    }


    /** */
    protected void abortTransaction( TransactionID tid ) {
    }


    /**
     */
    private void registerCluster( ClusterID cid ) throws OzoneInternalException {
        if (!touchedClusters.contains( cid )) {
            touchedClusters.add( cid );

            try {
                // write the cluster id
                FileOutputStream fo =
                        new FileOutputStream( new File( env.getDatabaseDir() + Env.DATA_DIR, TRANSACTION_FLAG ).toString(), true );
                DataOutputStream out = new DataOutputStream( fo );
                out.writeLong( cid.value() );
                out.close();
            } catch (IOException e) {
                throw new OzoneInternalException("Failed to register cluster", e);
            }
        }
    }


    /**
     */
    private Cluster newCluster() throws IOException {
        // close the old cluster stream before creating a new one
        Cluster oldCluster = null;
        if (currentCluster != null) {
            oldCluster = currentCluster;
            currentCluster.close();
        }

        // retieve a new clusterid and create a cluster
        currentCluster = new Cluster( env, classicStore, new ClusterID( env.keyGenerator.nextID() ) );

        // check, if the last cluster has to be compressed;
        // this can't be done in writeLeak() while the cluster is open
        if (oldCluster != null && oldCluster.needsCompressing()) {
            clustersToCompress.add( oldCluster.cluID() );
        }

        // save the current cluster
        writeProperties();

        return currentCluster;
    }


    /** */
    protected Cluster readCluster( ClusterID cid, int whatToRead ) throws OzoneInternalException {
        //env.logWriter.newEntry ("PersistenceSpace.readCluster: " + cid, LogWriter.DEBUG);
        // opening the same file for writing _and_ reading causes trouble
        Cluster cl = null;
        try {
            if (cid.equals( currentCluster.cluID() )) {
                currentCluster.close();
            }

            cl = new Cluster( env, classicStore, cid );
            cl.readObjects( whatToRead, null );

            // reopen, if necessary
            if (cid.equals( currentCluster.cluID() )) {
                currentCluster.open();
            }
        } catch (Exception e) {
            throw new OzoneInternalException("Failed to read cluster", e);
        }

        return cl;
    }


    /**
     */
    protected void compressCluster( ClusterID cid ) throws IOException {
        //env.logWriter.newEntry ("PersistanceSpace.compressCluster: " + cid, LogWriter.DEBUG);
        Cluster cl = new Cluster( env, classicStore, cid );
        cl.readObjects( Cluster.DATA, null );

        DeathObject dobj;
        DxIterator it = cl.objects().iterator();
        while ((dobj = (DeathObject)it.next()) != null) {
            writeObject( dobj, false, false );
        }
    }


    /**
     */
    protected ClusterID[] allClusters() {
        File path = new File( env.getDatabaseDir() + Env.DATA_DIR );
        String[] fileList = path.list( new FilenameFilter() {


            public boolean accept( File dir, String name ) {
                return name.endsWith( Cluster.CLUSTER_FILE_SUFF );
            }
        } );
        ClusterID[] result = new ClusterID[fileList.length];

        for (int i = 0; i < fileList.length; i++) {
            result[i] = new ClusterID( Long.parseLong( fileList[i].substring( 0,
                    fileList[i].length() - Cluster.CLUSTER_FILE_SUFF.length() ) ) );
        }

        return result;
    }


    /**
     */
    protected ClusterID writeObject( DeathObject dobj, boolean serialize, boolean useClone ) throws IOException {
        //env.logWriter.newEntry ("PersistenceSpace.writeObject: " + dobj.objID(), LogWriter.DEBUG);
        // create new cluster if necessary
        if (currentCluster.size() > Cluster.MAX_SIZE) {
            newCluster();
        }

        // assign the current cluster to the current transaction
        // we have to that _before_ writing the object, because if something
        // goes wrong while registering the operation isn't performed
        // and the database stays consistent
        registerCluster( currentCluster.cluID() );

        // at first set the object's clusterId and then write the object
        dobj.container().setClusterID( currentCluster.cluID() );
        currentCluster.appendObject( dobj, currentTransaction, serialize, useClone );
        return currentCluster.cluID();
    }


    /**
     */
    protected void writeLeak( ClusterID cid, DeathObject dobj ) throws OzoneInternalException {
        //env.logWriter.newEntry ("PersistenceSpace.writeLeak: " + cid + " : " + dobj.objID(), LogWriter.DEBUG);

        // assign the touched cluster to the current transaction
        // we have to that _before_ writeing the leak, because if something
        // goes wrong while registering the operation isn't performed
        // and the database stays consistent
        registerCluster( cid );

        // write the leak
        Cluster cl = new Cluster( env, classicStore, cid );
        try {
            cl.writeLeak( dobj, currentTransaction );
        } catch (IOException e) {
            throw new OzoneInternalException("Failed to write leak", e);
        }

        // we must not compress the current cluster ! This is technical
        // (we can't open the same file for read _and_ read at the same time)
        // and logical (we can't append the objects of the cluster to the
        // cluster itself) not possible. The current cluster will be
        // compressed in newCluster()
        if (currentCluster.cluID().equals( cid )) {
            return;
        }

        // retrieve the cluster size simply of it's file size
        // this is much faster than reading the whole cluster
        long clSize = cl.fileHandle().length();
        if (clSize > 0) {
            //env.logWriter.newEntry ("LEAK_WEIGHT = " + cl.leakSize() + " / " + clSize, LogWriter.DEBUG);
            if ((double)cl.leakSize() / clSize > Cluster.LEAK_WEIGHT) {
                clustersToCompress.add( cid );
            }
        }
    }


    /**
     */
    protected void fillObjectSpace() {
        env.logWriter.newEntry( this, "ObjectSpace recovery ...", LogWriter.INFO );
        int count = 0;
        ClusterID[] clusters = allClusters();
        for (int i = 0; i < clusters.length; i++) {
            try {
                ObjectContainer os;
                Cluster cl = new Cluster( env, classicStore, clusters[i] );
                cl.readObjects( Cluster.STATE, null );
                DxIterator it = cl.objects().iterator();
                while ((os = (ObjectContainer)it.next()) != null) {
                    ((ClassicStore)env.getStoreManager()).objectSpace.deleteObject( os );
                    ((ClassicStore)env.getStoreManager()).objectSpace.addObject( os );
                    count++;
                //env.logWriter.newEntry ("adding: " + os.id(), LogWriter.DEBUG);
                }
            } catch (Exception e) {
                env.fatalError( this, "fillObjectSpace: " + e.toString(), e );
            }
        }
        env.logWriter.newEntry( this, count + " objects found.", LogWriter.INFO );
    }


    /**
     * do some recover stuff besides transaction rollback
     */
    protected void recover() {
    }


    /**
     */
    protected void rollBackTransaction( File transFlag ) throws Exception {
        TransactionID rollBackTid = null;
        DxBag clusters = new DxArrayBag();
        try {
            // open the flag file
            FileInputStream fi = new FileInputStream( transFlag );
            DataInputStream in = new DataInputStream( fi );

            in.readInt();
            rollBackTid = new TransactionID( in.readLong() );
            // recover the current cluster
            currentCluster = new Cluster( env, classicStore, new ClusterID( in.readLong() ) );
            // get all assigned clusters
            while (fi.available() != 0) {
                clusters.add( new ClusterID( in.readLong() ) );
            }

            in.close();
        } catch (IOException e) {
            env.logWriter.newEntry( this, "rollback transaction: flag file corrupted", LogWriter.WARN );
        }

        //env.logWriter.newEntry ("rollback transaction: " + rollBackTid + " with " + clusters.count() + " clusters", LogWriter.DEBUG);

        // rollback the clusters
        ClusterID cid;
        DxIterator it = clusters.iterator();
        while ((cid = (ClusterID)it.next()) != null) {
            Cluster cl = new Cluster( env, classicStore, cid );
            //env.logWriter.newEntry ("rollback : " + cid, LogWriter.DEBUG);
            cl.rollBack( rollBackTid );
        }

        // save the recovered properties
        writeProperties();

        transFlag.delete();
        touchedClusters = null;
        clustersToCompress = null;
        currentTransaction = null;
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -