⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 clusterstore.java

📁 Java的面向对象数据库系统的源代码
💻 JAVA
📖 第 1 页 / 共 3 页
字号:
        synchronized (this) { // We synchronize on this ClusterStore so that a freshly returned cluster within the ClusterStore lock may not be deactivated during the lock time.        }    }    /**     * Store the specified cluster on disk. Write new files first. If this     * write fails, the original are still valid. The cluster may has been     * written to the disk already, if is was deactivated while transaction.     * But in case the cluster (and its changes) are only in memory, we have to     * write now to check if this is possible without errors.     *     * Note: This only writes all currently commited transaction results to the     * disk. This is different from the deactivation behaviour.     *     *     * @param cid MagicCluster to be prepare-commited.     * @exception IOException None of the clusters are written to disk.     */    public synchronized void prepareCommitCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException {        if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {            env.logWriter.newEntry(this, "prepareCommitCluster(): " + cid, LogWriter.DEBUG3);        }        Cluster cluster = loadCluster(cid, (MagicTransaction) ta);        cluster.prepareCommit(ta);        String basename = basename(cid);        File clusterFile = new File(basename + POSTFIX_CLUSTER);        File oldFile = new File(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_OLD);        if (cluster.lock().level(null) >= Lock.LEVEL_WRITE) {            // need to check if the cluster file exists, it does not when the cluster            // is created during transaction ta            if (clusterFile.exists() && !clusterFile.renameTo(oldFile)) {                throw new IOException("Unable to rename cluster file " + clusterFile + " to " + oldFile);            }            String tempFilename = basename(cid) + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_NEW;            storeDataImmediately(cluster, tempFilename);        }    }    /**     * @param cid MagicCluster to be commited.     */    public synchronized void commitCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException {        if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {            env.logWriter.newEntry(this, "commitCluster(): " + cid, LogWriter.DEBUG3);        }        String basename = basename(cid);        File clusterFile = new File(basename + POSTFIX_CLUSTER);        File oldFile = new File(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_OLD);        File newFile = new File(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_NEW);        File uncommittedFile = new File(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_CLUSTER);        // newFile only exists if it has been written to disk prior to commit        if (newFile.exists() && !newFile.renameTo(clusterFile)) {            throw new IOException("Unable to rename cluster file " + newFile + " to " + clusterFile);        }        // oldFile does not exist if cluster was created during transaction ta        if (oldFile.exists() && !oldFile.delete()) {            throw new IOException("Unable to delete old cluster file " + oldFile);        }        Cluster cluster = loadCluster(cid, (MagicTransaction) ta);        cluster.commit(ta);        if (uncommittedFile.exists() && !uncommittedFile.delete()) {            throw new IOException("Unable to delete uncommitted cluster file " + uncommittedFile);        }        // after the cluster is commited its lock is released and has to be        // updated on disk; if no lock file exists, the lock is newly created        // when loading        updateLockOnDisk(cluster, ta);    }    /**     * Actually abort the specified cluster. This deletes t     * @param cid MagicCluster to be aborted.     */    public synchronized void abortCluster(Transaction ta, ClusterID cid) throws IOException, ClassNotFoundException {        String basename = basename(cid);        File newFile = new File(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_NEW);        File clusterFile = new File(basename + POSTFIX_CLUSTER);        File oldFile = new File(basename + POSTFIX_SEPARATOR + ta.taID().value() + POSTFIX_OLD);        if (newFile.exists() && !newFile.delete()) {            throw new IOException("Unable to delete new cluster file " + newFile);        }        if (oldFile.exists()) {            if (clusterFile.exists() && !clusterFile.delete()) {                throw new IOException("Unable to delete cluster file " + clusterFile);            }            if (!oldFile.renameTo(clusterFile)) {                throw new IOException("Unable to rename old cluster file " + oldFile + " to " + clusterFile);            }        }        // FIXME: if transaction size exceeds cache size, this loads the        // cluster again altough it's not really needed//      MagicCluster cluster = loadCluster(cid,false);        Cluster cluster = loadCluster(cid, (MagicTransaction) ta);        cluster.abort(ta);        // the above abort() call does not change the cluster in memory, so        // we have to reload the cluster next time        unloadCluster(cid, false);        // after the cluster is aborted its lock is released and has to be        // updated on disk; if no lock file exists, the lock is newly created        // when loading        updateLockOnDisk(cluster, ta);    }    protected void updateLockOnDisk(Cluster cluster, Transaction ta) throws IOException {        //        System.out.println ("commit " + cid + ": " + ((DefaultLock)cluster.lock).lockers.count());        ClusterID cid = cluster.clusterID();        if (cluster.lock().level(ta) == Lock.LEVEL_NONE) {            File lockFile = new File(basename(cid) + POSTFIX_LOCK);            if (lockFile.exists() && !lockFile.delete()) {                throw new IOException("Unable to delete lock file.");            }        } else {            storeDataImmediately(cluster.lock(), basename(cid) + POSTFIX_LOCK);        }    }    /*     * Since we dropped the whole pinning idea we can do all writing in a     * separate thread, as long as there are hard references to a cluster, that     * cluster can be found in the clusterCache.     */    private class StoreThread extends Thread {        private class Entry {            public String key;            public Object value;            public Entry(String key, Object value) {                this.value = value;                this.key = key;            }        }        private LinkedList storeList = new LinkedList();        private volatile boolean stopRunning;        public void stopRunning() {            stopRunning = true;            synchronized (storeList) {                storeList.notifyAll();            }        }        public void storeData(Object obj, String key) {            if (stopRunning) {                // TODO: replace with proper exception                throw new RuntimeException("cannot call storeData() after stopRunning()");            }            synchronized (storeList) {                // It might happen that a tx is creating objects like crazy,                // thereby creating clusters like crazy. When there are more                // clusters being created than written, we run out of memory. So                // we need to keep the number of clusters that is to be written                // to disk not too high                // TODO: make some parameter out of this                while (storeList.size() >= 10) {                    try {                        storeList.wait();                    } catch (InterruptedException ignore) {                    }                }                storeList.addLast(new Entry(key, obj));                storeList.notifyAll();            }        }        public void run() {            for (Entry entry = null; !stopRunning && entry == null; ) {                synchronized (storeList) {                    if (storeList.size() > 0) {                        entry = (Entry) storeList.removeFirst();                        storeList.notifyAll();                    } else {                        try {                            storeList.wait();                        } catch (InterruptedException ignore) {                        }                    }                }                if (entry != null) {                    try {                        storeDataImmediately(entry.value, entry.key);                    } catch (IOException e) {                        env.logWriter.newEntry(this, "could not write: " + entry.value + ", filename: " + entry.key, e, LogWriter.ERROR);                    }                }            }        }    }    private StoreThread storeThread = new StoreThread();    /**     * Serialize and store the specified object for the specified key. This     * current implementation uses the file system as back end store.     */    protected void storeData(Object obj, String key) throws IOException {        if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {            env.logWriter.newEntry(this, "storeData(): " + key, LogWriter.DEBUG3);        }        storeThread.storeData(obj, key);    }    protected void storeDataImmediately(Object obj, String key) throws IOException {        OutputStream out = new FileOutputStream(key);        if (compressClusters) {            out = new GZIPOutputStream(out, 3 * 4096);        } else {            out = new BufferedOutputStream(out, 3 * 4096);        }        ObjectOutputStream oout = new ObjectOutputStream(out);        try {            oout.writeObject(obj);        } finally {            oout.close();        }    }    /**     * Load the data that previously has been stored for the given key.     */    protected Object loadData(String key) throws IOException, ClassNotFoundException {        if (env.logWriter.hasTarget(LogWriter.DEBUG3)) {            env.logWriter.newEntry(this, "loadData(): " + key, LogWriter.DEBUG3);        }        InputStream in = new FileInputStream(key);        if (compressClusters) {            in = new GZIPInputStream(in, 3 * 4096);        } else {            in = new BufferedInputStream(in, 3 * 4096);        }        ObjectInputStream oin = new ResolvingObjectInputStream(in);        try {            Object result = oin.readObject();            return result;        } finally {            oin.close();        }    }    void abortTransaction(MagicTransaction ta) throws IOException, ClassNotFoundException {        ta.commitClusterIDs = new DxHashSet(64);        DxIterator it = ta.idTable.iterator();        ClusterID cid;        while ((cid = (ClusterID) it.next()) != null) {            if (!ta.commitClusterIDs.contains(cid)) {                // We MUST NOT abort read locked clusters (because they may be read locked from other transactions, too)                Cluster cluster = loadCluster(cid, ta);                if (cluster.lock().level(ta) > Lock.LEVEL_READ) {                    if (env.logWriter.hasTarget(LogWriter.DEBUG2)) {                        env.logWriter.newEntry(this, "abort cluster: " + cid, LogWriter.DEBUG2);                    }                    abortCluster(ta, cid);                } else {                    // Do a plain unlock, and we are fine.                    cluster.lock().release(ta);                }                ta.commitClusterIDs.add(cid);            }        }    }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -