transproxy.cpp

来自「MySQL数据库开发源码 值得一看哦」· C++ 代码 · 共 362 行

CPP
362
字号
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <ndb_global.h>#include <NdbTCP.h>#include <NdbOut.hpp>#include <NdbThread.h>#include <NdbSleep.h>#include <Properties.hpp>#include <LocalConfig.hpp>#include <Config.hpp>#include <InitConfigFileParser.hpp>#include <IPCConfig.hpp>static voidfatal(char const* fmt, ...){    va_list ap;    char buf[200];    va_start(ap, fmt);    BaseString::vsnprintf(buf, sizeof(buf), fmt, ap);    va_end(ap);    ndbout << "FATAL: " << buf << endl;    sleep(1);    exit(1);}static voiddebug(char const* fmt, ...){    va_list ap;    char buf[200];    va_start(ap, fmt);    BaseString::vsnprintf(buf, sizeof(buf), fmt, ap);    va_end(ap);    ndbout << buf << endl;}// nodestruct Node {    enum Type { MGM = 1, DB = 2, API = 3 };    Type type;    unsigned id;		// node id    static Node* list;    static unsigned count;    static Node* find(unsigned n) {	for (unsigned i = 0; i < count; i++) {	    if (list[i].id == n)		return &list[i];	}	return 0;    }};unsigned Node::count = 0;Node* Node::list = 0;struct Copy {    int rfd;			// read from    int wfd;			// write to    unsigned char* buf;    unsigned bufsiz;    NdbThread* thread;    void run();    char info[20];};// connection between nodes 0-server side 1-client side// we are client to 0 and server to 1struct Conn {    Node* node[2];		// the nodes    unsigned port;		// server port    unsigned proxy;		// proxy port    static unsigned count;    static unsigned proxycount;    static Conn* list;    NdbThread* thread;		// thread handling this connection    void run();			// run the connection    int sockfd[2];		// socket 0-on server side 1-client side    void conn0();		// connect to side 0    void conn1();		// connect to side 0    char info[20];    Copy copy[2];		// 0-to-1 and 1-to-0};unsigned Conn::count = 0;unsigned Conn::proxycount = 0;Conn* Conn::list = 0;// global datastatic char* hostname = 0;static struct sockaddr_in hostaddr;static char* localcfgfile = 0;static char* initcfgfile = 0;static unsigned ownnodeid = 0;static voidproperr(const Properties* props, const char* name, int i = -1){    if (i < 0) {	fatal("get %s failed: errno = %d", name, props->getPropertiesErrno());    } else {	fatal("get %s_%d failed: errno = %d", name, i, props->getPropertiesErrno());    }}// read config and load it into our structsstatic voidgetcfg(){    LocalConfig lcfg;    if (! lcfg.read(localcfgfile)) {	fatal("read %s failed", localcfgfile);    }    ownnodeid = lcfg._ownNodeId;    debug("ownnodeid = %d", ownnodeid);    InitConfigFileParser pars(initcfgfile);    Config icfg;    if (! pars.getConfig(icfg)) {	fatal("parse %s failed", initcfgfile);    }    Properties* ccfg = icfg.getConfig(ownnodeid);    if (ccfg == 0) {	const char* err = "unknown error";	fatal("getConfig: %s", err);    }    ccfg->put("NodeId", ownnodeid);    ccfg->put("NodeType", "MGM");    if (! ccfg->get("NoOfNodes", &Node::count)) {	properr(ccfg, "NoOfNodes", -1);    }    debug("Node::count = %d", Node::count);    Node::list = new Node[Node::count];    for (unsigned i = 0; i < Node::count; i++) {	Node& node = Node::list[i];	const Properties* nodecfg;	if (! ccfg->get("Node", 1+i, &nodecfg)) {	    properr(ccfg, "Node", 1+i);	}	const char* type;	if (! nodecfg->get("Type", &type)) {	    properr(nodecfg, "Type");	}	if (strcmp(type, "MGM") == 0) {	    node.type = Node::MGM;	} else if (strcmp(type, "DB") == 0) {	    node.type = Node::DB;	} else if (strcmp(type, "API") == 0) {	    node.type = Node::API;	} else {	    fatal("prop %s_%d bad Type = %s", "Node", 1+i, type);	}	if (! nodecfg->get("NodeId", &node.id)) {	    properr(nodecfg, "NodeId");	}	debug("node id=%d type=%d", node.id, node.type);    }    IPCConfig ipccfg(ccfg);    if (ipccfg.init() != 0) {	fatal("ipccfg init failed");    }    if (! ccfg->get("NoOfConnections", &Conn::count)) {	properr(ccfg, "NoOfConnections");    }    debug("Conn::count = %d", Conn::count);    Conn::list = new Conn[Conn::count];    for (unsigned i = 0; i < Conn::count; i++) {	Conn& conn = Conn::list[i];	const Properties* conncfg;	if (! ccfg->get("Connection", i, &conncfg)) {	    properr(ccfg, "Connection", i);	}	unsigned n;	if (! conncfg->get("NodeId1", &n)) {	    properr(conncfg, "NodeId1");	}	if ((conn.node[0] = Node::find(n)) == 0) {	    fatal("node %d not found", n);	}	if (! conncfg->get("NodeId2", &n)) {	    properr(conncfg, "NodeId2");	}	if ((conn.node[1] = Node::find(n)) == 0) {	    fatal("node %d not found", n);	}	if (! conncfg->get("PortNumber", &conn.port)) {	    properr(conncfg, "PortNumber");	}	conn.proxy = 0;	const char* proxy;	if (conncfg->get("Proxy", &proxy)) {	    conn.proxy = atoi(proxy);	    if (conn.proxy > 0) {		Conn::proxycount++;	    }	}	sprintf(conn.info, "conn %d-%d", conn.node[0]->id, conn.node[1]->id);    }}voidConn::conn0(){    int fd;    while (1) {	if ((fd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {	    fatal("%s: create client socket failed: %s", info, strerror(errno));	}	struct sockaddr_in servaddr;	memset(&servaddr, 0, sizeof(servaddr));	servaddr.sin_family = AF_INET;	servaddr.sin_port = htons(port);	servaddr.sin_addr = hostaddr.sin_addr;#if 0	// coredump	if (Ndb_getInAddr(&servaddr.sin_addr, hostname) != 0) {	    fatal("%s: hostname %s lookup failed", info, hostname);	}#endif	if (connect(fd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == 0)	    break;	if (errno != ECONNREFUSED) {	    fatal("%s: connect failed: %s", info, strerror(errno));	}	close(fd);	NdbSleep_MilliSleep(100);    }    sockfd[0] = fd;    debug("%s: side 0 connected", info);}voidConn::conn1(){    int servfd;    if ((servfd = socket(PF_INET, SOCK_STREAM, 0)) == -1) {	fatal("%s: create server socket failed: %s", info, strerror(errno));    }    struct sockaddr_in servaddr;    memset(&servaddr, 0, sizeof(servaddr));    servaddr.sin_family = AF_INET;    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);    servaddr.sin_port = htons(proxy);    const int on = 1;    setsockopt(servfd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on));    if (bind(servfd, (struct sockaddr*) &servaddr, sizeof(servaddr)) == -1) {	fatal("%s: bind %d failed: %s", info, proxy, strerror(errno));    }    if (listen(servfd, 1) == -1) {	fatal("%s: listen %d failed: %s", info, proxy, strerror(errno));    }    int fd;    if ((fd = accept(servfd, 0, 0)) == -1) {	fatal("%s: accept failed: %s", info, strerror(errno));    }    sockfd[1] = fd;    close(servfd);    debug("%s: side 1 connected", info);}voidCopy::run(){    debug("%s: start", info);    int n, m;    while (1) {	n = read(rfd, buf, sizeof(buf));	if (n < 0)	    fatal("read error: %s", strerror(errno));	m = write(wfd, buf, n);	if (m != n)	    fatal("write error: %s", strerror(errno));    }    debug("%s: stop", info);}extern "C" void*copyrun_C(void* copy){    ((Copy*) copy)->run();    return 0;}voidConn::run(){    debug("%s: start", info);    conn1();    conn0();    const unsigned siz = 32 * 1024;    for (int i = 0; i < 2; i++) {	Copy& copy = this->copy[i];	copy.rfd = sockfd[i];	copy.wfd = sockfd[1-i];	copy.buf = new unsigned char[siz];	copy.bufsiz = siz;	sprintf(copy.info, "copy %d-%d", this->node[i]->id, this->node[1-i]->id);	copy.thread = NdbThread_Create(copyrun_C, (void**)&copy,	    8192, "copyrun", NDB_THREAD_PRIO_LOW);	if (copy.thread == 0) {	    fatal("%s: create thread %d failed errno=%d", i, errno);	}    }    debug("%s: stop", info);}extern "C" void*connrun_C(void* conn){    ((Conn*) conn)->run();    return 0;}static voidstart(){    NdbThread_SetConcurrencyLevel(3 * Conn::proxycount + 2);    for (unsigned i = 0; i < Conn::count; i++) {	Conn& conn = Conn::list[i];	if (! conn.proxy)	    continue;	conn.thread = NdbThread_Create(connrun_C, (void**)&conn,	    8192, "connrun", NDB_THREAD_PRIO_LOW);	if (conn.thread == 0) {	    fatal("create thread %d failed errno=%d", i, errno);	}    }    sleep(3600);}intmain(int av, char** ac){    ndb_init();    debug("start");    hostname = "ndb-srv7";    if (Ndb_getInAddr(&hostaddr.sin_addr, hostname) != 0) {	fatal("hostname %s lookup failed", hostname);    }    localcfgfile = "Ndb.cfg";    initcfgfile = "config.txt";    getcfg();    start();    debug("done");    return 0;}// vim: set sw=4 noet:

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?