📄 cluster.java
字号:
// You can redistribute this software and/or modify it under the terms of
// the Ozone Core License version 1 published by ozone-db.org.
//
// The original code and portions created by SMB are
// Copyright (C) 1997-@year@ by SMB GmbH. All rights reserved.
//
// $Id$
package org.ozoneDB.core.storage.classicStore;
import java.io.*;
import org.ozoneDB.core.*;
import org.ozoneDB.core.storage.classicStore.ClassicObjectContainer;
import org.ozoneDB.core.storage.classicStore.ClassicStore;
import org.ozoneDB.util.*;
import org.ozoneDB.DxLib.*;
import org.ozoneDB.io.stream.ResolvingObjectInputStream;
/**
* @author <a href="http://www.softwarebuero.de/">SMB</a>
*/
public class Cluster extends Object {
public final static int VERSION = 1;
public static int MAX_SIZE = 64 * 1024;
public final static String CLUSTER_FILE_SUFF = ".cl";
public final static String LEAK_FILE_SUFF = ".lk";
public final static String RECOVERY_SUFF = ".rec";
public final static double LEAK_WEIGHT = 0.5;
public final static int DATA = 1;
public final static int STATE = 2;
public final static int TRANS = 4;
// chunk id's
public final static byte CL_HEADER_CHUNK = 1;
public final static byte DATA_OID_CHUNK = 2;
public final static byte DATA_HEADER_CHUNK = 3;
public final static byte DATA_CHUNK = 4;
// recovery modes
public final static byte NONE = 0;
public final static byte OBJECTS = 1;
public final static byte LEAKS = 2;
class Chunk extends Object {
public byte id;
public byte[] data;
public int dataLength;
public Object object;
public Chunk() {
}
public Chunk( byte _id, byte[] _data ) {
id = _id;
data = _data;
dataLength = data.length;
}
public Chunk( byte _id, Object obj ) {
id = _id;
object = obj;
}
}
/** */
ClusterID cid;
/** */
byte recoveryMode = NONE;
/** */
long clusterSize = 0;
/** */
long leakSize = -1;
/** */
DataOutputStream stream;
/** */
DxCollection objects;
/** */
Env env;
ClassicStore classicStore;
/**
*/
public Cluster( Env _env, ClassicStore _classicStore ) {
env = _env;
classicStore = _classicStore;
}
/**
*/
public Cluster( Env _env, ClassicStore _classicStore, ClusterID _cid ) {
env = _env;
classicStore = _classicStore;
cid = _cid;
}
protected void finalize() throws Throwable {
super.finalize();
close();
}
/**
*/
public final ClusterID cluID() {
return cid;
}
/**
*/
public final DxCollection objects() {
return objects;
}
/**
*/
public final void beginRecovery( byte mode ) {
recoveryMode = mode;
}
/**
*/
public final void endRecovery( byte mode ) {
File file = mode == OBJECTS ? fileHandle() : leakFileHandle();
recoveryMode = NONE;
file.renameTo( mode == OBJECTS ? fileHandle() : leakFileHandle() );
}
/**
*/
public final File fileHandle() {
return new File( env.getDatabaseDir() + Env.DATA_DIR, cid.toString() + Cluster.CLUSTER_FILE_SUFF + (recoveryMode == OBJECTS
? Cluster.RECOVERY_SUFF
: "") );
}
/**
*/
public final File leakFileHandle() {
return new File( env.getDatabaseDir() + Env.DATA_DIR, cid.toString() + Cluster.LEAK_FILE_SUFF + (recoveryMode == LEAKS
? Cluster.RECOVERY_SUFF
: "") );
}
/**
* The cluster size has to different meanings:
* - while writing it means the size of the cluster file, so that we can
* limit the file's size as good as possible to the user defined
* maximum cluster size
* - while reading it means the sum of the sizes of all its death objects,
* so that we can determine exactly the space we will need in the object
* buffer of the cluster space
*/
public final long size() {
return clusterSize;
}
/** */
public void open() throws IOException {
//env.logWriter.newEntry ("Cluster.open: " + cid, LogWriter.DEBUG);
if (stream != null) {
return;
}
File file = fileHandle();
boolean newCluster = !file.exists();
stream = new DataOutputStream( new BufferedOutputStream( new FileOutputStream( file.toString(), true ),
4 * 1024 ) );
if (newCluster) {
writeHeader();
}
clusterSize = file.length();
}
/** */
public void close() throws IOException {
//env.logWriter.newEntry ("Cluster.close: " + cid, LogWriter.DEBUG);
if (stream != null) {
stream.close();
stream = null;
}
}
/** */
public void writeHeader() throws IOException {
//env.logWriter.newEntry ("Cluster.writeHeader: " + cid, LogWriter.DEBUG);
// write cluster version and id
stream.writeInt( Cluster.VERSION );
stream.writeLong( cid.value() );
}
/**
* size of a object entry:
* ObjectID : 8 bytes
* TransactionID : 8 bytes
* streamed ObjectContainer: comes from DeathObject.stateSize
* ChunkID : 1 byte
* data length : 4 bytes
* data itself : n bytes
* -> data overhead : 21 bytes + object state
*/
private final long entrySize( DeathObject dobj ) {
return dobj.size() + dobj.stateSize + 21;
}
/**
*/
public void appendObject( DeathObject dobj, TransactionID tid, boolean serialize, boolean useClone )
throws IOException {
env.logWriter.newEntry( this,
"Cluster " + cid + " appendObject: " + dobj.objID() + ", " + tid + ", " + serialize + ", " + useClone,
LogWriter.DEBUG3 );
open();
// write the object id
stream.writeLong( dobj.objID().value() );
// write transaction id
stream.writeLong( tid.value() );
// write the object' container
Chunk chunk = new Chunk( DATA_HEADER_CHUNK, dobj.container() );
writeChunk( stream, chunk );
dobj.stateSize = chunk.dataLength;
// write the object itself: if we are in recovery mode, we use the
// dobj.data() directly because it is already set
if (serialize) {
chunk = new Chunk( DATA_CHUNK, useClone ? dobj.container().targetShadow() : dobj.container().target() );
} else {
chunk = new Chunk( DATA_CHUNK, dobj.data() );
}
writeChunk( stream, chunk );
dobj.setSize( chunk.data.length );
clusterSize = fileHandle().length();
}
/**
* reads all objects from the cluster, while dropping leaks in normal mode
* or broken-transaction-objects in recovery mode;
* returns false if any objects were dropped
*/
public boolean readObjects( int whatToRead, TransactionID rollBackTid ) throws IOException {
//env.logWriter.newEntry ("Cluster " + cid + " readObjects: " + whatToRead + ", " + rollBackTid, LogWriter.DEBUG);
boolean result = true;
DataInputStream fi = new DataInputStream( new FileInputStream( fileHandle() ) );
int version = fi.readInt();
cid = new ClusterID( fi.readLong() );
DxMultiMap leaks = (DxMultiMap)readLeaks( rollBackTid, true );
//env.logWriter.newEntry ("leaks for " + cid + ": " + leaks.count(), LogWriter.DEBUG);
objects = new DxArrayBag();
while (fi.available() != 0) {
TransactionID tid = null;
ClassicObjectContainer os = null;
DeathObject dobj = null;
boolean isLeak = false;
boolean rollBack = false;
try {
// read the object id
ObjectID oid = new ObjectID( fi.readLong() );
//env.logWriter.newEntry ("\tnext object: " + oid, LogWriter.DEBUG);
DxDeque oidLeaks = (DxDeque)leaks.elementsForKey( oid );
if (oidLeaks != null) {
isLeak = true;
//env.logWriter.newEntry ("\t" + oid + " is a leak", LogWriter.DEBUG);
if (oidLeaks.count() == 1) {
leaks.removeForKey( oid );
} else {
oidLeaks.popBottom();
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -