📄 clusterstore.java
字号:
} cachedClusters.addForKey( cluster, cluster.clusterID() ); trimCache(); } } } return cluster; } public void splitCluster( Cluster cluster ) { } /** * Remove cluster from the cluster cache. * @param cid */ public void unloadCluster( ClusterID cid, boolean deactivate ) throws Exception { Cluster cluster = (Cluster)cachedClusters.removeForKey( cid ); if (deactivate) { deactivateCluster( cluster ); } } /** * Ensure that there is at least the specified size of free space in the * cluster cache. Under some circumstances clusters (currently invoked) * cannot be deactivated. Therefore this method cannot guarantee that the * needed space is free afterwards. * <p> * This is the central method of the deactivation of containers that are * currently in use. This is different from the commit behaviour. * * * @param neededSpace Size that must be available after this method. */ protected void trimCache() throws Exception { long freeSpace = env.freeMemory(); if (env.logWriter.hasTarget( LogWriter.DEBUG )) { env.logWriter.newEntry( this, "trimCache(): free:" + freeSpace, LogWriter.DEBUG ); } if (freeSpace <= 0) { synchronized (this) { long cacheSize = 0; // build priority queue for all currently loaded clusters DxMap priorityQueue = new DxTreeMap(); DxIterator it = cachedClusters.iterator(); Cluster cluster; while ((cluster = (Cluster)it.next()) != null) { priorityQueue.addForKey( cluster, cluster.cachePriority() ); cacheSize += cluster.size(); } // free at least 20% of the cache long neededSpace = Math.max( maxClusterSize * 2, cacheSize / 20 ); if (env.logWriter.hasTarget( LogWriter.DEBUG )) { env.logWriter.newEntry( this, " cache: " + cacheSize + " to be freed:" + neededSpace, LogWriter.DEBUG ); } // throw away (deactivate) clusters, lowest priority first it = priorityQueue.iterator(); while (freeSpace < neededSpace && (cluster = (Cluster)it.next()) != null) { // if any of the containers is currently invoked, the cluster // must not be written and must stay in memory if (!cluster.isInvoked()) { if (env.logWriter.hasTarget( LogWriter.DEBUG )) { env.logWriter.newEntry( this, "DEACTIVATE cluster: " + cluster.clusterID(), LogWriter.DEBUG ); } cluster = (Cluster)it.removeObject(); unloadCluster( cluster.clusterID(), true ); // try to free the memory of the unloaded cluster System.gc(); freeSpace = env.freeMemory(); if (env.logWriter.hasTarget( LogWriter.DEBUG )) { env.logWriter.newEntry( this, " free:" + freeSpace, LogWriter.DEBUG ); } } else { env.logWriter.newEntry( this, "trying to DEACTIVATE 'invoked' cluster: " + cluster.clusterID(), LogWriter.WARN ); } } } } } /** * This method us calles just after the specified Cluster was loaded from * disk. */ protected void activateCluster( Cluster cluster, int size ) throws Exception { if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { env.logWriter.newEntry( this, "activateCluster(): " + cluster.clusterID, LogWriter.DEBUG3 ); } cluster.env = env; cluster.clusterStore = this; cluster.touch(); cluster.setCurrentSize( size ); } /** * Deactivate the specified cluster before it is written to disk. The * specified cluster will be removed from the cluster cache. If it currently * has shadows, they are written to disk. If any of the containers are * currently invoked (should normally never happen), the shadows must stay * in memory. */ protected void deactivateCluster( Cluster cluster ) throws Exception { if (env.logWriter.hasTarget( LogWriter.DEBUG )) { env.logWriter.newEntry( this, "deactivateCluster(): " + cluster.clusterID + " priority: " + cluster.cachePriority(), LogWriter.DEBUG ); env.logWriter.newEntry( this, " lock: " + cluster.lock.level( null ), LogWriter.DEBUG ); } String basename = basename( cluster.clusterID() ); synchronized (cluster) { // any lock levels >= READ has to be persistent if (cluster.lock.level( null ) >= Lock.LEVEL_READ) { if (env.logWriter.hasTarget( LogWriter.DEBUG )) { env.logWriter.newEntry( this, " write lock to disk: " + cluster.clusterID, LogWriter.DEBUG ); } storeData( cluster.lock, basename + POSTFIX_LOCK ); } else { File lockFile = new File( basename + POSTFIX_LOCK ); if (lockFile.exists()) { lockFile.delete(); } } // clusters with WRITE lock are supposed to be dirty if (cluster.lock.level( null ) > Lock.LEVEL_UPGRADE) { if (env.logWriter.hasTarget( LogWriter.DEBUG )) { env.logWriter.newEntry( this, " write cluster: " + cluster.clusterID, LogWriter.DEBUG ); } storeData( cluster, basename + POSTFIX_CLUSTER ); } // mark the cluster to be not valid cluster.lock = null; } } /** * Store the specified cluster on disk. Write temp files first. If this * write fails, the original are still valid. The cluster may has been * written to the disk already, if is was deactivated while transaction. * But in case the cluster (and its changes) are only in memory, we have to * write now to check if this is possible without errors. * * Note: This only writes all currently commited transaction results to the * disk. This is different from the deactivation behaviour. * * * @param cid Cluster to be prepare-commited. * @exception Exception None of the clusters are written to disk. */ public synchronized void prepareCommitCluster( Transaction ta, ClusterID cid ) throws Exception { if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { env.logWriter.newEntry( this, "prepareCommitCluster(): " + cid, LogWriter.DEBUG3 ); } Cluster cluster = loadCluster( cid ); cluster.prepareCommit( ta ); if (cluster.lock.level( null ) >= Lock.LEVEL_WRITE) { String tempFilename = basename( cid ) + POSTFIX_TEMP; // write changed cluster in temp file; the lock is written in // commit() and abort() storeData( cluster, tempFilename ); long fileSize = new File( tempFilename ).length(); if (fileSize == 0L) { throw new Exception( "Unable to determine cluster file size." ); } if (compressClusters) { fileSize *= compressionFactor; } cluster.setCurrentSize( (int)fileSize ); } } /** * Actually commit the specified cluster. This simply renames the temp file * to be the new "original" ones. The rename operation MUST NOT fail. * * * @param cid Cluster to be commited. */ public synchronized void commitCluster( Transaction ta, ClusterID cid ) throws Exception { if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { env.logWriter.newEntry( this, "commitCluster(): " + cid, LogWriter.DEBUG3 ); } String basename = basename( cid ); File clusterFile = new File( basename + POSTFIX_CLUSTER ); File tempFile = new File( basename + POSTFIX_TEMP ); if (tempFile.exists()) { clusterFile.delete(); if (!tempFile.renameTo( clusterFile )) { throw new IOException( "Unable to rename temp cluster." ); } } // FIXME: if transaction size exceeds cache size, this loads the // cluster again altough it's not really needed Cluster cluster = loadCluster( cid ); cluster.commit( ta ); // after the cluster is commited its lock is released and has to be // updated on disk; if no lock file exists, the lock is newly created // when loading updateLockOnDisk( cluster, ta ); } /** * Actually abort the specified cluster. This deletes t * @param cid Cluster to be aborted. */ public synchronized void abortCluster( Transaction ta, ClusterID cid ) throws Exception { File tempFile = new File( basename( cid ) + POSTFIX_TEMP ); if (tempFile.exists()) { if (!tempFile.delete()) { throw new IOException( "Unable to delete temp cluster." ); } } // FIXME: if transaction size exceeds cache size, this loads the // cluster again altough it's not really needed Cluster cluster = loadCluster( cid ); cluster.abort( ta ); // the above abort() call does not change the cluster in memory, so // we have to reload the cluster next time unloadCluster( cid, false ); // after the cluster is aborted its lock is released and has to be // updated on disk; if no lock file exists, the lock is newly created // when loading updateLockOnDisk( cluster, ta ); } protected void updateLockOnDisk( Cluster cluster, Transaction ta ) throws Exception { // System.out.println ("commit " + cid + ": " + ((DefaultLock)cluster.lock).lockers.count()); ClusterID cid = cluster.clusterID(); if (cluster.lock.level( ta ) == Lock.LEVEL_NONE) { File lockFile = new File( basename( cid ) + POSTFIX_LOCK ); if (lockFile.exists() && !lockFile.delete()) { throw new IOException( "Unable to delete lock file." ); } } else { storeData( cluster.lock, basename( cid ) + POSTFIX_LOCK ); } } /** * Serialize and store the specified object for the specified key. This * current implementation uses the file system as back end store. */ protected void storeData( Object obj, String key ) throws Exception { if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { env.logWriter.newEntry( this, "storeData(): " + key, LogWriter.DEBUG3 ); } OutputStream out = new FileOutputStream( key ); if (compressClusters) { out = new GZIPOutputStream( out ); } out = new BufferedOutputStream( out, 3 * 4096 ); ObjectOutputStream oout = new ObjectOutputStream( out ); try { oout.writeObject( obj ); } finally { oout.close(); } } /** * Load the data that previously has been stored for the given key. */ protected Object loadData( String key ) throws Exception { if (env.logWriter.hasTarget( LogWriter.DEBUG3 )) { env.logWriter.newEntry( this, "loadData(): " + key, LogWriter.DEBUG3 ); } InputStream in = new FileInputStream( key ); if (compressClusters) { in = new GZIPInputStream( in ); } in = new BufferedInputStream( in, 3 * 4096 ); ObjectInputStream oin = new ObjectInputStream( in ); try { Object result = oin.readObject(); return result; } finally { oin.close(); } } protected String basename( ClusterID cid ) { StringBuffer filename = new StringBuffer( env.dir ); filename.append( Env.DATA_DIR ); // filename.append (File.separator); filename.append( cid.value() ); return filename.toString(); } }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -