📄 clusterstore.java
字号:
synchronized (this) { // We synchronize on this ClusterStore so that a freshly returned cluster within the ClusterStore lock may not be deactivated during the lock time. } } /** * Store the specified cluster on disk. Write new files first. If this * write fails, the original are still valid. The cluster may has been * written to the disk already, if is was deactivated while transaction. * But in case the cluster (and its changes) are only in memory, we have to * write now to check if this is possible without errors. * * Note: This only writes all currently commited transaction results to the * disk. This is different from the deactivation behaviour. * * * @param cid MagicCluster to be prepare-commited. * @exception IOException None of the clusters are written to disk. */ public synchronized void prepareCommitCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException { if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "prepareCommitCluster(): " + cid, LogWriter.DEBUG3); } Cluster cluster = loadCluster(cid, (MagicTransaction) ta); cluster.prepareCommit(ta); String basename = basename(cid); File clusterFile = new File(basename + POSTFIX_CLUSTER); File oldFile = new File(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_OLD); if (cluster.lock().level(null) >= Lock.LEVEL_WRITE) { // need to check if the cluster file exists, it does not when the cluster // is created during transaction ta if (clusterFile.exists() && !clusterFile.renameTo(oldFile)) { throw new IOException("Unable to rename cluster file " + clusterFile + " to " + oldFile); } String tempFilename = basename(cid) + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_NEW; storeDataImmediately(cluster, tempFilename); } } /** * @param cid MagicCluster to be commited. */ public synchronized void commitCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException { if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "commitCluster(): " + cid, LogWriter.DEBUG3); } String basename = basename(cid); File clusterFile = new File(basename + POSTFIX_CLUSTER); File oldFile = new File(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_OLD); File newFile = new File(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_NEW); File uncommittedFile = new File(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_CLUSTER); // newFile only exists if it has been written to disk prior to commit if (newFile.exists() && !newFile.renameTo(clusterFile)) { throw new IOException("Unable to rename cluster file " + newFile + " to " + clusterFile); } // oldFile does not exist if cluster was created during transaction ta if (oldFile.exists() && !oldFile.delete()) { throw new IOException("Unable to delete old cluster file " + oldFile); } Cluster cluster = loadCluster(cid, (MagicTransaction) ta); cluster.commit(ta); if (uncommittedFile.exists() && !uncommittedFile.delete()) { throw new IOException("Unable to delete uncommitted cluster file " + uncommittedFile); } // after the cluster is commited its lock is released and has to be // updated on disk; if no lock file exists, the lock is newly created // when loading updateLockOnDisk(cluster, ta); } /** * Actually abort the specified cluster. This deletes t * @param cid MagicCluster to be aborted. */ public synchronized void abortCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException { String basename = basename(cid); File newFile = new File(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_NEW); File clusterFile = new File(basename + POSTFIX_CLUSTER); File oldFile = new File(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_OLD); if (newFile.exists() && !newFile.delete()) { throw new IOException("Unable to delete new cluster file " + newFile); } if (oldFile.exists()) { if (clusterFile.exists() && !clusterFile.delete()) { throw new IOException("Unable to delete cluster file " + clusterFile); } if (!oldFile.renameTo(clusterFile)) { throw new IOException("Unable to rename old cluster file " + oldFile + " to " + clusterFile); } } // FIXME: if transaction size exceeds cache size, this loads the // cluster again altough it's not really needed// MagicCluster cluster = loadCluster(cid,false); Cluster cluster = loadCluster(cid, (MagicTransaction) ta); cluster.abort(ta); // the above abort() call does not change the cluster in memory, so // we have to reload the cluster next time unloadCluster(cid, false); // after the cluster is aborted its lock is released and has to be // updated on disk; if no lock file exists, the lock is newly created // when loading updateLockOnDisk(cluster, ta); } protected void updateLockOnDisk(Cluster cluster, Transaction ta) throws IOException { // System.out.println ("commit " + cid + ": " + ((DefaultLock)cluster.lock).lockers.count()); ClusterID cid = cluster.clusterID(); if (cluster.lock().level(ta) == Lock.LEVEL_NONE) { File lockFile = new File(basename(cid) + POSTFIX_LOCK); if (lockFile.exists() && !lockFile.delete()) { throw new IOException("Unable to delete lock file."); } } else { storeDataImmediately(cluster.lock(), basename(cid) + POSTFIX_LOCK); } } /* * Since we dropped the whole pinning idea we can do all writing in a * separate thread, as long as there are hard references to a cluster, that * cluster can be found in the clusterCache. */ private class StoreThread extends Thread { private class Entry { public String key; public Object value; public Entry(String key, Object value) { this.value = value; this.key = key; } } private LinkedList storeList = new LinkedList(); private volatile boolean stopRunning; public void stopRunning() { stopRunning = true; synchronized (storeList) { storeList.notifyAll(); } } public void storeData(Object obj, String key) { if (stopRunning) { // TODO: replace with proper exception throw new RuntimeException("cannot call storeData() after stopRunning()"); } synchronized (storeList) { // It might happen that a tx is creating objects like crazy, // thereby creating clusters like crazy. When there are more // clusters being created than written, we run out of memory. So // we need to keep the number of clusters that is to be written // to disk not too high // TODO: make some parameter out of this while (storeList.size() >= 10) { try { storeList.wait(); } catch (InterruptedException ignore) { } } storeList.addLast(new Entry(key, obj)); storeList.notifyAll(); } } public void run() { for (Entry entry = null; !stopRunning && entry == null; ) { synchronized (storeList) { if (storeList.size() > 0) { entry = (Entry) storeList.removeFirst(); storeList.notifyAll(); } else { try { storeList.wait(); } catch (InterruptedException ignore) { } } } if (entry != null) { try { storeDataImmediately(entry.value, entry.key); } catch (IOException e) { env.logWriter.newEntry(this, "could not write: " + entry.value + ", filename: " + entry.key, e, LogWriter.ERROR); } } } } } private StoreThread storeThread = new StoreThread(); /** * Serialize and store the specified object for the specified key. This * current implementation uses the file system as back end store. */ protected void storeData(Object obj, String key) throws IOException { if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "storeData(): " + key, LogWriter.DEBUG3); } storeThread.storeData(obj, key); } protected void storeDataImmediately(Object obj, String key) throws IOException { OutputStream out = new FileOutputStream(key); if (compressClusters) { out = new GZIPOutputStream(out, 3 * 4096); } else { out = new BufferedOutputStream(out, 3 * 4096); } ObjectOutputStream oout = new ObjectOutputStream(out); try { oout.writeObject(obj); } finally { oout.close(); } } /** * Load the data that previously has been stored for the given key. */ protected Object loadData(String key) throws IOException, ClassNotFoundException { if (env.logWriter.hasTarget(LogWriter.DEBUG3)) { env.logWriter.newEntry(this, "loadData(): " + key, LogWriter.DEBUG3); } InputStream in = new FileInputStream(key); if (compressClusters) { in = new GZIPInputStream(in, 3 * 4096); } else { in = new BufferedInputStream(in, 3 * 4096); } ObjectInputStream oin = new ResolvingObjectInputStream(in); try { Object result = oin.readObject(); return result; } finally { oin.close(); } } void abortTransaction(MagicTransaction ta) throws IOException, ClassNotFoundException { ta.commitClusterIDs = new DxHashSet(64); DxIterator it = ta.idTable.iterator(); ClusterID cid; while ((cid = (ClusterID) it.next()) != null) { if (!ta.commitClusterIDs.contains(cid)) { // We MUST NOT abort read locked clusters (because they may be read locked from other transactions, too) Cluster cluster = loadCluster(cid, ta); if (cluster.lock().level(ta) > Lock.LEVEL_READ) { if (env.logWriter.hasTarget(LogWriter.DEBUG2)) { env.logWriter.newEntry(this, "abort cluster: " + cid, LogWriter.DEBUG2); } abortCluster(ta, cid); } else { // Do a plain unlock, and we are fine. cluster.lock().release(ta); } ta.commitClusterIDs.add(cid); } } }}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -