📄 persistencespace.java
字号:
// You can redistribute this software and/or modify it under the terms of
// the Ozone Core License version 1 published by ozone-db.org.
//
// The original code and portions created by SMB are
// Copyright (C) 1997-@year@ by SMB GmbH. All rights reserved.
//
// $Id$
package org.ozoneDB.core.storage.classicStore;
import org.ozoneDB.core.*;
import org.ozoneDB.core.storage.classicStore.ClassicStore;
import org.ozoneDB.core.storage.classicStore.Cluster;
import org.ozoneDB.core.storage.classicStore.ClusterID;
import org.ozoneDB.core.storage.classicStore.DeathObject;
import org.ozoneDB.util.*;
import org.ozoneDB.DxLib.*;
import org.ozoneDB.OzoneInternalException;
import java.io.*;
/**
* @author <a href="http://www.softwarebuero.de/">SMB</a>
*/
public class PersistenceSpace extends Object {
final static String TRANSACTION_FLAG = "transaction";
final static int TRANSACTION_FLAG_VERSION = 1;
final static int PROPS_FILE_VERSION = 1;
final static String CID = "ozoneDB.classicStore.clusterID";
Env env;
ClassicStore classicStore;
Cluster currentCluster;
TransactionID currentTransaction;
DxSet touchedClusters;
DxSet clustersToCompress;
public PersistenceSpace( Env _env ) {
env = _env;
classicStore = (ClassicStore)env.getStoreManager();
}
/**
*/
protected boolean startup() throws Exception {
//env.logWriter.newEntry (this, "PersistenceSpace.open", LogWriter.DEBUG);
File transFile = new File( env.getDatabaseDir() + Env.DATA_DIR, TRANSACTION_FLAG );
if (transFile.exists()) {
// we had a crash (transaction abort while commiting):
// rollback the transaction to get a consitent database
rollBackTransaction( transFile );
}
if (!readProperties()) {
// check, if the datadir is empty, i.e. we start the first time
String[] list = new File( env.getDatabaseDir() + Env.DATA_DIR ).list();
if (list.length != 0) {
recover();
} else {
newCluster();
}
}
return true;
}
/**
*/
protected boolean shutdown() throws Exception {
//env.logWriter.newEntry (this, "PersistenceSpace.close", LogWriter.DEBUG);
if (currentCluster != null) {
writeProperties();
currentCluster.close();
}
currentCluster = null;
touchedClusters = null;
clustersToCompress = null;
return true;
}
/**
*/
protected boolean readProperties() {
ClusterID cid = (ClusterID)env.state.property( CID, null );
if (cid == null) {
return false;
}
currentCluster = new Cluster( env, classicStore, cid );
return true;
}
/**
*/
protected void writeProperties() {
env.state.setProperty( CID, currentCluster.cluID() );
}
/**
* begins a transaction commit with setting the transaction label
*/
protected void startTransaction( TransactionID tid ) throws OzoneInternalException {
//env.logWriter.newEntry ("PersistenceSpace.beginTransaction: " + tid, LogWriter.DEBUG);
currentTransaction = tid;
touchedClusters = new DxHashSet();
clustersToCompress = new DxHashSet();
try {
// write the transaction flag to harddisk
FileOutputStream fo = new FileOutputStream( new File( env.getDatabaseDir() + Env.DATA_DIR, TRANSACTION_FLAG ) );
DataOutputStream out = new DataOutputStream( fo );
out.writeInt( TRANSACTION_FLAG_VERSION );
out.writeLong( currentTransaction.value() );
// rescue the current cluster id
out.writeLong( currentCluster.cluID().value() );
out.close();
} catch (IOException e) {
throw new OzoneInternalException("Failed to start transaction", e);
}
}
/** */
protected void prepareCommitTransaction( TransactionID tid ) throws OzoneInternalException {
try {
// close the current cluster stream
currentCluster.close();
// remove now the deleted clusters from disk
DxIterator it = clustersToCompress.iterator();
// 1 : compress all clusters
while (it.next() != null) {
compressCluster( (ClusterID)it.object() );
}
// 2 : if everything was fine, remove the cluster files
it.reset();
while (it.next() != null) {
new Cluster( env, classicStore, (ClusterID)it.object() ).removeFromDisk();
}
} catch (IOException e) {
throw new OzoneInternalException("Failed to prepare to commit the transaction", e);
}
}
/** */
protected void commitTransaction( TransactionID tid ) {
// remove the transaction label
File f = new File( env.getDatabaseDir() + Env.DATA_DIR, TRANSACTION_FLAG );
if (f.exists()) {
f.delete();
}
//env.logWriter.newEntry ("PersistenceSpace.endTransaction: " + currentTransaction, LogWriter.DEBUG);
touchedClusters = null;
clustersToCompress = null;
currentTransaction = null;
}
/** */
protected void abortTransaction( TransactionID tid ) {
}
/**
*/
private void registerCluster( ClusterID cid ) throws OzoneInternalException {
if (!touchedClusters.contains( cid )) {
touchedClusters.add( cid );
try {
// write the cluster id
FileOutputStream fo =
new FileOutputStream( new File( env.getDatabaseDir() + Env.DATA_DIR, TRANSACTION_FLAG ).toString(), true );
DataOutputStream out = new DataOutputStream( fo );
out.writeLong( cid.value() );
out.close();
} catch (IOException e) {
throw new OzoneInternalException("Failed to register cluster", e);
}
}
}
/**
*/
private Cluster newCluster() throws IOException {
// close the old cluster stream before creating a new one
Cluster oldCluster = null;
if (currentCluster != null) {
oldCluster = currentCluster;
currentCluster.close();
}
// retieve a new clusterid and create a cluster
currentCluster = new Cluster( env, classicStore, new ClusterID( env.keyGenerator.nextID() ) );
// check, if the last cluster has to be compressed;
// this can't be done in writeLeak() while the cluster is open
if (oldCluster != null && oldCluster.needsCompressing()) {
clustersToCompress.add( oldCluster.cluID() );
}
// save the current cluster
writeProperties();
return currentCluster;
}
/** */
protected Cluster readCluster( ClusterID cid, int whatToRead ) throws OzoneInternalException {
//env.logWriter.newEntry ("PersistenceSpace.readCluster: " + cid, LogWriter.DEBUG);
// opening the same file for writing _and_ reading causes trouble
Cluster cl = null;
try {
if (cid.equals( currentCluster.cluID() )) {
currentCluster.close();
}
cl = new Cluster( env, classicStore, cid );
cl.readObjects( whatToRead, null );
// reopen, if necessary
if (cid.equals( currentCluster.cluID() )) {
currentCluster.open();
}
} catch (Exception e) {
throw new OzoneInternalException("Failed to read cluster", e);
}
return cl;
}
/**
*/
protected void compressCluster( ClusterID cid ) throws IOException {
//env.logWriter.newEntry ("PersistanceSpace.compressCluster: " + cid, LogWriter.DEBUG);
Cluster cl = new Cluster( env, classicStore, cid );
cl.readObjects( Cluster.DATA, null );
DeathObject dobj;
DxIterator it = cl.objects().iterator();
while ((dobj = (DeathObject)it.next()) != null) {
writeObject( dobj, false, false );
}
}
/**
*/
protected ClusterID[] allClusters() {
File path = new File( env.getDatabaseDir() + Env.DATA_DIR );
String[] fileList = path.list( new FilenameFilter() {
public boolean accept( File dir, String name ) {
return name.endsWith( Cluster.CLUSTER_FILE_SUFF );
}
} );
ClusterID[] result = new ClusterID[fileList.length];
for (int i = 0; i < fileList.length; i++) {
result[i] = new ClusterID( Long.parseLong( fileList[i].substring( 0,
fileList[i].length() - Cluster.CLUSTER_FILE_SUFF.length() ) ) );
}
return result;
}
/**
*/
protected ClusterID writeObject( DeathObject dobj, boolean serialize, boolean useClone ) throws IOException {
//env.logWriter.newEntry ("PersistenceSpace.writeObject: " + dobj.objID(), LogWriter.DEBUG);
// create new cluster if necessary
if (currentCluster.size() > Cluster.MAX_SIZE) {
newCluster();
}
// assign the current cluster to the current transaction
// we have to that _before_ writing the object, because if something
// goes wrong while registering the operation isn't performed
// and the database stays consistent
registerCluster( currentCluster.cluID() );
// at first set the object's clusterId and then write the object
dobj.container().setClusterID( currentCluster.cluID() );
currentCluster.appendObject( dobj, currentTransaction, serialize, useClone );
return currentCluster.cluID();
}
/**
*/
protected void writeLeak( ClusterID cid, DeathObject dobj ) throws OzoneInternalException {
//env.logWriter.newEntry ("PersistenceSpace.writeLeak: " + cid + " : " + dobj.objID(), LogWriter.DEBUG);
// assign the touched cluster to the current transaction
// we have to that _before_ writeing the leak, because if something
// goes wrong while registering the operation isn't performed
// and the database stays consistent
registerCluster( cid );
// write the leak
Cluster cl = new Cluster( env, classicStore, cid );
try {
cl.writeLeak( dobj, currentTransaction );
} catch (IOException e) {
throw new OzoneInternalException("Failed to write leak", e);
}
// we must not compress the current cluster ! This is technical
// (we can't open the same file for read _and_ read at the same time)
// and logical (we can't append the objects of the cluster to the
// cluster itself) not possible. The current cluster will be
// compressed in newCluster()
if (currentCluster.cluID().equals( cid )) {
return;
}
// retrieve the cluster size simply of it's file size
// this is much faster than reading the whole cluster
long clSize = cl.fileHandle().length();
if (clSize > 0) {
//env.logWriter.newEntry ("LEAK_WEIGHT = " + cl.leakSize() + " / " + clSize, LogWriter.DEBUG);
if ((double)cl.leakSize() / clSize > Cluster.LEAK_WEIGHT) {
clustersToCompress.add( cid );
}
}
}
/**
*/
protected void fillObjectSpace() {
env.logWriter.newEntry( this, "ObjectSpace recovery ...", LogWriter.INFO );
int count = 0;
ClusterID[] clusters = allClusters();
for (int i = 0; i < clusters.length; i++) {
try {
ObjectContainer os;
Cluster cl = new Cluster( env, classicStore, clusters[i] );
cl.readObjects( Cluster.STATE, null );
DxIterator it = cl.objects().iterator();
while ((os = (ObjectContainer)it.next()) != null) {
((ClassicStore)env.getStoreManager()).objectSpace.deleteObject( os );
((ClassicStore)env.getStoreManager()).objectSpace.addObject( os );
count++;
//env.logWriter.newEntry ("adding: " + os.id(), LogWriter.DEBUG);
}
} catch (Exception e) {
env.fatalError( this, "fillObjectSpace: " + e.toString(), e );
}
}
env.logWriter.newEntry( this, count + " objects found.", LogWriter.INFO );
}
/**
* do some recover stuff besides transaction rollback
*/
protected void recover() {
}
/**
*/
protected void rollBackTransaction( File transFlag ) throws Exception {
TransactionID rollBackTid = null;
DxBag clusters = new DxArrayBag();
try {
// open the flag file
FileInputStream fi = new FileInputStream( transFlag );
DataInputStream in = new DataInputStream( fi );
in.readInt();
rollBackTid = new TransactionID( in.readLong() );
// recover the current cluster
currentCluster = new Cluster( env, classicStore, new ClusterID( in.readLong() ) );
// get all assigned clusters
while (fi.available() != 0) {
clusters.add( new ClusterID( in.readLong() ) );
}
in.close();
} catch (IOException e) {
env.logWriter.newEntry( this, "rollback transaction: flag file corrupted", LogWriter.WARN );
}
//env.logWriter.newEntry ("rollback transaction: " + rollBackTid + " with " + clusters.count() + " clusters", LogWriter.DEBUG);
// rollback the clusters
ClusterID cid;
DxIterator it = clusters.iterator();
while ((cid = (ClusterID)it.next()) != null) {
Cluster cl = new Cluster( env, classicStore, cid );
//env.logWriter.newEntry ("rollback : " + cid, LogWriter.DEBUG);
cl.rollBack( rollBackTid );
}
// save the recovered properties
writeProperties();
transFlag.delete();
touchedClusters = null;
clustersToCompress = null;
currentTransaction = null;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -