⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 network-mysqld-proxy.c

📁 Mysql Proxy本身是个很好的mysql负载均衡工具,但是其本身有bug:当多个mysql 做slave的时候,如果一个slave死掉,会影响别的slave也死掉!这个文件修复了这个bug!
💻 C
📖 第 1 页 / 共 5 页
字号:
/* Copyright (C) 2007 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; version 2 of the License.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */ #ifdef HAVE_CONFIG_H#include "config.h"#endif/**  * @page proxy_states The internal states of the Proxy * * The MySQL Proxy implements the MySQL Protocol in its own way.  * *   -# connect @msc *   client, proxy, backend; *   --- [ label = "connect to backend" ]; *   client->proxy  [ label = "INIT" ]; *   proxy->backend [ label = "CONNECT_SERVER", URL="\ref proxy_connect_server" ]; * @endmsc *   -# auth @msc *   client, proxy, backend; *   --- [ label = "authenticate" ]; *   backend->proxy [ label = "READ_HANDSHAKE", URL="\ref proxy_read_handshake" ]; *   proxy->client  [ label = "SEND_HANDSHAKE" ]; *   client->proxy  [ label = "READ_AUTH", URL="\ref proxy_read_auth" ]; *   proxy->backend [ label = "SEND_AUTH" ]; *   backend->proxy [ label = "READ_AUTH_RESULT", URL="\ref proxy_read_auth_result" ]; *   proxy->client  [ label = "SEND_AUTH_RESULT" ]; * @endmsc *   -# query @msc *   client, proxy, backend; *   --- [ label = "query result phase" ]; *   client->proxy  [ label = "READ_QUERY", URL="\ref proxy_read_query" ]; *   proxy->backend [ label = "SEND_QUERY" ]; *   backend->proxy [ label = "READ_QUERY_RESULT", URL="\ref proxy_read_query_result" ]; *   proxy->client  [ label = "SEND_QUERY_RESULT", URL="\ref proxy_send_query_result" ]; * @endmsc * *   - network_mysqld_proxy_connection_init() *     -# registers the callbacks  *   - proxy_connect_server() (CON_STATE_CONNECT_SERVER) *     -# calls the connect_server() function in the lua script which might decide to *       -# send a handshake packet without contacting the backend server (CON_STATE_SEND_HANDSHAKE) *       -# closing the connection (CON_STATE_ERROR) *       -# picking a active connection from the connection pool *       -# pick a backend to authenticate against *       -# do nothing  *     -# by default, pick a backend from the backend list on the backend with the least active connctions *     -# opens the connection to the backend with connect() *     -# when done CON_STATE_READ_HANDSHAKE  *   - proxy_read_handshake() (CON_STATE_READ_HANDSHAKE) *     -# reads the handshake packet from the server  *   - proxy_read_auth() (CON_STATE_READ_AUTH) *     -# reads the auth packet from the client  *   - proxy_read_auth_result() (CON_STATE_READ_AUTH_RESULT) *     -# reads the auth-result packet from the server  *   - proxy_send_auth_result() (CON_STATE_SEND_AUTH_RESULT) *   - proxy_read_query() (CON_STATE_READ_QUERY) *     -# reads the query from the client  *   - proxy_read_query_result() (CON_STATE_READ_QUERY_RESULT) *     -# reads the query-result from the server  *   - proxy_send_query_result() (CON_STATE_SEND_QUERY_RESULT) *     -# called after the data is written to the client *     -# if scripts wants to close connections, goes to CON_STATE_ERROR *     -# if queries are in the injection queue, goes to CON_STATE_SEND_QUERY *     -# otherwise goes to CON_STATE_READ_QUERY *     -# does special handling for COM_BINLOG_DUMP (go to CON_STATE_READ_QUERY_RESULT)  */#ifdef HAVE_SYS_FILIO_H/** * required for FIONREAD on solaris */#include <sys/filio.h>#endif#ifndef _WIN32#include <sys/ioctl.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#define ioctlsocket ioctl#endif#include <string.h>#include <stdlib.h>#include <fcntl.h>#include <time.h>#include <stdio.h>#include <errno.h>#include <glib.h>#ifdef HAVE_LUA_H/** * embedded lua support */#include <lua.h>#include <lauxlib.h>#include <lualib.h>#endif/* for solaris 2.5 and NetBSD 1.3.x */#ifndef HAVE_SOCKLEN_Ttypedef int socklen_t;#endif#include <mysqld_error.h> /** for ER_UNKNOWN_ERROR */#include "network-mysqld.h"#include "network-mysqld-proto.h"#include "network-conn-pool.h"#include "sys-pedantic.h"#include "sql-tokenizer.h"#ifdef _WIN32#define E_NET_CONNRESET WSAECONNRESET#define E_NET_CONNABORTED WSAECONNABORTED#define E_NET_WOULDBLOCK WSAEWOULDBLOCK#define E_NET_INPROGRESS WSAEINPROGRESS#else#define E_NET_CONNRESET ECONNRESET#define E_NET_CONNABORTED ECONNABORTED#define E_NET_INPROGRESS EINPROGRESS#if EWOULDBLOCK == EAGAIN/** * some system make EAGAIN == EWOULDBLOCK which would lead to a  * error in the case handling * * set it to -1 as this error should never happen */#define E_NET_WOULDBLOCK -1#else#define E_NET_WOULDBLOCK EWOULDBLOCK#endif#endif#define TIME_DIFF_US(t2, t1) \	        ((t2.tv_sec - t1.tv_sec) * 1000000.0 + (t2.tv_usec - t1.tv_usec))#define C(x) x, sizeof(x) - 1#define HASH_INSERT(hash, key, expr) \		do { \			GString *hash_value; \			if ((hash_value = g_hash_table_lookup(hash, key))) { \				expr; \			} else { \				hash_value = g_string_new(NULL); \				expr; \				g_hash_table_insert(hash, g_strdup(key), hash_value); \			} \		} while(0);#define CRASHME() do { char *_crashme = NULL; *_crashme = 0; } while(0);typedef enum {	PROXY_NO_DECISION,	PROXY_SEND_QUERY,	PROXY_SEND_RESULT,	PROXY_SEND_INJECTION,	PROXY_IGNORE_RESULT       /** for read_query_result */} proxy_stmt_ret;typedef enum { 	BACKEND_STATE_UNKNOWN, 	BACKEND_STATE_UP, 	BACKEND_STATE_DOWN} backend_state_t;typedef enum { 	BACKEND_TYPE_UNKNOWN, 	BACKEND_TYPE_RW, 	BACKEND_TYPE_RO} backend_type_t;typedef struct {	network_address addr;	backend_state_t state;   /**< UP or DOWN */	backend_type_t type;     /**< ReadWrite or ReadOnly */	GTimeVal state_since;    /**< timestamp of the last state-change */	network_connection_pool *pool; /**< the pool of open connections */	guint connected_clients; /**< number of open connections to this backend for SQF */} backend_t;/** * the shared information across all connections  * */typedef struct {	/**	 * our pool if backends	 *	 * GPtrArray<backend_t>	 */	GPtrArray *backend_pool; 	GTimeVal backend_last_check;#ifdef HAVE_LUA_H	lua_State *L;            /**< the global lua_State */#endif} plugin_srv_state;typedef struct {	/**	 * the content of the OK packet 	 */	int server_status;	int warning_count;	guint64 affected_rows;	guint64 insert_id;	int was_resultset; /** if set, affected_rows and insert_id are ignored */	/**	 * MYSQLD_PACKET_OK or MYSQLD_PACKET_ERR	 */		int query_status;} query_status;typedef struct {	struct {		GQueue *queries;       /** queries we want to executed */		query_status qstat;#ifdef HAVE_LUA_H		lua_State *L;		int L_ref;#endif		int sent_resultset;    /** make sure we send only one result back to the client */	} injected;	plugin_srv_state *global_state;	backend_t *backend;	int backend_ndx;} plugin_con_state;typedef struct {	GString *query;	int id; /* a unique id set by the scripts to map the query to a handler */	/* the userdata's need them */	GQueue *result_queue; /* the data to parse */	query_status qstat;	GTimeVal ts_read_query;          /* timestamp when we added this query to the queues */	GTimeVal ts_read_query_result_first;   /* when we first finished it */	GTimeVal ts_read_query_result_last;     /* when we first finished it */} injection;static injection *injection_init(int id, GString *query) {	injection *i;	i = g_new0(injection, 1);	i->id = id;	i->query = query;	/**	 * we have to assume that injection_init() is only used by the read_query call	 * which should be fine	 */	g_get_current_time(&(i->ts_read_query));	return i;}static void injection_free(injection *i) {	if (!i) return;	if (i->query) g_string_free(i->query, TRUE);	g_free(i);}/** * reset the script context of the connection  */static void proxy_lua_free_script(plugin_con_state *st) {#ifdef HAVE_LUA_H	lua_State *L = st->injected.L;	plugin_srv_state *g  = st->global_state;	if (!st->injected.L) return;	g_assert(lua_isfunction(L, -1));	lua_pop(L, 1); /* function */	g_assert(lua_gettop(L) == 0);	luaL_unref(g->L, LUA_REGISTRYINDEX, st->injected.L_ref);			/**	 * clean up our object 	 */	lua_gc(g->L, LUA_GCCOLLECT, 0);	st->injected.L = NULL;#endif}static plugin_con_state *plugin_con_state_init() {	plugin_con_state *st;	st = g_new0(plugin_con_state, 1);	st->injected.queries = g_queue_new();		return st;}static void plugin_con_state_free(plugin_con_state *st) {	injection *inj;	if (!st) return;	proxy_lua_free_script(st);		while ((inj = g_queue_pop_head(st->injected.queries))) injection_free(inj);	g_queue_free(st->injected.queries);	g_free(st);}#ifdef HAVE_LUA_H/** * init the global proxy object  */static void proxy_lua_init_global_fenv(lua_State *L) {		lua_newtable(L); /* my empty environment aka {}              (sp += 1) */#define DEF(x) \	lua_pushinteger(L, x); \	lua_setfield(L, -2, #x);		DEF(PROXY_SEND_QUERY);	DEF(PROXY_SEND_RESULT);	DEF(PROXY_IGNORE_RESULT);	DEF(MYSQLD_PACKET_OK);	DEF(MYSQLD_PACKET_ERR);	DEF(MYSQLD_PACKET_RAW);	DEF(BACKEND_STATE_UNKNOWN);	DEF(BACKEND_STATE_UP);	DEF(BACKEND_STATE_DOWN);	DEF(BACKEND_TYPE_UNKNOWN);	DEF(BACKEND_TYPE_RW);	DEF(BACKEND_TYPE_RO);	DEF(COM_SLEEP);	DEF(COM_QUIT);	DEF(COM_INIT_DB);	DEF(COM_QUERY);	DEF(COM_FIELD_LIST);	DEF(COM_CREATE_DB);	DEF(COM_DROP_DB);	DEF(COM_REFRESH);	DEF(COM_SHUTDOWN);	DEF(COM_STATISTICS);	DEF(COM_PROCESS_INFO);	DEF(COM_CONNECT);	DEF(COM_PROCESS_KILL);	DEF(COM_DEBUG);	DEF(COM_PING);	DEF(COM_TIME);	DEF(COM_DELAYED_INSERT);	DEF(COM_CHANGE_USER);	DEF(COM_BINLOG_DUMP);	DEF(COM_TABLE_DUMP);	DEF(COM_CONNECT_OUT);	DEF(COM_REGISTER_SLAVE);	DEF(COM_STMT_PREPARE);	DEF(COM_STMT_EXECUTE);	DEF(COM_STMT_SEND_LONG_DATA);	DEF(COM_STMT_CLOSE);	DEF(COM_STMT_RESET);	DEF(COM_SET_OPTION);#if MYSQL_VERSION_ID >= 50000	DEF(COM_STMT_FETCH);#if MYSQL_VERSION_ID >= 50100	DEF(COM_DAEMON);#endif#endif	DEF(MYSQL_TYPE_DECIMAL);#if MYSQL_VERSION_ID >= 50000	DEF(MYSQL_TYPE_NEWDECIMAL);#endif	DEF(MYSQL_TYPE_TINY);	DEF(MYSQL_TYPE_SHORT);	DEF(MYSQL_TYPE_LONG);	DEF(MYSQL_TYPE_FLOAT);	DEF(MYSQL_TYPE_DOUBLE);	DEF(MYSQL_TYPE_NULL);	DEF(MYSQL_TYPE_TIMESTAMP);	DEF(MYSQL_TYPE_LONGLONG);	DEF(MYSQL_TYPE_INT24);	DEF(MYSQL_TYPE_DATE);	DEF(MYSQL_TYPE_TIME);	DEF(MYSQL_TYPE_DATETIME);	DEF(MYSQL_TYPE_YEAR);	DEF(MYSQL_TYPE_NEWDATE);	DEF(MYSQL_TYPE_ENUM);	DEF(MYSQL_TYPE_SET);	DEF(MYSQL_TYPE_TINY_BLOB);	DEF(MYSQL_TYPE_MEDIUM_BLOB);	DEF(MYSQL_TYPE_LONG_BLOB);	DEF(MYSQL_TYPE_BLOB);	DEF(MYSQL_TYPE_VAR_STRING);	DEF(MYSQL_TYPE_STRING);	DEF(MYSQL_TYPE_GEOMETRY);#if MYSQL_VERSION_ID >= 50000	DEF(MYSQL_TYPE_BIT);#endif	/* cheat with DEF() a bit :) */#define PROXY_VERSION PACKAGE_VERSION_ID	DEF(PROXY_VERSION);#undef DEF	/**	 * create 	 * - proxy.global 	 * - proxy.global.config	 */	lua_newtable(L);	lua_newtable(L);	lua_setfield(L, -2, "config");	lua_setfield(L, -2, "global");	lua_setglobal(L, "proxy");}#endifstatic backend_t *backend_init() {	backend_t *b;	b = g_new0(backend_t, 1);	b->pool = network_connection_pool_init();	return b;}void backend_free(backend_t *b) {	if (!b) return;	network_connection_pool_free(b->pool);	if (b->addr.str) g_free(b->addr.str);	g_free(b);}static plugin_srv_state *plugin_srv_state_init() {	plugin_srv_state *g;	g = g_new0(plugin_srv_state, 1);	g->backend_pool = g_ptr_array_new();#ifdef HAVE_LUA_H

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -