📄 cluster.java
字号:
// read TransactionID tid = new TransactionID( fi.readLong() ); // check, if this an object of the broken tranaction if (rollBackTid != null && rollBackTid.equals( tid )) { rollBack = true; } if (rollBack || isLeak || (whatToRead & TRANS) == 0) { tid = null; } // read object state, if necessary Chunk stateChunk = readChunk( fi, (whatToRead & STATE) == 0 || rollBack || isLeak ); if (stateChunk.data != null) { ObjectInputStream in = new ObjectInputStream( new ByteArrayInputStream( stateChunk.data ) ); os = new ClassicObjectContainer(); os.loadExternal( in ); in.close(); } // create a new deathobj and read the data Chunk dataChunk = readChunk( fi, (whatToRead & DATA) == 0 || rollBack || isLeak ); if (dataChunk.data != null) { dobj = new DeathObject( oid ); dobj.stateSize = stateChunk.dataLength; dobj.setData( dataChunk.data ); clusterSize += dobj.stateSize; clusterSize += dobj.size(); } } catch (Exception e) { env.fatalError( this, "exception in readObjects() of cluster " + cid, e ); break; } // is everything was fine, add all required information, // we have to do this at last, because only complete objects // should be added if (tid != null) { objects.add( tid ); } if (os != null) { objects.add( os ); } if (dobj != null) { objects.add( dobj ); } result &= !rollBack; } fi.close(); return result; } /** */ public long leakSize() { if (leakSize != -1) { return leakSize; } File file = new File( env.dir + Env.DATA_DIR, cid + Cluster.LEAK_FILE_SUFF ); if (!file.exists()) { return 0; } try { DataInputStream leakStream = new DataInputStream( new FileInputStream( file ) ); leakStream.skip( leakStream.available() - 8 ); leakSize = leakStream.readLong(); leakStream.close(); return leakSize; } catch (IOException e) { return 0; } } /** */ public void writeLeak( DeathObject dobj, TransactionID tid ) throws Exception { writeLeak( dobj.objID(), tid, entrySize( dobj ) ); } /** */ public void writeLeak( ObjectID oid, TransactionID tid, long objSize ) throws Exception { //env.logWriter.newEntry ("Cluster " + cid + " writeLeak: " + oid + ", " + tid + ", " + objSize, LogWriter.DEBUG); File file = leakFileHandle(); boolean newFile = !file.exists(); DataOutputStream leakStream = new DataOutputStream( new BufferedOutputStream( new FileOutputStream( file.toString(), true ) ) ); // write header, if necessary if (newFile) { leakStream.writeInt( Cluster.VERSION ); leakStream.writeLong( cid.value() ); } // update the leakSize leakSize(); // increase the leak size leakSize += objSize; // write leak entry leakStream.writeLong( oid.value() ); leakStream.writeLong( tid.value() ); leakStream.writeLong( leakSize ); leakStream.close(); } /** */ public DxCollection readLeaks( TransactionID rollBackTid, boolean ordered ) throws Exception { File file = leakFileHandle(); DxCollection result; if (ordered) { result = new DxMultiMap( new DxHashMap(), new DxListDeque() ); } else { result = new DxListDeque(); } if (!file.exists()) { return result; } DataInputStream leakStream = new DataInputStream( new FileInputStream( file ) ); leakStream.readInt(); leakStream.readLong(); while (leakStream.available() != 0) { // read object id ObjectID oid = new ObjectID( leakStream.readLong() ); // read transaction id TransactionID tid = new TransactionID( leakStream.readLong() ); // read leak size Long leakSize = new Long( leakStream.readLong() ); if (rollBackTid == null || !rollBackTid.equals( tid )) { if (ordered) { ((DxMultiMap)result).addForKey( tid, oid ); } else { ((DxDeque)result).pushTop( oid ); ((DxDeque)result).pushTop( tid ); ((DxDeque)result).pushTop( leakSize ); } } } leakStream.close(); return result; } /** */ public void removeFromDisk() throws IOException { //env.logWriter.newEntry ("Cluster " + cid + " removeFromDisk", LogWriter.DEBUG); File f = fileHandle(); if (f.exists()) { f.delete(); } f = leakFileHandle(); if (f.exists()) { f.delete(); } } /** */ private void writeChunk( DataOutputStream out, Chunk chunk ) throws IOException { if (chunk.object != null) { ByteArrayOutputStream bs = new ByteArrayOutputStream(); ObjectOutputStream os = new ObjectOutputStream( bs ); if (chunk.object instanceof ClassicObjectContainer) { ((ClassicObjectContainer)chunk.object).storeExternal( os ); } else { os.writeObject( chunk.object ); } chunk.data = bs.toByteArray(); chunk.dataLength = chunk.data.length; os.close(); } env.logWriter.newEntry( this, "Cluster " + cid + " writeChunk: " + chunk.id + ", " + chunk.dataLength, LogWriter.DEBUG3 ); out.writeByte( chunk.id ); out.writeInt( chunk.dataLength ); out.write( chunk.data ); } /** */ private Chunk readChunk( DataInputStream in, boolean skip ) throws IOException { Chunk chunk = new Chunk(); chunk.id = in.readByte(); chunk.dataLength = in.readInt(); //env.logWriter.newEntry ("Cluster " + cid + " readChunk: " + chunk.id + ", " + chunk.dataLength, LogWriter.DEBUG); if (skip) { in.skip( chunk.dataLength ); } else { chunk.data = new byte[chunk.dataLength]; in.read( chunk.data ); } return chunk; } public void rollBack( TransactionID rollBackTid ) throws Exception { //env.logWriter.newEntry ("Cluster " + cid + " rollback: " + rollBackTid, LogWriter.DEBUG); rollBackLeaks( rollBackTid ); boolean clusterIsClean = false; try { clusterIsClean = readObjects( Cluster.STATE | Cluster.TRANS | Cluster.DATA, rollBackTid ); } catch (Exception e) { //env.logWriter.newEntry (this, "rollBack: cluster " + cid + " corrupted", LogWriter.WARN); } if (!clusterIsClean) { if (objects().count() > 0) { // switch into recovery mode beginRecovery( OBJECTS ); open(); // rewrite all valid objects in the shadow cluster DxIterator it = objects().iterator(); while (it.next() != null) { TransactionID tid = (TransactionID)it.object(); ObjectContainer os = (ObjectContainer)it.next(); DeathObject dobj = (DeathObject)it.next(); // swap the object state, because it may have changed // while the transaction rollBackTid classicStore.objectSpace.deleteObject( os ); classicStore.objectSpace.addObject( os ); appendObject( dobj, tid, false, false ); } close(); // switch back to normal mode endRecovery( OBJECTS ); } else { // if all objects of the cluster are invalid simply delete it removeFromDisk(); } } } public void rollBackLeaks( TransactionID rollBackTid ) throws Exception { DxDeque leaks = null; try { leaks = (DxDeque)readLeaks( rollBackTid, false ); } catch (Exception e) { //env.logWriter.newEntry (this, "rollBackLeaks: leaks " + cid + " corrupted", LogWriter.WARN); } if (leaks.count() > 0) { beginRecovery( LEAKS ); while (leaks.count() > 0) { writeLeak( (ObjectID)leaks.popBottom(), (TransactionID)leaks.popBottom(), ((Long)leaks.popBottom()).longValue() ); } endRecovery( LEAKS ); } else { if (leakFileHandle().exists()) { leakFileHandle().delete(); } } } /** * Checks, if the specified cluster needs to be compressed. */ protected boolean needsCompressing() { boolean result = false; // retrieve the cluster size simply of it's file size // this is much faster than reading the whole cluster long clSize = fileHandle().length(); if (clSize > 0) { result = (double)leakSize() / clSize > Cluster.LEAK_WEIGHT; } return result; } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -