📄 tcp.c
字号:
}/* see o2net_register_callbacks() */static void o2net_state_change(struct sock *sk){ void (*state_change)(struct sock *sk); struct o2net_sock_container *sc; read_lock(&sk->sk_callback_lock); sc = sk->sk_user_data; if (sc == NULL) { state_change = sk->sk_state_change; goto out; } sclog(sc, "state_change to %d\n", sk->sk_state); state_change = sc->sc_state_change; switch(sk->sk_state) { /* ignore connecting sockets as they make progress */ case TCP_SYN_SENT: case TCP_SYN_RECV: break; case TCP_ESTABLISHED: o2net_sc_queue_work(sc, &sc->sc_connect_work); break; default: o2net_sc_queue_work(sc, &sc->sc_shutdown_work); break; }out: read_unlock(&sk->sk_callback_lock); state_change(sk);}/* * we register callbacks so we can queue work on events before calling * the original callbacks. our callbacks our careful to test user_data * to discover when they've reaced with o2net_unregister_callbacks(). */static void o2net_register_callbacks(struct sock *sk, struct o2net_sock_container *sc){ write_lock_bh(&sk->sk_callback_lock); /* accepted sockets inherit the old listen socket data ready */ if (sk->sk_data_ready == o2net_listen_data_ready) { sk->sk_data_ready = sk->sk_user_data; sk->sk_user_data = NULL; } BUG_ON(sk->sk_user_data != NULL); sk->sk_user_data = sc; sc_get(sc); sc->sc_data_ready = sk->sk_data_ready; sc->sc_state_change = sk->sk_state_change; sk->sk_data_ready = o2net_data_ready; sk->sk_state_change = o2net_state_change; write_unlock_bh(&sk->sk_callback_lock);}static int o2net_unregister_callbacks(struct sock *sk, struct o2net_sock_container *sc){ int ret = 0; write_lock_bh(&sk->sk_callback_lock); if (sk->sk_user_data == sc) { ret = 1; sk->sk_user_data = NULL; sk->sk_data_ready = sc->sc_data_ready; sk->sk_state_change = sc->sc_state_change; } write_unlock_bh(&sk->sk_callback_lock); return ret;}/* * this is a little helper that is called by callers who have seen a problem * with an sc and want to detach it from the nn if someone already hasn't beat * them to it. if an error is given then the shutdown will be persistent * and pending transmits will be canceled. */static void o2net_ensure_shutdown(struct o2net_node *nn, struct o2net_sock_container *sc, int err){ spin_lock(&nn->nn_lock); if (nn->nn_sc == sc) o2net_set_nn_state(nn, NULL, 0, err); spin_unlock(&nn->nn_lock);}/* * This work queue function performs the blocking parts of socket shutdown. A * few paths lead here. set_nn_state will trigger this callback if it sees an * sc detached from the nn. state_change will also trigger this callback * directly when it sees errors. In that case we need to call set_nn_state * ourselves as state_change couldn't get the nn_lock and call set_nn_state * itself. */static void o2net_shutdown_sc(void *arg){ struct o2net_sock_container *sc = arg; struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); sclog(sc, "shutting down\n"); /* drop the callbacks ref and call shutdown only once */ if (o2net_unregister_callbacks(sc->sc_sock->sk, sc)) { /* we shouldn't flush as we're in the thread, the * races with pending sc work structs are harmless */ del_timer_sync(&sc->sc_idle_timeout); o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work); sc_put(sc); sc->sc_sock->ops->shutdown(sc->sc_sock, RCV_SHUTDOWN|SEND_SHUTDOWN); } /* not fatal so failed connects before the other guy has our * heartbeat can be retried */ o2net_ensure_shutdown(nn, sc, 0); sc_put(sc);}/* ------------------------------------------------------------ */static int o2net_handler_cmp(struct o2net_msg_handler *nmh, u32 msg_type, u32 key){ int ret = memcmp(&nmh->nh_key, &key, sizeof(key)); if (ret == 0) ret = memcmp(&nmh->nh_msg_type, &msg_type, sizeof(msg_type)); return ret;}static struct o2net_msg_handler *o2net_handler_tree_lookup(u32 msg_type, u32 key, struct rb_node ***ret_p, struct rb_node **ret_parent){ struct rb_node **p = &o2net_handler_tree.rb_node; struct rb_node *parent = NULL; struct o2net_msg_handler *nmh, *ret = NULL; int cmp; while (*p) { parent = *p; nmh = rb_entry(parent, struct o2net_msg_handler, nh_node); cmp = o2net_handler_cmp(nmh, msg_type, key); if (cmp < 0) p = &(*p)->rb_left; else if (cmp > 0) p = &(*p)->rb_right; else { ret = nmh; break; } } if (ret_p != NULL) *ret_p = p; if (ret_parent != NULL) *ret_parent = parent; return ret;}static void o2net_handler_kref_release(struct kref *kref){ struct o2net_msg_handler *nmh; nmh = container_of(kref, struct o2net_msg_handler, nh_kref); kfree(nmh);}static void o2net_handler_put(struct o2net_msg_handler *nmh){ kref_put(&nmh->nh_kref, o2net_handler_kref_release);}/* max_len is protection for the handler func. incoming messages won't * be given to the handler if their payload is longer than the max. */int o2net_register_handler(u32 msg_type, u32 key, u32 max_len, o2net_msg_handler_func *func, void *data, struct list_head *unreg_list){ struct o2net_msg_handler *nmh = NULL; struct rb_node **p, *parent; int ret = 0; if (max_len > O2NET_MAX_PAYLOAD_BYTES) { mlog(0, "max_len for message handler out of range: %u\n", max_len); ret = -EINVAL; goto out; } if (!msg_type) { mlog(0, "no message type provided: %u, %p\n", msg_type, func); ret = -EINVAL; goto out; } if (!func) { mlog(0, "no message handler provided: %u, %p\n", msg_type, func); ret = -EINVAL; goto out; } nmh = kcalloc(1, sizeof(struct o2net_msg_handler), GFP_NOFS); if (nmh == NULL) { ret = -ENOMEM; goto out; } nmh->nh_func = func; nmh->nh_func_data = data; nmh->nh_msg_type = msg_type; nmh->nh_max_len = max_len; nmh->nh_key = key; /* the tree and list get this ref.. they're both removed in * unregister when this ref is dropped */ kref_init(&nmh->nh_kref); INIT_LIST_HEAD(&nmh->nh_unregister_item); write_lock(&o2net_handler_lock); if (o2net_handler_tree_lookup(msg_type, key, &p, &parent)) ret = -EEXIST; else { rb_link_node(&nmh->nh_node, parent, p); rb_insert_color(&nmh->nh_node, &o2net_handler_tree); list_add_tail(&nmh->nh_unregister_item, unreg_list); mlog(ML_TCP, "registered handler func %p type %u key %08x\n", func, msg_type, key); /* we've had some trouble with handlers seemingly vanishing. */ mlog_bug_on_msg(o2net_handler_tree_lookup(msg_type, key, &p, &parent) == NULL, "couldn't find handler we *just* registerd " "for type %u key %08x\n", msg_type, key); } write_unlock(&o2net_handler_lock); if (ret) goto out;out: if (ret) kfree(nmh); return ret;}EXPORT_SYMBOL_GPL(o2net_register_handler);void o2net_unregister_handler_list(struct list_head *list){ struct list_head *pos, *n; struct o2net_msg_handler *nmh; write_lock(&o2net_handler_lock); list_for_each_safe(pos, n, list) { nmh = list_entry(pos, struct o2net_msg_handler, nh_unregister_item); mlog(ML_TCP, "unregistering handler func %p type %u key %08x\n", nmh->nh_func, nmh->nh_msg_type, nmh->nh_key); rb_erase(&nmh->nh_node, &o2net_handler_tree); list_del_init(&nmh->nh_unregister_item); kref_put(&nmh->nh_kref, o2net_handler_kref_release); } write_unlock(&o2net_handler_lock);}EXPORT_SYMBOL_GPL(o2net_unregister_handler_list);static struct o2net_msg_handler *o2net_handler_get(u32 msg_type, u32 key){ struct o2net_msg_handler *nmh; read_lock(&o2net_handler_lock); nmh = o2net_handler_tree_lookup(msg_type, key, NULL, NULL); if (nmh) kref_get(&nmh->nh_kref); read_unlock(&o2net_handler_lock); return nmh;}/* ------------------------------------------------------------ */static int o2net_recv_tcp_msg(struct socket *sock, void *data, size_t len){ int ret; mm_segment_t oldfs; struct kvec vec = { .iov_len = len, .iov_base = data, }; struct msghdr msg = { .msg_iovlen = 1, .msg_iov = (struct iovec *)&vec, .msg_flags = MSG_DONTWAIT, }; oldfs = get_fs(); set_fs(get_ds()); ret = sock_recvmsg(sock, &msg, len, msg.msg_flags); set_fs(oldfs); return ret;}static int o2net_send_tcp_msg(struct socket *sock, struct kvec *vec, size_t veclen, size_t total){ int ret; mm_segment_t oldfs; struct msghdr msg = { .msg_iov = (struct iovec *)vec, .msg_iovlen = veclen, }; if (sock == NULL) { ret = -EINVAL; goto out; } oldfs = get_fs(); set_fs(get_ds()); ret = sock_sendmsg(sock, &msg, total); set_fs(oldfs); if (ret != total) { mlog(ML_ERROR, "sendmsg returned %d instead of %zu\n", ret, total); if (ret >= 0) ret = -EPIPE; /* should be smarter, I bet */ goto out; } ret = 0;out: if (ret < 0) mlog(0, "returning error: %d\n", ret); return ret;}static void o2net_sendpage(struct o2net_sock_container *sc, void *kmalloced_virt, size_t size){ struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); ssize_t ret; ret = sc->sc_sock->ops->sendpage(sc->sc_sock, virt_to_page(kmalloced_virt), (long)kmalloced_virt & ~PAGE_MASK, size, MSG_DONTWAIT); if (ret != size) { mlog(ML_ERROR, "sendpage of size %zu to " SC_NODEF_FMT " failed with %zd\n", size, SC_NODEF_ARGS(sc), ret); o2net_ensure_shutdown(nn, sc, 0); }}static void o2net_init_msg(struct o2net_msg *msg, u16 data_len, u16 msg_type, u32 key){ memset(msg, 0, sizeof(struct o2net_msg)); msg->magic = cpu_to_be16(O2NET_MSG_MAGIC); msg->data_len = cpu_to_be16(data_len); msg->msg_type = cpu_to_be16(msg_type); msg->sys_status = cpu_to_be32(O2NET_ERR_NONE); msg->status = 0; msg->key = cpu_to_be32(key);}static int o2net_tx_can_proceed(struct o2net_node *nn, struct o2net_sock_container **sc_ret, int *error){ int ret = 0; spin_lock(&nn->nn_lock); if (nn->nn_persistent_error) { ret = 1; *sc_ret = NULL; *error = nn->nn_persistent_error; } else if (nn->nn_sc_valid) { kref_get(&nn->nn_sc->sc_kref); ret = 1; *sc_ret = nn->nn_sc; *error = 0; } spin_unlock(&nn->nn_lock); return ret;}int o2net_send_message_vec(u32 msg_type, u32 key, struct kvec *caller_vec, size_t caller_veclen, u8 target_node, int *status){ int ret, error = 0; struct o2net_msg *msg = NULL; size_t veclen, caller_bytes = 0; struct kvec *vec = NULL; struct o2net_sock_container *sc = NULL; struct o2net_node *nn = o2net_nn_from_num(target_node); struct o2net_status_wait nsw = { .ns_node_item = LIST_HEAD_INIT(nsw.ns_node_item), }; if (o2net_wq == NULL) { mlog(0, "attempt to tx without o2netd running\n"); ret = -ESRCH; goto out; } if (caller_veclen == 0) { mlog(0, "bad kvec array length\n"); ret = -EINVAL; goto out; } caller_bytes = iov_length((struct iovec *)caller_vec, caller_veclen); if (caller_bytes > O2NET_MAX_PAYLOAD_BYTES) { mlog(0, "total payload len %zu too large\n", caller_bytes); ret = -EINVAL; goto out; } if (target_node == o2nm_this_node()) { ret = -ELOOP; goto out; } ret = wait_event_interruptible(nn->nn_sc_wq, o2net_tx_can_proceed(nn, &sc, &error)); if (!ret && error) ret = error; if (ret) goto out; veclen = caller_veclen + 1; vec = kmalloc(sizeof(struct kvec) * veclen, GFP_ATOMIC); if (vec == NULL) { mlog(0, "failed to %zu element kvec!\n", veclen); ret = -ENOMEM; goto out; } msg = kmalloc(sizeof(struct o2net_msg), GFP_ATOMIC); if (!msg) { mlog(0, "failed to allocate a o2net_msg!\n"); ret = -ENOMEM; goto out;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -