📄 replicationmasterfile.cs
字号:
namespace Perst.Impl
{
using System;
using System.Net.Sockets;
using System.Net;
using System.Threading;
using Perst;
/// <summary>
/// File performing replication of changed pages to specified slave nodes.
/// </summary>
public class ReplicationMasterFile : IFile
{
/// <summary>
/// Constructor of replication master file
/// </summary>
/// <param name="storage">replication storage</param>
/// <param name="file">local file used to store data locally</param>
public ReplicationMasterFile(ReplicationMasterStorageImpl storage, IFile file)
: this(storage, file, storage.port, storage.hosts, storage.replicationAck)
{
}
/// <summary>
/// Constructor of replication master file
/// </summary>
/// <param name="file">local file used to store data locally</param>
/// <param name="hosts">slave node hosts to which replicastion will be performed</param>
/// <param name="ack">whether master should wait acknowledgment from slave node during trasanction commit</param>
public ReplicationMasterFile(IFile file, string[] hosts, bool ack)
: this(null, file, -1, hosts, ack)
{
}
private ReplicationMasterFile(ReplicationMasterStorageImpl storage, IFile file, int port, string[] hosts, bool ack)
{
this.storage = storage;
this.file = file;
this.hosts = hosts;
this.ack = ack;
this.port = port;
mutex = new object();
sockets = new Socket[hosts.Length];
rcBuf = new byte[1];
txBuf = new byte[8 + Page.pageSize];
nHosts = 0;
for (int i = 0; i < hosts.Length; i++)
{
connect(i);
}
if (port >= 0)
{
listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(new IPEndPoint(IPAddress.Any, port));
listenSocket.Listen(ListenQueueSize);
listening = true;
listenThread = new Thread(new ThreadStart(run));
listenThread.Start();
}
}
public void run()
{
while (true)
{
Socket s = null;
try
{
s = listenSocket.Accept();
s.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, 1);
s.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger,
new System.Net.Sockets.LingerOption(true, LingerTime));
}
catch (SocketException)
{
}
lock (mutex)
{
if (!listening)
{
return;
}
}
if (s != null)
{
addConnection(s);
}
}
}
private void addConnection(Socket s)
{
lock (mutex)
{
int n = hosts.Length;
string[] newHosts = new string[n+1];
Array.Copy(hosts, 0, newHosts, 0, n);
newHosts[n] = s.RemoteEndPoint.ToString();
hosts = newHosts;
Socket[] newSockets = new Socket[n+1];
Array.Copy(sockets, 0, newSockets, 0, n);
newSockets[n] = s;
sockets = newSockets;
nHosts += 1;
SynchronizeThread syncThread = new SynchronizeThread(this, n);
Thread t = new Thread(new ThreadStart(syncThread.run));
t.Start();
}
}
void synchronizeNewSlaveNode(int i)
{
long size = storage.DatabaseSize;
Socket s;
lock (mutex)
{
s = sockets[i];
if (s == null)
{
return;
}
}
for (long pos = 0; pos < size; pos += Page.pageSize)
{
Page pg = storage.pool.getPage(pos);
try
{
lock (s)
{
Bytes.pack8(txBuf, 0, pos);
Array.Copy(pg.data, 0, txBuf, 8, Page.pageSize);
storage.pool.unfix(pg);
s.Send(txBuf);
if (!ack || pos != 0 || s.Receive(rcBuf) == 1)
{
continue;
}
}
}
catch (SocketException)
{
}
lock (mutex)
{
if (sockets[i] != null)
{
HandleError(hosts[i]);
sockets[i] = null;
nHosts -= 1;
}
}
return;
}
lock (s)
{
Bytes.pack8(txBuf, 0, ReplicationSlaveStorageImpl.REPL_SYNC);
s.Send(txBuf);
}
}
class SynchronizeThread
{
int i;
ReplicationMasterFile master;
public SynchronizeThread(ReplicationMasterFile master, int i)
{
this.i = i;
this.master = master;
}
public void run()
{
master.synchronizeNewSlaveNode(i);
}
}
public int GetNumberOfAvailableHosts()
{
return nHosts;
}
protected void connect(int i)
{
String host = hosts[i];
int colon = host.IndexOf(':');
int port = int.Parse(host.Substring(colon+1));
host = host.Substring(0, colon);
Socket socket = null;
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
int maxAttempts = storage != null
? storage.slaveConnectionTimeout : MaxConnectionAttempts;
for (int j = 0; j < maxAttempts; j++)
{
#if NET_FRAMEWORK_20
foreach (IPAddress ip in Dns.GetHostEntry(host).AddressList)
#else
foreach (IPAddress ip in Dns.Resolve(host).AddressList)
#endif
{
try
{
socket.Connect(new IPEndPoint(ip, port));
if (socket.Connected)
{
sockets[i] = socket;
nHosts += 1;
socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.NoDelay, 1);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger,
new System.Net.Sockets.LingerOption(true, LingerTime));
return;
}
}
catch (SocketException) {}
}
Thread.Sleep(ConnectionTimeout);
}
HandleError(hosts[i]);
}
/// <summary>
/// When overriden by base class this method perfroms socket error handling
/// </summary>
/// <returns><code>true</code> if host should be reconnected and attempt to send data to it should be
/// repeated, <code>false</code> if no more attmpts to communicate with this host should be performed
/// </returns>
public bool HandleError(string host)
{
return (storage != null && storage.listener != null)
? storage.listener.ReplicationError(host)
: false;
}
public virtual void Write(long pos, byte[] buf)
{
lock (mutex)
{
for (int i = 0; i < sockets.Length; i++)
{
while (sockets[i] != null)
{
try
{
lock(sockets[i])
{
Bytes.pack8(txBuf, 0, pos);
Array.Copy(buf, 0, txBuf, 8, buf.Length);
sockets[i].Send(txBuf);
if (!ack || pos != 0 || sockets[i].Receive(rcBuf) == 1)
{
break;
}
}
}
catch (SocketException) {}
sockets[i] = null;
nHosts -= 1;
if (HandleError(hosts[i]))
{
connect(i);
}
else
{
break;
}
}
}
}
file.Write(pos, buf);
}
public int Read(long pos, byte[] buf)
{
return file.Read(pos, buf);
}
public void Sync()
{
file.Sync();
}
public void Lock()
{
file.Lock();
}
public bool NoFlush
{
get { return file.NoFlush; }
set { file.NoFlush = value; }
}
public virtual void Close()
{
if (listenThread != null)
{
lock (mutex)
{
listening = false;
}
try
{
Socket s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
#if NET_FRAMEWORK_20
s.Connect(new IPEndPoint(Dns.GetHostEntry("localhost").AddressList[0], port));
#else
s.Connect(new IPEndPoint(Dns.Resolve("localhost").AddressList[0], port));
#endif
s.Close();
}
catch (SocketException) {}
listenThread.Join();
try
{
listenSocket.Close();
}
catch (SocketException) {}
}
file.Close();
Bytes.pack8(txBuf, 0, ReplicationSlaveStorageImpl.REPL_CLOSE);
for (int i = 0; i < sockets.Length; i++)
{
if (sockets[i] != null)
{
try
{
sockets[i].Send(txBuf);
sockets[i].Close();
}
catch (SocketException) {}
}
}
}
public long Length
{
get { return file.Length; }
}
public static int ListenQueueSize = 10;
public static int LingerTime = 10; // linger parameter for the socket
public static int MaxConnectionAttempts = 10; // attempts to establish connection with slave node
public static int ConnectionTimeout = 1000; // timeout between attempts to conbbect to the slave
protected Socket[] sockets;
protected byte[] txBuf;
protected byte[] rcBuf;
protected IFile file;
protected string[] hosts;
protected int nHosts;
protected bool ack;
protected bool listening;
protected Thread listenThread;
protected Socket listenSocket;
protected int port;
protected object mutex;
protected ReplicationMasterStorageImpl storage;
}
}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -