⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 network-mysqld-proxy-function.c

📁 Mysql Proxy本身是个很好的mysql负载均衡工具,但是其本身有bug:当多个mysql 做slave的时候,如果一个slave死掉,会影响别的slave也死掉!这个文件修复了这个bug!
💻 C
字号:
//在 src/network-mysqld-proxy.c
/**
 * connect to a backend
 *
 * @return
 *   RET_SUCCESS        - connected successfully
 *   RET_ERROR_RETRY    - connecting backend failed, call again to connect to another backend
 *   RET_ERROR          - no backends available, adds a ERR packet to the client queue
 */
NETWORK_MYSQLD_PLUGIN_PROTO(proxy_connect_server) {
	plugin_con_state *st = con->plugin_con_state;
	plugin_srv_state *g = st->global_state;
	guint min_connected_clients = G_MAXUINT;
	guint i;
	GTimeVal now;
	gboolean use_pooled_connection = FALSE;


	if (con->server) {
		int so_error = 0;
		socklen_t so_error_len = sizeof(so_error);

//g_debug("%s.%d: called a 2nd time after a connect(),the %d backend: %s",
//	__FILE__,__LINE__,st->backend_ndx,con->server->addr.str);
		/**
		 * we might get called a 2nd time after a connect() == EINPROGRESS
		 */
		if (getsockopt(con->server->fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_len)) {
			/* getsockopt failed */
			g_critical("%s.%d: getsockopt(%s) failed: %s", 
					__FILE__, __LINE__,
					con->server->addr.str, strerror(errno));
			return RET_ERROR;
		}

		if (so_error==0) {
			if (st->backend->state != BACKEND_STATE_UP) {
			st->backend->state = BACKEND_STATE_UP;
			g_get_current_time(&(st->backend->state_since));
			}
			
			con->state = CON_STATE_READ_HANDSHAKE;

			return RET_SUCCESS;
		}
		else {
			g_critical("%s.%d: connect(%s) failed: %s", 
					__FILE__, __LINE__,
					con->server->addr.str, strerror(so_error));
					
			st->backend->state = BACKEND_STATE_DOWN;
			g_get_current_time(&(st->backend->state_since));
						
			backend_t *cur = g->backend_pool->pdata[st->backend_ndx];
			cur->state == BACKEND_STATE_DOWN;
			g_get_current_time(&(cur->state_since));
			//cur->connected_clients = 0;
			st->backend->connected_clients --;
			
			network_socket_free(con->server);
			con->server = NULL;			
			//return 
		}
	}

	st->backend = NULL;
	st->backend_ndx = -1;

	g_get_current_time(&now);

	if (now.tv_sec - g->backend_last_check.tv_sec > 1) {
		/* check once a second if we have to wakeup a connection */
		for (i = 0; i < g->backend_pool->len; i++) {
			backend_t *cur = g->backend_pool->pdata[i];

			if (cur->state != BACKEND_STATE_DOWN) continue;

			/* check if a backend is marked as down for more than 10 sec */

			if (now.tv_sec - cur->state_since.tv_sec > 4) {
				g_debug("%s.%d: backend %s was down for more than 10 sec, waking it up", 
						__FILE__, __LINE__,
						cur->addr.str);

				cur->state = BACKEND_STATE_UNKNOWN;
				cur->state_since = now;
			}
		}
	}

	switch (proxy_lua_connect_server(con)) {
	case PROXY_SEND_RESULT:
		/* we answered directly ... like denial ...
		 *
		 * for sure we have something in the send-queue 
		 *  */
		 
		return RET_SUCCESS;
	case PROXY_NO_DECISION:
		/* just go on */
		
		break;
	case PROXY_IGNORE_RESULT:
		use_pooled_connection = TRUE;
		
		break;
	default:
		g_error("%s.%d: ... ", __FILE__, __LINE__);
		break;
	}

	/**
	 * if the current backend is down, ignore it 
	 */
//g_debug("%s.%d: st->backend_ndx = %d,  g->backend_pool->len=%d",__FILE__,__LINE__,st->backend_ndx, g->backend_pool->len);
	if (st->backend_ndx >= 0 && 
	    st->backend_ndx < g->backend_pool->len) {
		backend_t *cur = g->backend_pool->pdata[st->backend_ndx];

		if (cur->state == BACKEND_STATE_DOWN) {
			st->backend_ndx = -1;
		}
	}
	if (con->server && !use_pooled_connection) {
		gint bndx = st->backend_ndx;
		/* we already have a connection assigned, 
		 * but the script said we don't want to use it
		 */

		proxy_connection_pool_add_connection(con);

		st->backend_ndx = bndx;
//g_debug("%s.%d: st->backend_ndx = %d ",__FILE__,__LINE__,st->backend_ndx);
	}
//g_debug("%s.%d: st->backend_ndx = %d ",__FILE__,__LINE__,st->backend_ndx);

	if (st->backend_ndx < 0) {
		/**
		 * we can choose between different back addresses 
		 *
		 * prefer SQF (shorted queue first) to load all backends equally
		 */ 


		for (i = 0; i < g->backend_pool->len; i++) {
			backend_t *cur = g->backend_pool->pdata[i];
	
			/**
			 * skip backends which are down or not writable
			 */	
			if (cur->state == BACKEND_STATE_DOWN ||
			    cur->type != BACKEND_TYPE_RW) continue;
	
			//choose the backend server with less connections,change the backend_ndx---
//g_debug("%s.%d: The backend: %s. connected clients: %d.",
//	__FILE__,__LINE__,cur->addr.str,cur->connected_clients);
			if (cur->connected_clients < min_connected_clients) {
				st->backend_ndx = i;
				min_connected_clients = cur->connected_clients;
//g_debug("%s.%d: backend_ndx=%d,min_connected_clients=%d",__FILE__,__LINE__,st->backend_ndx,min_connected_clients);
			}
		}
	
//g_debug("%s.%d: select the %d backend",__FILE__,__LINE__,st->backend_ndx);
		if (st->backend_ndx >= 0 && 
		    st->backend_ndx < g->backend_pool->len) {
			st->backend = g->backend_pool->pdata[st->backend_ndx];
		}
	} else if (NULL == st->backend &&
		   st->backend_ndx >= 0 && 
		   st->backend_ndx < g->backend_pool->len) {
		st->backend = g->backend_pool->pdata[st->backend_ndx];
	}

	if (NULL == st->backend) {
		network_mysqld_con_send_error(con->client, C("(proxy) all backends are down"));
		return RET_ERROR;
	}

	/**
	 * check if we have a connection in the pool for this backend
	 */
	if (NULL == con->server) {
		int ioctlvar;

		con->server = network_socket_init();
		con->server->addr = st->backend->addr;
		con->server->addr.str = g_strdup(st->backend->addr.str);
	
//g_debug("%s.%d: Connect to %s, it has %d connected clients",__FILE__,__LINE__,st->backend->addr.str,st->backend->connected_clients);
		st->backend->connected_clients++;

		if (0 != network_mysqld_con_connect(con->server)) {
			if (errno == E_NET_INPROGRESS || errno == E_NET_WOULDBLOCK) {
				return RET_ERROR_RETRY;
			}

			g_message("%s.%d: connecting to backend (%s) failed, marking it as down for ...", 
					__FILE__, __LINE__, con->server->addr.str);

			st->backend->state = BACKEND_STATE_DOWN;
			g_get_current_time(&(st->backend->state_since));

			network_socket_free(con->server);
			con->server = NULL;

			return RET_ERROR_RETRY;
		}

		if (st->backend->state != BACKEND_STATE_UP) {
			st->backend->state = BACKEND_STATE_UP;
			g_get_current_time(&(st->backend->state_since));
		}

		con->state = CON_STATE_READ_HANDSHAKE;
	} else {
		/**
		 * send the old hand-shake packet
		 */

		/* remove the idle-handler from the socket */
		network_queue_append(con->client->send_queue, 
				con->server->auth_handshake_packet->str, 
				con->server->auth_handshake_packet->len,
			       	0); /* packet-id */
		
		con->state = CON_STATE_SEND_HANDSHAKE;

		/**
		 * connect_clients is already incremented 
		 */
	}

	return RET_SUCCESS;
}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -