📄 network-mysqld-proxy.c
字号:
/* push the parameters on the stack */ rows_p = lua_newuserdata(L, sizeof(rows)); *rows_p = rows; /* if the meta-table is new, add __index to it */ if (1 == luaL_newmetatable(L, "proxy.resultset.light")) { lua_pushcfunction(L, proxy_resultset_gc_light); /* (sp += 1) */ lua_setfield(L, -2, "__gc"); /* (sp -= 1) */ } lua_setmetatable(L, -2); /* tie the metatable to the table (sp -= 1) */ /* return a interator */ lua_pushcclosure(L, proxy_resultset_rows_iter, 1); } else { lua_pushnil(L); } } else if (0 == strcmp(key, "raw")) { GString *s = res->result_queue->head->data; lua_pushlstring(L, s->str + 4, s->len - 4); } else if (0 == strcmp(key, "flags")) { lua_newtable(L); lua_pushboolean(L, (res->qstat.server_status & SERVER_STATUS_IN_TRANS) != 0); lua_setfield(L, -2, "in_trans"); lua_pushboolean(L, (res->qstat.server_status & SERVER_STATUS_AUTOCOMMIT) != 0); lua_setfield(L, -2, "auto_commit"); lua_pushboolean(L, (res->qstat.server_status & SERVER_QUERY_NO_GOOD_INDEX_USED) != 0); lua_setfield(L, -2, "no_good_index_used"); lua_pushboolean(L, (res->qstat.server_status & SERVER_QUERY_NO_INDEX_USED) != 0); lua_setfield(L, -2, "no_index_used"); } else if (0 == strcmp(key, "warning_count")) { lua_pushinteger(L, res->qstat.warning_count); } else if (0 == strcmp(key, "affected_rows")) { /** * if the query had a result-set (SELECT, ...) * affected_rows and insert_id are not valid */ if (res->qstat.was_resultset) { lua_pushnil(L); } else { lua_pushnumber(L, res->qstat.affected_rows); } } else if (0 == strcmp(key, "insert_id")) { if (res->qstat.was_resultset) { lua_pushnil(L); } else { lua_pushnumber(L, res->qstat.insert_id); } } else if (0 == strcmp(key, "query_status")) { if (0 != parse_resultset_fields(res)) { /* not a result-set */ lua_pushnil(L); } else { lua_pushinteger(L, res->qstat.query_status); } } else { lua_pushnil(L); } return 1;}static int proxy_injection_get(lua_State *L) { injection *inj = *(injection **)luaL_checkudata(L, 1, "proxy.injection"); const char *key = luaL_checkstring(L, 2); if (0 == strcmp(key, "type")) { lua_pushinteger(L, inj->id); /** DEPRECATED: use "inj.id" instead */ } else if (0 == strcmp(key, "id")) { lua_pushinteger(L, inj->id); } else if (0 == strcmp(key, "query")) { lua_pushlstring(L, inj->query->str, inj->query->len); } else if (0 == strcmp(key, "query_time")) { lua_pushinteger(L, TIME_DIFF_US(inj->ts_read_query_result_first, inj->ts_read_query)); } else if (0 == strcmp(key, "response_time")) { lua_pushinteger(L, TIME_DIFF_US(inj->ts_read_query_result_last, inj->ts_read_query)); } else if (0 == strcmp(key, "resultset")) { /* fields, rows */ proxy_resultset_t *res; proxy_resultset_t **res_p; res_p = lua_newuserdata(L, sizeof(res)); *res_p = res = proxy_resultset_init(); res->result_queue = inj->result_queue; res->qstat = inj->qstat; /* if the meta-table is new, add __index to it */ if (1 == luaL_newmetatable(L, "proxy.resultset")) { lua_pushcfunction(L, proxy_resultset_get); /* (sp += 1) */ lua_setfield(L, -2, "__index"); /* (sp -= 1) */ lua_pushcfunction(L, proxy_resultset_gc); /* (sp += 1) */ lua_setfield(L, -2, "__gc"); /* (sp -= 1) */ } lua_setmetatable(L, -2); /* tie the metatable to the table (sp -= 1) */ } else { g_message("%s.%d: inj[%s] ... not found", __FILE__, __LINE__, key); lua_pushnil(L); } return 1;}#endifstatic proxy_stmt_ret proxy_lua_read_query_result(network_mysqld_con *con) {#ifdef HAVE_LUA_H network_socket *send_sock = con->client;#endif injection *inj = NULL; plugin_con_state *st = con->plugin_con_state; proxy_stmt_ret ret = PROXY_NO_DECISION; /** * check if we want to forward the statement to the client * * if not, clean the send-queue */ if (0 == st->injected.queries->length) return PROXY_NO_DECISION; inj = g_queue_pop_head(st->injected.queries);#ifdef HAVE_LUA_H /* call the lua script to pick a backend * */ lua_register_callback(con); if (st->injected.L) { lua_State *L = st->injected.L; g_assert(lua_isfunction(L, -1)); lua_getfenv(L, -1); g_assert(lua_istable(L, -1)); lua_getfield(L, -1, "read_query_result"); if (lua_isfunction(L, -1)) { injection **inj_p; GString *packet; inj_p = lua_newuserdata(L, sizeof(inj)); *inj_p = inj; inj->result_queue = con->client->send_queue->chunks; inj->qstat = st->injected.qstat; /* if the meta-table is new, add __index to it */ if (1 == luaL_newmetatable(L, "proxy.injection")) { lua_pushcfunction(L, proxy_injection_get); /* (sp += 1) */ lua_setfield(L, -2, "__index"); /* (sp -= 1) */ } lua_setmetatable(L, -2); /* tie the metatable to the table (sp -= 1) */ if (lua_pcall(L, 1, 1, 0) != 0) { g_critical("(read_query_result) %s", lua_tostring(L, -1)); lua_pop(L, 1); /* err-msg */ ret = PROXY_NO_DECISION; } else { if (lua_isnumber(L, -1)) { ret = lua_tonumber(L, -1); } lua_pop(L, 1); } switch (ret) { case PROXY_SEND_RESULT: /** * replace the result-set the server sent us */ while ((packet = g_queue_pop_head(send_sock->send_queue->chunks))) g_string_free(packet, TRUE); /** * we are a response to the client packet, hence one packet id more */ send_sock->packet_id++; if (proxy_lua_handle_proxy_response(con)) { /** * handling proxy.response failed * * send a ERR packet in case there was no result-set sent yet */ if (!st->injected.sent_resultset) { network_mysqld_con_send_error(con->client, C("(lua) handling proxy.response failed, check error-log")); } } /* fall through */ case PROXY_NO_DECISION: if (!st->injected.sent_resultset) { /** * make sure we send only one result-set per client-query */ st->injected.sent_resultset++; break; } g_warning("%s.%d: got asked to send a resultset, but ignoring it as we already have sent %d resultset(s). injection-id: %d", __FILE__, __LINE__, st->injected.sent_resultset, inj->id); st->injected.sent_resultset++; /* fall through */ case PROXY_IGNORE_RESULT: /* trash the packets for the injection query */ while ((packet = g_queue_pop_head(send_sock->send_queue->chunks))) g_string_free(packet, TRUE); break; default: /* invalid return code */ g_message("%s.%d: return-code for read_query_result() was neither PROXY_SEND_RESULT or PROXY_IGNORE_RESULT, will ignore the result", __FILE__, __LINE__); while ((packet = g_queue_pop_head(send_sock->send_queue->chunks))) g_string_free(packet, TRUE); break; } } else if (lua_isnil(L, -1)) { /* no function defined, let's send the result-set */ lua_pop(L, 1); /* pop the nil */ } else { g_message("%s.%d: (network_mysqld_con_handle_proxy_resultset) got wrong type: %s", __FILE__, __LINE__, lua_typename(L, lua_type(L, -1))); lua_pop(L, 1); /* pop the nil */ } lua_pop(L, 1); /* fenv */ g_assert(lua_isfunction(L, -1)); }#endif injection_free(inj); return ret;}/** * call the lua function to intercept the handshake packet * * @return PROXY_SEND_QUERY to send the packet from the client * PROXY_NO_DECISION to pass the server packet unmodified */static proxy_stmt_ret proxy_lua_read_handshake(network_mysqld_con *con) { proxy_stmt_ret ret = PROXY_NO_DECISION; /* send what the server gave us */#ifdef HAVE_LUA_H plugin_con_state *st = con->plugin_con_state; network_socket *recv_sock = con->server; network_socket *send_sock = con->client; lua_State *L; /* call the lua script to pick a backend * */ lua_register_callback(con); if (!st->injected.L) return ret; L = st->injected.L; g_assert(lua_isfunction(L, -1)); lua_getfenv(L, -1); g_assert(lua_istable(L, -1)); lua_getfield(L, -1, "read_handshake"); if (lua_isfunction(L, -1)) { /* export * * every thing we know about it * */ lua_newtable(L); lua_pushlstring(L, recv_sock->scramble_buf->str, recv_sock->scramble_buf->len); lua_setfield(L, -2, "scramble"); lua_pushinteger(L, recv_sock->mysqld_version); lua_setfield(L, -2, "mysqld_version"); lua_pushinteger(L, recv_sock->thread_id); lua_setfield(L, -2, "thread_id"); lua_pushstring(L, recv_sock->addr.str); lua_setfield(L, -2, "server_addr"); lua_pushstring(L, send_sock->addr.str); lua_setfield(L, -2, "client_addr"); if (lua_pcall(L, 1, 1, 0) != 0) { g_critical("(read_handshake) %s", lua_tostring(L, -1)); lua_pop(L, 1); /* errmsg */ /* the script failed, but we have a useful default */ } else { if (lua_isnumber(L, -1)) { ret = lua_tonumber(L, -1); } lua_pop(L, 1); } switch (ret) { case PROXY_NO_DECISION: break; case PROXY_SEND_QUERY: g_warning("%s.%d: (read_handshake) return proxy.PROXY_SEND_QUERY is deprecated, use PROXY_SEND_RESULT instead", __FILE__, __LINE__); ret = PROXY_SEND_RESULT; case PROXY_SEND_RESULT: /** * proxy.response.type = ERR, RAW, ... */ if (proxy_lua_handle_proxy_response(con)) { /** * handling proxy.response failed * * send a ERR packet */ network_mysqld_con_send_error(con->client, C("(lua) handling proxy.response failed, check error-log")); } break; default: ret = PROXY_NO_DECISION; break; } } else if (lua_isnil(L, -1)) { lua_pop(L, 1); /* pop the nil */ } else { g_message("%s.%d: %s", __FILE__, __LINE__, lua_typename(L, lua_type(L, -1))); lua_pop(L, 1); /* pop the ... */ } lua_pop(L, 1); /* fenv */ g_assert(lua_isfunction(L, -1));#endif return ret;}/** * parse the hand-shake packet from the server * * * @note the SSL and COMPRESS flags are disabled as we can't * intercept or parse them. */NETWORK_MYSQLD_PLUGIN_PROTO(proxy_read_handshake) { GString *packet; GList *chunk; network_socket *recv_sock, *send_sock; guint off = 0; int maj, min, patch; guint16 server_cap = 0; guint8 server_lang = 0; guint16 server_status = 0; gchar *scramble_1, *scramble_2; send_sock = con->client; recv_sock = con->server; chunk = recv_sock->recv_queue->chunks->tail; packet = chunk->data; if (packet->len != recv_sock->packet_len + NET_HEADER_SIZE) { /** * packet is too short, looks nasty. * * report an error and let the core send a error to the * client */ recv_sock->packet_len = PACKET_LEN_UNSET; g_queue_delete_link(recv_sock->recv_queue->chunks, chunk); return RET_ERROR; } if (packet->str[NET_HEADER_SIZE + 0] == '\xff') { /* the server doesn't like us and sends a ERR packet * * forward it to the client */ network_queue_append_chunk(send_sock->send_queue, packet); recv_sock->packet_len = PACKET_LEN_UNSET; g_queue_delete_link(recv_sock->recv_queue->chunks, chunk); return RET_ERROR; } else if (packet->str[NET_HEADER_SIZE + 0] != '\x0a') { /* the server isn't 4.1+ server, send a client a ERR packet */ recv_sock->packet_len = PACKET_LEN_UNSET; g_queue_delete_link(recv_sock->recv_queue->chunks, chunk); network_mysqld_con_send_error(send_sock, C("unknown protocol")); return RET_ERROR; } /* scan for a \0 */ for (off = NET_HEADER_SIZE + 1; packet->str[off] && off < packet->len + NET_HEADER_SIZE; off++); if (packet->str[off] != '\0') { /* the server has sent us garbage */ recv_sock->packet_len = PACKET_LEN_UNSET; g_queue_delete_link(recv_sock->recv_queue->chunks, chunk); network_mysqld_con_send_error(send_sock, C("protocol 10, but version number not terminated")); return RET_ERROR; } if (3 != sscanf(packet->str + NET_HEADER_SIZE + 1, "%d.%d.%d%*s", &maj, &min, &patch)) { /* can't parse the protocol */ recv_sock->packet_len = PACKET_LEN_UNSET; g_queue_delete_link(recv_sock->recv_queue->chunks, chunk); network_mysqld_con_send_error(send_sock, C("protocol 10, but version number not parsable")); return RET_ERROR; } /** * out of range */ if (min < 0 || min > 100 || patch < 0 || patch > 100 || maj < 0 || maj > 10) { recv_sock->packet_len = PACKET_LEN_UNSET; g_queue_delete_link(recv_sock->recv_queue->chunks, chunk); network_mysqld_con_send_error(send_sock, C("protocol 10, but version number out of range")); return RET_ERROR; } recv_sock->mysqld_version = maj * 10000 + min * 100 + patch; /* skip the \0 */ off++; recv_sock->thread_id = network_mysqld_proto_get_int32(packet, &off); send_sock->thread_id = recv_sock->thread_id; /** * get the scramble buf * * 8 byte here and some the other 12 somewhen later */ scramble_1 = network_mysqld_proto_get_string_len(packet, &off, 8); network_mysqld_proto_skip(packet, &off, 1); /* we can't sniff compressed packets nor do we support SSL */ packet->str[off] &= ~(CLIENT_COMPRESS); packet->str[off] &= ~(CLIENT_SSL); server_cap = network_mysqld_proto_get_int16(packet, &off); if (server_cap & CLIENT_COMPRESS) { packet->str[off-2] &= ~(CLIENT_COMPRESS); } if (server_cap & CLIENT_SSL) { packet->str[off-1] &= ~(CLIENT_SSL >> 8); } server_lang = network_mysqld_proto_get_int8(packet, &off); server_statu
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -