⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replicationslavestorageimpl.cs

📁 Perst开源实时数据库
💻 CS
字号:
namespace Perst.Impl
{
    using System;
    using System.Net;
    using System.Net.Sockets;
    using System.Threading;
    using Perst;
    
    public abstract class ReplicationSlaveStorageImpl : StorageImpl, ReplicationSlaveStorage
    { 
        public override void Open(IFile file, int pagePoolSize) 
        {
            if (opened) 
            {
                throw new StorageError(StorageError.ErrorCode.STORAGE_ALREADY_OPENED);
            }
            initialize(file, pagePoolSize);

            lck = new PersistentResource();
            init = new object();
            sync = new object();
            done = new object();
            commit = new object();
            listening = true;
            connect();
            thread = new Thread(new ThreadStart(run));
            thread.Start();
            WaitSynchronizationCompletion();
            WaitInitializationCompletion();
            opened = true;
            BeginThreadTransaction(TransactionMode.ReplicationSlave);
            reloadScheme();
            EndThreadTransaction();
        }


        /// <summary>
        /// Check if socket is connected to the master host
        /// @return <code>true</code> if connection between slave and master is sucessfully established
        /// </summary>
        public bool IsConnected() 
        {
            return socket != null;
        }
    
        public override void BeginThreadTransaction(TransactionMode mode)
        {
            if (mode != TransactionMode.ReplicationSlave) 
            {
                throw new ArgumentException("Illegal transaction mode");
            }
            lck.SharedLock();
            Page pg = pool.getPage(0);
            header.unpack(pg.data);
            pool.unfix(pg);
            currIndex = 1-header.curr;
            currIndexSize = header.root[1-currIndex].indexUsed;
            committedIndexSize = currIndexSize;
            usedSize = header.root[currIndex].size;
            objectCache.clear();
        }
     
        public override void EndThreadTransaction(int maxDelay)
        {
            lck.Unlock();
        }

        protected void WaitSynchronizationCompletion() 
        {
            lock(sync) 
            { 
                while (outOfSync) 
                { 
                    Monitor.Wait(sync);
                }
            }
        }

        protected void WaitInitializationCompletion() 
        {
            lock(init) 
            { 
                while (!initialized) 
                { 
                    Monitor.Wait(init);
                }
            }
        }

        /// <summary>
        /// Wait until database is modified by master
        /// This method blocks current thread until master node commits trasanction and
        /// this transanction is completely delivered to this slave node
        /// </summary>
        public void WaitForModification() 
        { 
            lock(commit) 
            { 
                if (socket != null) 
                { 
                    Monitor.Wait(commit);
                }
            }
        }

        internal const int DB_HDR_CURR_INDEX_OFFSET  = 0;
        internal const int DB_HDR_DIRTY_OFFSET       = 1;
        internal const int DB_HDR_INITIALIZED_OFFSET = 2;
        internal const int PAGE_DATA_OFFSET          = 8;
    
        internal const int REPL_CLOSE = -1;
        internal const int REPL_SYNC  = -2;

        public static int ListenQueueSize = 10;
        public static int LingerTime = 10; // linger parameter for the socket

        protected abstract Socket GetSocket();
        
        protected virtual void cancelIO() {}

        void connect()
        {
            try 
            { 
                socket = GetSocket();
            } 
            catch (SocketException) 
            { 
                socket = null;
            }
        }

        /// <summary>
        /// When overriden by base class this method perfroms socket error handling
        /// @return <code>true</code> if host should be reconnected and attempt to send data to it should be 
        /// repeated, <code>false</code> if no more attmpts to communicate with this host should be performed 
        /// </summary>     
        public virtual bool HandleError() 
        {
            return (listener != null) ? listener.ReplicationError(null) : false;
        }

        public void run() 
        { 
            byte[] buf = new byte[Page.pageSize+PAGE_DATA_OFFSET];

            while (listening) 
            { 
                int offs = 0;
                do 
                {
                    int rc;
                    try 
                    { 
                        rc = socket.Receive(buf, offs, buf.Length - offs, SocketFlags.None);
                    } 
                    catch (SocketException) 
                    { 
                        rc = -1;
                    }
                    lock(done) 
                    { 
                        if (!listening) 
                        { 
                            return;
                        }
                    }
                    if (rc < 0) 
                    { 
                        if (HandleError()) 
                        { 
                            connect();
                        } 
                        else 
                        { 
                            return;
                        }
                    } 
                    else 
                    { 
                        offs += rc;
                    }
                } while (offs < buf.Length);
            
                long pos = Bytes.unpack8(buf, 0);
                bool transactionCommit = false;
                if (pos == 0) 
                { 
                    if (replicationAck) 
                    { 
                        try 
                        { 
                            socket.Send(buf, 0, 1, SocketFlags.None);
                        } 
                        catch (SocketException) 
                        {
                            HandleError();
                        }
                    }
                    if (buf[PAGE_DATA_OFFSET + DB_HDR_CURR_INDEX_OFFSET] != prevIndex) 
                    { 
                        prevIndex = buf[PAGE_DATA_OFFSET + DB_HDR_CURR_INDEX_OFFSET];
                        lck.ExclusiveLock();
                        transactionCommit = true;
                    }
                } 
                else if (pos == REPL_SYNC)     
                { 
                    lock(sync) 
                    { 
                        outOfSync = false;
                        Monitor.Pulse(sync);
                    }
                    continue;   
                }
                else if (pos == REPL_CLOSE)
                { 
                    lock(commit) 
                    { 
                        hangup();
                        Monitor.PulseAll(commit);
                    }     
                    return;
                }
            
                Page pg = pool.putPage(pos);
                Array.Copy(buf, PAGE_DATA_OFFSET, pg.data, 0, Page.pageSize);
                pool.unfix(pg);
            
                if (pos == 0) 
                { 
                    if (!initialized && buf[PAGE_DATA_OFFSET + DB_HDR_INITIALIZED_OFFSET] != 0) 
                    { 
                        lock(init) 
                        { 
                            initialized = true;
                            Monitor.Pulse(init);
                        }
                    }
                    if (transactionCommit) 
                    { 
                        lck.Unlock();
                        lock(commit) 
                        { 
                            Monitor.PulseAll(commit);
                        }
                        pool.flush();
                    }
                }
            }            
        }

        public override void Close() 
        {
            lock(done) 
            {
                listening = false;
            }
            cancelIO();
            thread.Interrupt();
            thread.Join();
            hangup();

            pool.flush();
            base.Close();
        }

        private void hangup() 
        { 
            if (socket != null) 
            { 
                try 
                { 
                    socket.Close();
                } 
                catch (SocketException) {}
                socket = null;
            }
        }

        protected override bool isDirty() 
        { 
            return false;
        }

        protected Socket       socket;
        protected IFile        file;
        protected bool         outOfSync;
        protected bool         initialized;
        protected bool         listening;
        protected object       sync;
        protected object       init;
        protected object       done;
        protected object       commit;
        protected int          prevIndex;
        protected IResource    lck;
        protected Thread       thread;
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -