⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 replicationmasterfile.cs

📁 Perst开源实时数据库
💻 CS
字号:
namespace Perst.Impl    
{
    using System;
    using System.Net.Sockets;
    using System.Net;
    using System.Threading;
    using Perst;
	
    /// <summary>
    /// File performing replication of changed pages to specified slave nodes.
    /// </summary>
    public class ReplicationMasterFile : IFile 
    { 
        /// <summary>
        /// Constructor of replication master file
        /// </summary>
        /// <param name="storage">replication storage</param>
        /// <param name="file">local file used to store data locally</param>
        public ReplicationMasterFile(ReplicationMasterStorageImpl storage, IFile file)  
            : this(storage, file, storage.port, storage.hosts, storage.replicationAck)
        {
        }

        /// <summary>
        /// Constructor of replication master file
        /// </summary>
        /// <param name="file">local file used to store data locally</param>
        /// <param name="hosts">slave node hosts to which replicastion will be performed</param>
        /// <param name="ack">whether master should wait acknowledgment from slave node during trasanction commit</param>
        public ReplicationMasterFile(IFile file, string[] hosts, bool ack) 
            : this(null, file, -1, hosts, ack)
        {
        }
        
        private ReplicationMasterFile(ReplicationMasterStorageImpl storage, IFile file, int port, string[] hosts, bool ack) 
        {         
            this.storage = storage;
            this.file = file;
            this.hosts = hosts;
            this.ack = ack;
            this.port = port;
            mutex = new object();
            sockets = new Socket[hosts.Length];
            rcBuf = new byte[1];
            txBuf = new byte[8 + Page.pageSize];
            nHosts = 0;
            for (int i = 0; i < hosts.Length; i++) 
            { 
                connect(i);
            }
            if (port >= 0) 
            {
                listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                listenSocket.Bind(new IPEndPoint(IPAddress.Any, port));          
                listenSocket.Listen(ListenQueueSize);
                listening = true;
                listenThread = new Thread(new ThreadStart(run));
                listenThread.Start();
            }
        }

        public void run() 
        { 
            while (true) 
            { 
                Socket s = null;
                try 
                { 
                    s = listenSocket.Accept();
                    s.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, 1);
                    s.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger,
                                      new System.Net.Sockets.LingerOption(true, LingerTime));
                } 
                catch (SocketException) 
                {
                }
                lock (mutex) 
                { 
                    if (!listening) 
                    { 
                        return;
                    }
                }
                if (s != null)  
                { 
                    addConnection(s);
                }
            }
        }
         
        private void addConnection(Socket s) 
        {
            lock (mutex) 
            { 
                int n = hosts.Length;
                string[] newHosts = new string[n+1];
                Array.Copy(hosts, 0, newHosts, 0, n);
                newHosts[n] = s.RemoteEndPoint.ToString();
                hosts = newHosts;
                Socket[] newSockets = new Socket[n+1];
                Array.Copy(sockets, 0, newSockets, 0, n);
                newSockets[n] = s;
                sockets = newSockets;
                nHosts += 1;
    
                SynchronizeThread syncThread = new SynchronizeThread(this, n);
                Thread t = new Thread(new ThreadStart(syncThread.run));           
                t.Start();
            }
        }
    
        void synchronizeNewSlaveNode(int i) 
        {
            long size = storage.DatabaseSize;
            Socket s;
            lock (mutex) 
            { 
                s = sockets[i];
                if (s == null) 
                { 
                    return;
                }
            }
            for (long pos = 0; pos < size; pos += Page.pageSize) 
            { 
                Page pg = storage.pool.getPage(pos);
                try 
                { 
                    lock (s) 
                    {
                        Bytes.pack8(txBuf, 0, pos);
                        Array.Copy(pg.data, 0, txBuf, 8, Page.pageSize);
                        storage.pool.unfix(pg);
                        s.Send(txBuf);
                        if (!ack || pos != 0 || s.Receive(rcBuf) == 1) 
                        { 
                            continue;
                        }
                    }
                } 
                catch (SocketException) 
                { 
                }
                lock (mutex) 
                { 
                    if (sockets[i] != null) 
                    { 
                        HandleError(hosts[i]);
                        sockets[i] = null;
                        nHosts -= 1;
                    }
                }
                return;
            }
            lock (s) 
            {
                Bytes.pack8(txBuf, 0, ReplicationSlaveStorageImpl.REPL_SYNC);
                s.Send(txBuf);
            }   
        }

        class SynchronizeThread 
        { 
            int i;
            ReplicationMasterFile master;
    
            public SynchronizeThread(ReplicationMasterFile master, int i) 
            { 
                this.i = i;
                this.master = master;
            }
    
            public void run() 
            { 
                master.synchronizeNewSlaveNode(i);
            }
        }          
     
        public int GetNumberOfAvailableHosts() 
        { 
            return nHosts;
        }

        protected void connect(int i)
        {
            String host = hosts[i];
            int colon = host.IndexOf(':');
            int port = int.Parse(host.Substring(colon+1));
            host = host.Substring(0, colon);
            Socket socket = null; 
            socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            int maxAttempts = storage != null 
                ? storage.slaveConnectionTimeout : MaxConnectionAttempts;
            for (int j = 0; j < maxAttempts; j++)  
            { 
#if NET_FRAMEWORK_20
                foreach (IPAddress ip in Dns.GetHostEntry(host).AddressList) 
#else
                foreach (IPAddress ip in Dns.Resolve(host).AddressList) 
#endif
                { 
                    try 
                    {
                        socket.Connect(new IPEndPoint(ip, port));
                        if (socket.Connected)
                        {	
                            sockets[i] = socket;
                            nHosts += 1;
                            socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, 1);
                            socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger,
                                new System.Net.Sockets.LingerOption(true, LingerTime));
                            return;
                        }
                    } 
                    catch (SocketException) {}
                }
                Thread.Sleep(ConnectionTimeout);
            }
            HandleError(hosts[i]);
        }

        /// <summary>
        /// When overriden by base class this method perfroms socket error handling
        /// </summary>     
        /// <returns><code>true</code> if host should be reconnected and attempt to send data to it should be 
        /// repeated, <code>false</code> if no more attmpts to communicate with this host should be performed 
        /// </returns>
        public bool HandleError(string host) 
        {
            return (storage != null && storage.listener != null) 
                ? storage.listener.ReplicationError(host) 
                : false;
        }


        public virtual void Write(long pos, byte[] buf) 
        {
            lock (mutex) 
            { 
                for (int i = 0; i < sockets.Length; i++) 
                { 
                    while (sockets[i] != null) 
                    {                 
                        try 
                        { 
                            lock(sockets[i]) 
                            { 
                                Bytes.pack8(txBuf, 0, pos);
                                Array.Copy(buf, 0, txBuf, 8, buf.Length);
                                sockets[i].Send(txBuf);
                                if (!ack || pos != 0 || sockets[i].Receive(rcBuf) == 1) 
                                { 
                                    break;
                                }
                            }
                        } 
                        catch (SocketException) {} 
    
                        sockets[i] = null;
                        nHosts -= 1;
                        if (HandleError(hosts[i])) 
                        { 
                            connect(i);
                        } 
                        else 
                        { 
                            break;
                        }
                    }
                }
            }
            file.Write(pos, buf);
        }
         
        public int Read(long pos, byte[] buf) 
        {
            return file.Read(pos, buf);
        }

        public void Sync() 
        {
            file.Sync();
        }

        public void Lock() 
        { 
            file.Lock();
        }

        public bool NoFlush 
        {
            get { return file.NoFlush; }
            set { file.NoFlush =  value; }
        }

        public virtual void Close() 
        {
            if (listenThread != null) 
            { 
                lock (mutex) 
                { 
                    listening = false;
                }
                try 
                { 
                    Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
#if NET_FRAMEWORK_20
                    s.Connect(new IPEndPoint(Dns.GetHostEntry("localhost").AddressList[0], port));	
#else
                    s.Connect(new IPEndPoint(Dns.Resolve("localhost").AddressList[0], port));	
#endif
                    s.Close();
                } 
                catch (SocketException) {}
                listenThread.Join();

                try 
                {
                    listenSocket.Close();
                }           
                catch (SocketException) {}
            }                
            file.Close();
            Bytes.pack8(txBuf, 0, ReplicationSlaveStorageImpl.REPL_CLOSE);
            for (int i = 0; i < sockets.Length; i++) 
            { 
                if (sockets[i] != null) 
                {                 
                    try 
                    {  
                        sockets[i].Send(txBuf);
                        sockets[i].Close();
                    } 
                    catch (SocketException) {}
                }
            }
        }

        public long Length 
        {
            get { return file.Length; }
        }

        public static int ListenQueueSize = 10;
        public static int LingerTime = 10; // linger parameter for the socket
        public static int MaxConnectionAttempts = 10; // attempts to establish connection with slave node
        public static int ConnectionTimeout = 1000; // timeout between attempts to conbbect to the slave

        protected Socket[]       sockets;
        protected byte[]         txBuf;
        protected byte[]         rcBuf;
        protected IFile          file;
        protected string[]       hosts;
        protected int            nHosts;
        protected bool           ack;
        protected bool           listening;
        protected Thread         listenThread;
        protected Socket         listenSocket;
        protected int            port;
        protected object         mutex;
    
        protected ReplicationMasterStorageImpl storage;
    }
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -