📄 clusterstore.java
字号:
// } } else { if (false) { env.logWriter.newEntry(this, "trying to DEACTIVATE 'invoked' cluster: " + cluster.clusterID(), LogWriter.WARN); } } } else { env.logWriter.newEntry(this, "the cluster is not a WizardCluster, not sure what to do", LogWriter.WARN); } } System.gc(); freeSpace = env.freeMemory(); } } } /** * This method is called right after the specified WizardCluster was loaded from * disk. */ protected void activateCluster(Cluster cluster, int size) { if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "activateCluster(): " + cluster.clusterID(), LogWriter.DEBUG3); } cluster.setEnv(env); cluster.setClusterStore(this); cluster.touch(); cluster.setCurrentSize(size); } /** * Deactivate the specified cluster before it is written to disk. The * specified cluster will be removed from the cluster cache. If it currently * has shadows, they are written to disk. If any of the containers are * currently invoked (should normally never happen), the shadows must stay * in memory. */ protected void deactivateCluster(Cluster cluster) throws IOException { if (env.logWriter.hasTarget(LogWriter.DEBUG)) { env.logWriter.newEntry(this, "deactivateCluster(): " + cluster.clusterID() + " priority: " + cluster.cachePriority(), LogWriter.DEBUG); env.logWriter.newEntry(this, " lock: " + cluster.lock().level(null), LogWriter.DEBUG); } String basename = basename(cluster.clusterID()); synchronized (this) { // We synchronize on this ClusterStore so that a freshly returned cluster within the ClusterStore lock may not be deactivated during the lock time. synchronized (cluster) { // any lock levels >= READ has to be persistent if (cluster.lock().level(null) >= Lock.LEVEL_READ) { if (env.logWriter.hasTarget(LogWriter.DEBUG)) { env.logWriter.newEntry(this, " write lock to disk: " + cluster.clusterID(), LogWriter.DEBUG); } storeData(cluster.lock(), basename + POSTFIX_LOCK); } else { File lockFile = new File(basename + POSTFIX_LOCK); if (lockFile.exists()) { lockFile.delete(); } } // clusters with WRITE lock are supposed to be dirty if (cluster.lock().level(null) > Lock.LEVEL_UPGRADE) { if (env.logWriter.hasTarget(LogWriter.DEBUG)) { env.logWriter.newEntry(this, " write cluster: " + cluster.clusterID(), LogWriter.DEBUG); } storeData(cluster, basename + POSTFIX_CLUSTER); } // mark the cluster to be not valid cluster.setLock(null); } } } /** * Store the specified cluster on disk. Write temp files first. If this * write fails, the original are still valid. The cluster may has been * written to the disk already, if is was deactivated while transaction. * But in case the cluster (and its changes) are only in memory, we have to * write now to check if this is possible without errors. * * Note: This only writes all currently commited transaction results to the * disk. This is different from the deactivation behaviour. * * * @param cid WizardCluster to be prepare-commited. * @exception java.io.IOException None of the clusters are written to disk. */ public synchronized void prepareCommitCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException { if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "prepareCommitCluster(): " + cid, LogWriter.DEBUG3); }// WizardCluster cluster = loadCluster(cid,false); // If we do not pin, loadCluster may just loose the loaded cluster due to trim() after load Cluster cluster = loadCluster(cid, true); if (cluster instanceof WizardCluster) { ((WizardCluster)cluster).unpin(); } cluster.prepareCommit(ta); if (cluster.lock().level(null) >= Lock.LEVEL_WRITE) { String tempFilename = basename(cid) + POSTFIX_TEMP; // write changed cluster in temp file; the lock is written in // commit() and abort() storeData(cluster, tempFilename); long fileSize = new File(tempFilename).length(); if (fileSize == 0L) { throw new IOException("Unable to determine cluster file size."); } if (compressClusters) { fileSize *= compressionFactor; } cluster.setCurrentSize((int) fileSize); } } /** * Actually commit the specified cluster. This simply renames the temp file * to be the new "original" ones. The rename operation MUST NOT fail. * * * @param cid WizardCluster to be commited. */ public synchronized void commitCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException { if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "commitCluster(): " + cid, LogWriter.DEBUG3); } String basename = basename(cid); File clusterFile = new File(basename + POSTFIX_CLUSTER); File tempFile = new File(basename + POSTFIX_TEMP); if (tempFile.exists()) { clusterFile.delete(); if (!tempFile.renameTo(clusterFile)) { throw new IOException("Unable to rename temp cluster."); } } // FIXME: if transaction size exceeds cache size, this loads the // cluster again altough it's not really needed// WizardCluster cluster = loadCluster(cid,false); // If we do not pin, loadCluster may just loose the loaded cluster due to trim() after load Cluster cluster = loadCluster(cid, true); if (cluster instanceof WizardCluster) { ((WizardCluster)cluster).unpin(); } cluster.commit(ta); // after the cluster is commited its lock is released and has to be // updated on disk; if no lock file exists, the lock is newly created // when loading updateLockOnDisk(cluster, ta); } /** * Actually abort the specified cluster. This deletes t * @param cid WizardCluster to be aborted. */ public synchronized void abortCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException { File tempFile = new File(basename(cid) + POSTFIX_TEMP); if (tempFile.exists()) { if (!tempFile.delete()) { throw new IOException("Unable to delete temp cluster."); } } // FIXME: if transaction size exceeds cache size, this loads the // cluster again altough it's not really needed// WizardCluster cluster = loadCluster(cid,false); // If we do not pin, loadCluster may just loose the loaded cluster due to trim() after load Cluster cluster = loadCluster(cid, true); boolean isWizardCluster = cluster instanceof WizardCluster; if (isWizardCluster) { ((WizardCluster)cluster).unpin(); } cluster.abort(ta); if (isWizardCluster && ((WizardCluster)cluster).isPinned() ) { // If the cluster is pinned, it should be reloaded immediately. /* To other ozone-developers: What do we do if the cluster is pinned and thus may not be removed from memory? Is this only the case if another transaction is waiting for this cluster to be unlocked? If so, should, in this case, the transaction simply reload the cluster? */ env.logWriter.newEntry(this, "abortCluster(): Unloading pinned cluster " + cid + ". Should we really do that?", LogWriter.DEBUG); // the above abort() call does not change the cluster in memory, so // we have to reload the cluster immediately unloadCluster(cid, false); loadCluster(cid, true); } else { // the above abort() call does not change the cluster in memory, so // we have to reload the cluster next time unloadCluster(cid, false); } // after the cluster is aborted its lock is released and has to be // updated on disk; if no lock file exists, the lock is newly created // when loading updateLockOnDisk(cluster, ta); } protected void updateLockOnDisk(Cluster cluster, Transaction ta) throws IOException { // System.out.println ("commit " + cid + ": " + ((DefaultLock)cluster.lock).lockers.count()); ClusterID cid = cluster.clusterID(); if (cluster.lock().level(ta) == Lock.LEVEL_NONE) { File lockFile = new File(basename(cid) + POSTFIX_LOCK); if (lockFile.exists() && !lockFile.delete()) { throw new IOException("Unable to delete lock file."); } } else { storeData(cluster.lock(), basename(cid) + POSTFIX_LOCK); } } /** * Serialize and store the specified object for the specified key. This * current implementation uses the file system as back end store. */ protected void storeData(Object obj, String key) throws IOException { if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "storeData(): " + key, LogWriter.DEBUG3); } OutputStream out = new FileOutputStream(key); if (compressClusters) { out = new GZIPOutputStream(out, 3 * 4096); } else { out = new BufferedOutputStream(out, 3 * 4096); } ObjectOutputStream oout = new ObjectOutputStream(out); try { oout.writeObject(obj); } finally { oout.close(); } } /** * Load the data that previously has been stored for the given key. */ protected Object loadData(String key) throws IOException, ClassNotFoundException { if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "loadData(): " + key, LogWriter.DEBUG3); } InputStream in = new FileInputStream(key); if (compressClusters) { in = new GZIPInputStream(in, 3 * 4096); } else { in = new BufferedInputStream(in, 3 * 4096); } ObjectInputStream oin = new ResolvingObjectInputStream(in); try { Object result = oin.readObject(); return result; } finally { oin.close(); } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -