📄 network-mysqld-proxy.c
字号:
g->L = luaL_newstate(); luaL_openlibs(g->L); proxy_lua_init_global_fenv(g->L);#endif return g;}void plugin_srv_state_free(plugin_srv_state *g) { gsize i; if (!g) return;#ifdef HAVE_LUA_H lua_close(g->L);#endif for (i = 0; i < g->backend_pool->len; i++) { backend_t *backend = g->backend_pool->pdata[i]; backend_free(backend); } g_ptr_array_free(g->backend_pool, TRUE); g_free(g);}/** * parse the result-set packet and extract the fields * * @param chunk list of mysql packets * @param fields empty array where the fields shall be stored in * * @return NULL if there is no resultset * pointer to the chunk after the fields (to the EOF packet) */ static GList *network_mysqld_result_parse_fields(GList *chunk, GPtrArray *fields) { GString *packet = chunk->data; guint8 field_count; guint i; /* * read(6, "\1\0\0\1", 4) = 4 * read(6, "\2", 1) = 1 * read(6, "6\0\0\2", 4) = 4 * read(6, "\3def\0\6STATUS\0\rVariable_name\rVariable_name\f\10\0P\0\0\0\375\1\0\0\0\0", 54) = 54 * read(6, "&\0\0\3", 4) = 4 * read(6, "\3def\0\6STATUS\0\5Value\5Value\f\10\0\0\2\0\0\375\1\0\0\0\0", 38) = 38 * read(6, "\5\0\0\4", 4) = 4 * read(6, "\376\0\0\"\0", 5) = 5 * read(6, "\23\0\0\5", 4) = 4 * read(6, "\17Aborted_clients\00298", 19) = 19 * */ g_assert(packet->len > NET_HEADER_SIZE); /* the first chunk is the length * */ if (packet->len != NET_HEADER_SIZE + 1) { /* * looks like this isn't a result-set * * read(6, "\1\0\0\1", 4) = 4 * read(6, "\2", 1) = 1 * * is expected. We might got called on a non-result, tell the user about it. */#if 0 g_debug("%s.%d: network_mysqld_result_parse_fields() got called on a non-resultset. "F_SIZE_T" != 5", __FILE__, __LINE__, packet->len);#endif return NULL; } field_count = packet->str[NET_HEADER_SIZE]; /* the byte after the net-header is the field-count */ /* the next chunk, the field-def */ for (i = 0; i < field_count; i++) { guint off = NET_HEADER_SIZE; MYSQL_FIELD *field; chunk = chunk->next; packet = chunk->data; field = network_mysqld_proto_field_init(); field->catalog = network_mysqld_proto_get_lenenc_string(packet, &off); field->db = network_mysqld_proto_get_lenenc_string(packet, &off); field->table = network_mysqld_proto_get_lenenc_string(packet, &off); field->org_table = network_mysqld_proto_get_lenenc_string(packet, &off); field->name = network_mysqld_proto_get_lenenc_string(packet, &off); field->org_name = network_mysqld_proto_get_lenenc_string(packet, &off); network_mysqld_proto_skip(packet, &off, 1); /* filler */ field->charsetnr = network_mysqld_proto_get_int16(packet, &off); field->length = network_mysqld_proto_get_int32(packet, &off); field->type = network_mysqld_proto_get_int8(packet, &off); field->flags = network_mysqld_proto_get_int16(packet, &off); field->decimals = network_mysqld_proto_get_int8(packet, &off); network_mysqld_proto_skip(packet, &off, 2); /* filler */ g_ptr_array_add(fields, field); } /* this should be EOF chunk */ chunk = chunk->next; packet = chunk->data; g_assert(packet->str[NET_HEADER_SIZE] == MYSQLD_PACKET_EOF); return chunk;}static void g_hash_table_reset_gstring(gpointer UNUSED_PARAM(_key), gpointer _value, gpointer UNUSED_PARAM(ser_data)) { GString *value = _value; g_string_truncate(value, 0);}/** * handle the events of a idling server connection in the pool * * make sure we know about connection close from the server side * - wait_timeout */static void network_mysqld_con_idle_handle(int event_fd, short events, void *user_data) { network_connection_pool_entry *pool_entry = user_data; network_connection_pool *pool = pool_entry->pool; if (events == EV_READ) { int b = -1; /** * @todo we have to handle the case that the server really sent use something * up to now we just ignore it */ if (ioctlsocket(event_fd, FIONREAD, &b)) { g_critical("ioctl(%d, FIONREAD, ...) failed: %s", event_fd, strerror(errno)); } else if (b != 0) { g_critical("ioctl(%d, FIONREAD, ...) said there is something to read, oops: %d", event_fd, b); } else { /* the server decided the close the connection (wait_timeout, crash, ... ) * * remove us from the connection pool and close the connection */ network_connection_pool_remove(pool, pool_entry); } }}/** * move the con->server into connection pool and disconnect the * proxy from its backend */static int proxy_connection_pool_add_connection(network_mysqld_con *con) { network_mysqld *srv = con->srv; network_connection_pool_entry *pool_entry = NULL; plugin_con_state *st = con->plugin_con_state; /* con-server is already disconnected, got out */ if (!con->server) return 0; /* the server connection is still authed */ con->server->is_authed = 1; /* insert the server socket into the connection pool */ pool_entry = network_connection_pool_add(st->backend->pool, con->server); event_set(&(con->server->event), con->server->fd, EV_READ, network_mysqld_con_idle_handle, pool_entry); event_base_set(srv->event_base, &(con->server->event)); event_add(&(con->server->event), NULL); st->backend->connected_clients--; st->backend = NULL; st->backend_ndx = -1; con->server = NULL; return 0;}/** * swap the server connection with a connection from * the connection pool * * we can only switch backends if we have a authed connection in the pool. * * @return NULL if swapping failed * the new backend on success */static network_socket *proxy_connection_pool_swap(network_mysqld_con *con, int backend_ndx) { backend_t *backend = NULL; network_socket *send_sock; plugin_con_state *st = con->plugin_con_state; plugin_srv_state *g = st->global_state; /* * we can only change to another backend if the backend is already * in the connection pool and connected */ if (backend_ndx < 0 || backend_ndx >= g->backend_pool->len) { /* backend_ndx is out of range */ return NULL; } backend = g->backend_pool->pdata[backend_ndx]; /** * get a connection from the pool which matches our basic requirements * - username has to match * - default_db should match */ #ifdef DEBUG_CONN_POOL g_debug("%s: (swap) check if we have a connection for this user in the pool '%s'", G_STRLOC, con->client->username->str);#endif if (NULL == (send_sock = network_connection_pool_get(backend->pool, con->client->username, con->client->default_db))) { /** * no connections in the pool */ st->backend_ndx = -1; return NULL; } /* the backend is up and cool, take and move the current backend into the pool */#ifdef DEBUG_CONN_POOL g_debug("%s: (swap) added the previous connection to the pool", G_STRLOC);#endif proxy_connection_pool_add_connection(con); /* connect to the new backend */ st->backend = backend; st->backend->connected_clients++; st->backend_ndx = backend_ndx; return send_sock;}#ifdef HAVE_LUA_H/** * load the lua script * * wraps luaL_loadfile and prints warnings when needed * * @see luaL_loadfile */lua_State *lua_load_script(lua_State *L, const gchar *name) { if (0 != luaL_loadfile(L, name)) { /* oops, an error, return it */ g_warning("luaL_loadfile(%s) failed", name); return L; } /** * pcall() needs the function on the stack * * as pcall() will pop the script from the stack when done, we have to * duplicate it here */ g_assert(lua_isfunction(L, -1)); return L;}/** * get the info connection pool * * @return nil or requested information */static int proxy_pool_queue_get(lua_State *L) { GQueue *queue = *(GQueue **)luaL_checkudata(L, 1, "proxy.backend.pool.queue"); const char *key = luaL_checkstring(L, 2); /** ... cur_idle */ if (0 == strcmp(key, "cur_idle_connections")) { lua_pushinteger(L, queue ? queue->length : 0); } else { lua_pushnil(L); } return 1;}/** * get the info connection pool * * @return nil or requested information */static int proxy_pool_users_get(lua_State *L) { network_connection_pool *pool = *(network_connection_pool **)luaL_checkudata(L, 1, "proxy.backend.pool.users"); const char *key = luaL_checkstring(L, 2); /** the username */ GString *s = g_string_new(key); GQueue **q_p = NULL; q_p = lua_newuserdata(L, sizeof(*q_p)); *q_p = network_connection_pool_get_conns(pool, s, NULL); g_string_free(s, TRUE); /* if the meta-table is new, add __index to it */ if (1 == luaL_newmetatable(L, "proxy.backend.pool.queue")) { lua_pushcfunction(L, proxy_pool_queue_get); /* (sp += 1) */ lua_setfield(L, -2, "__index"); /* (sp -= 1) */ } lua_setmetatable(L, -2); /* tie the metatable to the table (sp -= 1) */ return 1;}static int proxy_pool_get(lua_State *L) { network_connection_pool *pool = *(network_connection_pool **)luaL_checkudata(L, 1, "proxy.backend.pool"); const char *key = luaL_checkstring(L, 2); if (0 == strcmp(key, "max_idle_connections")) { lua_pushinteger(L, pool->max_idle_connections); } else if (0 == strcmp(key, "min_idle_connections")) { lua_pushinteger(L, pool->min_idle_connections); } else if (0 == strcmp(key, "users")) { network_connection_pool **pool_p; pool_p = lua_newuserdata(L, sizeof(*pool_p)); *pool_p = pool; /* if the meta-table is new, add __index to it */ if (1 == luaL_newmetatable(L, "proxy.backend.pool.users")) { lua_pushcfunction(L, proxy_pool_users_get); /* (sp += 1) */ lua_setfield(L, -2, "__index"); /* (sp -= 1) */ } lua_setmetatable(L, -2); /* tie the metatable to the table (sp -= 1) */ } else { lua_pushnil(L); } return 1;}static int proxy_pool_set(lua_State *L) { network_connection_pool *pool = *(network_connection_pool **)luaL_checkudata(L, 1, "proxy.backend.pool"); const char *key = luaL_checkstring(L, 2); if (0 == strcmp(key, "max_idle_connections")) { pool->max_idle_connections = lua_tointeger(L, -1); } else if (0 == strcmp(key, "min_idle_connections")) { pool->min_idle_connections = lua_tointeger(L, -1); } else { return luaL_error(L, "proxy.backend[...].%s is not writable", key); } return 0;}/** * get the info about a backend * * proxy.backend[0]. * connected_clients => clients using this backend * address => ip:port or unix-path of to the backend * state => int(BACKEND_STATE_UP|BACKEND_STATE_DOWN) * type => int(BACKEND_TYPE_RW|BACKEND_TYPE_RO) * * @return nil or requested information * @see backend_state_t backend_type_t */static int proxy_backend_get(lua_State *L) { backend_t *backend = *(backend_t **)luaL_checkudata(L, 1, "proxy.backend"); const char *key = luaL_checkstring(L, 2); if (0 == strcmp(key, "connected_clients")) { lua_pushinteger(L, backend->connected_clients); } else if (0 == strcmp(key, "address")) { lua_pushstring(L, backend->addr.str); } else if (0 == strcmp(key, "state")) { lua_pushinteger(L, backend->state); } else if (0 == strcmp(key, "type")) { lua_pushinteger(L, backend->type); } else if (0 == strcmp(key, "pool")) { network_connection_pool *pool; network_connection_pool **pool_p; pool_p = lua_newuserdata(L, sizeof(pool)); *pool_p = backend->pool; /* if the meta-table is new, add __index to it */ if (1 == luaL_newmetatable(L, "proxy.backend.pool")) { lua_pushcfunction(L, proxy_pool_get); /* (sp += 1) */ lua_setfield(L, -2, "__index"); /* (sp -= 1) */ lua_pushcfunction(L, proxy_pool_set); /* (sp += 1) */ lua_setfield(L, -2, "__newindex"); /* (sp -= 1) */ } lua_setmetatable(L, -2); /* tie the metatable to the table (sp -= 1) */ } else { lua_pushnil(L); } return 1;}/** * get proxy.backends[ndx] * * get the backend from the array of mysql backends. * * @return nil or the backend * @see proxy_backend_get */static int proxy_backends_get(lua_State *L) { plugin_con_state *st; backend_t *backend; backend_t **backend_p; network_mysqld_con *con = *(network_mysqld_con **)luaL_checkudata(L, 1, "proxy.backends"); int backend_ndx = luaL_checkinteger(L, 2) - 1; /** lua is indexes from 1, C from 0 */ st = con->plugin_con_state; if (backend_ndx < 0 || backend_ndx >= st->global_state->backend_pool->len) { lua_pushnil(L); return 1; } backend = st->global_state->backend_pool->pdata[backend_ndx]; backend_p = lua_newuserdata(L, sizeof(backend)); /* the table underneat proxy.backends[ndx] */ *backend_p = backend; /* if the meta-table is new, add __index to it */ if (1 == luaL_newmetatable(L, "proxy.backend")) { lua_pushcfunction(L, proxy_backend_get); /* (sp += 1) */ lua_setfield(L, -2, "__index"); /* (sp -= 1) */ } lua_setmetatable(L, -2); /* tie the metatable to the table (sp -= 1) */ return 1;}static int proxy_backends_len(lua_State *L) { network_mysqld_con *con = *(network_mysqld_con **)luaL_checkudata(L, 1, "proxy.backends"); plugin_con_state *st; st = con->plugin_con_state; lua_pushinteger(L, st->global_state->backend_pool->len); return 1;}static int proxy_socket_get(lua_State *L) { network_socket *sock = *(network_socket **)luaL_checkudata(L, 1, "proxy.socket"); const char *key = luaL_checkstring(L, 2); /** * we to split it in .client and .server here */ if (0 == strcmp(key, "default_db")) { lua_pushlstring(L, sock->default_db->str, sock->default_db->len); } else if (0 == strcmp(key, "username")) { lua_pushlstring(L, sock->username->str, sock->username->len); } else if (0 == strcmp(key, "address")) { lua_pushstring(L, sock->addr.str); } else if (0 == strcmp(key, "scrambled_password")) { lua_pushlstring(L, sock->scrambled_password->str, sock->scrambled_password->len); } else if (sock->mysqld_version) { /* only the server-side has mysqld_version set */ if (0 == strcmp(key, "mysqld_version")) { lua_pushinteger(L, sock->mysqld_version); } else if (0 == strcmp(key, "thread_id")) { lua_pushinteger(L, sock->thread_id); } else if (0 == strcmp(key, "scramble_buffer")) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -