📄 clusterstore.java
字号:
// You can redistribute this software and/or modify it under the terms of// the Ozone Core License version 1 published by ozone-db.org.//// The original code and portions created by SMB are// Copyright (C) 1997-@year@ by SMB GmbH. All rights reserved.//// $Id: ClusterStore.java,v 1.3 2004/01/10 21:40:24 per_nyfelt Exp $package org.ozoneDB.core.storage.wizardStore;import java.io.*;import java.util.zip.GZIPInputStream;import java.util.zip.GZIPOutputStream;import org.ozoneDB.DxLib.*;import org.ozoneDB.Setup;import org.ozoneDB.io.stream.ResolvingObjectInputStream;import org.ozoneDB.core.*;import org.ozoneDB.core.storage.*;import org.ozoneDB.core.storage.ClusterID;import org.ozoneDB.core.storage.Cluster;import org.ozoneDB.util.LogWriter;/** * The ClusterStore is the back-end store of the wizardStore. It maintains the * cluster cache, activation/passivation and the actual persistent commits. * * * @author <a href="http://www.softwarebuero.de/">SMB</a> * @author <a href="http://www.medium.net/">Medium.net</a> * @version $Revision: 1.3 $Date: 2004/01/10 21:40:24 $ */public final class ClusterStore extends AbstractClusterStore { public final static String POSTFIX_SHADOW = ".sh"; protected final static int compressionFactor = 3; protected DxMap cachedClusters; protected int maxClusterSize = 64 * 1024; /** * Table that maps Permissions to ClusterIDs. */ protected DxMap growingClusterIDs; private boolean compressClusters; ClusterStore(Env _env) { super(_env); maxClusterSize = env.config.intProperty(Setup.WS_CLUSTER_SIZE, -1); cachedClusters = new DxHashMap(64); compressClusters = env.config.booleanProperty(Setup.WS_COMPRESS_CLUSTERS, true); } public void startup() throws Exception { growingClusterIDs = new DxHashMap(32); } public void shutdown() { } /** * Check if the ClusterStore was cleanly shutted down. */ public boolean isCleanShutdown() { File file = new File(env.getDatabaseDir() + Env.DATA_DIR); String[] fileList = file.list(); for (int i = 0; i < fileList.length; i++) { if (fileList[i].endsWith(POSTFIX_SHADOW) || fileList[i].endsWith(POSTFIX_TEMP)) { return false; } } return true; } /** * Search the DATA dir and recover all ClusterIDs. */ public DxSet recoverClusterIDs() { File file = new File(env.getDatabaseDir() + Env.DATA_DIR); String[] fileList = file.list(); DxSet result = new DxHashSet(); for (int i = 0; i < fileList.length; i++) { if (fileList[i].endsWith(POSTFIX_CLUSTER) || fileList[i].endsWith(POSTFIX_SHADOW)) { String cidString = fileList[i].substring(0, fileList[i].indexOf('.')); long cid = Long.parseLong(cidString); result.add(new ClusterID(cid)); } } return result; } public long currentCacheSize() { long result = 0; DxIterator it = cachedClusters.iterator(); Cluster cluster; while ((cluster = (Cluster) it.next()) != null) { result += cluster.size(); } return result; } public int currentBytesPerContainer() { int result = env.config.intProperty(Setup.WS_CLUSTER_SIZE_RATIO, 256);// env.logWriter.newEntry( this, "currentBytesPerContainer(): setup:" + result, LogWriter.DEBUG ); return result; // if (cachedClusters.count() < 3) { // int result = env.config.intProperty (Setup.WS_CLUSTER_SIZE_RATIO, 256); // env.logWriter.newEntry (this, "currentBytesPerContainer(): config:" + result, LogWriter.DEBUG); // return result; // } // else { // int bpc = 0; // int count = 0; // DxIterator it = cachedClusters.iterator(); // WizardCluster cluster; // while ((cluster=(WizardCluster)it.next()) != null) { // count ++; // bpc += cluster.bytesPerContainer; // } // int result = bpc / count; // env.logWriter.newEntry (this, "currentBytesPerContainer(): new:" + result, LogWriter.DEBUG); // return result; // } } // public WizardCluster lruCluster() { // search the LRU cluster to speed things up; since this is not // synchronized, checking and accessing currentCluster must be done in // one line to avoid other thread to change the variable in between // container = (currentCluster != null && currentCluster.lock != null) ? currentCluster.containerForID (id) : null; // if (container != null) { // // System.out.print ("+"); // return container.isDeleted() ? null : container; // } /** * @param perms Permissions of the cluster to search. * @return WizardCluster with the specified permissions that is good to store a * new container in it. */ protected synchronized Cluster growingCluster(Permissions perms) throws Exception { if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "growingCluster() ", LogWriter.DEBUG3); } Cluster cluster = null; ClusterID cid = (ClusterID) growingClusterIDs.elementForKey(perms); // load the current growing cluster and check space if (cid != null) { cluster = (Cluster) cachedClusters.elementForKey(cid); if (cluster == null) { cluster = loadCluster(cid, true); if (cluster instanceof WizardCluster) { ((WizardCluster)cluster).unpin(); } } // check cluster size and if it was deactivated by the trimCache(); // use this cluster only if it isn't used by another ta if (cluster.lock() == null || cluster.size() >= maxClusterSize || cluster.lock().level(null) > Lock.LEVEL_NONE && !cluster.lock().isAcquiredBy(env.transactionManager.currentTA())) { if (env.logWriter.hasTarget(LogWriter.DEBUG1)) { env.logWriter.newEntry(this, "growingCluster(): growing cluster not usable: cid=" + cluster.clusterID() + " size=" + cluster.size() + " lockLevel=" + (cluster.lock() != null ? String.valueOf(cluster.lock().level(null)) : "null"), LogWriter.DEBUG1); } growingClusterIDs.removeForKey(perms); cluster = null; } } // search all currently loaded clusters if (cluster == null) { DxIterator it = cachedClusters.iterator(); Cluster cursor; while ((cursor = (Cluster) it.next()) != null) { // System.out.println (cursor.size()); if (cursor.size() < maxClusterSize && cursor.permissions().equals(perms)) { cluster = cursor; // make sure that there is enough space for the clusters to be // able to grow to the max size // ensureCacheSpace (maxClusterSize - cluster.size()); trimCache(); // check if the cluster deactivated be the ensureCacheSpace if (cluster.lock() == null) { env.logWriter.newEntry(this, "growingCluster(): loaded cluster was deactivated: " + cluster.clusterID(), LogWriter.DEBUG); cluster = null; } else if (cluster.lock().level(null) > Lock.LEVEL_NONE && !cluster.lock().isAcquiredBy( env.transactionManager.currentTA())) { // use this cluster only if it isn't used by another ta if (env.logWriter.hasTarget(LogWriter.DEBUG1)) { env.logWriter.newEntry(this, "growingCluster(): loaded cluster is locked by another transaction: " + cluster.clusterID(), LogWriter.DEBUG1); } cluster = null; } else { growingClusterIDs.addForKey(cluster.clusterID(), perms); if (env.logWriter.hasTarget(LogWriter.DEBUG1)) { env.logWriter.newEntry(this, "growingCluster(): loaded cluster is now growing cluster: " + cluster.clusterID() + " size:" + cluster.size(), LogWriter.DEBUG1); } break; } } } } // write a new, empty cluster and load it just after to ensures // that new cluster is "regularly" loaded if (cluster == null) { cluster = createANewEmptyAndUsableCluster(perms); } return cluster; } /** Creates a cluster which is <UL> <LI>new</LI> <LI>empty</LI> <LI>usable and</LI> <LI>not locked</LI> </UL> */ protected synchronized Cluster createANewEmptyAndUsableCluster(Permissions perms) throws IOException, ClassNotFoundException {// env.logWriter.newEntry( this, "growingCluster(): creating new cluster...", LogWriter.DEBUG ); Cluster cluster = new WizardCluster(new ClusterID(env.keyGenerator.nextID()), perms, env.transactionManager.newLock(), 256); // the new cluster has to be written to disk in order to make // saveShadow() and things work; storeData(cluster, basename(cluster.clusterID()) + POSTFIX_CLUSTER); /* // Old // If we do not pin, the freshly created cluster may be deactivated and thus its lock may be null cluster.pin(); try { // since we don't check the cache size after registering a cont // we have to make sure that there is enough space for this cluster // to grow to the max size // ensureCacheSpace (maxClusterSize); trimCache(); cluster = loadCluster(cluster.clusterID(), false); } finally { cluster.unpin(); } */ // since we don't check the cache size after registering a cont // we have to make sure that there is enough space for this cluster // to grow to the max size // ensureCacheSpace (maxClusterSize); trimCache(); // We need to load the cluster pinned because loadCluster guarantees only to return a not-unloaded cluster if it is pinned. cluster = loadCluster(cluster.clusterID(), true); if (cluster instanceof WizardCluster) { ((WizardCluster)cluster).unpin(); } growingClusterIDs.addForKey(cluster.clusterID(), perms);// env.logWriter.newEntry( this, "growingCluster(): new cluster created: " + cluster.clusterID(), LogWriter.DEBUG ); return cluster; } /** Returns or creates a cluster which is not locked so that locking it will succeed. The returned cluster is only guaranteed to be not locked by any other thread as long as this method is called during synchronization to this ClusterStore. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -