⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 network-mysqld-proxy.c

📁 Mysql Proxy本身是个很好的mysql负载均衡工具,但是其本身有bug:当多个mysql 做slave的时候,如果一个slave死掉,会影响别的slave也死掉!这个文件修复了这个bug!
💻 C
📖 第 1 页 / 共 5 页
字号:
				/* push the parameters on the stack */			rows_p = lua_newuserdata(L, sizeof(rows));			*rows_p = rows;				/* if the meta-table is new, add __index to it */			if (1 == luaL_newmetatable(L, "proxy.resultset.light")) {				lua_pushcfunction(L, proxy_resultset_gc_light);           /* (sp += 1) */				lua_setfield(L, -2, "__gc");                              /* (sp -= 1) */			}			lua_setmetatable(L, -2);         /* tie the metatable to the table   (sp -= 1) */				/* return a interator */			lua_pushcclosure(L, proxy_resultset_rows_iter, 1);		} else {			lua_pushnil(L);		}	} else if (0 == strcmp(key, "raw")) {		GString *s = res->result_queue->head->data;		lua_pushlstring(L, s->str + 4, s->len - 4);	} else if (0 == strcmp(key, "flags")) {		lua_newtable(L);		lua_pushboolean(L, (res->qstat.server_status & SERVER_STATUS_IN_TRANS) != 0);		lua_setfield(L, -2, "in_trans");		lua_pushboolean(L, (res->qstat.server_status & SERVER_STATUS_AUTOCOMMIT) != 0);		lua_setfield(L, -2, "auto_commit");				lua_pushboolean(L, (res->qstat.server_status & SERVER_QUERY_NO_GOOD_INDEX_USED) != 0);		lua_setfield(L, -2, "no_good_index_used");				lua_pushboolean(L, (res->qstat.server_status & SERVER_QUERY_NO_INDEX_USED) != 0);		lua_setfield(L, -2, "no_index_used");	} else if (0 == strcmp(key, "warning_count")) {		lua_pushinteger(L, res->qstat.warning_count);	} else if (0 == strcmp(key, "affected_rows")) {		/**		 * if the query had a result-set (SELECT, ...) 		 * affected_rows and insert_id are not valid		 */		if (res->qstat.was_resultset) {			lua_pushnil(L);		} else {			lua_pushnumber(L, res->qstat.affected_rows);		}	} else if (0 == strcmp(key, "insert_id")) {		if (res->qstat.was_resultset) {			lua_pushnil(L);		} else {			lua_pushnumber(L, res->qstat.insert_id);		}	} else if (0 == strcmp(key, "query_status")) {		if (0 != parse_resultset_fields(res)) {			/* not a result-set */			lua_pushnil(L);		} else {			lua_pushinteger(L, res->qstat.query_status);		}	} else {		lua_pushnil(L);	}	return 1;}static int proxy_injection_get(lua_State *L) {	injection *inj = *(injection **)luaL_checkudata(L, 1, "proxy.injection"); 	const char *key = luaL_checkstring(L, 2);	if (0 == strcmp(key, "type")) {		lua_pushinteger(L, inj->id); /** DEPRECATED: use "inj.id" instead */	} else if (0 == strcmp(key, "id")) {		lua_pushinteger(L, inj->id);	} else if (0 == strcmp(key, "query")) {		lua_pushlstring(L, inj->query->str, inj->query->len);	} else if (0 == strcmp(key, "query_time")) {		lua_pushinteger(L, TIME_DIFF_US(inj->ts_read_query_result_first, inj->ts_read_query));	} else if (0 == strcmp(key, "response_time")) {		lua_pushinteger(L, TIME_DIFF_US(inj->ts_read_query_result_last, inj->ts_read_query));	} else if (0 == strcmp(key, "resultset")) {		/* fields, rows */		proxy_resultset_t *res;		proxy_resultset_t **res_p;		res_p = lua_newuserdata(L, sizeof(res));		*res_p = res = proxy_resultset_init();		res->result_queue = inj->result_queue;		res->qstat = inj->qstat;		/* if the meta-table is new, add __index to it */		if (1 == luaL_newmetatable(L, "proxy.resultset")) {			lua_pushcfunction(L, proxy_resultset_get);                /* (sp += 1) */			lua_setfield(L, -2, "__index");                           /* (sp -= 1) */			lua_pushcfunction(L, proxy_resultset_gc);               /* (sp += 1) */			lua_setfield(L, -2, "__gc");                              /* (sp -= 1) */		}		lua_setmetatable(L, -2); /* tie the metatable to the table   (sp -= 1) */	} else {		g_message("%s.%d: inj[%s] ... not found", __FILE__, __LINE__, key);		lua_pushnil(L);	}	return 1;}#endifstatic proxy_stmt_ret proxy_lua_read_query_result(network_mysqld_con *con) {#ifdef HAVE_LUA_H	network_socket *send_sock = con->client;#endif	injection *inj = NULL;	plugin_con_state *st = con->plugin_con_state;	proxy_stmt_ret ret = PROXY_NO_DECISION;	/**	 * check if we want to forward the statement to the client 	 *	 * if not, clean the send-queue 	 */	if (0 == st->injected.queries->length) return PROXY_NO_DECISION;	inj = g_queue_pop_head(st->injected.queries);#ifdef HAVE_LUA_H	/* call the lua script to pick a backend	 * */	lua_register_callback(con);	if (st->injected.L) {		lua_State *L = st->injected.L;		g_assert(lua_isfunction(L, -1));		lua_getfenv(L, -1);		g_assert(lua_istable(L, -1));				lua_getfield(L, -1, "read_query_result");		if (lua_isfunction(L, -1)) {			injection **inj_p;			GString *packet;			inj_p = lua_newuserdata(L, sizeof(inj));			*inj_p = inj;			inj->result_queue = con->client->send_queue->chunks;			inj->qstat = st->injected.qstat;			/* if the meta-table is new, add __index to it */			if (1 == luaL_newmetatable(L, "proxy.injection")) {				lua_pushcfunction(L, proxy_injection_get);                /* (sp += 1) */				lua_setfield(L, -2, "__index");                           /* (sp -= 1) */			}			lua_setmetatable(L, -2); /* tie the metatable to the table   (sp -= 1) */			if (lua_pcall(L, 1, 1, 0) != 0) {				g_critical("(read_query_result) %s", lua_tostring(L, -1));				lua_pop(L, 1); /* err-msg */				ret = PROXY_NO_DECISION;			} else {				if (lua_isnumber(L, -1)) {					ret = lua_tonumber(L, -1);				}				lua_pop(L, 1);			}			switch (ret) {			case PROXY_SEND_RESULT:				/**				 * replace the result-set the server sent us 				 */				while ((packet = g_queue_pop_head(send_sock->send_queue->chunks))) g_string_free(packet, TRUE);								/**				 * we are a response to the client packet, hence one packet id more 				 */				send_sock->packet_id++;				if (proxy_lua_handle_proxy_response(con)) {					/**					 * handling proxy.response failed					 *					 * send a ERR packet in case there was no result-set sent yet					 */								if (!st->injected.sent_resultset) {						network_mysqld_con_send_error(con->client, C("(lua) handling proxy.response failed, check error-log"));					}				}				/* fall through */			case PROXY_NO_DECISION:				if (!st->injected.sent_resultset) {					/**					 * make sure we send only one result-set per client-query					 */					st->injected.sent_resultset++;					break;				}				g_warning("%s.%d: got asked to send a resultset, but ignoring it as we already have sent %d resultset(s). injection-id: %d",						__FILE__, __LINE__,						st->injected.sent_resultset,						inj->id);				st->injected.sent_resultset++;				/* fall through */			case PROXY_IGNORE_RESULT:				/* trash the packets for the injection query */				while ((packet = g_queue_pop_head(send_sock->send_queue->chunks))) g_string_free(packet, TRUE);				break;			default:				/* invalid return code */				g_message("%s.%d: return-code for read_query_result() was neither PROXY_SEND_RESULT or PROXY_IGNORE_RESULT, will ignore the result",						__FILE__, __LINE__);				while ((packet = g_queue_pop_head(send_sock->send_queue->chunks))) g_string_free(packet, TRUE);				break;			}		} else if (lua_isnil(L, -1)) {			/* no function defined, let's send the result-set */			lua_pop(L, 1); /* pop the nil */		} else {			g_message("%s.%d: (network_mysqld_con_handle_proxy_resultset) got wrong type: %s", __FILE__, __LINE__, lua_typename(L, lua_type(L, -1)));			lua_pop(L, 1); /* pop the nil */		}		lua_pop(L, 1); /* fenv */		g_assert(lua_isfunction(L, -1));	}#endif	injection_free(inj);	return ret;}/** * call the lua function to intercept the handshake packet * * @return PROXY_SEND_QUERY  to send the packet from the client *         PROXY_NO_DECISION to pass the server packet unmodified */static proxy_stmt_ret proxy_lua_read_handshake(network_mysqld_con *con) {	proxy_stmt_ret ret = PROXY_NO_DECISION; /* send what the server gave us */#ifdef HAVE_LUA_H	plugin_con_state *st = con->plugin_con_state;	network_socket   *recv_sock = con->server;	network_socket   *send_sock = con->client;	lua_State *L;	/* call the lua script to pick a backend	 * */	lua_register_callback(con);	if (!st->injected.L) return ret;	L = st->injected.L;	g_assert(lua_isfunction(L, -1));	lua_getfenv(L, -1);	g_assert(lua_istable(L, -1));		lua_getfield(L, -1, "read_handshake");	if (lua_isfunction(L, -1)) {		/* export		 *		 * every thing we know about it		 *  */		lua_newtable(L);		lua_pushlstring(L, recv_sock->scramble_buf->str, recv_sock->scramble_buf->len);		lua_setfield(L, -2, "scramble");		lua_pushinteger(L, recv_sock->mysqld_version);		lua_setfield(L, -2, "mysqld_version");		lua_pushinteger(L, recv_sock->thread_id);		lua_setfield(L, -2, "thread_id");		lua_pushstring(L, recv_sock->addr.str);		lua_setfield(L, -2, "server_addr");		lua_pushstring(L, send_sock->addr.str);		lua_setfield(L, -2, "client_addr");		if (lua_pcall(L, 1, 1, 0) != 0) {			g_critical("(read_handshake) %s", lua_tostring(L, -1));			lua_pop(L, 1); /* errmsg */			/* the script failed, but we have a useful default */		} else {			if (lua_isnumber(L, -1)) {				ret = lua_tonumber(L, -1);			}			lua_pop(L, 1);		}			switch (ret) {		case PROXY_NO_DECISION:			break;		case PROXY_SEND_QUERY:			g_warning("%s.%d: (read_handshake) return proxy.PROXY_SEND_QUERY is deprecated, use PROXY_SEND_RESULT instead",					__FILE__, __LINE__);			ret = PROXY_SEND_RESULT;		case PROXY_SEND_RESULT:			/**			 * proxy.response.type = ERR, RAW, ...			 */			if (proxy_lua_handle_proxy_response(con)) {				/**				 * handling proxy.response failed				 *				 * send a ERR packet				 */						network_mysqld_con_send_error(con->client, C("(lua) handling proxy.response failed, check error-log"));			}			break;		default:			ret = PROXY_NO_DECISION;			break;		}	} else if (lua_isnil(L, -1)) {		lua_pop(L, 1); /* pop the nil */	} else {		g_message("%s.%d: %s", __FILE__, __LINE__, lua_typename(L, lua_type(L, -1)));		lua_pop(L, 1); /* pop the ... */	}	lua_pop(L, 1); /* fenv */	g_assert(lua_isfunction(L, -1));#endif	return ret;}/** * parse the hand-shake packet from the server * * * @note the SSL and COMPRESS flags are disabled as we can't  *       intercept or parse them. */NETWORK_MYSQLD_PLUGIN_PROTO(proxy_read_handshake) {	GString *packet;	GList *chunk;	network_socket *recv_sock, *send_sock;	guint off = 0;	int maj, min, patch;	guint16 server_cap = 0;	guint8  server_lang = 0;	guint16 server_status = 0;	gchar *scramble_1, *scramble_2;	send_sock = con->client;	recv_sock = con->server;	chunk = recv_sock->recv_queue->chunks->tail;	packet = chunk->data;	if (packet->len != recv_sock->packet_len + NET_HEADER_SIZE) {		/**		 * packet is too short, looks nasty.		 *		 * report an error and let the core send a error to the 		 * client		 */		recv_sock->packet_len = PACKET_LEN_UNSET;		g_queue_delete_link(recv_sock->recv_queue->chunks, chunk);		return RET_ERROR;	}	if (packet->str[NET_HEADER_SIZE + 0] == '\xff') {		/* the server doesn't like us and sends a ERR packet		 *		 * forward it to the client */		network_queue_append_chunk(send_sock->send_queue, packet);		recv_sock->packet_len = PACKET_LEN_UNSET;		g_queue_delete_link(recv_sock->recv_queue->chunks, chunk);		return RET_ERROR;	} else if (packet->str[NET_HEADER_SIZE + 0] != '\x0a') {		/* the server isn't 4.1+ server, send a client a ERR packet		 */		recv_sock->packet_len = PACKET_LEN_UNSET;		g_queue_delete_link(recv_sock->recv_queue->chunks, chunk);		network_mysqld_con_send_error(send_sock, C("unknown protocol"));		return RET_ERROR;	}	/* scan for a \0 */	for (off = NET_HEADER_SIZE + 1; packet->str[off] && off < packet->len + NET_HEADER_SIZE; off++);	if (packet->str[off] != '\0') {		/* the server has sent us garbage */		recv_sock->packet_len = PACKET_LEN_UNSET;		g_queue_delete_link(recv_sock->recv_queue->chunks, chunk);		network_mysqld_con_send_error(send_sock, C("protocol 10, but version number not terminated"));		return RET_ERROR;	}	if (3 != sscanf(packet->str + NET_HEADER_SIZE + 1, "%d.%d.%d%*s", &maj, &min, &patch)) {		/* can't parse the protocol */		recv_sock->packet_len = PACKET_LEN_UNSET;		g_queue_delete_link(recv_sock->recv_queue->chunks, chunk);		network_mysqld_con_send_error(send_sock, C("protocol 10, but version number not parsable"));		return RET_ERROR;	}	/**	 * out of range 	 */	if (min   < 0 || min   > 100 ||	    patch < 0 || patch > 100 ||	    maj   < 0 || maj   > 10) {		recv_sock->packet_len = PACKET_LEN_UNSET;		g_queue_delete_link(recv_sock->recv_queue->chunks, chunk);		network_mysqld_con_send_error(send_sock, C("protocol 10, but version number out of range"));		return RET_ERROR;	}	recv_sock->mysqld_version = 		maj * 10000 +		min *   100 +		patch;	/* skip the \0 */	off++;	recv_sock->thread_id = network_mysqld_proto_get_int32(packet, &off);	send_sock->thread_id = recv_sock->thread_id;	/**	 * get the scramble buf	 *	 * 8 byte here and some the other 12 somewhen later	 */		scramble_1 = network_mysqld_proto_get_string_len(packet, &off, 8);	network_mysqld_proto_skip(packet, &off, 1);	/* we can't sniff compressed packets nor do we support SSL */	packet->str[off] &= ~(CLIENT_COMPRESS);	packet->str[off] &= ~(CLIENT_SSL);	server_cap    = network_mysqld_proto_get_int16(packet, &off);	if (server_cap & CLIENT_COMPRESS) {		packet->str[off-2] &= ~(CLIENT_COMPRESS);	}	if (server_cap & CLIENT_SSL) {		packet->str[off-1] &= ~(CLIENT_SSL >> 8);	}		server_lang   = network_mysqld_proto_get_int8(packet, &off);	server_statu

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -