⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 network-mysqld.c

📁 Mysql Proxy本身是个很好的mysql负载均衡工具,但是其本身有bug:当多个mysql 做slave的时候,如果一个slave死掉,会影响别的slave也死掉!这个文件修复了这个bug!
💻 C
📖 第 1 页 / 共 4 页
字号:
		return -1;	}	/**	 * make the connect() call non-blocking	 *	 */	network_mysqld_con_set_non_blocking(con->fd);	if (-1 == connect(con->fd, (struct sockaddr *) &(con->addr.addr), con->addr.len)) {		/**		 * in most TCP cases we connect() will return with 		 * EINPROGRESS ... 3-way handshake		 */#ifdef _WIN32		errno = WSAGetLastError();#endif		switch (errno) {		case E_NET_INPROGRESS:                case E_NET_WOULDBLOCK:			return -2;		default:			g_critical("%s.%d: connect(%s) failed: %s (%d)", 					__FILE__, __LINE__,					con->addr.str,					strerror(errno), errno);			return -1;		}	}	/**	 * set the same options as the mysql client 	 */#ifdef IP_TOS	val = 8;	setsockopt(con->fd, IPPROTO_IP,     IP_TOS, &val, sizeof(val));#endif	val = 1;	setsockopt(con->fd, IPPROTO_TCP,    TCP_NODELAY, &val, sizeof(val) );	val = 1;	setsockopt(con->fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val) );	return 0;}/** * connect a socket * * the con->addr has to be set before  *  * @param con    a socket  * @return       0 on connected, -1 on error, -2 for try again * @see network_mysqld_set_address() */int network_mysqld_con_bind(network_socket * con) {	int val = 1;	g_assert(con->addr.len);	if (-1 == (con->fd = socket(con->addr.addr.ipv4.sin_family, SOCK_STREAM, 0))) {#ifdef _WIN32		errno = WSAGetLastError();#endif		g_critical("%s.%d: socket(%s) failed: %s", 				__FILE__, __LINE__,				con->addr.str, strerror(errno));		return -1;	}	setsockopt(con->fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));	setsockopt(con->fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));	if (-1 == bind(con->fd, (struct sockaddr *) &(con->addr.addr), con->addr.len)) {#ifdef _WIN32		errno = WSAGetLastError();#endif		g_critical("%s.%d: bind(%s) failed: %s", 				__FILE__, __LINE__,				con->addr.str,				strerror(errno));		return -1;	}	if (-1 == listen(con->fd, 8)) {#ifdef _WIN32		errno = WSAGetLastError();#endif		g_critical("%s.%d: listen() failed: %s",				__FILE__, __LINE__,				strerror(errno));		return -1;	}	return 0;}#if 0 static void dump_str(const char *msg, const unsigned char *s, size_t len) {	GString *hex;	size_t i;		       	hex = g_string_new(NULL);	for (i = 0; i < len; i++) {		g_string_append_printf(hex, "%02x", s[i]);		if ((i + 1) % 16 == 0) {			g_string_append(hex, "\n");		} else {			g_string_append_c(hex, ' ');		}	}	g_message("(%s): %s", msg, hex->str);	g_string_free(hex, TRUE);}#endif/** * create a OK packet and append it to the send-queue * * @param con             a client socket  * @param affected_rows   affected rows  * @param insert_id       insert_id  * @param server_status   server_status (bitfield of SERVER_STATUS_*)  * @param warnings        number of warnings to fetch with SHOW WARNINGS  * @return 0 * * @todo move to network_mysqld_proto */int network_mysqld_con_send_ok_full(network_socket *con, guint64 affected_rows, guint64 insert_id, guint16 server_status, guint16 warnings ) {	GString *packet = g_string_new(NULL);	network_mysqld_proto_append_ok_packet(packet, affected_rows, insert_id, server_status, warnings);		network_queue_append(con->send_queue, packet->str, packet->len, con->packet_id);	g_string_free(packet, TRUE);	return 0;}/** * send a simple OK packet * * - no affected rows * - no insert-id * - AUTOCOMMIT * - no warnings * * @param con             a client socket  */int network_mysqld_con_send_ok(network_socket *con) {	return network_mysqld_con_send_ok_full(con, 0, 0, SERVER_STATUS_AUTOCOMMIT, 0);}/** * send a error packet to the client connection * * @note the sqlstate has to match the SQL standard. If no matching SQL state is known, leave it at NULL * * @param con         the client connection * @param errmsg      the error message * @param errmsg_len  byte-len of the error-message * @param errorcode   mysql error-code we want to send * @param sqlstate    if none-NULL, 5-char SQL state to send, if NULL, default SQL state is used * * @return 0 on success */int network_mysqld_con_send_error_full(network_socket *con, const char *errmsg, gsize errmsg_len, guint errorcode, const gchar *sqlstate) {	GString *packet;		packet = g_string_sized_new(10 + errmsg_len);	network_mysqld_proto_append_error_packet(packet, errmsg, errmsg_len, errorcode, sqlstate);	network_queue_append(con->send_queue, packet->str, packet->len, con->packet_id);	g_string_free(packet, TRUE);	return 0;}/** * send a error-packet to the client connection * * errorcode is 1000, sqlstate is NULL * * @param con         the client connection * @param errmsg      the error message * @param errmsg_len  byte-len of the error-message * * @see network_mysqld_con_send_error_full */int network_mysqld_con_send_error(network_socket *con, const char *errmsg, gsize errmsg_len) {	return network_mysqld_con_send_error_full(con, errmsg, errmsg_len, ER_UNKNOWN_ERROR, NULL);}/** * read a data from the socket * */retval_t network_mysqld_read_raw(network_mysqld *UNUSED_PARAM(srv), network_socket *con, GString *dest, size_t we_want) {	gssize len;	/**	 * nothing to read, let's get out of here 	 */	if (we_want - dest->len == 0) {		return RET_SUCCESS;	}	if (-1 == (len = recv(con->fd, dest->str + dest->len, we_want - dest->len, 0))) {#ifdef _WIN32		errno = WSAGetLastError();#endif		switch (errno) {		case E_NET_CONNABORTED:		case E_NET_CONNRESET: /** nothing to read, let's let ioctl() handle the close for us */		case E_NET_WOULDBLOCK: /** the buffers are empty, try again later */		case EAGAIN:     			return RET_WAIT_FOR_EVENT;		default:			g_debug("%s: recv() failed: %s (errno=%d)", G_STRLOC, strerror(errno), errno);			return RET_ERROR;		}	} else if (len == 0) {		/**		 * connection close		 *		 * let's call the ioctl() and let it handle it for use		 */		return RET_WAIT_FOR_EVENT;	}	dest->len += len;	dest->str[dest->len] = '\0';	if (dest->len < we_want) {		/* we don't have enough */		return RET_WAIT_FOR_EVENT;	}	return RET_SUCCESS;}/** * read a MySQL packet from the socket * * the packet is added to the con->recv_queue and contains a full mysql packet * with packet-header and everything  */retval_t network_mysqld_read(network_mysqld *srv, network_socket *con) {	GString *packet = NULL;	/** 	 * read the packet header (4 bytes)	 */	if (con->packet_len == PACKET_LEN_UNSET) {		switch (network_mysqld_read_raw(srv, con, con->header, NET_HEADER_SIZE)) {		case RET_WAIT_FOR_EVENT:			return RET_WAIT_FOR_EVENT;		case RET_ERROR:			return RET_ERROR;		case RET_SUCCESS:			break;		case RET_ERROR_RETRY:			g_error("RET_ERROR_RETRY wasn't expected");			break;		}		con->packet_len = network_mysqld_proto_get_header((unsigned char *)(con->header->str));		con->packet_id  = (unsigned char)(con->header->str[3]); /* packet-id if the next packet */		packet = g_string_sized_new(con->packet_len + NET_HEADER_SIZE + 1); /* we need some space for the \0 */		g_string_append_len(packet, con->header->str, NET_HEADER_SIZE); /* copy the header */		network_queue_append_chunk(con->recv_queue, packet);		g_string_truncate(con->header, 0);	} else {		packet = con->recv_queue->chunks->tail->data;	}	g_assert(packet->allocated_len >= con->packet_len + NET_HEADER_SIZE);	switch (network_mysqld_read_raw(srv, con, packet, con->packet_len + NET_HEADER_SIZE)) {	case RET_WAIT_FOR_EVENT:		return RET_WAIT_FOR_EVENT;	case RET_ERROR:		return RET_ERROR;	case RET_SUCCESS:		break;	case RET_ERROR_RETRY:		g_error("RET_ERROR_RETRY wasn't expected");		break;	}	return RET_SUCCESS;}/** * write data to the socket * */retval_t network_mysqld_write_len(network_mysqld *UNUSED_PARAM(srv), network_socket *con, int send_chunks) {	/* send the whole queue */	GList *chunk;	if (send_chunks == 0) return RET_SUCCESS;	for (chunk = con->send_queue->chunks->head; chunk; ) {		GString *s = chunk->data;		gssize len;		g_assert(con->send_queue->offset < s->len);		if (-1 == (len = send(con->fd, s->str + con->send_queue->offset, s->len - con->send_queue->offset, 0))) {#ifdef _WIN32			errno = WSAGetLastError();#endif			switch (errno) {			case E_NET_WOULDBLOCK:			case EAGAIN:				return RET_WAIT_FOR_EVENT;			case EPIPE:			case E_NET_CONNRESET:			case E_NET_CONNABORTED:				/** remote side closed the connection */				return RET_ERROR;			default:				g_message("%s.%d: send(%s, %ld) failed: %s", 						__FILE__, __LINE__, 						con->addr.str, 						s->len - con->send_queue->offset, 						strerror(errno));				return RET_ERROR;			}		} else if (len == 0) {			return RET_ERROR;		}		con->send_queue->offset += len;		if (con->send_queue->offset == s->len) {			g_string_free(s, TRUE);						g_queue_delete_link(con->send_queue->chunks, chunk);			con->send_queue->offset = 0;			if (send_chunks > 0 && --send_chunks == 0) break;			chunk = con->send_queue->chunks->head;		} else {			return RET_WAIT_FOR_EVENT;		}	}	return RET_SUCCESS;}retval_t network_mysqld_write(network_mysqld *srv, network_socket *con) {	retval_t ret;	ret = network_mysqld_write_len(srv, con, -1);	return ret;}/** * call the hooks of the plugins for each state * * if the plugin doesn't implement a hook, we provide a default operation * * @param srv      the global context * @param con      the connection context * @param state    state to handle * @return         RET_SUCCESS on success */retval_t plugin_call(network_mysqld *srv, network_mysqld_con *con, int state) {	NETWORK_MYSQLD_PLUGIN_FUNC(func) = NULL;	switch (state) {	case CON_STATE_INIT:		func = con->plugins.con_init;		if (!func) { /* default implementation */			con->state = CON_STATE_CONNECT_SERVER;		}		break;	case CON_STATE_CONNECT_SERVER:		func = con->plugins.con_connect_server;		if (!func) { /* default implementation */			con->state = CON_STATE_READ_HANDSHAKE;		}		break;	case CON_STATE_SEND_HANDSHAKE:		func = con->plugins.con_send_handshake;		if (!func) { /* default implementation */			con->state = CON_STATE_READ_AUTH;		}		break;	case CON_STATE_READ_HANDSHAKE:		func = con->plugins.con_read_handshake;		break;	case CON_STATE_READ_AUTH:		func = con->plugins.con_read_auth;		break;	case CON_STATE_SEND_AUTH:		func = con->plugins.con_send_auth;		if (!func) { /* default implementation */			con->state = CON_STATE_READ_AUTH_RESULT;		}		break;	case CON_STATE_READ_AUTH_RESULT:		func = con->plugins.con_read_auth_result;		break;	case CON_STATE_SEND_AUTH_RESULT:		func = con->plugins.con_send_auth_result;		if (!func) { /* default implementation */			switch (con->parse.state.auth_result.state) {			case MYSQLD_PACKET_OK:				con->state = CON_STATE_READ_QUERY;				break;			case MYSQLD_PACKET_ERR:				con->state = CON_STATE_ERROR;				break;			case MYSQLD_PACKET_EOF:				/**				 * the MySQL 4.0 hash in a MySQL 4.1+ connection				 */				con->state = CON_STATE_READ_AUTH_OLD_PASSWORD;				break;			default:				g_error("%s.%d: unexpected state for SEND_AUTH_RESULT: %02x", 						__FILE__, __LINE__,						con->parse.state.auth_result.state);			}		}		break;	case CON_STATE_READ_AUTH_OLD_PASSWORD: {		/** move the packet to the send queue */		GString *packet;		GList *chunk;		network_socket *recv_sock, *send_sock;		recv_sock = con->client;		send_sock = con->server;		if (NULL == con->server) {			/**			 * we have to auth against same backend as we did before			 * but the user changed it			 */			g_message("%s.%d: (lua) read-auth-old-password failed as backend_ndx got reset.", __FILE__, __LINE__);			network_mysqld_con_send_error(con->client, C("(lua) read-auth-old-password failed as backend_ndx got reset."));			con->state = CON_STATE_SEND_ERROR;			break;		}		chunk = recv_sock->recv_queue->chunks->head;		packet = chunk->data;		/* we aren't finished yet */		if (packet->len != recv_sock->packet_len + NET_HEADER_SIZE) return RET_SUCCESS;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -