📄 tcp.c
字号:
} o2net_init_msg(msg, caller_bytes, msg_type, key); vec[0].iov_len = sizeof(struct o2net_msg); vec[0].iov_base = msg; memcpy(&vec[1], caller_vec, caller_veclen * sizeof(struct kvec)); ret = o2net_prep_nsw(nn, &nsw); if (ret) goto out; msg->msg_num = cpu_to_be32(nsw.ns_id); /* finally, convert the message header to network byte-order * and send */ ret = o2net_send_tcp_msg(sc->sc_sock, vec, veclen, sizeof(struct o2net_msg) + caller_bytes); msglog(msg, "sending returned %d\n", ret); if (ret < 0) { mlog(0, "error returned from o2net_send_tcp_msg=%d\n", ret); goto out; } /* wait on other node's handler */ wait_event(nsw.ns_wq, o2net_nsw_completed(nn, &nsw)); /* Note that we avoid overwriting the callers status return * variable if a system error was reported on the other * side. Callers beware. */ ret = o2net_sys_err_to_errno(nsw.ns_sys_status); if (status && !ret) *status = nsw.ns_status; mlog(0, "woken, returning system status %d, user status %d\n", ret, nsw.ns_status);out: if (sc) sc_put(sc); if (vec) kfree(vec); if (msg) kfree(msg); o2net_complete_nsw(nn, &nsw, 0, 0, 0); return ret;}EXPORT_SYMBOL_GPL(o2net_send_message_vec);int o2net_send_message(u32 msg_type, u32 key, void *data, u32 len, u8 target_node, int *status){ struct kvec vec = { .iov_base = data, .iov_len = len, }; return o2net_send_message_vec(msg_type, key, &vec, 1, target_node, status);}EXPORT_SYMBOL_GPL(o2net_send_message);static int o2net_send_status_magic(struct socket *sock, struct o2net_msg *hdr, enum o2net_system_error syserr, int err){ struct kvec vec = { .iov_base = hdr, .iov_len = sizeof(struct o2net_msg), }; BUG_ON(syserr >= O2NET_ERR_MAX); /* leave other fields intact from the incoming message, msg_num * in particular */ hdr->sys_status = cpu_to_be32(syserr); hdr->status = cpu_to_be32(err); hdr->magic = cpu_to_be16(O2NET_MSG_STATUS_MAGIC); // twiddle the magic hdr->data_len = 0; msglog(hdr, "about to send status magic %d\n", err); /* hdr has been in host byteorder this whole time */ return o2net_send_tcp_msg(sock, &vec, 1, sizeof(struct o2net_msg));}/* this returns -errno if the header was unknown or too large, etc. * after this is called the buffer us reused for the next message */static int o2net_process_message(struct o2net_sock_container *sc, struct o2net_msg *hdr){ struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); int ret = 0, handler_status; enum o2net_system_error syserr; struct o2net_msg_handler *nmh = NULL; msglog(hdr, "processing message\n"); o2net_sc_postpone_idle(sc); switch(be16_to_cpu(hdr->magic)) { case O2NET_MSG_STATUS_MAGIC: /* special type for returning message status */ o2net_complete_nsw(nn, NULL, be32_to_cpu(hdr->msg_num), be32_to_cpu(hdr->sys_status), be32_to_cpu(hdr->status)); goto out; case O2NET_MSG_KEEP_REQ_MAGIC: o2net_sendpage(sc, o2net_keep_resp, sizeof(*o2net_keep_resp)); goto out; case O2NET_MSG_KEEP_RESP_MAGIC: goto out; case O2NET_MSG_MAGIC: break; default: msglog(hdr, "bad magic\n"); ret = -EINVAL; goto out; break; } /* find a handler for it */ handler_status = 0; nmh = o2net_handler_get(be16_to_cpu(hdr->msg_type), be32_to_cpu(hdr->key)); if (!nmh) { mlog(ML_TCP, "couldn't find handler for type %u key %08x\n", be16_to_cpu(hdr->msg_type), be32_to_cpu(hdr->key)); syserr = O2NET_ERR_NO_HNDLR; goto out_respond; } syserr = O2NET_ERR_NONE; if (be16_to_cpu(hdr->data_len) > nmh->nh_max_len) syserr = O2NET_ERR_OVERFLOW; if (syserr != O2NET_ERR_NONE) goto out_respond; do_gettimeofday(&sc->sc_tv_func_start); sc->sc_msg_key = be32_to_cpu(hdr->key); sc->sc_msg_type = be16_to_cpu(hdr->msg_type); handler_status = (nmh->nh_func)(hdr, sizeof(struct o2net_msg) + be16_to_cpu(hdr->data_len), nmh->nh_func_data); do_gettimeofday(&sc->sc_tv_func_stop);out_respond: /* this destroys the hdr, so don't use it after this */ ret = o2net_send_status_magic(sc->sc_sock, hdr, syserr, handler_status); hdr = NULL; mlog(0, "sending handler status %d, syserr %d returned %d\n", handler_status, syserr, ret);out: if (nmh) o2net_handler_put(nmh); return ret;}static int o2net_check_handshake(struct o2net_sock_container *sc){ struct o2net_handshake *hand = page_address(sc->sc_page); struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); if (hand->protocol_version != cpu_to_be64(O2NET_PROTOCOL_VERSION)) { mlog(ML_NOTICE, SC_NODEF_FMT " advertised net protocol " "version %llu but %llu is required, disconnecting\n", SC_NODEF_ARGS(sc), (unsigned long long)be64_to_cpu(hand->protocol_version), O2NET_PROTOCOL_VERSION); /* don't bother reconnecting if its the wrong version. */ o2net_ensure_shutdown(nn, sc, -ENOTCONN); return -1; } sc->sc_handshake_ok = 1; spin_lock(&nn->nn_lock); /* set valid and queue the idle timers only if it hasn't been * shut down already */ if (nn->nn_sc == sc) { o2net_sc_postpone_idle(sc); o2net_set_nn_state(nn, sc, 1, 0); } spin_unlock(&nn->nn_lock); /* shift everything up as though it wasn't there */ sc->sc_page_off -= sizeof(struct o2net_handshake); if (sc->sc_page_off) memmove(hand, hand + 1, sc->sc_page_off); return 0;}/* this demuxes the queued rx bytes into header or payload bits and calls * handlers as each full message is read off the socket. it returns -error, * == 0 eof, or > 0 for progress made.*/static int o2net_advance_rx(struct o2net_sock_container *sc){ struct o2net_msg *hdr; int ret = 0; void *data; size_t datalen; sclog(sc, "receiving\n"); do_gettimeofday(&sc->sc_tv_advance_start); /* do we need more header? */ if (sc->sc_page_off < sizeof(struct o2net_msg)) { data = page_address(sc->sc_page) + sc->sc_page_off; datalen = sizeof(struct o2net_msg) - sc->sc_page_off; ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); if (ret > 0) { sc->sc_page_off += ret; /* this working relies on the handshake being * smaller than the normal message header */ if (sc->sc_page_off >= sizeof(struct o2net_handshake)&& !sc->sc_handshake_ok && o2net_check_handshake(sc)) { ret = -EPROTO; goto out; } /* only swab incoming here.. we can * only get here once as we cross from * being under to over */ if (sc->sc_page_off == sizeof(struct o2net_msg)) { hdr = page_address(sc->sc_page); if (be16_to_cpu(hdr->data_len) > O2NET_MAX_PAYLOAD_BYTES) ret = -EOVERFLOW; } } if (ret <= 0) goto out; } if (sc->sc_page_off < sizeof(struct o2net_msg)) { /* oof, still don't have a header */ goto out; } /* this was swabbed above when we first read it */ hdr = page_address(sc->sc_page); msglog(hdr, "at page_off %zu\n", sc->sc_page_off); /* do we need more payload? */ if (sc->sc_page_off - sizeof(struct o2net_msg) < be16_to_cpu(hdr->data_len)) { /* need more payload */ data = page_address(sc->sc_page) + sc->sc_page_off; datalen = (sizeof(struct o2net_msg) + be16_to_cpu(hdr->data_len)) - sc->sc_page_off; ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); if (ret > 0) sc->sc_page_off += ret; if (ret <= 0) goto out; } if (sc->sc_page_off - sizeof(struct o2net_msg) == be16_to_cpu(hdr->data_len)) { /* we can only get here once, the first time we read * the payload.. so set ret to progress if the handler * works out. after calling this the message is toast */ ret = o2net_process_message(sc, hdr); if (ret == 0) ret = 1; sc->sc_page_off = 0; }out: sclog(sc, "ret = %d\n", ret); do_gettimeofday(&sc->sc_tv_advance_stop); return ret;}/* this work func is triggerd by data ready. it reads until it can read no * more. it interprets 0, eof, as fatal. if data_ready hits while we're doing * our work the work struct will be marked and we'll be called again. */static void o2net_rx_until_empty(void *arg){ struct o2net_sock_container *sc = arg; int ret; do { ret = o2net_advance_rx(sc); } while (ret > 0); if (ret <= 0 && ret != -EAGAIN) { struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); sclog(sc, "saw error %d, closing\n", ret); /* not permanent so read failed handshake can retry */ o2net_ensure_shutdown(nn, sc, 0); } sc_put(sc);}static int o2net_set_nodelay(struct socket *sock){ int ret, val = 1; mm_segment_t oldfs; oldfs = get_fs(); set_fs(KERNEL_DS); /* * Dear unsuspecting programmer, * * Don't use sock_setsockopt() for SOL_TCP. It doesn't check its level * argument and assumes SOL_SOCKET so, say, your TCP_NODELAY will * silently turn into SO_DEBUG. * * Yours, * Keeper of hilariously fragile interfaces. */ ret = sock->ops->setsockopt(sock, SOL_TCP, TCP_NODELAY, (char __user *)&val, sizeof(val)); set_fs(oldfs); return ret;}/* ------------------------------------------------------------ *//* called when a connect completes and after a sock is accepted. the * rx path will see the response and mark the sc valid */static void o2net_sc_connect_completed(void *arg){ struct o2net_sock_container *sc = arg; mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n", (unsigned long long)O2NET_PROTOCOL_VERSION, (unsigned long long)be64_to_cpu(o2net_hand->connector_id)); o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); sc_put(sc);}/* this is called as a work_struct func. */static void o2net_sc_send_keep_req(void *arg){ struct o2net_sock_container *sc = arg; o2net_sendpage(sc, o2net_keep_req, sizeof(*o2net_keep_req)); sc_put(sc);}/* socket shutdown does a del_timer_sync against this as it tears down. * we can't start this timer until we've got to the point in sc buildup * where shutdown is going to be involved */static void o2net_idle_timer(unsigned long data){ struct o2net_sock_container *sc = (struct o2net_sock_container *)data; struct timeval now; do_gettimeofday(&now); mlog(ML_NOTICE, "connection to " SC_NODEF_FMT " has been idle for 10 " "seconds, shutting it down.\n", SC_NODEF_ARGS(sc)); mlog(ML_NOTICE, "here are some times that might help debug the " "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv " "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n", sc->sc_tv_timer.tv_sec, (long) sc->sc_tv_timer.tv_usec, now.tv_sec, (long) now.tv_usec, sc->sc_tv_data_ready.tv_sec, (long) sc->sc_tv_data_ready.tv_usec, sc->sc_tv_advance_start.tv_sec, (long) sc->sc_tv_advance_start.tv_usec, sc->sc_tv_advance_stop.tv_sec, (long) sc->sc_tv_advance_stop.tv_usec, sc->sc_msg_key, sc->sc_msg_type, sc->sc_tv_func_start.tv_sec, (long) sc->sc_tv_func_start.tv_usec, sc->sc_tv_func_stop.tv_sec, (long) sc->sc_tv_func_stop.tv_usec); o2net_sc_queue_work(sc, &sc->sc_shutdown_work);}static void o2net_sc_postpone_idle(struct o2net_sock_container *sc){ o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work); o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work, O2NET_KEEPALIVE_DELAY_SECS * HZ); do_gettimeofday(&sc->sc_tv_timer); mod_timer(&sc->sc_idle_timeout, jiffies + (O2NET_IDLE_TIMEOUT_SECS * HZ));}/* this work func is kicked whenever a path sets the nn state which doesn't * have valid set. This includes seeing hb come up, losing a connection, * having a connect attempt fail, etc. This centralizes the logic which decides * if a connect attempt should be made or if we should give up and all future * transmit attempts should fail */static void o2net_start_connect(void *arg){ struct o2net_node *nn = arg; struct o2net_sock_container *sc = NULL; struct o2nm_node *node = NULL, *mynode = NULL; struct socket *sock = NULL; struct sockaddr_in myaddr = {0, }, remoteaddr = {0, }; int ret = 0; /* if we're greater we initiate tx, otherwise we accept */ if (o2nm_this_node() <= o2net_num_from_nn(nn)) goto out; /* watch for racing with tearing a node down */ node = o2nm_get_node_by_num(o2net_num_from_nn(nn)); if (node == NULL) { ret = 0; goto out; } mynode = o2nm_get_node_by_num(o2nm_this_node()); if (mynode == NULL) { ret = 0; goto out; } spin_lock(&nn->nn_lock); /* see if we already have one pending or have given up */ if (nn->nn_sc || nn->nn_persistent_error) arg = NULL; spin_unlock(&nn->nn_lock); if (arg == NULL) /* *shrug*, needed some indicator */ goto out; nn->nn_last_connect_attempt = jiffies; sc = sc_alloc(node); if (sc == NULL) { mlog(0, "couldn't allocate sc\n"); ret = -ENOMEM; goto out; } ret = sock_create(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock); if (ret < 0) { mlog(0, "can't create socket: %d\n", ret); goto out; } sc->sc_sock = sock; /* freed by sc_kref_release */ sock->sk->sk_allocation = GFP_ATOMIC; myaddr.sin_family = AF_INET; myaddr.sin_addr.s_addr = (__force u32)mynode->nd_ipv4_address; myaddr.sin_port = (__force u16)htons(0); /* any port */ ret = sock->ops->bind(sock, (struct sockaddr *)&myaddr, sizeof(myaddr)); if (ret) { mlog(ML_ERROR, "bind failed with %d at address %u.%u.%u.%u\n", ret, NIPQUAD(mynode->nd_ipv4_address)); goto out; } ret = o2net_set_nodelay(sc->sc_sock); if (ret) { mlog(ML_ERROR, "setting TCP_NODELAY failed with %d\n", ret);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -