📄 network-mysqld-proxy.c
字号:
/* Copyright (C) 2007 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifdef HAVE_CONFIG_H#include "config.h"#endif/** * @page proxy_states The internal states of the Proxy * * The MySQL Proxy implements the MySQL Protocol in its own way. * * -# connect @msc * client, proxy, backend; * --- [ label = "connect to backend" ]; * client->proxy [ label = "INIT" ]; * proxy->backend [ label = "CONNECT_SERVER", URL="\ref proxy_connect_server" ]; * @endmsc * -# auth @msc * client, proxy, backend; * --- [ label = "authenticate" ]; * backend->proxy [ label = "READ_HANDSHAKE", URL="\ref proxy_read_handshake" ]; * proxy->client [ label = "SEND_HANDSHAKE" ]; * client->proxy [ label = "READ_AUTH", URL="\ref proxy_read_auth" ]; * proxy->backend [ label = "SEND_AUTH" ]; * backend->proxy [ label = "READ_AUTH_RESULT", URL="\ref proxy_read_auth_result" ]; * proxy->client [ label = "SEND_AUTH_RESULT" ]; * @endmsc * -# query @msc * client, proxy, backend; * --- [ label = "query result phase" ]; * client->proxy [ label = "READ_QUERY", URL="\ref proxy_read_query" ]; * proxy->backend [ label = "SEND_QUERY" ]; * backend->proxy [ label = "READ_QUERY_RESULT", URL="\ref proxy_read_query_result" ]; * proxy->client [ label = "SEND_QUERY_RESULT", URL="\ref proxy_send_query_result" ]; * @endmsc * * - network_mysqld_proxy_connection_init() * -# registers the callbacks * - proxy_connect_server() (CON_STATE_CONNECT_SERVER) * -# calls the connect_server() function in the lua script which might decide to * -# send a handshake packet without contacting the backend server (CON_STATE_SEND_HANDSHAKE) * -# closing the connection (CON_STATE_ERROR) * -# picking a active connection from the connection pool * -# pick a backend to authenticate against * -# do nothing * -# by default, pick a backend from the backend list on the backend with the least active connctions * -# opens the connection to the backend with connect() * -# when done CON_STATE_READ_HANDSHAKE * - proxy_read_handshake() (CON_STATE_READ_HANDSHAKE) * -# reads the handshake packet from the server * - proxy_read_auth() (CON_STATE_READ_AUTH) * -# reads the auth packet from the client * - proxy_read_auth_result() (CON_STATE_READ_AUTH_RESULT) * -# reads the auth-result packet from the server * - proxy_send_auth_result() (CON_STATE_SEND_AUTH_RESULT) * - proxy_read_query() (CON_STATE_READ_QUERY) * -# reads the query from the client * - proxy_read_query_result() (CON_STATE_READ_QUERY_RESULT) * -# reads the query-result from the server * - proxy_send_query_result() (CON_STATE_SEND_QUERY_RESULT) * -# called after the data is written to the client * -# if scripts wants to close connections, goes to CON_STATE_ERROR * -# if queries are in the injection queue, goes to CON_STATE_SEND_QUERY * -# otherwise goes to CON_STATE_READ_QUERY * -# does special handling for COM_BINLOG_DUMP (go to CON_STATE_READ_QUERY_RESULT) */#ifdef HAVE_SYS_FILIO_H/** * required for FIONREAD on solaris */#include <sys/filio.h>#endif#ifndef _WIN32#include <sys/ioctl.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#define ioctlsocket ioctl#endif#include <string.h>#include <stdlib.h>#include <fcntl.h>#include <time.h>#include <stdio.h>#include <errno.h>#include <glib.h>#ifdef HAVE_LUA_H/** * embedded lua support */#include <lua.h>#include <lauxlib.h>#include <lualib.h>#endif/* for solaris 2.5 and NetBSD 1.3.x */#ifndef HAVE_SOCKLEN_Ttypedef int socklen_t;#endif#include <mysqld_error.h> /** for ER_UNKNOWN_ERROR */#include "network-mysqld.h"#include "network-mysqld-proto.h"#include "network-conn-pool.h"#include "sys-pedantic.h"#include "sql-tokenizer.h"#ifdef _WIN32#define E_NET_CONNRESET WSAECONNRESET#define E_NET_CONNABORTED WSAECONNABORTED#define E_NET_WOULDBLOCK WSAEWOULDBLOCK#define E_NET_INPROGRESS WSAEINPROGRESS#else#define E_NET_CONNRESET ECONNRESET#define E_NET_CONNABORTED ECONNABORTED#define E_NET_INPROGRESS EINPROGRESS#if EWOULDBLOCK == EAGAIN/** * some system make EAGAIN == EWOULDBLOCK which would lead to a * error in the case handling * * set it to -1 as this error should never happen */#define E_NET_WOULDBLOCK -1#else#define E_NET_WOULDBLOCK EWOULDBLOCK#endif#endif#define TIME_DIFF_US(t2, t1) \ ((t2.tv_sec - t1.tv_sec) * 1000000.0 + (t2.tv_usec - t1.tv_usec))#define C(x) x, sizeof(x) - 1#define HASH_INSERT(hash, key, expr) \ do { \ GString *hash_value; \ if ((hash_value = g_hash_table_lookup(hash, key))) { \ expr; \ } else { \ hash_value = g_string_new(NULL); \ expr; \ g_hash_table_insert(hash, g_strdup(key), hash_value); \ } \ } while(0);#define CRASHME() do { char *_crashme = NULL; *_crashme = 0; } while(0);typedef enum { PROXY_NO_DECISION, PROXY_SEND_QUERY, PROXY_SEND_RESULT, PROXY_SEND_INJECTION, PROXY_IGNORE_RESULT /** for read_query_result */} proxy_stmt_ret;typedef enum { BACKEND_STATE_UNKNOWN, BACKEND_STATE_UP, BACKEND_STATE_DOWN} backend_state_t;typedef enum { BACKEND_TYPE_UNKNOWN, BACKEND_TYPE_RW, BACKEND_TYPE_RO} backend_type_t;typedef struct { network_address addr; backend_state_t state; /**< UP or DOWN */ backend_type_t type; /**< ReadWrite or ReadOnly */ GTimeVal state_since; /**< timestamp of the last state-change */ network_connection_pool *pool; /**< the pool of open connections */ guint connected_clients; /**< number of open connections to this backend for SQF */} backend_t;/** * the shared information across all connections * */typedef struct { /** * our pool if backends * * GPtrArray<backend_t> */ GPtrArray *backend_pool; GTimeVal backend_last_check;#ifdef HAVE_LUA_H lua_State *L; /**< the global lua_State */#endif} plugin_srv_state;typedef struct { /** * the content of the OK packet */ int server_status; int warning_count; guint64 affected_rows; guint64 insert_id; int was_resultset; /** if set, affected_rows and insert_id are ignored */ /** * MYSQLD_PACKET_OK or MYSQLD_PACKET_ERR */ int query_status;} query_status;typedef struct { struct { GQueue *queries; /** queries we want to executed */ query_status qstat;#ifdef HAVE_LUA_H lua_State *L; int L_ref;#endif int sent_resultset; /** make sure we send only one result back to the client */ } injected; plugin_srv_state *global_state; backend_t *backend; int backend_ndx;} plugin_con_state;typedef struct { GString *query; int id; /* a unique id set by the scripts to map the query to a handler */ /* the userdata's need them */ GQueue *result_queue; /* the data to parse */ query_status qstat; GTimeVal ts_read_query; /* timestamp when we added this query to the queues */ GTimeVal ts_read_query_result_first; /* when we first finished it */ GTimeVal ts_read_query_result_last; /* when we first finished it */} injection;static injection *injection_init(int id, GString *query) { injection *i; i = g_new0(injection, 1); i->id = id; i->query = query; /** * we have to assume that injection_init() is only used by the read_query call * which should be fine */ g_get_current_time(&(i->ts_read_query)); return i;}static void injection_free(injection *i) { if (!i) return; if (i->query) g_string_free(i->query, TRUE); g_free(i);}/** * reset the script context of the connection */static void proxy_lua_free_script(plugin_con_state *st) {#ifdef HAVE_LUA_H lua_State *L = st->injected.L; plugin_srv_state *g = st->global_state; if (!st->injected.L) return; g_assert(lua_isfunction(L, -1)); lua_pop(L, 1); /* function */ g_assert(lua_gettop(L) == 0); luaL_unref(g->L, LUA_REGISTRYINDEX, st->injected.L_ref); /** * clean up our object */ lua_gc(g->L, LUA_GCCOLLECT, 0); st->injected.L = NULL;#endif}static plugin_con_state *plugin_con_state_init() { plugin_con_state *st; st = g_new0(plugin_con_state, 1); st->injected.queries = g_queue_new(); return st;}static void plugin_con_state_free(plugin_con_state *st) { injection *inj; if (!st) return; proxy_lua_free_script(st); while ((inj = g_queue_pop_head(st->injected.queries))) injection_free(inj); g_queue_free(st->injected.queries); g_free(st);}#ifdef HAVE_LUA_H/** * init the global proxy object */static void proxy_lua_init_global_fenv(lua_State *L) { lua_newtable(L); /* my empty environment aka {} (sp += 1) */#define DEF(x) \ lua_pushinteger(L, x); \ lua_setfield(L, -2, #x); DEF(PROXY_SEND_QUERY); DEF(PROXY_SEND_RESULT); DEF(PROXY_IGNORE_RESULT); DEF(MYSQLD_PACKET_OK); DEF(MYSQLD_PACKET_ERR); DEF(MYSQLD_PACKET_RAW); DEF(BACKEND_STATE_UNKNOWN); DEF(BACKEND_STATE_UP); DEF(BACKEND_STATE_DOWN); DEF(BACKEND_TYPE_UNKNOWN); DEF(BACKEND_TYPE_RW); DEF(BACKEND_TYPE_RO); DEF(COM_SLEEP); DEF(COM_QUIT); DEF(COM_INIT_DB); DEF(COM_QUERY); DEF(COM_FIELD_LIST); DEF(COM_CREATE_DB); DEF(COM_DROP_DB); DEF(COM_REFRESH); DEF(COM_SHUTDOWN); DEF(COM_STATISTICS); DEF(COM_PROCESS_INFO); DEF(COM_CONNECT); DEF(COM_PROCESS_KILL); DEF(COM_DEBUG); DEF(COM_PING); DEF(COM_TIME); DEF(COM_DELAYED_INSERT); DEF(COM_CHANGE_USER); DEF(COM_BINLOG_DUMP); DEF(COM_TABLE_DUMP); DEF(COM_CONNECT_OUT); DEF(COM_REGISTER_SLAVE); DEF(COM_STMT_PREPARE); DEF(COM_STMT_EXECUTE); DEF(COM_STMT_SEND_LONG_DATA); DEF(COM_STMT_CLOSE); DEF(COM_STMT_RESET); DEF(COM_SET_OPTION);#if MYSQL_VERSION_ID >= 50000 DEF(COM_STMT_FETCH);#if MYSQL_VERSION_ID >= 50100 DEF(COM_DAEMON);#endif#endif DEF(MYSQL_TYPE_DECIMAL);#if MYSQL_VERSION_ID >= 50000 DEF(MYSQL_TYPE_NEWDECIMAL);#endif DEF(MYSQL_TYPE_TINY); DEF(MYSQL_TYPE_SHORT); DEF(MYSQL_TYPE_LONG); DEF(MYSQL_TYPE_FLOAT); DEF(MYSQL_TYPE_DOUBLE); DEF(MYSQL_TYPE_NULL); DEF(MYSQL_TYPE_TIMESTAMP); DEF(MYSQL_TYPE_LONGLONG); DEF(MYSQL_TYPE_INT24); DEF(MYSQL_TYPE_DATE); DEF(MYSQL_TYPE_TIME); DEF(MYSQL_TYPE_DATETIME); DEF(MYSQL_TYPE_YEAR); DEF(MYSQL_TYPE_NEWDATE); DEF(MYSQL_TYPE_ENUM); DEF(MYSQL_TYPE_SET); DEF(MYSQL_TYPE_TINY_BLOB); DEF(MYSQL_TYPE_MEDIUM_BLOB); DEF(MYSQL_TYPE_LONG_BLOB); DEF(MYSQL_TYPE_BLOB); DEF(MYSQL_TYPE_VAR_STRING); DEF(MYSQL_TYPE_STRING); DEF(MYSQL_TYPE_GEOMETRY);#if MYSQL_VERSION_ID >= 50000 DEF(MYSQL_TYPE_BIT);#endif /* cheat with DEF() a bit :) */#define PROXY_VERSION PACKAGE_VERSION_ID DEF(PROXY_VERSION);#undef DEF /** * create * - proxy.global * - proxy.global.config */ lua_newtable(L); lua_newtable(L); lua_setfield(L, -2, "config"); lua_setfield(L, -2, "global"); lua_setglobal(L, "proxy");}#endifstatic backend_t *backend_init() { backend_t *b; b = g_new0(backend_t, 1); b->pool = network_connection_pool_init(); return b;}void backend_free(backend_t *b) { if (!b) return; network_connection_pool_free(b->pool); if (b->addr.str) g_free(b->addr.str); g_free(b);}static plugin_srv_state *plugin_srv_state_init() { plugin_srv_state *g; g = g_new0(plugin_srv_state, 1); g->backend_pool = g_ptr_array_new();#ifdef HAVE_LUA_H
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -