⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replicationslavestorageimpl.java

📁 这个是perst-269.zip下面的SOURCECODE,和大家分享了。
💻 JAVA
字号:
package org.garret.perst.impl;

import java.io.*;
import java.net.*;

import org.garret.perst.*;


public abstract class ReplicationSlaveStorageImpl extends StorageImpl implements ReplicationSlaveStorage, Runnable
{ 
    static final int REPL_CLOSE = -1;
    static final int REPL_SYNC  = -2;
    
    public void open(IFile file, int pagePoolSize) {
        if (opened) {
            throw new StorageError(StorageError.STORAGE_ALREADY_OPENED);
        }
        initialize(file, pagePoolSize);
        lock = new PersistentResource();
        init = new Object();
        sync = new Object();
        done = new Object();
        commit = new Object();
        listening = true;
        connect();        
        thread = new Thread(this);
        thread.start();
        waitSynchronizationCompletion();
        waitInitializationCompletion(); 
        opened = true;
        beginThreadTransaction(REPLICATION_SLAVE_TRANSACTION);
        reloadScheme();
        endThreadTransaction();
    }


    /**
     * Check if socket is connected to the master host
     * @return <code>true</code> if connection between slave and master is sucessfully established
     */
    public boolean isConnected() {
        return socket != null;
    }
    
    public void beginThreadTransaction(int mode)
    {
        if (mode != REPLICATION_SLAVE_TRANSACTION) {
            throw new IllegalArgumentException("Illegal transaction mode");
        }
        lock.sharedLock();
        Page pg = pool.getPage(0);
        header.unpack(pg.data);
        pool.unfix(pg);
        currIndex = 1-header.curr;
        currIndexSize = header.root[1-currIndex].indexUsed;
        committedIndexSize = currIndexSize;
        usedSize = header.root[currIndex].size;
        objectCache.clear();
    }
     
    public void endThreadTransaction(int maxDelay)
    {
        lock.unlock();
    }

    protected void waitSynchronizationCompletion() {
        try { 
            synchronized (sync) { 
                while (outOfSync) { 
                    sync.wait();
                }
            }
        } catch (InterruptedException x) { 
        }
    }

    protected void waitInitializationCompletion() {
        try { 
            synchronized (init) { 
                while (!initialized) { 
                    init.wait();
                }
            }
        } catch (InterruptedException x) { 
        }
    }

    /**
     * Wait until database is modified by master
     * This method blocks current thread until master node commits trasanction and
     * this transanction is completely delivered to this slave node
     */
    public void waitForModification() { 
        try { 
            synchronized (commit) { 
                if (socket != null) { 
                    commit.wait();
                }
            }
        } catch (InterruptedException x) { 
        }
    }

    protected static final int DB_HDR_CURR_INDEX_OFFSET  = 0;
    protected static final int DB_HDR_DIRTY_OFFSET       = 1;
    protected static final int DB_HDR_INITIALIZED_OFFSET = 2;
    protected static final int PAGE_DATA_OFFSET          = 8;
    
    public static int LINGER_TIME = 10; // linger parameter for the socket

    /**
     * When overriden by base class this method perfroms socket error handling
     * @return <code>true</code> if host should be reconnected and attempt to send data to it should be 
     * repeated, <code>false</code> if no more attmpts to communicate with this host should be performed 
     */     
    public boolean handleError() 
    {
        return (listener != null) ? listener.replicationError(null) : false;
    }

    void connect()
    {
        try { 
            socket = getSocket();
            try {
                socket.setSoLinger(true, LINGER_TIME);
            } catch (NoSuchMethodError er) {}
            try { 
                socket.setTcpNoDelay(true);
            } catch (Exception x) {}
            in = socket.getInputStream();
            if (replicationAck) { 
                out = socket.getOutputStream();
            }
        } catch (IOException x) { 
            socket = null;
            in = null;
        }
    }

    abstract Socket getSocket() throws IOException;

    void cancelIO() {}

    public void run() { 
        byte[] buf = new byte[Page.pageSize+PAGE_DATA_OFFSET];

        while (listening) { 
            int offs = 0;
            do {
                int rc;
                try { 
                    rc = in.read(buf, offs, buf.length - offs);
                } catch (IOException x) { 
                    rc = -1;
                }
                synchronized(done) { 
                    if (!listening) { 
                        return;
                    }
                }
                if (rc < 0) { 
                    if (handleError()) { 
                        connect();
                    } else { 
                        return;
                    }
                } else { 
                    offs += rc;
                }
            } while (offs < buf.length);
            
            long pos = Bytes.unpack8(buf, 0);
            boolean transactionCommit = false;
            if (pos == 0) { 
                if (replicationAck) { 
                    try { 
                        out.write(buf, 0, 1);
                    } catch (IOException x) {
                        handleError();
                    }
                }
                if (buf[PAGE_DATA_OFFSET + DB_HDR_CURR_INDEX_OFFSET] != prevIndex) { 
                    prevIndex = buf[PAGE_DATA_OFFSET + DB_HDR_CURR_INDEX_OFFSET];
                    lock.exclusiveLock();
                    transactionCommit = true;
                }
            } else if (pos == REPL_SYNC) { 
                synchronized(sync) { 
                    outOfSync = false;
                    sync.notify();
                }
                continue;
            } else if (pos == REPL_CLOSE) { 
                synchronized(commit) { 
                    hangup();
                    commit.notifyAll();
                }     
                return;
            }
            
            Page pg = pool.putPage(pos);
            System.arraycopy(buf, PAGE_DATA_OFFSET, pg.data, 0, Page.pageSize);
            pool.unfix(pg);
            
            if (pos == 0) { 
                if (!initialized && buf[PAGE_DATA_OFFSET + DB_HDR_INITIALIZED_OFFSET] != 0) { 
                    synchronized(init) { 
                        initialized = true;
                        init.notify();
                    }
                }
                if (transactionCommit) { 
                    lock.unlock();
                    synchronized(commit) { 
                        commit.notifyAll();
                    }
                    pool.flush();
                }
            }
        }            
    }

    public void close() {
        synchronized (done) {
            listening = false;
        }
        cancelIO();
        try { 
            thread.interrupt();
            thread.join();
        } catch (InterruptedException x) {}

        hangup();

        pool.flush();
        super.close();
    }

    protected void hangup() { 
        if (socket != null) { 
            try { 
                in.close();
                if (out != null) { 
                    out.close();
                }
                socket.close();
            } catch (IOException x) {}
            in = null;
            socket = null;
        }
    }

    protected boolean isDirty() { 
        return false;
    }

    protected InputStream  in;
    protected OutputStream out;
    protected Socket       socket;
    protected boolean      outOfSync;
    protected boolean      initialized;
    protected boolean      listening;
    protected Object       sync;
    protected Object       init;
    protected Object       done;
    protected Object       commit;
    protected int          prevIndex;
    protected IResource    lock;
    protected Thread       thread;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -