📄 cluster.java
字号:
}
// read TransactionID
tid = new TransactionID( fi.readLong() );
// check, if this an object of the broken tranaction
if (rollBackTid != null && rollBackTid.equals( tid )) {
rollBack = true;
}
if (rollBack || isLeak || (whatToRead & TRANS) == 0) {
tid = null;
}
// read object state, if necessary
Chunk stateChunk = readChunk( fi, (whatToRead & STATE) == 0 || rollBack || isLeak );
if (stateChunk.data != null) {
ObjectInputStream in = new ResolvingObjectInputStream( new ByteArrayInputStream( stateChunk.data ) );
os = new ClassicObjectContainer();
os.loadExternal( in );
in.close();
}
// create a new deathobj and read the data
Chunk dataChunk = readChunk( fi, (whatToRead & DATA) == 0 || rollBack || isLeak );
if (dataChunk.data != null) {
dobj = new DeathObject( oid );
dobj.stateSize = stateChunk.dataLength;
dobj.setData( dataChunk.data );
clusterSize += dobj.stateSize;
clusterSize += dobj.size();
}
} catch (Exception e) {
env.fatalError( this, "exception in readObjects() of cluster " + cid, e );
break;
}
// is everything was fine, add all required information,
// we have to do this at last, because only complete objects
// should be added
if (tid != null) {
objects.add( tid );
}
if (os != null) {
objects.add( os );
}
if (dobj != null) {
objects.add( dobj );
}
result &= !rollBack;
}
fi.close();
return result;
}
/** */
public long leakSize() {
if (leakSize != -1) {
return leakSize;
}
File file = new File( env.getDatabaseDir() + Env.DATA_DIR, cid + Cluster.LEAK_FILE_SUFF );
if (!file.exists()) {
return 0;
}
try {
DataInputStream leakStream = new DataInputStream( new FileInputStream( file ) );
leakStream.skip( leakStream.available() - 8 );
leakSize = leakStream.readLong();
leakStream.close();
return leakSize;
} catch (IOException e) {
return 0;
}
}
/** */
public void writeLeak( DeathObject dobj, TransactionID tid ) throws IOException {
writeLeak( dobj.objID(), tid, entrySize( dobj ) );
}
/** */
public void writeLeak( ObjectID oid, TransactionID tid, long objSize ) throws IOException {
//env.logWriter.newEntry ("Cluster " + cid + " writeLeak: " + oid + ", " + tid + ", " + objSize, LogWriter.DEBUG);
File file = leakFileHandle();
boolean newFile = !file.exists();
DataOutputStream leakStream =
new DataOutputStream( new BufferedOutputStream( new FileOutputStream( file.toString(), true ) ) );
// write header, if necessary
if (newFile) {
leakStream.writeInt( Cluster.VERSION );
leakStream.writeLong( cid.value() );
}
// update the leakSize
leakSize();
// increase the leak size
leakSize += objSize;
// write leak entry
leakStream.writeLong( oid.value() );
leakStream.writeLong( tid.value() );
leakStream.writeLong( leakSize );
leakStream.close();
}
/** */
public DxCollection readLeaks( TransactionID rollBackTid, boolean ordered ) throws IOException {
File file = leakFileHandle();
DxCollection result;
if (ordered) {
result = new DxMultiMap( new DxHashMap(), new DxListDeque() );
} else {
result = new DxListDeque();
}
if (!file.exists()) {
return result;
}
DataInputStream leakStream = new DataInputStream( new FileInputStream( file ) );
leakStream.readInt();
leakStream.readLong();
while (leakStream.available() != 0) {
// read object id
ObjectID oid = new ObjectID( leakStream.readLong() );
// read transaction id
TransactionID tid = new TransactionID( leakStream.readLong() );
// read leak size
Long leakSize = new Long( leakStream.readLong() );
if (rollBackTid == null || !rollBackTid.equals( tid )) {
if (ordered) {
((DxMultiMap)result).addForKey( tid, oid );
} else {
((DxDeque)result).pushTop( oid );
((DxDeque)result).pushTop( tid );
((DxDeque)result).pushTop( leakSize );
}
}
}
leakStream.close();
return result;
}
/** */
public void removeFromDisk() throws IOException {
//env.logWriter.newEntry ("Cluster " + cid + " removeFromDisk", LogWriter.DEBUG);
File f = fileHandle();
if (f.exists()) {
f.delete();
}
f = leakFileHandle();
if (f.exists()) {
f.delete();
}
}
/** */
private void writeChunk( DataOutputStream out, Chunk chunk ) throws IOException {
if (chunk.object != null) {
ByteArrayOutputStream bs = new ByteArrayOutputStream();
ObjectOutputStream os = new ObjectOutputStream( bs );
if (chunk.object instanceof ClassicObjectContainer) {
((ClassicObjectContainer)chunk.object).storeExternal( os );
} else {
os.writeObject( chunk.object );
}
chunk.data = bs.toByteArray();
chunk.dataLength = chunk.data.length;
os.close();
}
env.logWriter.newEntry( this, "Cluster " + cid + " writeChunk: " + chunk.id + ", " + chunk.dataLength,
LogWriter.DEBUG3 );
out.writeByte( chunk.id );
out.writeInt( chunk.dataLength );
out.write( chunk.data );
}
/** */
private Chunk readChunk( DataInputStream in, boolean skip ) throws IOException {
Chunk chunk = new Chunk();
chunk.id = in.readByte();
chunk.dataLength = in.readInt();
//env.logWriter.newEntry ("Cluster " + cid + " readChunk: " + chunk.id + ", " + chunk.dataLength, LogWriter.DEBUG);
if (skip) {
in.skip( chunk.dataLength );
} else {
chunk.data = new byte[chunk.dataLength];
in.read( chunk.data );
}
return chunk;
}
public void rollBack( TransactionID rollBackTid ) throws Exception {
//env.logWriter.newEntry ("Cluster " + cid + " rollback: " + rollBackTid, LogWriter.DEBUG);
rollBackLeaks( rollBackTid );
boolean clusterIsClean = false;
try {
clusterIsClean = readObjects( Cluster.STATE | Cluster.TRANS | Cluster.DATA, rollBackTid );
} catch (Exception e) {
//env.logWriter.newEntry (this, "rollBack: cluster " + cid + " corrupted", LogWriter.WARN);
}
if (!clusterIsClean) {
if (objects().count() > 0) {
// switch into recovery mode
beginRecovery( OBJECTS );
open();
// rewrite all valid objects in the shadow cluster
DxIterator it = objects().iterator();
while (it.next() != null) {
TransactionID tid = (TransactionID)it.object();
ObjectContainer os = (ObjectContainer)it.next();
DeathObject dobj = (DeathObject)it.next();
// swap the object state, because it may have changed
// while the transaction rollBackTid
classicStore.objectSpace.deleteObject( os );
classicStore.objectSpace.addObject( os );
appendObject( dobj, tid, false, false );
}
close();
// switch back to normal mode
endRecovery( OBJECTS );
} else {
// if all objects of the cluster are invalid simply delete it
removeFromDisk();
}
}
}
public void rollBackLeaks( TransactionID rollBackTid ) throws Exception {
DxDeque leaks = null;
try {
leaks = (DxDeque)readLeaks( rollBackTid, false );
} catch (Exception e) {
//env.logWriter.newEntry (this, "rollBackLeaks: leaks " + cid + " corrupted", LogWriter.WARN);
}
if (leaks.count() > 0) {
beginRecovery( LEAKS );
while (leaks.count() > 0) {
writeLeak( (ObjectID)leaks.popBottom(), (TransactionID)leaks.popBottom(),
((Long)leaks.popBottom()).longValue() );
}
endRecovery( LEAKS );
} else {
if (leakFileHandle().exists()) {
leakFileHandle().delete();
}
}
}
/**
* Checks, if the specified cluster needs to be compressed.
*/
protected boolean needsCompressing() {
boolean result = false;
// retrieve the cluster size simply of it's file size
// this is much faster than reading the whole cluster
long clSize = fileHandle().length();
if (clSize > 0) {
result = (double)leakSize() / clSize > Cluster.LEAK_WEIGHT;
}
return result;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -