⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 network-mysqld.c

📁 Mysql Proxy本身是个很好的mysql负载均衡工具,但是其本身有bug:当多个mysql 做slave的时候,如果一个slave死掉,会影响别的slave也死掉!这个文件修复了这个bug!
💻 C
📖 第 1 页 / 共 4 页
字号:
					} else {					con->parse.command = s->str[4];						if (con->parse.len == PACKET_LEN_MAX) {						con->is_overlong_packet = 1;					}							/* init the parser for the commands */					switch (con->parse.command) {					case COM_QUERY:					case COM_STMT_EXECUTE:						con->parse.state.query = PARSE_COM_QUERY_INIT;						break;					case COM_STMT_PREPARE:						con->parse.state.prepare.first_packet = 1;						break;					case COM_INIT_DB:						if (s->str[NET_HEADER_SIZE] == COM_INIT_DB && 						    (s->len > NET_HEADER_SIZE + 1)) {							con->parse.state.init_db.db_name = g_string_new(NULL);											g_string_truncate(con->parse.state.init_db.db_name, 0);							g_string_append_len(con->parse.state.init_db.db_name, 									s->str + NET_HEADER_SIZE + 1, 									s->len - NET_HEADER_SIZE - 1);						} else {							con->parse.state.init_db.db_name = NULL;						}						break;					default:						break;					}				}			}				switch (network_mysqld_write_len(srv, con->server, 1)) {			case RET_SUCCESS:				break;			case RET_WAIT_FOR_EVENT:				WAIT_FOR_EVENT(con->server, EV_WRITE, NULL);				return;			case RET_ERROR_RETRY:			case RET_ERROR:				g_debug("%s.%d: network_mysqld_write(CON_STATE_SEND_QUERY) returned an error", __FILE__, __LINE__);				/**				 * write() failed, close the connections 				 */				con->state = CON_STATE_ERROR;				break;			}			if (con->is_overlong_packet) {				con->state = CON_STATE_READ_QUERY;				break;			}			/* some statements don't have a server response */			switch (con->parse.command) {			case COM_STMT_SEND_LONG_DATA: /* not acked */			case COM_STMT_CLOSE:				con->state = CON_STATE_READ_QUERY;				break;			case COM_QUERY:				if (con->parse.state.query == PARSE_COM_QUERY_LOAD_DATA) {					con->state = CON_STATE_READ_QUERY;				} else {					con->state = CON_STATE_READ_QUERY_RESULT;				}				break;			default:				con->state = CON_STATE_READ_QUERY_RESULT;				break;			}							break; 		case CON_STATE_READ_QUERY_RESULT: 			do {				network_socket *recv_sock;				recv_sock = con->server;				g_assert(events == 0 || event_fd == recv_sock->fd);				switch (network_mysqld_read(srv, recv_sock)) {				case RET_SUCCESS:					break;				case RET_WAIT_FOR_EVENT:					WAIT_FOR_EVENT(con->server, EV_READ, NULL);					return;				case RET_ERROR_RETRY:				case RET_ERROR:					g_error("%s.%d: network_mysqld_read(CON_STATE_READ_QUERY_RESULT) returned an error", __FILE__, __LINE__);					return;				}				switch (plugin_call(srv, con, con->state)) {				case RET_SUCCESS:					break;				default:					g_error("%s.%d: ...", __FILE__, __LINE__);					break;				}			} while (con->state == CON_STATE_READ_QUERY_RESULT);			if (con->parse.command == COM_INIT_DB) {				if (con->parse.state.init_db.db_name) {					g_string_free(con->parse.state.init_db.db_name, TRUE);					con->parse.state.init_db.db_name = NULL;				}			}				break; 		case CON_STATE_SEND_QUERY_RESULT:			/**			 * send the query result-set to the client */			switch (network_mysqld_write(srv, con->client)) {			case RET_SUCCESS:				break;			case RET_WAIT_FOR_EVENT:				WAIT_FOR_EVENT(con->client, EV_WRITE, NULL);				return;			case RET_ERROR_RETRY:			case RET_ERROR:				/**				 * client is gone away				 *				 * close the connection and clean up				 */				con->state = CON_STATE_ERROR;				break;			}			/* if the write failed, don't call the plugin handlers */			if (con->state != CON_STATE_SEND_QUERY_RESULT) break;			switch (plugin_call(srv, con, con->state)) {			case RET_SUCCESS:				break;			default:				g_error("%s.%d: ...", __FILE__, __LINE__);				break;			}			break;		case CON_STATE_SEND_ERROR:			/**			 * send error to the client			 * and close the connections afterwards			 *  */			switch (network_mysqld_write(srv, con->client)) {			case RET_SUCCESS:				break;			case RET_WAIT_FOR_EVENT:				WAIT_FOR_EVENT(con->client, EV_WRITE, NULL);				return;			case RET_ERROR_RETRY:			case RET_ERROR:				g_critical("%s.%d: network_mysqld_write(CON_STATE_SEND_ERROR) returned an error", __FILE__, __LINE__);				con->state = CON_STATE_ERROR;				break;			}							con->state = CON_STATE_ERROR;			break;		}		event_fd = -1;		events   = 0;	} while (ostate != con->state);	return;}/** * accept a connection * * event handler for listening connections * * @param event_fd     fd on which the event was fired * @param events       the event that was fired * @param user_data    the listening connection handle *  */void network_mysqld_con_accept(int event_fd, short events, void *user_data) {	network_mysqld_con *con = user_data;	network_mysqld_con *client_con;	socklen_t addr_len;	struct sockaddr_in ipv4;	int fd;	g_assert(events == EV_READ);	g_assert(con->server);	addr_len = sizeof(struct sockaddr_in);	if (-1 == (fd = accept(event_fd, (struct sockaddr *)&ipv4, &addr_len))) {		return ;	}	network_mysqld_con_set_non_blocking(fd);	/* looks like we open a client connection */	client_con = network_mysqld_con_init(con->srv);	client_con->client = network_socket_init();	client_con->client->addr.addr.ipv4 = ipv4;	client_con->client->addr.len = addr_len;	client_con->client->fd   = fd;	/* resolve the peer-addr if necessary */	if (!client_con->client->addr.str) {		switch (client_con->client->addr.addr.common.sa_family) {		case AF_INET:			client_con->client->addr.str = g_strdup_printf("%s:%d", 					inet_ntoa(client_con->client->addr.addr.ipv4.sin_addr),					client_con->client->addr.addr.ipv4.sin_port);			break;		default:			g_message("%s.%d: can't convert addr-type %d into a string", 					 __FILE__, __LINE__, 					 client_con->client->addr.addr.common.sa_family);			break;		}	}	/* copy the config	 *	 * @todo replace network-type by a function pointer for the init 	 *	 */	client_con->config = con->config;	client_con->config.network_type = con->config.network_type;	switch (con->config.network_type) {	case NETWORK_TYPE_SERVER:		network_mysqld_server_connection_init(client_con);		break;	case NETWORK_TYPE_PROXY:		network_mysqld_proxy_connection_init(client_con);		break;	default:		g_error("%s.%d", __FILE__, __LINE__);		break;	}	network_mysqld_con_handle(-1, 0, client_con);	return;}/** * timeout handler for the event-loop  */static void handle_timeout() {	if (!agent_shutdown) return;	/* we have to shutdown, disable all events to leave the dispatch */}void *network_mysqld_thread(void *_srv) {	network_mysqld *srv = _srv;	network_mysqld_con *proxy_con = NULL, *admin_con = NULL;#ifdef _WIN32	WORD wVersionRequested;	WSADATA wsaData;	int err;	wVersionRequested = MAKEWORD( 2, 2 );	err = WSAStartup( wVersionRequested, &wsaData );	if ( err != 0 ) {		/* Tell the user that we could not find a usable */		/* WinSock DLL.                                  */		return NULL;	}#endif	/* setup the different handlers */	if (srv->config.admin.address) {		network_mysqld_con *con = NULL;		con = network_mysqld_con_init(srv);		con->config = srv->config;		con->config.network_type = NETWORK_TYPE_SERVER;				con->server = network_socket_init();		if (0 != network_mysqld_server_init(con)) {			g_critical("%s.%d: network_mysqld_server_init() failed", __FILE__, __LINE__);			return NULL;		}		/* keep the listen socket alive */		event_set(&(con->server->event), con->server->fd, EV_READ|EV_PERSIST, network_mysqld_con_accept, con);		event_base_set(srv->event_base, &(con->server->event));		event_add(&(con->server->event), NULL);				admin_con = con;	}	if (srv->config.proxy.address) {		network_mysqld_con *con = NULL;		con = network_mysqld_con_init(srv);		con->config = srv->config;		con->config.network_type = NETWORK_TYPE_PROXY;				con->server = network_socket_init();		if (0 != network_mysqld_proxy_init(con)) {			g_critical("%s.%d: network_mysqld_server_init() failed", __FILE__, __LINE__);			return NULL;		}			/* keep the listen socket alive */		event_set(&(con->server->event), con->server->fd, EV_READ|EV_PERSIST, network_mysqld_con_accept, con);		event_base_set(srv->event_base, &(con->server->event));		event_add(&(con->server->event), NULL);		proxy_con = con;	}	/**	 * check once a second if we shall shutdown the proxy	 */	while (!agent_shutdown) {		struct timeval timeout;		int r;		timeout.tv_sec = 1;		timeout.tv_usec = 0;		g_assert(event_base_loopexit(srv->event_base, &timeout) == 0);		r = event_base_dispatch(srv->event_base);		if (r == -1) {			if (errno == EINTR) continue;			break;		}	}	/**	 * cleanup	 *	 */	if (proxy_con) {		/**		 * we still might have connections pointing to the close scope */		event_del(&(proxy_con->server->event));		network_mysqld_con_free(proxy_con);	}		if (admin_con) {		event_del(&(admin_con->server->event));		network_mysqld_con_free(admin_con);	}	return NULL;}/** * @todo move to network_mysqld_proto */int network_mysqld_con_send_resultset(network_socket *con, GPtrArray *fields, GPtrArray *rows) {	GString *s;	gsize i, j;	g_assert(fields->len > 0 && fields->len < 251);	s = g_string_new(NULL);	/* - len = 99	 *  \1\0\0\1 	 *    \1 - one field	 *  \'\0\0\2 	 *    \3def 	 *    \0 	 *    \0 	 *    \0 	 *    \21@@version_comment 	 *    \0            - org-name	 *    \f            - filler	 *    \10\0         - charset	 *    \34\0\0\0     - length	 *    \375          - type 	 *    \1\0          - flags	 *    \37           - decimals	 *    \0\0          - filler 	 *  \5\0\0\3 	 *    \376\0\0\2\0	 *  \35\0\0\4	 *    \34MySQL Community Server (GPL)	 *  \5\0\0\5	 *    \376\0\0\2\0	 */	g_string_append_c(s, fields->len); /* the field-count */	network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);	for (i = 0; i < fields->len; i++) {		MYSQL_FIELD *field = fields->pdata[i];				g_string_truncate(s, 0);		network_mysqld_proto_append_lenenc_string(s, field->catalog ? field->catalog : "def");   /* catalog */		network_mysqld_proto_append_lenenc_string(s, field->db ? field->db : "");                /* database */		network_mysqld_proto_append_lenenc_string(s, field->table ? field->table : "");          /* table */		network_mysqld_proto_append_lenenc_string(s, field->org_table ? field->org_table : "");  /* org_table */		network_mysqld_proto_append_lenenc_string(s, field->name ? field->name : "");            /* name */		network_mysqld_proto_append_lenenc_string(s, field->org_name ? field->org_name : "");    /* org_name */		g_string_append_c(s, '\x0c');                  /* length of the following block, 12 byte */		g_string_append_len(s, "\x08\x00", 2);         /* charset */		g_string_append_c(s, (field->length >> 0) & 0xff); /* len */		g_string_append_c(s, (field->length >> 8) & 0xff); /* len */		g_string_append_c(s, (field->length >> 16) & 0xff); /* len */		g_string_append_c(s, (field->length >> 24) & 0xff); /* len */		g_string_append_c(s, field->type);             /* type */		g_string_append_c(s, field->flags & 0xff);     /* flags */		g_string_append_c(s, (field->flags >> 8) & 0xff); /* flags */		g_string_append_c(s, 0);                       /* decimals */		g_string_append_len(s, "\x00\x00", 2);         /* filler */#if 0		/* this is in the docs, but not on the network */		network_mysqld_proto_append_lenenc_string(s, field->def);         /* default-value */#endif		network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);	}	g_string_truncate(s, 0);		/* EOF */		g_string_append_len(s, "\xfe", 1); /* EOF */	g_string_append_len(s, "\x00\x00", 2); /* warning count */	g_string_append_len(s, "\x02\x00", 2); /* flags */		network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);	for (i = 0; i < rows->len; i++) {		GPtrArray *row = rows->pdata[i];		g_string_truncate(s, 0);		for (j = 0; j < row->len; j++) {			network_mysqld_proto_append_lenenc_string(s, row->pdata[j]);		}		network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);	}	g_string_truncate(s, 0);	/* EOF */		g_string_append_len(s, "\xfe", 1); /* EOF */	g_string_append_len(s, "\x00\x00", 2); /* warning count */	g_string_append_len(s, "\x02\x00", 2); /* flags */	network_queue_append(con->send_queue, s->str, s->len, con->packet_id++);	g_string_free(s, TRUE);	return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -