📄 tcp.c
字号:
goto out; } o2net_register_callbacks(sc->sc_sock->sk, sc); spin_lock(&nn->nn_lock); /* handshake completion will set nn->nn_sc_valid */ o2net_set_nn_state(nn, sc, 0, 0); spin_unlock(&nn->nn_lock); remoteaddr.sin_family = AF_INET; remoteaddr.sin_addr.s_addr = (__force u32)node->nd_ipv4_address; remoteaddr.sin_port = (__force u16)node->nd_ipv4_port; ret = sc->sc_sock->ops->connect(sc->sc_sock, (struct sockaddr *)&remoteaddr, sizeof(remoteaddr), O_NONBLOCK); if (ret == -EINPROGRESS) ret = 0;out: if (ret) { mlog(ML_NOTICE, "connect attempt to " SC_NODEF_FMT " failed " "with errno %d\n", SC_NODEF_ARGS(sc), ret); /* 0 err so that another will be queued and attempted * from set_nn_state */ if (sc) o2net_ensure_shutdown(nn, sc, 0); } if (sc) sc_put(sc); if (node) o2nm_node_put(node); if (mynode) o2nm_node_put(mynode); return;}static void o2net_connect_expired(void *arg){ struct o2net_node *nn = arg; spin_lock(&nn->nn_lock); if (!nn->nn_sc_valid) { mlog(ML_ERROR, "no connection established with node %u after " "%u seconds, giving up and returning errors.\n", o2net_num_from_nn(nn), O2NET_IDLE_TIMEOUT_SECS); o2net_set_nn_state(nn, NULL, 0, -ENOTCONN); } spin_unlock(&nn->nn_lock);}static void o2net_still_up(void *arg){ struct o2net_node *nn = arg; o2quo_hb_still_up(o2net_num_from_nn(nn));}/* ------------------------------------------------------------ */void o2net_disconnect_node(struct o2nm_node *node){ struct o2net_node *nn = o2net_nn_from_num(node->nd_num); /* don't reconnect until it's heartbeating again */ spin_lock(&nn->nn_lock); o2net_set_nn_state(nn, NULL, 0, -ENOTCONN); spin_unlock(&nn->nn_lock); if (o2net_wq) { cancel_delayed_work(&nn->nn_connect_expired); cancel_delayed_work(&nn->nn_connect_work); cancel_delayed_work(&nn->nn_still_up); flush_workqueue(o2net_wq); }}static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num, void *data){ o2quo_hb_down(node_num); if (node_num != o2nm_this_node()) o2net_disconnect_node(node);}static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num, void *data){ struct o2net_node *nn = o2net_nn_from_num(node_num); o2quo_hb_up(node_num); /* ensure an immediate connect attempt */ nn->nn_last_connect_attempt = jiffies - (msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1); if (node_num != o2nm_this_node()) { /* heartbeat doesn't work unless a local node number is * configured and doing so brings up the o2net_wq, so we can * use it.. */ queue_delayed_work(o2net_wq, &nn->nn_connect_expired, O2NET_IDLE_TIMEOUT_SECS * HZ); /* believe it or not, accept and node hearbeating testing * can succeed for this node before we got here.. so * only use set_nn_state to clear the persistent error * if that hasn't already happened */ spin_lock(&nn->nn_lock); if (nn->nn_persistent_error) o2net_set_nn_state(nn, NULL, 0, 0); spin_unlock(&nn->nn_lock); }}void o2net_unregister_hb_callbacks(void){ int ret; ret = o2hb_unregister_callback(&o2net_hb_up); if (ret < 0) mlog(ML_ERROR, "Status return %d unregistering heartbeat up " "callback!\n", ret); ret = o2hb_unregister_callback(&o2net_hb_down); if (ret < 0) mlog(ML_ERROR, "Status return %d unregistering heartbeat down " "callback!\n", ret);}int o2net_register_hb_callbacks(void){ int ret; o2hb_setup_callback(&o2net_hb_down, O2HB_NODE_DOWN_CB, o2net_hb_node_down_cb, NULL, O2NET_HB_PRI); o2hb_setup_callback(&o2net_hb_up, O2HB_NODE_UP_CB, o2net_hb_node_up_cb, NULL, O2NET_HB_PRI); ret = o2hb_register_callback(&o2net_hb_up); if (ret == 0) ret = o2hb_register_callback(&o2net_hb_down); if (ret) o2net_unregister_hb_callbacks(); return ret;}/* ------------------------------------------------------------ */static int o2net_accept_one(struct socket *sock){ int ret, slen; struct sockaddr_in sin; struct socket *new_sock = NULL; struct o2nm_node *node = NULL; struct o2net_sock_container *sc = NULL; struct o2net_node *nn; BUG_ON(sock == NULL); ret = sock_create_lite(sock->sk->sk_family, sock->sk->sk_type, sock->sk->sk_protocol, &new_sock); if (ret) goto out; new_sock->type = sock->type; new_sock->ops = sock->ops; ret = sock->ops->accept(sock, new_sock, O_NONBLOCK); if (ret < 0) goto out; new_sock->sk->sk_allocation = GFP_ATOMIC; ret = o2net_set_nodelay(new_sock); if (ret) { mlog(ML_ERROR, "setting TCP_NODELAY failed with %d\n", ret); goto out; } slen = sizeof(sin); ret = new_sock->ops->getname(new_sock, (struct sockaddr *) &sin, &slen, 1); if (ret < 0) goto out; node = o2nm_get_node_by_ip((__force __be32)sin.sin_addr.s_addr); if (node == NULL) { mlog(ML_NOTICE, "attempt to connect from unknown node at " "%u.%u.%u.%u:%d\n", NIPQUAD(sin.sin_addr.s_addr), ntohs((__force __be16)sin.sin_port)); ret = -EINVAL; goto out; } if (o2nm_this_node() > node->nd_num) { mlog(ML_NOTICE, "unexpected connect attempted from a lower " "numbered node '%s' at " "%u.%u.%u.%u:%d with num %u\n", node->nd_name, NIPQUAD(sin.sin_addr.s_addr), ntohs((__force __be16)sin.sin_port), node->nd_num); ret = -EINVAL; goto out; } /* this happens all the time when the other node sees our heartbeat * and tries to connect before we see their heartbeat */ if (!o2hb_check_node_heartbeating_from_callback(node->nd_num)) { mlog(ML_CONN, "attempt to connect from node '%s' at " "%u.%u.%u.%u:%d but it isn't heartbeating\n", node->nd_name, NIPQUAD(sin.sin_addr.s_addr), ntohs((__force __be16)sin.sin_port)); ret = -EINVAL; goto out; } nn = o2net_nn_from_num(node->nd_num); spin_lock(&nn->nn_lock); if (nn->nn_sc) ret = -EBUSY; else ret = 0; spin_unlock(&nn->nn_lock); if (ret) { mlog(ML_NOTICE, "attempt to connect from node '%s' at " "%u.%u.%u.%u:%d but it already has an open connection\n", node->nd_name, NIPQUAD(sin.sin_addr.s_addr), ntohs((__force __be16)sin.sin_port)); goto out; } sc = sc_alloc(node); if (sc == NULL) { ret = -ENOMEM; goto out; } sc->sc_sock = new_sock; new_sock = NULL; spin_lock(&nn->nn_lock); o2net_set_nn_state(nn, sc, 0, 0); spin_unlock(&nn->nn_lock); o2net_register_callbacks(sc->sc_sock->sk, sc); o2net_sc_queue_work(sc, &sc->sc_rx_work); o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));out: if (new_sock) sock_release(new_sock); if (node) o2nm_node_put(node); if (sc) sc_put(sc); return ret;}static void o2net_accept_many(void *arg){ struct socket *sock = arg; while (o2net_accept_one(sock) == 0) cond_resched();}static void o2net_listen_data_ready(struct sock *sk, int bytes){ void (*ready)(struct sock *sk, int bytes); read_lock(&sk->sk_callback_lock); ready = sk->sk_user_data; if (ready == NULL) { /* check for teardown race */ ready = sk->sk_data_ready; goto out; } /* ->sk_data_ready is also called for a newly established child socket * before it has been accepted and the acceptor has set up their * data_ready.. we only want to queue listen work for our listening * socket */ if (sk->sk_state == TCP_LISTEN) { mlog(ML_TCP, "bytes: %d\n", bytes); queue_work(o2net_wq, &o2net_listen_work); }out: read_unlock(&sk->sk_callback_lock); ready(sk, bytes);}static int o2net_open_listening_sock(__be16 port){ struct socket *sock = NULL; int ret; struct sockaddr_in sin = { .sin_family = PF_INET, .sin_addr = { .s_addr = (__force u32)htonl(INADDR_ANY) }, .sin_port = (__force u16)port, }; ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); if (ret < 0) { mlog(ML_ERROR, "unable to create socket, ret=%d\n", ret); goto out; } sock->sk->sk_allocation = GFP_ATOMIC; write_lock_bh(&sock->sk->sk_callback_lock); sock->sk->sk_user_data = sock->sk->sk_data_ready; sock->sk->sk_data_ready = o2net_listen_data_ready; write_unlock_bh(&sock->sk->sk_callback_lock); o2net_listen_sock = sock; INIT_WORK(&o2net_listen_work, o2net_accept_many, sock); sock->sk->sk_reuse = 1; ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin)); if (ret < 0) { mlog(ML_ERROR, "unable to bind socket to port %d, ret=%d\n", ntohs(port), ret); goto out; } ret = sock->ops->listen(sock, 64); if (ret < 0) { mlog(ML_ERROR, "unable to listen on port %d, ret=%d\n", ntohs(port), ret); }out: if (ret) { o2net_listen_sock = NULL; if (sock) sock_release(sock); } return ret;}/* * called from node manager when we should bring up our network listening * socket. node manager handles all the serialization to only call this * once and to match it with o2net_stop_listening(). note, * o2nm_this_node() doesn't work yet as we're being called while it * is being set up. */int o2net_start_listening(struct o2nm_node *node){ int ret = 0; BUG_ON(o2net_wq != NULL); BUG_ON(o2net_listen_sock != NULL); mlog(ML_KTHREAD, "starting o2net thread...\n"); o2net_wq = create_singlethread_workqueue("o2net"); if (o2net_wq == NULL) { mlog(ML_ERROR, "unable to launch o2net thread\n"); return -ENOMEM; /* ? */ } ret = o2net_open_listening_sock(node->nd_ipv4_port); if (ret) { destroy_workqueue(o2net_wq); o2net_wq = NULL; } else o2quo_conn_up(node->nd_num); return ret;}/* again, o2nm_this_node() doesn't work here as we're involved in * tearing it down */void o2net_stop_listening(struct o2nm_node *node){ struct socket *sock = o2net_listen_sock; size_t i; BUG_ON(o2net_wq == NULL); BUG_ON(o2net_listen_sock == NULL); /* stop the listening socket from generating work */ write_lock_bh(&sock->sk->sk_callback_lock); sock->sk->sk_data_ready = sock->sk->sk_user_data; sock->sk->sk_user_data = NULL; write_unlock_bh(&sock->sk->sk_callback_lock); for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) { struct o2nm_node *node = o2nm_get_node_by_num(i); if (node) { o2net_disconnect_node(node); o2nm_node_put(node); } } /* finish all work and tear down the work queue */ mlog(ML_KTHREAD, "waiting for o2net thread to exit....\n"); destroy_workqueue(o2net_wq); o2net_wq = NULL; sock_release(o2net_listen_sock); o2net_listen_sock = NULL; o2quo_conn_err(node->nd_num);}/* ------------------------------------------------------------ */int o2net_init(void){ unsigned long i; o2quo_init(); o2net_hand = kcalloc(1, sizeof(struct o2net_handshake), GFP_KERNEL); o2net_keep_req = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL); o2net_keep_resp = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL); if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) { kfree(o2net_hand); kfree(o2net_keep_req); kfree(o2net_keep_resp); return -ENOMEM; } o2net_hand->protocol_version = cpu_to_be64(O2NET_PROTOCOL_VERSION); o2net_hand->connector_id = cpu_to_be64(1); o2net_keep_req->magic = cpu_to_be16(O2NET_MSG_KEEP_REQ_MAGIC); o2net_keep_resp->magic = cpu_to_be16(O2NET_MSG_KEEP_RESP_MAGIC); for (i = 0; i < ARRAY_SIZE(o2net_nodes); i++) { struct o2net_node *nn = o2net_nn_from_num(i); spin_lock_init(&nn->nn_lock); INIT_WORK(&nn->nn_connect_work, o2net_start_connect, nn); INIT_WORK(&nn->nn_connect_expired, o2net_connect_expired, nn); INIT_WORK(&nn->nn_still_up, o2net_still_up, nn); /* until we see hb from a node we'll return einval */ nn->nn_persistent_error = -ENOTCONN; init_waitqueue_head(&nn->nn_sc_wq); idr_init(&nn->nn_status_idr); INIT_LIST_HEAD(&nn->nn_status_list); } return 0;}void o2net_exit(void){ o2quo_exit(); kfree(o2net_hand); kfree(o2net_keep_req); kfree(o2net_keep_resp);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -