⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 network-mysqld-proxy.c

📁 Mysql Proxy本身是个很好的mysql负载均衡工具,但是其本身有bug:当多个mysql 做slave的时候,如果一个slave死掉,会影响别的slave也死掉!这个文件修复了这个bug!
💻 C
📖 第 1 页 / 共 5 页
字号:
								__FILE__, __LINE__,								i,								lua_typename(L, lua_type(L, -1)));					} else {						field->name = g_strdup(lua_tostring(L, -1));					}					lua_pop(L, 1);						lua_getfield(L, -1, "type"); /* proxy.response.resultset.fields[].type */					if (!lua_isnumber(L, -1)) {						g_warning("%s.%d: proxy.response.type = OK, "								"but proxy.response.resultset.fields[" F_SIZE_T "].type is not a integer (is %s), "								"using MYSQL_TYPE_STRING", 								__FILE__, __LINE__,								i,								lua_typename(L, lua_type(L, -1)));							field->type = MYSQL_TYPE_STRING;					} else {						field->type = lua_tonumber(L, -1);					}					lua_pop(L, 1);					field->flags = PRI_KEY_FLAG;					field->length = 32;					g_ptr_array_add(fields, field);										lua_pop(L, 1); /* pop key + value */				} else if (lua_isnil(L, -1)) {					lua_pop(L, 1); /* pop the nil and leave the loop */					break;				} else {					g_error("proxy.response.resultset.fields[%d] should be a table, but is a %s", 							i,							lua_typename(L, lua_type(L, -1)));				}			}			lua_pop(L, 1);				rows = g_ptr_array_new();			lua_getfield(L, -1, "rows"); /* proxy.response.resultset.rows */			g_assert(lua_istable(L, -1));			for (i = 1; ; i++) {				lua_rawgeti(L, -1, i);					if (lua_istable(L, -1)) { /** proxy.response.resultset.rows[i] */					GPtrArray *row;					gsize j;						row = g_ptr_array_new();						/* we should have as many columns as we had fields */							for (j = 1; j < field_count + 1; j++) {						lua_rawgeti(L, -1, j);							if (lua_isnil(L, -1)) {							g_ptr_array_add(row, NULL);						} else {							g_ptr_array_add(row, g_strdup(lua_tostring(L, -1)));						}							lua_pop(L, 1);					}						g_ptr_array_add(rows, row);						lua_pop(L, 1); /* pop value */				} else if (lua_isnil(L, -1)) {					lua_pop(L, 1); /* pop the nil and leave the loop */					break;				} else {					g_error("proxy.response.resultset.rows[%d] should be a table, but is a %s", 							i,							lua_typename(L, lua_type(L, -1)));				}			}			lua_pop(L, 1);			network_mysqld_con_send_resultset(con->client, fields, rows);		} else {			guint64 affected_rows = 0;			guint64 insert_id = 0;			lua_getfield(L, -2, "affected_rows"); /* proxy.response.affected_rows */			if (lua_isnumber(L, -1)) {				affected_rows = lua_tonumber(L, -1);			}			lua_pop(L, 1);			lua_getfield(L, -2, "insert_id"); /* proxy.response.affected_rows */			if (lua_isnumber(L, -1)) {				insert_id = lua_tonumber(L, -1);			}			lua_pop(L, 1);			network_mysqld_con_send_ok_full(con->client, affected_rows, insert_id, 0x0002, 0);		}		/**		 * someone should cleanup 		 */		if (fields) {			network_mysqld_proto_fields_free(fields);			fields = NULL;		}		if (rows) {			for (i = 0; i < rows->len; i++) {				GPtrArray *row = rows->pdata[i];				gsize j;				for (j = 0; j < row->len; j++) {					if (row->pdata[j]) g_free(row->pdata[j]);				}				g_ptr_array_free(row, TRUE);			}			g_ptr_array_free(rows, TRUE);			rows = NULL;		}				lua_pop(L, 1); /* .resultset */				break; }	case MYSQLD_PACKET_ERR: {		gint errorcode = ER_UNKNOWN_ERROR;		const gchar *sqlstate = "07000"; /** let's call ourself Dynamic SQL ... 07000 is "dynamic SQL error" */				lua_getfield(L, -1, "errcode"); /* proxy.response.errcode */		if (lua_isnumber(L, -1)) {			errorcode = lua_tonumber(L, -1);		}		lua_pop(L, 1);		lua_getfield(L, -1, "sqlstate"); /* proxy.response.sqlstate */		sqlstate = lua_tostring(L, -1);		lua_pop(L, 1);		lua_getfield(L, -1, "errmsg"); /* proxy.response.errmsg */		if (lua_isstring(L, -1)) {			str = lua_tolstring(L, -1, &str_len);			network_mysqld_con_send_error_full(con->client, str, str_len, errorcode, sqlstate);		} else {			network_mysqld_con_send_error(con->client, C("(lua) proxy.response.errmsg is nil"));		}		lua_pop(L, 1);		break; }	case MYSQLD_PACKET_RAW:		/**		 * iterate over the packet table and add each packet to the send-queue		 */		lua_getfield(L, -1, "packets"); /* proxy.response.packets */		if (lua_isnil(L, -1)) {			g_message("%s.%d: proxy.response.packets isn't set in %s", __FILE__, __LINE__,					con->config.proxy.lua_script);			lua_pop(L, 3 + 1); /* fenv + proxy + response + nil */			return -1;		} else if (!lua_istable(L, -1)) {			g_message("%s.%d: proxy.response.packets has to be a table, is %s in %s", __FILE__, __LINE__,					lua_typename(L, lua_type(L, -1)),					con->config.proxy.lua_script);			lua_pop(L, 3 + 1); /* fenv + proxy + response + packets */			return -1;		}		for (i = 1; ; i++) {			lua_rawgeti(L, -1, i);			if (lua_isstring(L, -1)) { /** proxy.response.packets[i] */				str = lua_tolstring(L, -1, &str_len);				network_queue_append(con->client->send_queue, str, str_len, con->client->packet_id++);					lua_pop(L, 1); /* pop value */			} else if (lua_isnil(L, -1)) {				lua_pop(L, 1); /* pop the nil and leave the loop */				break;			} else {				g_error("%s.%d: proxy.response.packets should be array of strings, field "F_SIZE_T" was %s", 						__FILE__, __LINE__, 						i,						lua_typename(L, lua_type(L, -1)));			}		}		lua_pop(L, 1); /* .packets */		break;	default:		g_message("proxy.response.type is unknown: %d", resp_type);		lua_pop(L, 2); /* proxy + response */		return -1;	}	lua_pop(L, 2);	return 0;}#endif/** * turn a GTimeVal into string * * @return string in ISO notation with micro-seconds */static gchar * g_timeval_string(GTimeVal *t1, GString *str) {	size_t used_len;		g_string_set_size(str, 63);	used_len = strftime(str->str, str->allocated_len, "%Y-%m-%dT%H:%M:%S", gmtime(&t1->tv_sec));	g_assert(used_len < str->allocated_len);	str->len = used_len;	g_string_append_printf(str, ".%06ld", t1->tv_usec);	return str->str;}#ifdef HAVE_LUA_H/** * parsed result set * *  */typedef struct {	GQueue *result_queue;   /**< where the packets are read from */	GPtrArray *fields;      /**< the parsed fields */	GList *rows_chunk_head; /**< pointer to the EOF packet after the fields */	GList *row;             /**< the current row */	query_status qstat;     /**< state if this query */} proxy_resultset_t;proxy_resultset_t *proxy_resultset_init() {	proxy_resultset_t *res;	res = g_new0(proxy_resultset_t, 1);	return res;}void proxy_resultset_free(proxy_resultset_t *res) {	if (!res) return;	if (res->fields) {		network_mysqld_proto_fields_free(res->fields);	}	g_free(res);}static int proxy_resultset_gc(lua_State *L) {	proxy_resultset_t *res = *(proxy_resultset_t **)lua_touserdata(L, 1);		proxy_resultset_free(res);	return 0;}static int proxy_resultset_gc_light(lua_State *L) {	proxy_resultset_t *res = *(proxy_resultset_t **)lua_touserdata(L, 1);		g_free(res);	return 0;}static int proxy_resultset_fields_len(lua_State *L) {	GPtrArray *fields = *(GPtrArray **)luaL_checkudata(L, 1, "proxy.resultset.fields");        lua_pushinteger(L, fields->len);        return 1;}static int proxy_resultset_field_get(lua_State *L) {	MYSQL_FIELD *field = *(MYSQL_FIELD **)luaL_checkudata(L, 1, "proxy.resultset.fields.field");	const char *key = luaL_checkstring(L, 2);	if (0 == strcmp(key, "type")) {		lua_pushinteger(L, field->type);	} else if (0 == strcmp(key, "name")) {		lua_pushstring(L, field->name);	} else if (0 == strcmp(key, "org_name")) {		lua_pushstring(L, field->org_name);	} else if (0 == strcmp(key, "org_table")) {		lua_pushstring(L, field->org_table);	} else if (0 == strcmp(key, "table")) {		lua_pushstring(L, field->table);	} else {		lua_pushnil(L);	}	return 1;}/** * get a field from the result-set * */static int proxy_resultset_fields_get(lua_State *L) {	GPtrArray *fields = *(GPtrArray **)luaL_checkudata(L, 1, "proxy.resultset.fields");	MYSQL_FIELD *field;	MYSQL_FIELD **field_p;	int ndx = luaL_checkinteger(L, 2);	if (ndx < 1 || ndx > fields->len) {		lua_pushnil(L);		return 1;	}	field = fields->pdata[ndx - 1]; /** lua starts at 1, C at 0 */	field_p = lua_newuserdata(L, sizeof(field));	*field_p = field;	/* if the meta-table is new, add __index to it */	if (1 == luaL_newmetatable(L, "proxy.resultset.fields.field")) {		lua_pushcfunction(L, proxy_resultset_field_get);         /* (sp += 1) */		lua_setfield(L, -2, "__index");                          /* (sp -= 1) */	}	lua_setmetatable(L, -2); /* tie the metatable to the table   (sp -= 1) */	return 1;}/** * get the next row from the resultset * * returns a lua-table with the fields (starting at 1) * * @return 0 on error, 1 on success * */static int proxy_resultset_rows_iter(lua_State *L) {	proxy_resultset_t *res = *(proxy_resultset_t **)lua_touserdata(L, lua_upvalueindex(1));	guint32 off = NET_HEADER_SIZE; /* skip the packet-len and sequence-number */	GString *packet;	GPtrArray *fields = res->fields;	gsize i;	GList *chunk = res->row;	if (chunk == NULL) return 0;	packet = chunk->data;	/* if we find the 2nd EOF packet we are done */	if (packet->str[off] == MYSQLD_PACKET_EOF &&	    packet->len < 10) return 0;	/* a ERR packet instead of real rows	 *	 * like "explain select fld3 from t2 ignore index (fld3,not_existing)"	 *	 * see mysql-test/t/select.test	 *  */	if (packet->str[off] == MYSQLD_PACKET_ERR) {		return 0;	}	lua_newtable(L);	for (i = 0; i < fields->len; i++) {		guint64 field_len;		g_assert(off <= packet->len + NET_HEADER_SIZE);		field_len = network_mysqld_proto_get_lenenc_int(packet, &off);		if (field_len == 251) { /** @todo use constant */			lua_pushnil(L);						off += 0;		} else {			/**			 * @todo we only support fields in the row-iterator < 16M (packet-len)			 */			g_assert(field_len <= packet->len + NET_HEADER_SIZE);			g_assert(off + field_len <= packet->len + NET_HEADER_SIZE);			lua_pushlstring(L, packet->str + off, field_len);			off += field_len;		}		/* lua starts its tables at 1 */		lua_rawseti(L, -2, i + 1);	}	res->row = res->row->next;	return 1;}/** * parse the result-set of the query * * @return if this is not a result-set we return -1 */static int parse_resultset_fields(proxy_resultset_t *res) {	GString *packet = res->result_queue->head->data;	GList *chunk;	if (res->fields) return 0;	switch (packet->str[NET_HEADER_SIZE]) {	case MYSQLD_PACKET_OK:	case MYSQLD_PACKET_ERR:		res->qstat.query_status = packet->str[NET_HEADER_SIZE];		return 0;	default:		/* OK with a resultset */		res->qstat.query_status = MYSQLD_PACKET_OK;		break;	}	/* parse the fields */	res->fields = network_mysqld_proto_fields_init();	if (!res->fields) return -1;	chunk = network_mysqld_result_parse_fields(res->result_queue->head, res->fields);	/* no result-set found */	if (!chunk) return -1;	/* skip the end-of-fields chunk */	res->rows_chunk_head = chunk->next;	return 0;}static int proxy_resultset_get(lua_State *L) {	proxy_resultset_t *res = *(proxy_resultset_t **)luaL_checkudata(L, 1, "proxy.resultset");	const char *key = luaL_checkstring(L, 2);	if (0 == strcmp(key, "fields")) {		GPtrArray **fields_p;		parse_resultset_fields(res);		if (res->fields) {			fields_p = lua_newuserdata(L, sizeof(res->fields));			*fields_p = res->fields;				/* if the meta-table is new, add __index to it */			if (1 == luaL_newmetatable(L, "proxy.resultset.fields")) {				lua_pushcfunction(L, proxy_resultset_fields_get);         /* (sp += 1) */				lua_setfield(L, -2, "__index");                           /* (sp -= 1) */                                lua_pushcfunction(L, proxy_resultset_fields_len);        /* (sp += 1) */                                lua_setfield(L, -2, "__len");                            /* (sp -= 1) */			}				lua_setmetatable(L, -2); /* tie the metatable to the table   (sp -= 1) */		} else {			lua_pushnil(L);		}	} else if (0 == strcmp(key, "rows")) {		proxy_resultset_t *rows;		proxy_resultset_t **rows_p;		parse_resultset_fields(res);		if (res->rows_chunk_head) {				rows = proxy_resultset_init();			rows->rows_chunk_head = res->rows_chunk_head;			rows->row    = rows->rows_chunk_head;			rows->fields = res->fields;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -