⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 network-mysqld-proxy.c

📁 Mysql Proxy本身是个很好的mysql负载均衡工具,但是其本身有bug:当多个mysql 做slave的时候,如果一个slave死掉,会影响别的slave也死掉!这个文件修复了这个bug!
💻 C
📖 第 1 页 / 共 5 页
字号:
			lua_pushlstring(L, sock->scramble_buf->str, sock->scramble_buf->len);		} else {			lua_pushnil(L);		}	} else {		lua_pushnil(L);	}	return 1;}/** * get the connection information * * note: might be called in connect_server() before con->server is set  */static int proxy_connection_get(lua_State *L) {	network_mysqld_con *con = *(network_mysqld_con **)luaL_checkudata(L, 1, "proxy.connection"); 	plugin_con_state *st;	const char *key = luaL_checkstring(L, 2);	st = con->plugin_con_state;	/**	 * we to split it in .client and .server here	 */	if (0 == strcmp(key, "default_db")) {		return luaL_error(L, "proxy.connection.default_db is deprecated, use proxy.connection.client.default_db or proxy.connection.server.default_db instead");	} else if (0 == strcmp(key, "thread_id")) {		return luaL_error(L, "proxy.connection.thread_id is deprecated, use proxy.connection.server.thread_id instead");	} else if (0 == strcmp(key, "mysqld_version")) {		return luaL_error(L, "proxy.connection.mysqld_version is deprecated, use proxy.connection.server.mysqld_version instead");	} else if (0 == strcmp(key, "backend_ndx")) {		lua_pushinteger(L, st->backend_ndx + 1);	} else if ((con->server && (0 == strcmp(key, "server"))) ||	           (con->client && (0 == strcmp(key, "client")))) {		network_socket **socket_p;		socket_p = lua_newuserdata(L, sizeof(network_socket)); /* the table underneat proxy.socket */		if (key[0] == 's') {			*socket_p = con->server;		} else {			*socket_p = con->client;		}		/* if the meta-table is new, add __index to it */		if (1 == luaL_newmetatable(L, "proxy.socket")) {			lua_pushcfunction(L, proxy_socket_get);                   /* (sp += 1) */			lua_setfield(L, -2, "__index");                           /* (sp -= 1) */		}		lua_setmetatable(L, -2); /* tie the metatable to the table   (sp -= 1) */	} else {		lua_pushnil(L);	}	return 1;}/** * set the connection information * * note: might be called in connect_server() before con->server is set  */static int proxy_connection_set(lua_State *L) {	network_mysqld_con *con = *(network_mysqld_con **)luaL_checkudata(L, 1, "proxy.connection"); 	plugin_con_state *st;	const char *key = luaL_checkstring(L, 2);	st = con->plugin_con_state;	if (0 == strcmp(key, "backend_ndx")) {		/**		 * in lua-land the ndx is based on 1, in C-land on 0 */		int backend_ndx = luaL_checkinteger(L, 3) - 1;		network_socket *send_sock;					if (backend_ndx == -1) {			/** drop the backend for now			 */			proxy_connection_pool_add_connection(con);		} else if (NULL != (send_sock = proxy_connection_pool_swap(con, backend_ndx))) {			con->server = send_sock;		} else {			st->backend_ndx = backend_ndx;		}	} else {		return luaL_error(L, "proxy.connection.%s is not writable", key);	}	return 0;}static int proxy_queue_append(lua_State *L) {	/* we expect 2 parameters */	GQueue *q = *(GQueue **)luaL_checkudata(L, 1, "proxy.queue");	int resp_type = luaL_checkinteger(L, 2);	size_t str_len;	const char *str = luaL_checklstring(L, 3, &str_len);	GString *query = g_string_sized_new(str_len);	g_string_append_len(query, str, str_len);	g_queue_push_tail(q, injection_init(resp_type, query));	return 0;}static int proxy_queue_prepend(lua_State *L) {	/* we expect 2 parameters */	GQueue *q = *(GQueue **)luaL_checkudata(L, 1, "proxy.queue");	int resp_type = luaL_checkinteger(L, 2);	size_t str_len;	const char *str = luaL_checklstring(L, 3, &str_len);	GString *query = g_string_sized_new(str_len);	g_string_append_len(query, str, str_len);	g_queue_push_head(q, injection_init(resp_type, query));	return 0;}static int proxy_queue_reset(lua_State *L) {	/* we expect 2 parameters */	GQueue *q = *(GQueue **)luaL_checkudata(L, 1, "proxy.queue");	injection *inj;	while ((inj = g_queue_pop_head(q))) injection_free(inj);	return 0;}static int proxy_queue_len(lua_State *L) {	/* we expect 2 parameters */	GQueue *q = *(GQueue **)luaL_checkudata(L, 1, "proxy.queue");	lua_pushinteger(L, q->length);	return 1;}/** * split the SQL query into a stream of tokens */static int proxy_tokenize(lua_State *L) {	size_t str_len;	const char *str = luaL_checklstring(L, 1, &str_len);	GPtrArray *tokens = sql_tokens_new();	gsize i;	sql_tokenizer(tokens, str, str_len);	/**	 * export the data into a table 	 */	lua_newtable(L);	for (i = 0; i < tokens->len; i++) {		sql_token *token = tokens->pdata[i];		lua_newtable(L);		lua_pushlstring(L, token->text->str, token->text->len);		lua_setfield(L, -2, "text");				lua_pushinteger(L, token->token_id);		lua_setfield(L, -2, "token_id");				lua_pushstring(L, sql_token_get_name(token->token_id));		lua_setfield(L, -2, "token_name");		lua_rawseti(L, -2, i + 1);	}	sql_tokens_free(tokens);	return 1;}/** * setup the script before we hook function is executed  * * has to be called before any lua_pcall() is called to start a hook function * * - we use a global lua_State which is split into child-states with lua_newthread() * - luaL_ref() moves the state into the registry and cleans up the global stack * - on connection close we call luaL_unref() to hand the thread to the GC * * @see proxy_lua_free_script */static int lua_register_callback(network_mysqld_con *con) {	lua_State *L = NULL;	plugin_con_state *st = con->plugin_con_state;	plugin_srv_state *g  = st->global_state;	GQueue **q_p;	network_mysqld_con **con_p;	if (!con->config.proxy.lua_script) return 0;	if (NULL == st->injected.L) {		/**		 * create a side thread for this connection		 *		 * (this is not pre-emptive, it is just a new stack in the global env)		 */		L = lua_newthread(g->L);		/**		 * move the thread into the registry to clean up the global stack 		 */		st->injected.L_ref = luaL_ref(g->L, LUA_REGISTRYINDEX);		lua_load_script(L, con->config.proxy.lua_script);				if (lua_isstring(L, -1)) {			g_warning("lua_load_file(%s) failed: %s", con->config.proxy.lua_script, lua_tostring(L, -1));			lua_pop(L, 1); /* remove the error-msg from the stack */				proxy_lua_free_script(st);			L = NULL;		} else if (lua_isfunction(L, -1)) {			/**			 * set the function env */			lua_newtable(L); /* my empty environment aka {}              (sp += 1) */			lua_newtable(L); /* the meta-table for the new env           (sp += 1) */			lua_pushvalue(L, LUA_GLOBALSINDEX);                       /* (sp += 1) */			lua_setfield(L, -2, "__index"); /* { __index = _G }          (sp -= 1) */			lua_setmetatable(L, -2); /* setmetatable({}, {__index = _G}) (sp -= 1) */			lua_setfenv(L, -2); /* on the stack should be a modified env (sp -= 1) */						/* cache the script */			g_assert(lua_isfunction(L, -1));			lua_pushvalue(L, -1);			/* push the functions on the stack */			if (lua_pcall(L, 0, 0, 0) != 0) {				g_critical("(lua-error) [%s]\n%s", con->config.proxy.lua_script, lua_tostring(L, -1));				lua_pop(L, 1); /* errmsg */							proxy_lua_free_script(st);				L = NULL;			}			st->injected.L = L;					/* on the stack should be the script now, keep it there */		} else {			g_error("lua_load_file(%s): returned a %s", con->config.proxy.lua_script, lua_typename(L, lua_type(L, -1)));		}	} else {		L = st->injected.L;	}	if (!L) return 0;	g_assert(lua_isfunction(L, -1));	lua_getfenv(L, -1);	g_assert(lua_istable(L, -1));			lua_getfield(L, -1, "proxy");	if (!lua_istable(L, -1)) {		g_error("fenv.proxy should be a table, but is %s", lua_typename(L, lua_type(L, -1)));	}	g_assert(lua_istable(L, -1));	q_p = lua_newuserdata(L, sizeof(GQueue *));	*q_p = st->injected.queries;	/**	 * proxy.queries	 *	 * implement a queue	 *	 * - append(type, query)	 * - prepend(type, query)	 * - reset()	 * - len() and #proxy.queue	 *	 */	if (1 == luaL_newmetatable(L, "proxy.queue")) {		lua_pushcfunction(L, proxy_queue_append);		lua_setfield(L, -2, "append");		lua_pushcfunction(L, proxy_queue_prepend);		lua_setfield(L, -2, "prepend");		lua_pushcfunction(L, proxy_queue_reset);		lua_setfield(L, -2, "reset");		lua_pushcfunction(L, proxy_queue_len);		lua_setfield(L, -2, "len");   /* DEPRECATED: */		lua_pushcfunction(L, proxy_queue_len);		lua_setfield(L, -2, "__len"); /* support #proxy.queue too */		lua_pushvalue(L, -1); /* meta.__index = meta */		lua_setfield(L, -2, "__index");	}	lua_setmetatable(L, -2);	lua_setfield(L, -2, "queries");		/**	 * export internal functions 	 *	 * @note: might be moved into a lua-c-lib instead	 */	lua_pushcfunction(L, proxy_tokenize);	lua_setfield(L, -2, "tokenize");	/**	 * proxy.connection is (mostly) read-only	 *	 * .thread_id  = ... thread-id against this server	 * .backend_id = ... index into proxy.backends[ndx]	 *	 */		con_p = lua_newuserdata(L, sizeof(con));	*con_p = con;	/* if the meta-table is new, add __index to it */	if (1 == luaL_newmetatable(L, "proxy.connection")) {		lua_pushcfunction(L, proxy_connection_get);               /* (sp += 1) */		lua_setfield(L, -2, "__index");                           /* (sp -= 1) */		lua_pushcfunction(L, proxy_connection_set);               /* (sp += 1) */		lua_setfield(L, -2, "__newindex");                        /* (sp -= 1) */	}	lua_setmetatable(L, -2);          /* tie the metatable to the table   (sp -= 1) */	lua_setfield(L, -2, "connection");	/**	 * register proxy.backends[]	 *	 * @see proxy_backends_get()	 */	con_p = lua_newuserdata(L, sizeof(con));	*con_p = con;	/* if the meta-table is new, add __index to it */	if (1 == luaL_newmetatable(L, "proxy.backends")) {		lua_pushcfunction(L, proxy_backends_get);                  /* (sp += 1) */		lua_setfield(L, -2, "__index");                           /* (sp -= 1) */		lua_pushcfunction(L, proxy_backends_len);                  /* (sp += 1) */		lua_setfield(L, -2, "__len");                             /* (sp -= 1) */	}	lua_setmetatable(L, -2);          /* tie the metatable to the table   (sp -= 1) */	lua_setfield(L, -2, "backends");	/**	 * proxy.response knows 3 fields with strict types:	 *	 * .type = <int>	 * .errmsg = <string>	 * .resultset = { 	 *   fields = { 	 *     { type = <int>, name = <string > }, 	 *     { ... } }, 	 *   rows = { 	 *     { ..., ... }, 	 *     { ..., ... } }	 * }	 */	lua_newtable(L);#if 0	lua_newtable(L); /* the meta-table for the response-table    (sp += 1) */	lua_pushcfunction(L, response_get);                       /* (sp += 1) */	lua_setfield(L, -2, "__index");                           /* (sp -= 1) */	lua_pushcfunction(L, response_set);                       /* (sp += 1) */	lua_setfield(L, -2, "__newindex");                        /* (sp -= 1) */	lua_setmetatable(L, -2); /* tie the metatable to response    (sp -= 1) */#endif	lua_setfield(L, -2, "response");	lua_pop(L, 2); /* fenv + proxy */	return 0;}/** * handle the proxy.response.* table from the lua script * * proxy.response *   .type can be either ERR, OK or RAW *   .resultset (in case of OK) *     .fields *     .rows *   .errmsg (in case of ERR) *   .packet (in case of nil) * */static int proxy_lua_handle_proxy_response(network_mysqld_con *con) {	plugin_con_state *st = con->plugin_con_state;	int resp_type = 1;	const char *str;	size_t str_len;	gsize i;	lua_State *L = st->injected.L;	/**	 * on the stack should be the fenv of our function */	g_assert(lua_istable(L, -1));		lua_getfield(L, -1, "proxy"); /* proxy.* from the env  */	g_assert(lua_istable(L, -1));	lua_getfield(L, -1, "response"); /* proxy.response */	if (lua_isnil(L, -1)) {		g_message("%s.%d: proxy.response isn't set in %s", __FILE__, __LINE__, 				con->config.proxy.lua_script);		lua_pop(L, 2); /* proxy + nil */		return -1;	} else if (!lua_istable(L, -1)) {		g_message("%s.%d: proxy.response has to be a table, is %s in %s", __FILE__, __LINE__,				lua_typename(L, lua_type(L, -1)),				con->config.proxy.lua_script);		lua_pop(L, 2); /* proxy + response */		return -1;	}	lua_getfield(L, -1, "type"); /* proxy.response.type */	if (lua_isnil(L, -1)) {		/**		 * nil is fine, we expect to get a raw packet in that case		 */		g_message("%s.%d: proxy.response.type isn't set in %s", __FILE__, __LINE__, 				con->config.proxy.lua_script);		lua_pop(L, 3); /* proxy + nil */		return -1;	} else if (!lua_isnumber(L, -1)) {		g_message("%s.%d: proxy.response.type has to be a number, is %s in %s", __FILE__, __LINE__,				lua_typename(L, lua_type(L, -1)),				con->config.proxy.lua_script);				lua_pop(L, 3); /* proxy + response + type */		return -1;	} else {		resp_type = lua_tonumber(L, -1);	}	lua_pop(L, 1);	switch(resp_type) {	case MYSQLD_PACKET_OK: {		GPtrArray *fields = NULL;		GPtrArray *rows = NULL;		gsize field_count = 0;		lua_getfield(L, -1, "resultset"); /* proxy.response.resultset */		if (lua_istable(L, -1)) {			lua_getfield(L, -1, "fields"); /* proxy.response.resultset.fields */			g_assert(lua_istable(L, -1));			fields = g_ptr_array_new();					for (i = 1, field_count = 0; ; i++, field_count++) {				lua_rawgeti(L, -1, i);								if (lua_istable(L, -1)) { /** proxy.response.resultset.fields[i] */					MYSQL_FIELD *field;						field = network_mysqld_proto_field_init();						lua_getfield(L, -1, "name"); /* proxy.response.resultset.fields[].name */						if (!lua_isstring(L, -1)) {						field->name = g_strdup("no-field-name");							g_warning("%s.%d: proxy.response.type = OK, "								"but proxy.response.resultset.fields[" F_SIZE_T "].name is not a string (is %s), "								"using default", 

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -