⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcp.c

📁 LINUX 2.6.17.4的源码
💻 C
📖 第 1 页 / 共 4 页
字号:
		goto out;	}	o2net_register_callbacks(sc->sc_sock->sk, sc);	spin_lock(&nn->nn_lock);	/* handshake completion will set nn->nn_sc_valid */	o2net_set_nn_state(nn, sc, 0, 0);	spin_unlock(&nn->nn_lock);	remoteaddr.sin_family = AF_INET;	remoteaddr.sin_addr.s_addr = (__force u32)node->nd_ipv4_address;	remoteaddr.sin_port = (__force u16)node->nd_ipv4_port;	ret = sc->sc_sock->ops->connect(sc->sc_sock,					(struct sockaddr *)&remoteaddr,					sizeof(remoteaddr),					O_NONBLOCK);	if (ret == -EINPROGRESS)		ret = 0;out:	if (ret) {		mlog(ML_NOTICE, "connect attempt to " SC_NODEF_FMT " failed "		     "with errno %d\n", SC_NODEF_ARGS(sc), ret);		/* 0 err so that another will be queued and attempted		 * from set_nn_state */		if (sc)			o2net_ensure_shutdown(nn, sc, 0);	}	if (sc)		sc_put(sc);	if (node)		o2nm_node_put(node);	if (mynode)		o2nm_node_put(mynode);	return;}static void o2net_connect_expired(void *arg){	struct o2net_node *nn = arg;	spin_lock(&nn->nn_lock);	if (!nn->nn_sc_valid) {		mlog(ML_ERROR, "no connection established with node %u after "		     "%u seconds, giving up and returning errors.\n",		     o2net_num_from_nn(nn), O2NET_IDLE_TIMEOUT_SECS);		o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);	}	spin_unlock(&nn->nn_lock);}static void o2net_still_up(void *arg){	struct o2net_node *nn = arg;	o2quo_hb_still_up(o2net_num_from_nn(nn));}/* ------------------------------------------------------------ */void o2net_disconnect_node(struct o2nm_node *node){	struct o2net_node *nn = o2net_nn_from_num(node->nd_num);	/* don't reconnect until it's heartbeating again */	spin_lock(&nn->nn_lock);	o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);	spin_unlock(&nn->nn_lock);	if (o2net_wq) {		cancel_delayed_work(&nn->nn_connect_expired);		cancel_delayed_work(&nn->nn_connect_work);		cancel_delayed_work(&nn->nn_still_up);		flush_workqueue(o2net_wq);	}}static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,				  void *data){	o2quo_hb_down(node_num);	if (node_num != o2nm_this_node())		o2net_disconnect_node(node);}static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,				void *data){	struct o2net_node *nn = o2net_nn_from_num(node_num);	o2quo_hb_up(node_num);	/* ensure an immediate connect attempt */	nn->nn_last_connect_attempt = jiffies -		(msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1);	if (node_num != o2nm_this_node()) {		/* heartbeat doesn't work unless a local node number is		 * configured and doing so brings up the o2net_wq, so we can		 * use it.. */		queue_delayed_work(o2net_wq, &nn->nn_connect_expired,				   O2NET_IDLE_TIMEOUT_SECS * HZ);		/* believe it or not, accept and node hearbeating testing		 * can succeed for this node before we got here.. so		 * only use set_nn_state to clear the persistent error		 * if that hasn't already happened */		spin_lock(&nn->nn_lock);		if (nn->nn_persistent_error)			o2net_set_nn_state(nn, NULL, 0, 0);		spin_unlock(&nn->nn_lock);	}}void o2net_unregister_hb_callbacks(void){	int ret;	ret = o2hb_unregister_callback(&o2net_hb_up);	if (ret < 0)		mlog(ML_ERROR, "Status return %d unregistering heartbeat up "		     "callback!\n", ret);	ret = o2hb_unregister_callback(&o2net_hb_down);	if (ret < 0)		mlog(ML_ERROR, "Status return %d unregistering heartbeat down "		     "callback!\n", ret);}int o2net_register_hb_callbacks(void){	int ret;	o2hb_setup_callback(&o2net_hb_down, O2HB_NODE_DOWN_CB,			    o2net_hb_node_down_cb, NULL, O2NET_HB_PRI);	o2hb_setup_callback(&o2net_hb_up, O2HB_NODE_UP_CB,			    o2net_hb_node_up_cb, NULL, O2NET_HB_PRI);	ret = o2hb_register_callback(&o2net_hb_up);	if (ret == 0)		ret = o2hb_register_callback(&o2net_hb_down);	if (ret)		o2net_unregister_hb_callbacks();	return ret;}/* ------------------------------------------------------------ */static int o2net_accept_one(struct socket *sock){	int ret, slen;	struct sockaddr_in sin;	struct socket *new_sock = NULL;	struct o2nm_node *node = NULL;	struct o2net_sock_container *sc = NULL;	struct o2net_node *nn;	BUG_ON(sock == NULL);	ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type,			       sock->sk->sk_protocol, &new_sock);	if (ret)		goto out;	new_sock->type = sock->type;	new_sock->ops = sock->ops;	ret = sock->ops->accept(sock, new_sock, O_NONBLOCK);	if (ret < 0)		goto out;	new_sock->sk->sk_allocation = GFP_ATOMIC;	ret = o2net_set_nodelay(new_sock);	if (ret) {		mlog(ML_ERROR, "setting TCP_NODELAY failed with %d\n", ret);		goto out;	}	slen = sizeof(sin);	ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin,				       &slen, 1);	if (ret < 0)		goto out;	node = o2nm_get_node_by_ip((__force __be32)sin.sin_addr.s_addr);	if (node == NULL) {		mlog(ML_NOTICE, "attempt to connect from unknown node at "		     "%u.%u.%u.%u:%d\n", NIPQUAD(sin.sin_addr.s_addr),		     ntohs((__force __be16)sin.sin_port));		ret = -EINVAL;		goto out;	}	if (o2nm_this_node() > node->nd_num) {		mlog(ML_NOTICE, "unexpected connect attempted from a lower "		     "numbered node '%s' at " "%u.%u.%u.%u:%d with num %u\n",		     node->nd_name, NIPQUAD(sin.sin_addr.s_addr),		     ntohs((__force __be16)sin.sin_port), node->nd_num);		ret = -EINVAL;		goto out;	}	/* this happens all the time when the other node sees our heartbeat	 * and tries to connect before we see their heartbeat */	if (!o2hb_check_node_heartbeating_from_callback(node->nd_num)) {		mlog(ML_CONN, "attempt to connect from node '%s' at "		     "%u.%u.%u.%u:%d but it isn't heartbeating\n",		     node->nd_name, NIPQUAD(sin.sin_addr.s_addr),		     ntohs((__force __be16)sin.sin_port));		ret = -EINVAL;		goto out;	}	nn = o2net_nn_from_num(node->nd_num);	spin_lock(&nn->nn_lock);	if (nn->nn_sc)		ret = -EBUSY;	else		ret = 0;	spin_unlock(&nn->nn_lock);	if (ret) {		mlog(ML_NOTICE, "attempt to connect from node '%s' at "		     "%u.%u.%u.%u:%d but it already has an open connection\n",		     node->nd_name, NIPQUAD(sin.sin_addr.s_addr),		     ntohs((__force __be16)sin.sin_port));		goto out;	}	sc = sc_alloc(node);	if (sc == NULL) {		ret = -ENOMEM;		goto out;	}	sc->sc_sock = new_sock;	new_sock = NULL;	spin_lock(&nn->nn_lock);	o2net_set_nn_state(nn, sc, 0, 0);	spin_unlock(&nn->nn_lock);	o2net_register_callbacks(sc->sc_sock->sk, sc);	o2net_sc_queue_work(sc, &sc->sc_rx_work);	o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));out:	if (new_sock)		sock_release(new_sock);	if (node)		o2nm_node_put(node);	if (sc)		sc_put(sc);	return ret;}static void o2net_accept_many(void *arg){	struct socket *sock = arg;	while (o2net_accept_one(sock) == 0)		cond_resched();}static void o2net_listen_data_ready(struct sock *sk, int bytes){	void (*ready)(struct sock *sk, int bytes);	read_lock(&sk->sk_callback_lock);	ready = sk->sk_user_data;	if (ready == NULL) { /* check for teardown race */		ready = sk->sk_data_ready;		goto out;	}	/* ->sk_data_ready is also called for a newly established child socket	 * before it has been accepted and the acceptor has set up their	 * data_ready.. we only want to queue listen work for our listening	 * socket */	if (sk->sk_state == TCP_LISTEN) {		mlog(ML_TCP, "bytes: %d\n", bytes);		queue_work(o2net_wq, &o2net_listen_work);	}out:	read_unlock(&sk->sk_callback_lock);	ready(sk, bytes);}static int o2net_open_listening_sock(__be16 port){	struct socket *sock = NULL;	int ret;	struct sockaddr_in sin = {		.sin_family = PF_INET,		.sin_addr = { .s_addr = (__force u32)htonl(INADDR_ANY) },		.sin_port = (__force u16)port,	};	ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock);	if (ret < 0) {		mlog(ML_ERROR, "unable to create socket, ret=%d\n", ret);		goto out;	}	sock->sk->sk_allocation = GFP_ATOMIC;	write_lock_bh(&sock->sk->sk_callback_lock);	sock->sk->sk_user_data = sock->sk->sk_data_ready;	sock->sk->sk_data_ready = o2net_listen_data_ready;	write_unlock_bh(&sock->sk->sk_callback_lock);	o2net_listen_sock = sock;	INIT_WORK(&o2net_listen_work, o2net_accept_many, sock);	sock->sk->sk_reuse = 1;	ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));	if (ret < 0) {		mlog(ML_ERROR, "unable to bind socket to port %d, ret=%d\n",		     ntohs(port), ret);		goto out;	}	ret = sock->ops->listen(sock, 64);	if (ret < 0) {		mlog(ML_ERROR, "unable to listen on port %d, ret=%d\n",		     ntohs(port), ret);	}out:	if (ret) {		o2net_listen_sock = NULL;		if (sock)			sock_release(sock);	}	return ret;}/* * called from node manager when we should bring up our network listening * socket.  node manager handles all the serialization to only call this * once and to match it with o2net_stop_listening().  note, * o2nm_this_node() doesn't work yet as we're being called while it * is being set up. */int o2net_start_listening(struct o2nm_node *node){	int ret = 0;	BUG_ON(o2net_wq != NULL);	BUG_ON(o2net_listen_sock != NULL);	mlog(ML_KTHREAD, "starting o2net thread...\n");	o2net_wq = create_singlethread_workqueue("o2net");	if (o2net_wq == NULL) {		mlog(ML_ERROR, "unable to launch o2net thread\n");		return -ENOMEM; /* ? */	}	ret = o2net_open_listening_sock(node->nd_ipv4_port);	if (ret) {		destroy_workqueue(o2net_wq);		o2net_wq = NULL;	} else		o2quo_conn_up(node->nd_num);	return ret;}/* again, o2nm_this_node() doesn't work here as we're involved in * tearing it down */void o2net_stop_listening(struct o2nm_node *node){	struct socket *sock = o2net_listen_sock;	size_t i;	BUG_ON(o2net_wq == NULL);	BUG_ON(o2net_listen_sock == NULL);	/* stop the listening socket from generating work */	write_lock_bh(&sock->sk->sk_callback_lock);	sock->sk->sk_data_ready = sock->sk->sk_user_data;	sock->sk->sk_user_data = NULL;	write_unlock_bh(&sock->sk->sk_callback_lock);	for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) {		struct o2nm_node *node = o2nm_get_node_by_num(i);		if (node) {			o2net_disconnect_node(node);			o2nm_node_put(node);		}	}	/* finish all work and tear down the work queue */	mlog(ML_KTHREAD, "waiting for o2net thread to exit....\n");	destroy_workqueue(o2net_wq);	o2net_wq = NULL;	sock_release(o2net_listen_sock);	o2net_listen_sock = NULL;	o2quo_conn_err(node->nd_num);}/* ------------------------------------------------------------ */int o2net_init(void){	unsigned long i;	o2quo_init();	o2net_hand = kcalloc(1, sizeof(struct o2net_handshake), GFP_KERNEL);	o2net_keep_req = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL);	o2net_keep_resp = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL);	if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) {		kfree(o2net_hand);		kfree(o2net_keep_req);		kfree(o2net_keep_resp);		return -ENOMEM;	}	o2net_hand->protocol_version = cpu_to_be64(O2NET_PROTOCOL_VERSION);	o2net_hand->connector_id = cpu_to_be64(1);	o2net_keep_req->magic = cpu_to_be16(O2NET_MSG_KEEP_REQ_MAGIC);	o2net_keep_resp->magic = cpu_to_be16(O2NET_MSG_KEEP_RESP_MAGIC);	for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) {		struct o2net_node *nn = o2net_nn_from_num(i);		spin_lock_init(&nn->nn_lock);		INIT_WORK(&nn->nn_connect_work, o2net_start_connect, nn);		INIT_WORK(&nn->nn_connect_expired, o2net_connect_expired, nn);		INIT_WORK(&nn->nn_still_up, o2net_still_up, nn);		/* until we see hb from a node we'll return einval */		nn->nn_persistent_error = -ENOTCONN;		init_waitqueue_head(&nn->nn_sc_wq);		idr_init(&nn->nn_status_idr);		INIT_LIST_HEAD(&nn->nn_status_list);	}	return 0;}void o2net_exit(void){	o2quo_exit();	kfree(o2net_hand);	kfree(o2net_keep_req);	kfree(o2net_keep_resp);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -