📄 clusterstore.java
字号:
/** Associates the specified container with a cluster. Iff this method returns normally (without exception), the container (and thus the cluster of the container) is write locked @param container Container to be registered with one cluster. */ public void registerContainerAndLock(StorageObjectContainer container, Permissions perms, Transaction locker, int lockLevel) throws Exception { if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "registerContainer()", LogWriter.DEBUG3); } Cluster cluster = null; boolean locked = false; boolean alright = false; try { synchronized (this) { MagicTransaction ta = (MagicTransaction) env.transactionManager.currentTA(); cluster = growingCluster(perms, ta); Lock clusterLock = cluster.lock(); int prevLevel = clusterLock.tryAcquire(locker, lockLevel); if (prevLevel == Lock.NOT_ACQUIRED) { // The cluster we are trying to lock is already locked, so we take another cluster cluster = giveMeAnUnlockedCluster(perms); clusterLock = cluster.lock(); prevLevel = clusterLock.tryAcquire(locker, lockLevel); if (prevLevel == Lock.NOT_ACQUIRED) { throw new Error("BUG! We could not acquire a lock for an unlocked cluster."); } } locked = true; cluster.registerContainer(container); } cluster.updateLockLevel(locker); if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, " cluster: " + cluster.clusterID(), LogWriter.DEBUG3); } alright = true; } finally { if (!alright) { if (locked) { cluster.lock().release(locker); } } } } public void invalidateContainer(StorageObjectContainer container) { synchronized (container) { container.getCluster().removeContainer(container); container.setCluster(null); } } protected Cluster restoreCluster(final ClusterID cid, Set uncommittedTaIDs) throws Exception { String basename = basename(cid); Cluster cluster; new File(basename + POSTFIX_LOCK).delete(); File dir = new File(env.getDatabaseDir() + Env.DATA_DIR); File[] oldFileList = dir.listFiles(new FilenameFilter() { public boolean accept(File dir, String name) { return name.startsWith(cid.value() + POSTFIX_SEPARATOR) && name.endsWith(POSTFIX_OLD); } }); File[] newFileList = dir.listFiles(new FilenameFilter() { public boolean accept(File dir, String name) { return name.startsWith(cid.value() + POSTFIX_SEPARATOR) && name.endsWith(POSTFIX_OLD); } }); File clusterFile = new File(basename + POSTFIX_CLUSTER); if (oldFileList.length == 0) { if (newFileList.length == 1) { // there is only a new file and no old file, so the new file // is simply uncommitted data and we delete it newFileList[0].delete(); } } else if (oldFileList.length == 1) { long num = Long.parseLong(splitClusterName(oldFileList[0].getName())[1]); TransactionID taID = new TransactionID(num); if (uncommittedTaIDs.contains(taID)) { if (newFileList.length == 1) { // we have both an old and a new file, but the tx that has // written these files has not fully committed, so we can // delete the new file here and rename the old file later newFileList[0].delete(); } else { // there is no new file but we do have an old file and the // tx has not fully committed but has been able to rename // new into cluster, so we delete the cluster and rename the // old file later clusterFile.delete(); } if (!oldFileList[0].renameTo(clusterFile)) { throw new IOException("Unable to rename old cluster file " + oldFileList[0] + " to " + clusterFile); } } else { // we need not bother checking newFileList at this point, since // uncommittedTaIDs.contains(taID) can only be false when there // is no new cluster whatsoever (not only the cluster we // currently look at) on disk with the taID in its filename if (oldFileList.length == 1) { // this might happen, since a tx that has renamed all new // files to cluster files can be considered to have // committed; note that a client is only notified of a // commit when all old files have been deleted as well -> // this is an evil that may exist, since a crash can happen // at any time (also between finishing the commit and // informing any client the commit has taken place) oldFileList[0].delete(); } } } cluster = (Cluster) loadData(basename + POSTFIX_CLUSTER); activateCluster(cluster, 0); return cluster; } /** * Make sure the corresponding cluster is in the cache. While loading * clusters, we may have to throw away (and maybe store) some currently * cached clusters. * * * @param cid ClusterID of the cluster to load. */// public Cluster loadCluster(ClusterID cid) throws IOException, ClassNotFoundException {// return loadCluster(cid, null);// } public Cluster loadCluster(ClusterID cid, MagicTransaction ta) throws IOException, ClassNotFoundException { Cluster cluster = (Cluster) clusterCache.get(cid); if (cluster == null) { if (env.logWriter.hasTarget(LogWriter.DEBUG)) { env.logWriter.newEntry(this, "loadCluster(): load cluster from disk: " + cid.toString(), LogWriter.DEBUG); } final String basename = basename(cid); String newClusterName = ta == null ? null : basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_NEW; String uncommittedClusterName = ta == null ? null : basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_CLUSTER; String currentClusterName = basename + POSTFIX_CLUSTER; String lockName = basename + POSTFIX_LOCK; String clusterName = null; if (new File(uncommittedClusterName).exists()) { clusterName = uncommittedClusterName; } else if (new File(lockName).exists()) { MROWLock lock = (MROWLock) loadData(lockName); TransactionID lockerID = lock.getWriteLockingTransactionID(); clusterName = basename + POSTFIX_SEPARATOR + lockerID.value() + POSTFIX_CLUSTER; } else if (new File(currentClusterName).exists()) { clusterName = currentClusterName; } else if (new File(newClusterName).exists()) { clusterName = newClusterName; } cluster = (Cluster) loadData(clusterName); int clusterByteSize = (int) new File(clusterName).length(); if (compressClusters) { clusterByteSize *= compressionFactor; } env.logWriter.newEntry(this, "loaded data = " + cluster.getClass().getName(), LogWriter.DEBUG); synchronized (this) { // now we have to check the cachedClusters table inside the // synchronized block to see if someone did register this // cluster while we loaded it Cluster interimCluster = (Cluster) clusterCache.get(cid); if (interimCluster != null) { env.logWriter.newEntry(this, "loadCluster(): cluster was loaded by another thread too; droping my copy", LogWriter.DEBUG); cluster = interimCluster; } else { // we are going to mess with the cluster; it seems that the cluster // is not visible to other thread until it is added to cachedClusters, // however, IBM jdk throws an exception in cluster.updateLockLevel, which // seems to be related to the initialization in the following block synchronized (cluster) { // locks are only there if the lock level is >= READ File lockFile = new File(lockName); if (lockFile.exists()) { cluster.setLock((Lock) loadData(lockName)); if (!lockFile.delete()) { env.logWriter.newEntry(this, "could not delete lock file " + lockFile, LogWriter.ERROR); } } else { if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "no lock on disk for " + cid + ", creating a new lock.", LogWriter.DEBUG3); } cluster.setLock(env.transactionManager.newLock()); } ((MROWLock) cluster.lock()).setDebugInfo("clusterID=" + cluster.clusterID()); activateCluster(cluster, clusterByteSize); } if (clusterByteSize > maxClusterSize * 2) { splitCluster(cluster); } clusterCache.put(cluster.clusterID(), cluster); } } } if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "returning MagicCluster: " + cluster, LogWriter.DEBUG3); } return cluster; } public void splitCluster(Cluster cluster) { // todo: implement? } /** * Remove cluster from the cluster cache. * @param cid */ public void unloadCluster(ClusterID cid, boolean deactivate) throws IOException { if (env.logWriter.hasTarget(LogWriter.DEBUG)) { env.logWriter.newEntry(this, "unloadCluster(" + cid + "," + deactivate + ").", LogWriter.DEBUG); } Cluster cluster = (Cluster) clusterCache.remove(cid); if (deactivate) { deactivateCluster(cluster); } } /** * This method is called right after the specified MagicCluster was loaded from * disk. */ protected void activateCluster(Cluster cluster, int size) { if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "activateCluster(): " + cluster.clusterID(), LogWriter.DEBUG3); } cluster.setEnv(env); cluster.setClusterStore(this); cluster.touch(); cluster.setCurrentSize(size); } /** * Deactivate the specified cluster before it is written to disk. The * specified cluster will be removed from the cluster cache. If it currently * has shadows, they are written to disk. If any of the containers are * currently invoked (should normally never happen), the shadows must stay * in memory. */ protected void deactivateCluster(Cluster cluster) throws IOException { if (env.logWriter.hasTarget(LogWriter.DEBUG)) { env.logWriter.newEntry(this, "deactivateCluster(): " + cluster.clusterID() + " priority: " + cluster.cachePriority(), LogWriter.DEBUG); env.logWriter.newEntry(this, " lock: " + cluster.lock().level(null), LogWriter.DEBUG); } String basename = basename(cluster.clusterID());
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -