📄 network-mysqld.c
字号:
} else { con->parse.command = s->str[4]; if (con->parse.len == PACKET_LEN_MAX) { con->is_overlong_packet = 1; } /* init the parser for the commands */ switch (con->parse.command) { case COM_QUERY: case COM_STMT_EXECUTE: con->parse.state.query = PARSE_COM_QUERY_INIT; break; case COM_STMT_PREPARE: con->parse.state.prepare.first_packet = 1; break; case COM_INIT_DB: if (s->str[NET_HEADER_SIZE] == COM_INIT_DB && (s->len > NET_HEADER_SIZE + 1)) { con->parse.state.init_db.db_name = g_string_new(NULL); g_string_truncate(con->parse.state.init_db.db_name, 0); g_string_append_len(con->parse.state.init_db.db_name, s->str + NET_HEADER_SIZE + 1, s->len - NET_HEADER_SIZE - 1); } else { con->parse.state.init_db.db_name = NULL; } break; default: break; } } } switch (network_mysqld_write_len(srv, con->server, 1)) { case RET_SUCCESS: break; case RET_WAIT_FOR_EVENT: WAIT_FOR_EVENT(con->server, EV_WRITE, NULL); return; case RET_ERROR_RETRY: case RET_ERROR: g_debug("%s.%d: network_mysqld_write(CON_STATE_SEND_QUERY) returned an error", __FILE__, __LINE__); /** * write() failed, close the connections */ con->state = CON_STATE_ERROR; break; } if (con->is_overlong_packet) { con->state = CON_STATE_READ_QUERY; break; } /* some statements don't have a server response */ switch (con->parse.command) { case COM_STMT_SEND_LONG_DATA: /* not acked */ case COM_STMT_CLOSE: con->state = CON_STATE_READ_QUERY; break; case COM_QUERY: if (con->parse.state.query == PARSE_COM_QUERY_LOAD_DATA) { con->state = CON_STATE_READ_QUERY; } else { con->state = CON_STATE_READ_QUERY_RESULT; } break; default: con->state = CON_STATE_READ_QUERY_RESULT; break; } break; case CON_STATE_READ_QUERY_RESULT: do { network_socket *recv_sock; recv_sock = con->server; g_assert(events == 0 || event_fd == recv_sock->fd); switch (network_mysqld_read(srv, recv_sock)) { case RET_SUCCESS: break; case RET_WAIT_FOR_EVENT: WAIT_FOR_EVENT(con->server, EV_READ, NULL); return; case RET_ERROR_RETRY: case RET_ERROR: g_error("%s.%d: network_mysqld_read(CON_STATE_READ_QUERY_RESULT) returned an error", __FILE__, __LINE__); return; } switch (plugin_call(srv, con, con->state)) { case RET_SUCCESS: break; default: g_error("%s.%d: ...", __FILE__, __LINE__); break; } } while (con->state == CON_STATE_READ_QUERY_RESULT); if (con->parse.command == COM_INIT_DB) { if (con->parse.state.init_db.db_name) { g_string_free(con->parse.state.init_db.db_name, TRUE); con->parse.state.init_db.db_name = NULL; } } break; case CON_STATE_SEND_QUERY_RESULT: /** * send the query result-set to the client */ switch (network_mysqld_write(srv, con->client)) { case RET_SUCCESS: break; case RET_WAIT_FOR_EVENT: WAIT_FOR_EVENT(con->client, EV_WRITE, NULL); return; case RET_ERROR_RETRY: case RET_ERROR: /** * client is gone away * * close the connection and clean up */ con->state = CON_STATE_ERROR; break; } /* if the write failed, don't call the plugin handlers */ if (con->state != CON_STATE_SEND_QUERY_RESULT) break; switch (plugin_call(srv, con, con->state)) { case RET_SUCCESS: break; default: g_error("%s.%d: ...", __FILE__, __LINE__); break; } break; case CON_STATE_SEND_ERROR: /** * send error to the client * and close the connections afterwards * */ switch (network_mysqld_write(srv, con->client)) { case RET_SUCCESS: break; case RET_WAIT_FOR_EVENT: WAIT_FOR_EVENT(con->client, EV_WRITE, NULL); return; case RET_ERROR_RETRY: case RET_ERROR: g_critical("%s.%d: network_mysqld_write(CON_STATE_SEND_ERROR) returned an error", __FILE__, __LINE__); con->state = CON_STATE_ERROR; break; } con->state = CON_STATE_ERROR; break; } event_fd = -1; events = 0; } while (ostate != con->state); return;}/** * accept a connection * * event handler for listening connections * * @param event_fd fd on which the event was fired * @param events the event that was fired * @param user_data the listening connection handle * */void network_mysqld_con_accept(int event_fd, short events, void *user_data) { network_mysqld_con *con = user_data; network_mysqld_con *client_con; socklen_t addr_len; struct sockaddr_in ipv4; int fd; g_assert(events == EV_READ); g_assert(con->server); addr_len = sizeof(struct sockaddr_in); if (-1 == (fd = accept(event_fd, (struct sockaddr *)&ipv4, &addr_len))) { return ; } network_mysqld_con_set_non_blocking(fd); /* looks like we open a client connection */ client_con = network_mysqld_con_init(con->srv); client_con->client = network_socket_init(); client_con->client->addr.addr.ipv4 = ipv4; client_con->client->addr.len = addr_len; client_con->client->fd = fd; /* resolve the peer-addr if necessary */ if (!client_con->client->addr.str) { switch (client_con->client->addr.addr.common.sa_family) { case AF_INET: client_con->client->addr.str = g_strdup_printf("%s:%d", inet_ntoa(client_con->client->addr.addr.ipv4.sin_addr), client_con->client->addr.addr.ipv4.sin_port); break; default: g_message("%s.%d: can't convert addr-type %d into a string", __FILE__, __LINE__, client_con->client->addr.addr.common.sa_family); break; } } /* copy the config * * @todo replace network-type by a function pointer for the init * */ client_con->config = con->config; client_con->config.network_type = con->config.network_type; switch (con->config.network_type) { case NETWORK_TYPE_SERVER: network_mysqld_server_connection_init(client_con); break; case NETWORK_TYPE_PROXY: network_mysqld_proxy_connection_init(client_con); break; default: g_error("%s.%d", __FILE__, __LINE__); break; } network_mysqld_con_handle(-1, 0, client_con); return;}/** * timeout handler for the event-loop */static void handle_timeout() { if (!agent_shutdown) return; /* we have to shutdown, disable all events to leave the dispatch */}void *network_mysqld_thread(void *_srv) { network_mysqld *srv = _srv; network_mysqld_con *proxy_con = NULL, *admin_con = NULL;#ifdef _WIN32 WORD wVersionRequested; WSADATA wsaData; int err; wVersionRequested = MAKEWORD( 2, 2 ); err = WSAStartup( wVersionRequested, &wsaData ); if ( err != 0 ) { /* Tell the user that we could not find a usable */ /* WinSock DLL. */ return NULL; }#endif /* setup the different handlers */ if (srv->config.admin.address) { network_mysqld_con *con = NULL; con = network_mysqld_con_init(srv); con->config = srv->config; con->config.network_type = NETWORK_TYPE_SERVER; con->server = network_socket_init(); if (0 != network_mysqld_server_init(con)) { g_critical("%s.%d: network_mysqld_server_init() failed", __FILE__, __LINE__); return NULL; } /* keep the listen socket alive */ event_set(&(con->server->event), con->server->fd, EV_READ|EV_PERSIST, network_mysqld_con_accept, con); event_base_set(srv->event_base, &(con->server->event)); event_add(&(con->server->event), NULL); admin_con = con; } if (srv->config.proxy.address) { network_mysqld_con *con = NULL; con = network_mysqld_con_init(srv); con->config = srv->config; con->config.network_type = NETWORK_TYPE_PROXY; con->server = network_socket_init(); if (0 != network_mysqld_proxy_init(con)) { g_critical("%s.%d: network_mysqld_server_init() failed", __FILE__, __LINE__); return NULL; } /* keep the listen socket alive */ event_set(&(con->server->event), con->server->fd, EV_READ|EV_PERSIST, network_mysqld_con_accept, con); event_base_set(srv->event_base, &(con->server->event)); event_add(&(con->server->event), NULL); proxy_con = con; } /** * check once a second if we shall shutdown the proxy */ while (!agent_shutdown) { struct timeval timeout; int r; timeout.tv_sec = 1; timeout.tv_usec = 0; g_assert(event_base_loopexit(srv->event_base, &timeout) == 0); r = event_base_dispatch(srv->event_base); if (r == -1) { if (errno == EINTR) continue; break; } } /** * cleanup * */ if (proxy_con) { /** * we still might have connections pointing to the close scope */ event_del(&(proxy_con->server->event)); network_mysqld_con_free(proxy_con); } if (admin_con) { event_del(&(admin_con->server->event)); network_mysqld_con_free(admin_con); } return NULL;}/** * @todo move to network_mysqld_proto */int network_mysqld_con_send_resultset(network_socket *con, GPtrArray *fields, GPtrArray *rows) { GString *s; gsize i, j; g_assert(fields->len > 0 && fields->len < 251); s = g_string_new(NULL); /* - len = 99 * \1\0\0\1 * \1 - one field * \'\0\0\2 * \3def * \0 * \0 * \0 * \21@@version_comment * \0 - org-name * \f - filler * \10\0 - charset * \34\0\0\0 - length * \375 - type * \1\0 - flags * \37 - decimals * \0\0 - filler * \5\0\0\3 * \376\0\0\2\0 * \35\0\0\4 * \34MySQL Community Server (GPL) * \5\0\0\5 * \376\0\0\2\0 */ g_string_append_c(s, fields->len); /* the field-count */ network_queue_append(con->send_queue, s->str, s->len, con->packet_id++); for (i = 0; i < fields->len; i++) { MYSQL_FIELD *field = fields->pdata[i]; g_string_truncate(s, 0); network_mysqld_proto_append_lenenc_string(s, field->catalog ? field->catalog : "def"); /* catalog */ network_mysqld_proto_append_lenenc_string(s, field->db ? field->db : ""); /* database */ network_mysqld_proto_append_lenenc_string(s, field->table ? field->table : ""); /* table */ network_mysqld_proto_append_lenenc_string(s, field->org_table ? field->org_table : ""); /* org_table */ network_mysqld_proto_append_lenenc_string(s, field->name ? field->name : ""); /* name */ network_mysqld_proto_append_lenenc_string(s, field->org_name ? field->org_name : ""); /* org_name */ g_string_append_c(s, '\x0c'); /* length of the following block, 12 byte */ g_string_append_len(s, "\x08\x00", 2); /* charset */ g_string_append_c(s, (field->length >> 0) & 0xff); /* len */ g_string_append_c(s, (field->length >> 8) & 0xff); /* len */ g_string_append_c(s, (field->length >> 16) & 0xff); /* len */ g_string_append_c(s, (field->length >> 24) & 0xff); /* len */ g_string_append_c(s, field->type); /* type */ g_string_append_c(s, field->flags & 0xff); /* flags */ g_string_append_c(s, (field->flags >> 8) & 0xff); /* flags */ g_string_append_c(s, 0); /* decimals */ g_string_append_len(s, "\x00\x00", 2); /* filler */#if 0 /* this is in the docs, but not on the network */ network_mysqld_proto_append_lenenc_string(s, field->def); /* default-value */#endif network_queue_append(con->send_queue, s->str, s->len, con->packet_id++); } g_string_truncate(s, 0); /* EOF */ g_string_append_len(s, "\xfe", 1); /* EOF */ g_string_append_len(s, "\x00\x00", 2); /* warning count */ g_string_append_len(s, "\x02\x00", 2); /* flags */ network_queue_append(con->send_queue, s->str, s->len, con->packet_id++); for (i = 0; i < rows->len; i++) { GPtrArray *row = rows->pdata[i]; g_string_truncate(s, 0); for (j = 0; j < row->len; j++) { network_mysqld_proto_append_lenenc_string(s, row->pdata[j]); } network_queue_append(con->send_queue, s->str, s->len, con->packet_id++); } g_string_truncate(s, 0); /* EOF */ g_string_append_len(s, "\xfe", 1); /* EOF */ g_string_append_len(s, "\x00\x00", 2); /* warning count */ g_string_append_len(s, "\x02\x00", 2); /* flags */ network_queue_append(con->send_queue, s->str, s->len, con->packet_id++); g_string_free(s, TRUE); return 0;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -