📄 tcp.c
字号:
goto out; } /* wait on other node's handler */ wait_event(nsw.ns_wq, o2net_nsw_completed(nn, &nsw)); /* Note that we avoid overwriting the callers status return * variable if a system error was reported on the other * side. Callers beware. */ ret = o2net_sys_err_to_errno(nsw.ns_sys_status); if (status && !ret) *status = nsw.ns_status; mlog(0, "woken, returning system status %d, user status %d\n", ret, nsw.ns_status);out: if (sc) sc_put(sc); if (vec) kfree(vec); if (msg) kfree(msg); o2net_complete_nsw(nn, &nsw, 0, 0, 0); return ret;}EXPORT_SYMBOL_GPL(o2net_send_message_vec);int o2net_send_message(u32 msg_type, u32 key, void *data, u32 len, u8 target_node, int *status){ struct kvec vec = { .iov_base = data, .iov_len = len, }; return o2net_send_message_vec(msg_type, key, &vec, 1, target_node, status);}EXPORT_SYMBOL_GPL(o2net_send_message);static int o2net_send_status_magic(struct socket *sock, struct o2net_msg *hdr, enum o2net_system_error syserr, int err){ struct kvec vec = { .iov_base = hdr, .iov_len = sizeof(struct o2net_msg), }; BUG_ON(syserr >= O2NET_ERR_MAX); /* leave other fields intact from the incoming message, msg_num * in particular */ hdr->sys_status = cpu_to_be32(syserr); hdr->status = cpu_to_be32(err); hdr->magic = cpu_to_be16(O2NET_MSG_STATUS_MAGIC); // twiddle the magic hdr->data_len = 0; msglog(hdr, "about to send status magic %d\n", err); /* hdr has been in host byteorder this whole time */ return o2net_send_tcp_msg(sock, &vec, 1, sizeof(struct o2net_msg));}/* this returns -errno if the header was unknown or too large, etc. * after this is called the buffer us reused for the next message */static int o2net_process_message(struct o2net_sock_container *sc, struct o2net_msg *hdr){ struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); int ret = 0, handler_status; enum o2net_system_error syserr; struct o2net_msg_handler *nmh = NULL; void *ret_data = NULL; msglog(hdr, "processing message\n"); o2net_sc_postpone_idle(sc); switch(be16_to_cpu(hdr->magic)) { case O2NET_MSG_STATUS_MAGIC: /* special type for returning message status */ o2net_complete_nsw(nn, NULL, be32_to_cpu(hdr->msg_num), be32_to_cpu(hdr->sys_status), be32_to_cpu(hdr->status)); goto out; case O2NET_MSG_KEEP_REQ_MAGIC: o2net_sendpage(sc, o2net_keep_resp, sizeof(*o2net_keep_resp)); goto out; case O2NET_MSG_KEEP_RESP_MAGIC: goto out; case O2NET_MSG_MAGIC: break; default: msglog(hdr, "bad magic\n"); ret = -EINVAL; goto out; break; } /* find a handler for it */ handler_status = 0; nmh = o2net_handler_get(be16_to_cpu(hdr->msg_type), be32_to_cpu(hdr->key)); if (!nmh) { mlog(ML_TCP, "couldn't find handler for type %u key %08x\n", be16_to_cpu(hdr->msg_type), be32_to_cpu(hdr->key)); syserr = O2NET_ERR_NO_HNDLR; goto out_respond; } syserr = O2NET_ERR_NONE; if (be16_to_cpu(hdr->data_len) > nmh->nh_max_len) syserr = O2NET_ERR_OVERFLOW; if (syserr != O2NET_ERR_NONE) goto out_respond; do_gettimeofday(&sc->sc_tv_func_start); sc->sc_msg_key = be32_to_cpu(hdr->key); sc->sc_msg_type = be16_to_cpu(hdr->msg_type); handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) + be16_to_cpu(hdr->data_len), nmh->nh_func_data, &ret_data); do_gettimeofday(&sc->sc_tv_func_stop);out_respond: /* this destroys the hdr, so don't use it after this */ mutex_lock(&sc->sc_send_lock); ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr, handler_status); mutex_unlock(&sc->sc_send_lock); hdr = NULL; mlog(0, "sending handler status %d, syserr %d returned %d\n", handler_status, syserr, ret); if (nmh) { BUG_ON(ret_data != NULL && nmh->nh_post_func == NULL); if (nmh->nh_post_func) (nmh->nh_post_func)(handler_status, nmh->nh_func_data, ret_data); }out: if (nmh) o2net_handler_put(nmh); return ret;}static int o2net_check_handshake(struct o2net_sock_container *sc){ struct o2net_handshake *hand = page_address(sc->sc_page); struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); if (hand->protocol_version != cpu_to_be64(O2NET_PROTOCOL_VERSION)) { mlog(ML_NOTICE, SC_NODEF_FMT " advertised net protocol " "version %llu but %llu is required, disconnecting\n", SC_NODEF_ARGS(sc), (unsigned long long)be64_to_cpu(hand->protocol_version), O2NET_PROTOCOL_VERSION); /* don't bother reconnecting if its the wrong version. */ o2net_ensure_shutdown(nn, sc, -ENOTCONN); return -1; } /* * Ensure timeouts are consistent with other nodes, otherwise * we can end up with one node thinking that the other must be down, * but isn't. This can ultimately cause corruption. */ if (be32_to_cpu(hand->o2net_idle_timeout_ms) != o2net_idle_timeout(sc->sc_node)) { mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of " "%u ms, but we use %u ms locally. disconnecting\n", SC_NODEF_ARGS(sc), be32_to_cpu(hand->o2net_idle_timeout_ms), o2net_idle_timeout(sc->sc_node)); o2net_ensure_shutdown(nn, sc, -ENOTCONN); return -1; } if (be32_to_cpu(hand->o2net_keepalive_delay_ms) != o2net_keepalive_delay(sc->sc_node)) { mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of " "%u ms, but we use %u ms locally. disconnecting\n", SC_NODEF_ARGS(sc), be32_to_cpu(hand->o2net_keepalive_delay_ms), o2net_keepalive_delay(sc->sc_node)); o2net_ensure_shutdown(nn, sc, -ENOTCONN); return -1; } if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) != O2HB_MAX_WRITE_TIMEOUT_MS) { mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of " "%u ms, but we use %u ms locally. disconnecting\n", SC_NODEF_ARGS(sc), be32_to_cpu(hand->o2hb_heartbeat_timeout_ms), O2HB_MAX_WRITE_TIMEOUT_MS); o2net_ensure_shutdown(nn, sc, -ENOTCONN); return -1; } sc->sc_handshake_ok = 1; spin_lock(&nn->nn_lock); /* set valid and queue the idle timers only if it hasn't been * shut down already */ if (nn->nn_sc == sc) { o2net_sc_reset_idle_timer(sc); o2net_set_nn_state(nn, sc, 1, 0); } spin_unlock(&nn->nn_lock); /* shift everything up as though it wasn't there */ sc->sc_page_off -= sizeof(struct o2net_handshake); if (sc->sc_page_off) memmove(hand, hand + 1, sc->sc_page_off); return 0;}/* this demuxes the queued rx bytes into header or payload bits and calls * handlers as each full message is read off the socket. it returns -error, * == 0 eof, or > 0 for progress made.*/static int o2net_advance_rx(struct o2net_sock_container *sc){ struct o2net_msg *hdr; int ret = 0; void *data; size_t datalen; sclog(sc, "receiving\n"); do_gettimeofday(&sc->sc_tv_advance_start); if (unlikely(sc->sc_handshake_ok == 0)) { if(sc->sc_page_off < sizeof(struct o2net_handshake)) { data = page_address(sc->sc_page) + sc->sc_page_off; datalen = sizeof(struct o2net_handshake) - sc->sc_page_off; ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); if (ret > 0) sc->sc_page_off += ret; } if (sc->sc_page_off == sizeof(struct o2net_handshake)) { o2net_check_handshake(sc); if (unlikely(sc->sc_handshake_ok == 0)) ret = -EPROTO; } goto out; } /* do we need more header? */ if (sc->sc_page_off < sizeof(struct o2net_msg)) { data = page_address(sc->sc_page) + sc->sc_page_off; datalen = sizeof(struct o2net_msg) - sc->sc_page_off; ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); if (ret > 0) { sc->sc_page_off += ret; /* only swab incoming here.. we can * only get here once as we cross from * being under to over */ if (sc->sc_page_off == sizeof(struct o2net_msg)) { hdr = page_address(sc->sc_page); if (be16_to_cpu(hdr->data_len) > O2NET_MAX_PAYLOAD_BYTES) ret = -EOVERFLOW; } } if (ret <= 0) goto out; } if (sc->sc_page_off < sizeof(struct o2net_msg)) { /* oof, still don't have a header */ goto out; } /* this was swabbed above when we first read it */ hdr = page_address(sc->sc_page); msglog(hdr, "at page_off %zu\n", sc->sc_page_off); /* do we need more payload? */ if (sc->sc_page_off - sizeof(struct o2net_msg) < be16_to_cpu(hdr->data_len)) { /* need more payload */ data = page_address(sc->sc_page) + sc->sc_page_off; datalen = (sizeof(struct o2net_msg) + be16_to_cpu(hdr->data_len)) - sc->sc_page_off; ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); if (ret > 0) sc->sc_page_off += ret; if (ret <= 0) goto out; } if (sc->sc_page_off - sizeof(struct o2net_msg) == be16_to_cpu(hdr->data_len)) { /* we can only get here once, the first time we read * the payload.. so set ret to progress if the handler * works out. after calling this the message is toast */ ret = o2net_process_message(sc, hdr); if (ret == 0) ret = 1; sc->sc_page_off = 0; }out: sclog(sc, "ret = %d\n", ret); do_gettimeofday(&sc->sc_tv_advance_stop); return ret;}/* this work func is triggerd by data ready. it reads until it can read no * more. it interprets 0, eof, as fatal. if data_ready hits while we're doing * our work the work struct will be marked and we'll be called again. */static void o2net_rx_until_empty(struct work_struct *work){ struct o2net_sock_container *sc = container_of(work, struct o2net_sock_container, sc_rx_work); int ret; do { ret = o2net_advance_rx(sc); } while (ret > 0); if (ret <= 0 && ret != -EAGAIN) { struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); sclog(sc, "saw error %d, closing\n", ret); /* not permanent so read failed handshake can retry */ o2net_ensure_shutdown(nn, sc, 0); } sc_put(sc);}static int o2net_set_nodelay(struct socket *sock){ int ret, val = 1; mm_segment_t oldfs; oldfs = get_fs(); set_fs(KERNEL_DS); /* * Dear unsuspecting programmer, * * Don't use sock_setsockopt() for SOL_TCP. It doesn't check its level * argument and assumes SOL_SOCKET so, say, your TCP_NODELAY will * silently turn into SO_DEBUG. * * Yours, * Keeper of hilariously fragile interfaces. */ ret = sock->ops->setsockopt(sock, SOL_TCP, TCP_NODELAY, (char __user *)&val, sizeof(val)); set_fs(oldfs); return ret;}static void o2net_initialize_handshake(void){ o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32( O2HB_MAX_WRITE_TIMEOUT_MS); o2net_hand->o2net_idle_timeout_ms = cpu_to_be32( o2net_idle_timeout(NULL)); o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32( o2net_keepalive_delay(NULL)); o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32( o2net_reconnect_delay(NULL));}/* ------------------------------------------------------------ *//* called when a connect completes and after a sock is accepted. the * rx path will see the response and mark the sc valid */static void o2net_sc_connect_completed(struct work_struct *work){ struct o2net_sock_container *sc = container_of(work, struct o2net_sock_container, sc_connect_work); mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n", (unsigned long long)O2NET_PROTOCOL_VERSION, (unsigned long long)be64_to_cpu(o2net_hand->connector_id)); o2net_initialize_handshake(); o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); sc_put(sc);}/* this is called as a work_struct func. */static void o2net_sc_send_keep_req(struct work_struct *work){ struct o2net_sock_container *sc = container_of(work, struct o2net_sock_container, sc_keepalive_work.work); o2net_sendpage(sc, o2net_keep_req, sizeof(*o2net_keep_req)); sc_put(sc);}/* socket shutdown does a del_timer_sync against this as it tears down. * we can't start this timer until we've got to the point in sc buildup * where shutdown is going to be involved */static void o2net_idle_timer(unsigned long data){ struct o2net_sock_container *sc = (struct o2net_sock_container *)data; struct timeval now; do_gettimeofday(&now); printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for %u.%u " "seconds, shutting it down.\n", SC_NODEF_ARGS(sc), o2net_idle_timeout(sc->sc_node) / 1000, o2net_idle_timeout(sc->sc_node) % 1000); mlog(ML_NOTICE, "here are some times that might help debug the " "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv " "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n", sc->sc_tv_timer.tv_sec, (long) sc->sc_tv_timer.tv_usec, now.tv_sec, (long) now.tv_usec, sc->sc_tv_data_ready.tv_sec, (long) sc->sc_tv_data_ready.tv_usec, sc->sc_tv_advance_start.tv_sec, (long) sc->sc_tv_advance_start.tv_usec, sc->sc_tv_advance_stop.tv_sec, (long) sc->sc_tv_advance_stop.tv_usec, sc->sc_msg_key, sc->sc_msg_type, sc->sc_tv_func_start.tv_sec, (long) sc->sc_tv_func_start.tv_usec, sc->sc_tv_func_stop.tv_sec, (long) sc->sc_tv_func_stop.tv_usec); o2net_sc_queue_work(sc, &sc->sc_shutdown_work);}static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc){ o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work); o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work, msecs_to_jiffies(o2net_keepalive_delay(sc->sc_node))); do_gettimeofday(&sc->sc_tv_timer); mod_timer(&sc->sc_idle_timeout, jiffies + msecs_to_jiffies(o2net_idle_timeout(sc->sc_node)));}static void o2net_sc_postpone_idle(struct o2net_sock_container *sc){ /* Only push out an existing timer */ if (timer_pending(&sc->sc_idle_timeout)) o2net_sc_reset_idle_timer(sc);}/* this work func is kicked whenever a path sets the nn state which doesn't * have valid set. This includes seeing hb come up, losing a connection, * having a connect attempt fail, etc. This centralizes the logic which decides * if a connect attempt should be made or if we should give up and all future * transmit attempts should fail */static void o2net_start_connect(struct work_struct *work){ struct o2net_node *nn = container_of(work, struct o2net_node, nn_connect_work.work); struct o2net_sock_container *sc = NULL; struct o2nm_node *node = NULL, *mynode = NULL; struct socket *sock = NULL; struct sockaddr_in myaddr = {0, }, remoteaddr = {0, }; int ret = 0, stop; /* if we're greater we initiate tx, otherwise we accept */ if (o2nm_this_node() <= o2net_num_from_nn(nn)) goto out; /* watch for racing with tearing a node down */ node = o2nm_get_node_by_num(o2net_num_from_nn(nn)); if (node == NULL) { ret = 0; goto out; } mynode = o2nm_get_node_by_num(o2nm_this_node()); if (mynode == NULL) { ret = 0; goto out; } spin_lock(&nn->nn_lock); /* see if we already have one pending or have given up */ stop = (nn->nn_sc || nn->nn_persistent_error); spin_unlock(&nn->nn_lock); if (stop) goto out; nn->nn_last_connect_attempt = jiffies;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -