⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 network-mysqld-proxy.c

📁 Mysql Proxy本身是个很好的mysql负载均衡工具,但是其本身有bug:当多个mysql 做slave的时候,如果一个slave死掉,会影响别的slave也死掉!这个文件修复了这个bug!
💻 C
📖 第 1 页 / 共 5 页
字号:
	g->L = luaL_newstate();	luaL_openlibs(g->L);	proxy_lua_init_global_fenv(g->L);#endif	return g;}void plugin_srv_state_free(plugin_srv_state *g) {	gsize i;	if (!g) return;#ifdef HAVE_LUA_H	lua_close(g->L);#endif	for (i = 0; i < g->backend_pool->len; i++) {		backend_t *backend = g->backend_pool->pdata[i];				backend_free(backend);	}	g_ptr_array_free(g->backend_pool, TRUE);	g_free(g);}/** * parse the result-set packet and extract the fields * * @param chunk  list of mysql packets  * @param fields empty array where the fields shall be stored in * * @return NULL if there is no resultset *         pointer to the chunk after the fields (to the EOF packet) */ static GList *network_mysqld_result_parse_fields(GList *chunk, GPtrArray *fields) {	GString *packet = chunk->data;	guint8 field_count;	guint i;	/*	 * read(6, "\1\0\0\1", 4)                  = 4	 * read(6, "\2", 1)                        = 1	 * read(6, "6\0\0\2", 4)                   = 4	 * read(6, "\3def\0\6STATUS\0\rVariable_name\rVariable_name\f\10\0P\0\0\0\375\1\0\0\0\0", 54) = 54	 * read(6, "&\0\0\3", 4)                   = 4	 * read(6, "\3def\0\6STATUS\0\5Value\5Value\f\10\0\0\2\0\0\375\1\0\0\0\0", 38) = 38	 * read(6, "\5\0\0\4", 4)                  = 4	 * read(6, "\376\0\0\"\0", 5)              = 5	 * read(6, "\23\0\0\5", 4)                 = 4	 * read(6, "\17Aborted_clients\00298", 19) = 19	 *	 */	g_assert(packet->len > NET_HEADER_SIZE);	/* the first chunk is the length	 *  */	if (packet->len != NET_HEADER_SIZE + 1) {		/*		 * looks like this isn't a result-set		 * 		 *    read(6, "\1\0\0\1", 4)                  = 4		 *    read(6, "\2", 1)                        = 1		 *		 * is expected. We might got called on a non-result, tell the user about it.		 */#if 0		g_debug("%s.%d: network_mysqld_result_parse_fields() got called on a non-resultset. "F_SIZE_T" != 5", __FILE__, __LINE__, packet->len);#endif		return NULL;	}		field_count = packet->str[NET_HEADER_SIZE]; /* the byte after the net-header is the field-count */	/* the next chunk, the field-def */	for (i = 0; i < field_count; i++) {		guint off = NET_HEADER_SIZE;		MYSQL_FIELD *field;		chunk = chunk->next;		packet = chunk->data;		field = network_mysqld_proto_field_init();		field->catalog   = network_mysqld_proto_get_lenenc_string(packet, &off);		field->db        = network_mysqld_proto_get_lenenc_string(packet, &off);		field->table     = network_mysqld_proto_get_lenenc_string(packet, &off);		field->org_table = network_mysqld_proto_get_lenenc_string(packet, &off);		field->name      = network_mysqld_proto_get_lenenc_string(packet, &off);		field->org_name  = network_mysqld_proto_get_lenenc_string(packet, &off);		network_mysqld_proto_skip(packet, &off, 1); /* filler */		field->charsetnr = network_mysqld_proto_get_int16(packet, &off);		field->length    = network_mysqld_proto_get_int32(packet, &off);		field->type      = network_mysqld_proto_get_int8(packet, &off);		field->flags     = network_mysqld_proto_get_int16(packet, &off);		field->decimals  = network_mysqld_proto_get_int8(packet, &off);		network_mysqld_proto_skip(packet, &off, 2); /* filler */		g_ptr_array_add(fields, field);	}	/* this should be EOF chunk */	chunk = chunk->next;	packet = chunk->data;		g_assert(packet->str[NET_HEADER_SIZE] == MYSQLD_PACKET_EOF);	return chunk;}static void g_hash_table_reset_gstring(gpointer UNUSED_PARAM(_key), gpointer _value, gpointer UNUSED_PARAM(ser_data)) {	GString *value = _value;	g_string_truncate(value, 0);}/** * handle the events of a idling server connection in the pool  * * make sure we know about connection close from the server side * - wait_timeout */static void network_mysqld_con_idle_handle(int event_fd, short events, void *user_data) {	network_connection_pool_entry *pool_entry = user_data;	network_connection_pool *pool             = pool_entry->pool;	if (events == EV_READ) {		int b = -1;		/**		 * @todo we have to handle the case that the server really sent use something		 *        up to now we just ignore it		 */		if (ioctlsocket(event_fd, FIONREAD, &b)) {			g_critical("ioctl(%d, FIONREAD, ...) failed: %s", event_fd, strerror(errno));		} else if (b != 0) {			g_critical("ioctl(%d, FIONREAD, ...) said there is something to read, oops: %d", event_fd, b);		} else {			/* the server decided the close the connection (wait_timeout, crash, ... )			 *			 * remove us from the connection pool and close the connection */					network_connection_pool_remove(pool, pool_entry);		}	}}/** * move the con->server into connection pool and disconnect the  * proxy from its backend  */static int proxy_connection_pool_add_connection(network_mysqld_con *con) {	network_mysqld *srv = con->srv;	network_connection_pool_entry *pool_entry = NULL;	plugin_con_state *st = con->plugin_con_state;	/* con-server is already disconnected, got out */	if (!con->server) return 0;	/* the server connection is still authed */	con->server->is_authed = 1;	/* insert the server socket into the connection pool */	pool_entry = network_connection_pool_add(st->backend->pool, con->server);	event_set(&(con->server->event), con->server->fd, EV_READ, network_mysqld_con_idle_handle, pool_entry);	event_base_set(srv->event_base, &(con->server->event));	event_add(&(con->server->event), NULL);		st->backend->connected_clients--;	st->backend = NULL;	st->backend_ndx = -1;		con->server = NULL;	return 0;}/** * swap the server connection with a connection from * the connection pool * * we can only switch backends if we have a authed connection in the pool. * * @return NULL if swapping failed *         the new backend on success */static network_socket *proxy_connection_pool_swap(network_mysqld_con *con, int backend_ndx) {	backend_t *backend = NULL;	network_socket *send_sock;	plugin_con_state *st = con->plugin_con_state;	plugin_srv_state *g = st->global_state;	/*	 * we can only change to another backend if the backend is already	 * in the connection pool and connected	 */	if (backend_ndx < 0 || 	    backend_ndx >= g->backend_pool->len) {		/* backend_ndx is out of range */		return NULL;	} 	backend = g->backend_pool->pdata[backend_ndx];	/**	 * get a connection from the pool which matches our basic requirements	 * - username has to match	 * - default_db should match	 */		#ifdef DEBUG_CONN_POOL	g_debug("%s: (swap) check if we have a connection for this user in the pool '%s'", G_STRLOC, con->client->username->str);#endif	if (NULL == (send_sock = network_connection_pool_get(backend->pool, 					con->client->username,					con->client->default_db))) {		/**		 * no connections in the pool		 */		st->backend_ndx = -1;		return NULL;	}	/* the backend is up and cool, take and move the current backend into the pool */#ifdef DEBUG_CONN_POOL	g_debug("%s: (swap) added the previous connection to the pool", G_STRLOC);#endif	proxy_connection_pool_add_connection(con);	/* connect to the new backend */	st->backend = backend;	st->backend->connected_clients++;	st->backend_ndx = backend_ndx;	return send_sock;}#ifdef HAVE_LUA_H/** * load the lua script * * wraps luaL_loadfile and prints warnings when needed * * @see luaL_loadfile */lua_State *lua_load_script(lua_State *L, const gchar *name) {	if (0 != luaL_loadfile(L, name)) {		/* oops, an error, return it */		g_warning("luaL_loadfile(%s) failed", name);		return L;	}	/**	 * pcall() needs the function on the stack	 *	 * as pcall() will pop the script from the stack when done, we have to	 * duplicate it here	 */	g_assert(lua_isfunction(L, -1));	return L;}/** * get the info connection pool  * * @return nil or requested information */static int proxy_pool_queue_get(lua_State *L) {	GQueue *queue = *(GQueue **)luaL_checkudata(L, 1, "proxy.backend.pool.queue"); 	const char *key = luaL_checkstring(L, 2); /** ... cur_idle */	if (0 == strcmp(key, "cur_idle_connections")) {		lua_pushinteger(L, queue ? queue->length : 0);	} else {		lua_pushnil(L);	}	return 1;}/** * get the info connection pool  * * @return nil or requested information */static int proxy_pool_users_get(lua_State *L) {	network_connection_pool *pool = *(network_connection_pool **)luaL_checkudata(L, 1, "proxy.backend.pool.users"); 	const char *key = luaL_checkstring(L, 2); /** the username */	GString *s = g_string_new(key);	GQueue **q_p = NULL;	q_p = lua_newuserdata(L, sizeof(*q_p)); 	*q_p = network_connection_pool_get_conns(pool, s, NULL);	g_string_free(s, TRUE);	/* if the meta-table is new, add __index to it */	if (1 == luaL_newmetatable(L, "proxy.backend.pool.queue")) {		lua_pushcfunction(L, proxy_pool_queue_get);            /* (sp += 1) */		lua_setfield(L, -2, "__index");                        /* (sp -= 1) */	}	lua_setmetatable(L, -2); /* tie the metatable to the table   (sp -= 1) */	return 1;}static int proxy_pool_get(lua_State *L) {	network_connection_pool *pool = *(network_connection_pool **)luaL_checkudata(L, 1, "proxy.backend.pool"); 	const char *key = luaL_checkstring(L, 2);	if (0 == strcmp(key, "max_idle_connections")) {		lua_pushinteger(L, pool->max_idle_connections);	} else if (0 == strcmp(key, "min_idle_connections")) {		lua_pushinteger(L, pool->min_idle_connections);	} else if (0 == strcmp(key, "users")) {		network_connection_pool **pool_p;		pool_p = lua_newuserdata(L, sizeof(*pool_p)); 		*pool_p = pool;		/* if the meta-table is new, add __index to it */		if (1 == luaL_newmetatable(L, "proxy.backend.pool.users")) {			lua_pushcfunction(L, proxy_pool_users_get);            /* (sp += 1) */			lua_setfield(L, -2, "__index");                        /* (sp -= 1) */		}		lua_setmetatable(L, -2); /* tie the metatable to the table   (sp -= 1) */	} else {		lua_pushnil(L);	}	return 1;}static int proxy_pool_set(lua_State *L) {	network_connection_pool *pool = *(network_connection_pool **)luaL_checkudata(L, 1, "proxy.backend.pool"); 	const char *key = luaL_checkstring(L, 2);	if (0 == strcmp(key, "max_idle_connections")) {		pool->max_idle_connections = lua_tointeger(L, -1);	} else if (0 == strcmp(key, "min_idle_connections")) {		pool->min_idle_connections = lua_tointeger(L, -1);	} else {		return luaL_error(L, "proxy.backend[...].%s is not writable", key);	}	return 0;}/** * get the info about a backend * * proxy.backend[0]. *   connected_clients => clients using this backend *   address           => ip:port or unix-path of to the backend *   state             => int(BACKEND_STATE_UP|BACKEND_STATE_DOWN)  *   type              => int(BACKEND_TYPE_RW|BACKEND_TYPE_RO)  * * @return nil or requested information * @see backend_state_t backend_type_t */static int proxy_backend_get(lua_State *L) {	backend_t *backend = *(backend_t **)luaL_checkudata(L, 1, "proxy.backend"); 	const char *key = luaL_checkstring(L, 2);	if (0 == strcmp(key, "connected_clients")) {		lua_pushinteger(L, backend->connected_clients);	} else if (0 == strcmp(key, "address")) {		lua_pushstring(L, backend->addr.str);	} else if (0 == strcmp(key, "state")) {		lua_pushinteger(L, backend->state);	} else if (0 == strcmp(key, "type")) {		lua_pushinteger(L, backend->type);	} else if (0 == strcmp(key, "pool")) {		network_connection_pool *pool; 		network_connection_pool **pool_p;		pool_p = lua_newuserdata(L, sizeof(pool)); 		*pool_p = backend->pool;		/* if the meta-table is new, add __index to it */		if (1 == luaL_newmetatable(L, "proxy.backend.pool")) {			lua_pushcfunction(L, proxy_pool_get);                  /* (sp += 1) */			lua_setfield(L, -2, "__index");                        /* (sp -= 1) */			lua_pushcfunction(L, proxy_pool_set);                  /* (sp += 1) */			lua_setfield(L, -2, "__newindex");                     /* (sp -= 1) */		}		lua_setmetatable(L, -2); /* tie the metatable to the table   (sp -= 1) */	} else {		lua_pushnil(L);	}	return 1;}/** * get proxy.backends[ndx] * * get the backend from the array of mysql backends.  * * @return nil or the backend * @see proxy_backend_get */static int proxy_backends_get(lua_State *L) {	plugin_con_state *st;	backend_t *backend; 	backend_t **backend_p;	network_mysqld_con *con = *(network_mysqld_con **)luaL_checkudata(L, 1, "proxy.backends"); 	int backend_ndx = luaL_checkinteger(L, 2) - 1; /** lua is indexes from 1, C from 0 */	st = con->plugin_con_state;	if (backend_ndx < 0 ||	    backend_ndx >= st->global_state->backend_pool->len) {		lua_pushnil(L);		return 1;	}	backend = st->global_state->backend_pool->pdata[backend_ndx];	backend_p = lua_newuserdata(L, sizeof(backend)); /* the table underneat proxy.backends[ndx] */	*backend_p = backend;	/* if the meta-table is new, add __index to it */	if (1 == luaL_newmetatable(L, "proxy.backend")) {		lua_pushcfunction(L, proxy_backend_get);                  /* (sp += 1) */		lua_setfield(L, -2, "__index");                           /* (sp -= 1) */	}	lua_setmetatable(L, -2); /* tie the metatable to the table   (sp -= 1) */	return 1;}static int proxy_backends_len(lua_State *L) {	network_mysqld_con *con = *(network_mysqld_con **)luaL_checkudata(L, 1, "proxy.backends"); 	plugin_con_state *st;	st = con->plugin_con_state;        lua_pushinteger(L, st->global_state->backend_pool->len);        return 1;}static int proxy_socket_get(lua_State *L) {	network_socket *sock = *(network_socket **)luaL_checkudata(L, 1, "proxy.socket"); 	const char *key = luaL_checkstring(L, 2);	/**	 * we to split it in .client and .server here	 */	if (0 == strcmp(key, "default_db")) {		lua_pushlstring(L, sock->default_db->str, sock->default_db->len);	} else if (0 == strcmp(key, "username")) {		lua_pushlstring(L, sock->username->str, sock->username->len);	} else if (0 == strcmp(key, "address")) {		lua_pushstring(L, sock->addr.str);	} else if (0 == strcmp(key, "scrambled_password")) {		lua_pushlstring(L, sock->scrambled_password->str, sock->scrambled_password->len);	} else if (sock->mysqld_version) { /* only the server-side has mysqld_version set */		if (0 == strcmp(key, "mysqld_version")) {			lua_pushinteger(L, sock->mysqld_version);		} else if (0 == strcmp(key, "thread_id")) {			lua_pushinteger(L, sock->thread_id);		} else if (0 == strcmp(key, "scramble_buffer")) {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -