📄 socklnd_cb.c
字号:
/* called holding global lock (read or irq-write) and caller may * not have dropped this lock between finding conn and calling me, * so we don't need the {get,put}connsock dance to deref * ksnc_sock... */ LASSERT(!conn->ksnc_closing); CDEBUG (D_NET, "Sending to %s ip %d.%d.%d.%d:%d\n", libcfs_id2str(conn->ksnc_peer->ksnp_id), HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); tx->tx_checked_zc = 0; conn->ksnc_proto->pro_pack(tx); /* Ensure the frags we've been given EXACTLY match the number of * bytes we want to send. Many TCP/IP stacks disregard any total * size parameters passed to them and just look at the frags. * * We always expect at least 1 mapped fragment containing the * complete ksocknal message header. */ LASSERT (lnet_iov_nob (tx->tx_niov, tx->tx_iov) + lnet_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob); LASSERT (tx->tx_niov >= 1); LASSERT (tx->tx_resid == tx->tx_nob); CDEBUG (D_NET, "Packet %p type %d, nob %d niov %d nkiov %d\n", tx, (tx->tx_lnetmsg != NULL)? tx->tx_lnetmsg->msg_hdr.type: KSOCK_MSG_NOOP, tx->tx_nob, tx->tx_niov, tx->tx_nkiov); atomic_add (tx->tx_nob, &conn->ksnc_tx_nob); tx->tx_conn = conn; ksocknal_conn_addref(conn); /* +1 ref for tx */ /* * NB Darwin: SOCK_WMEM_QUEUED()->sock_getsockopt() will take * a blockable lock(socket lock), so SOCK_WMEM_QUEUED can't be * put in spinlock. */ bufnob = SOCK_WMEM_QUEUED(conn->ksnc_sock); spin_lock_bh (&sched->kss_lock); if (list_empty(&conn->ksnc_tx_queue) && bufnob == 0) { /* First packet starts the timeout */ conn->ksnc_tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout); conn->ksnc_tx_bufnob = 0; mb(); /* order with adding to tx_queue */ } ztx = NULL; if (msg->ksm_type == KSOCK_MSG_NOOP) { /* The packet is noop ZC ACK, try to piggyback the ack_cookie * on a normal packet so I don't need to send it */ LASSERT(msg->ksm_zc_req_cookie == 0); LASSERT(msg->ksm_zc_ack_cookie != 0); if (conn->ksnc_tx_mono != NULL) { if (ksocknal_piggyback_zcack(conn, msg->ksm_zc_ack_cookie)) { /* zc-ack cookie is piggybacked */ atomic_sub (tx->tx_nob, &conn->ksnc_tx_nob); ztx = tx; /* Put to freelist later */ } else { /* no packet can piggyback zc-ack cookie */ list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); } } else { /* It's the first mono-packet */ conn->ksnc_tx_mono = tx; list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); } } else { /* It's a normal packet - can it piggback a noop zc-ack that * has been queued already? */ LASSERT(msg->ksm_zc_ack_cookie == 0); if (conn->ksnc_proto == &ksocknal_protocol_v2x && /* V2.x packet */ conn->ksnc_tx_mono != NULL) { if (conn->ksnc_tx_mono->tx_msg.ksm_type == KSOCK_MSG_NOOP) { /* There is a noop zc-ack can be piggybacked */ ztx = conn->ksnc_tx_mono; msg->ksm_zc_ack_cookie = ztx->tx_msg.ksm_zc_ack_cookie; ksocknal_next_mono_tx(conn); /* use tx to replace the noop zc-ack packet, ztx will * be put to freelist later */ list_add(&tx->tx_list, &ztx->tx_list); list_del(&ztx->tx_list); atomic_sub (ztx->tx_nob, &conn->ksnc_tx_nob); } else { /* no noop zc-ack packet, just enqueue it */ LASSERT(conn->ksnc_tx_mono->tx_msg.ksm_type == KSOCK_MSG_LNET); list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); } } else if (conn->ksnc_proto == &ksocknal_protocol_v2x) { /* it's the first mono-packet, enqueue it */ conn->ksnc_tx_mono = tx; list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); } else { /* V1.x packet, just enqueue it */ list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue); } } if (ztx != NULL) list_add_tail(&ztx->tx_list, &sched->kss_zombie_noop_txs); if (conn->ksnc_tx_ready && /* able to send */ !conn->ksnc_tx_scheduled) { /* not scheduled to send */ /* +1 ref for scheduler */ ksocknal_conn_addref(conn); list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns); conn->ksnc_tx_scheduled = 1; cfs_waitq_signal (&sched->kss_waitq); } spin_unlock_bh (&sched->kss_lock);}ksock_route_t *ksocknal_find_connectable_route_locked (ksock_peer_t *peer){ struct list_head *tmp; ksock_route_t *route; list_for_each (tmp, &peer->ksnp_routes) { route = list_entry (tmp, ksock_route_t, ksnr_list); LASSERT (!route->ksnr_connecting || route->ksnr_scheduled); if (route->ksnr_scheduled) /* connections being established */ continue; /* all route types connected ? */ if ((ksocknal_route_mask() & ~route->ksnr_connected) == 0) continue; /* too soon to retry this guy? */ if (!(route->ksnr_retry_interval == 0 || /* first attempt */ cfs_time_aftereq (cfs_time_current(), route->ksnr_timeout))) continue; return (route); } return (NULL);}ksock_route_t *ksocknal_find_connecting_route_locked (ksock_peer_t *peer){ struct list_head *tmp; ksock_route_t *route; list_for_each (tmp, &peer->ksnp_routes) { route = list_entry (tmp, ksock_route_t, ksnr_list); LASSERT (!route->ksnr_connecting || route->ksnr_scheduled); if (route->ksnr_scheduled) return (route); } return (NULL);}intksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id){ ksock_peer_t *peer; ksock_conn_t *conn; ksock_route_t *route; rwlock_t *g_lock; int retry; int rc; LASSERT (tx->tx_conn == NULL); LASSERT (tx->tx_lnetmsg != NULL); g_lock = &ksocknal_data.ksnd_global_lock; for (retry = 0;; retry = 1) {#if !SOCKNAL_ROUND_ROBIN read_lock (g_lock); peer = ksocknal_find_peer_locked(ni, id); if (peer != NULL) { if (ksocknal_find_connectable_route_locked(peer) == NULL) { conn = ksocknal_find_conn_locked (tx->tx_lnetmsg->msg_len, peer); if (conn != NULL) { /* I've got no routes that need to be * connecting and I do have an actual * connection... */ ksocknal_queue_tx_locked (tx, conn); read_unlock (g_lock); return (0); } } } /* I'll need a write lock... */ read_unlock (g_lock);#endif write_lock_bh (g_lock); peer = ksocknal_find_peer_locked(ni, id); if (peer != NULL) break; write_unlock_bh (g_lock); if ((id.pid & LNET_PID_USERFLAG) != 0) { CERROR("Refusing to create a connection to " "userspace process %s\n", libcfs_id2str(id)); return -EHOSTUNREACH; } if (retry) { CERROR("Can't find peer %s\n", libcfs_id2str(id)); return -EHOSTUNREACH; } rc = ksocknal_add_peer(ni, id, LNET_NIDADDR(id.nid), lnet_acceptor_port()); if (rc != 0) { CERROR("Can't add peer %s: %d\n", libcfs_id2str(id), rc); return rc; } } for (;;) { /* launch any/all connections that need it */ route = ksocknal_find_connectable_route_locked (peer); if (route == NULL) break; ksocknal_launch_connection_locked (route); } conn = ksocknal_find_conn_locked (tx->tx_lnetmsg->msg_len, peer); if (conn != NULL) { /* Connection exists; queue message on it */ ksocknal_queue_tx_locked (tx, conn); write_unlock_bh (g_lock); return (0); } if (peer->ksnp_accepting > 0 || ksocknal_find_connecting_route_locked (peer) != NULL) { /* Queue the message until a connection is established */ list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue); write_unlock_bh (g_lock); return 0; } write_unlock_bh (g_lock); /* NB Routes may be ignored if connections to them failed recently */ CDEBUG(D_NETERROR, "No usable routes to %s\n", libcfs_id2str(id)); return (-EHOSTUNREACH);}intksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg){ int type = lntmsg->msg_type; lnet_process_id_t target = lntmsg->msg_target; unsigned int payload_niov = lntmsg->msg_niov; struct iovec *payload_iov = lntmsg->msg_iov; lnet_kiov_t *payload_kiov = lntmsg->msg_kiov; unsigned int payload_offset = lntmsg->msg_offset; unsigned int payload_nob = lntmsg->msg_len; ksock_tx_t *tx; int desc_size; int rc; /* NB 'private' is different depending on what we're sending. * Just ignore it... */ CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n", payload_nob, payload_niov, libcfs_id2str(target)); LASSERT (payload_nob == 0 || payload_niov > 0); LASSERT (payload_niov <= LNET_MAX_IOV); /* payload is either all vaddrs or all pages */ LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); LASSERT (!in_interrupt ()); if (payload_iov != NULL) desc_size = offsetof(ksock_tx_t, tx_frags.virt.iov[1 + payload_niov]); else desc_size = offsetof(ksock_tx_t, tx_frags.paged.kiov[payload_niov]); tx = ksocknal_alloc_tx(desc_size); if (tx == NULL) { CERROR("Can't allocate tx desc type %d size %d\n", type, desc_size); return (-ENOMEM); } tx->tx_conn = NULL; /* set when assigned a conn */ tx->tx_lnetmsg = lntmsg; if (payload_iov != NULL) { tx->tx_kiov = NULL; tx->tx_nkiov = 0; tx->tx_iov = tx->tx_frags.virt.iov; tx->tx_niov = 1 + lnet_extract_iov(payload_niov, &tx->tx_iov[1], payload_niov, payload_iov, payload_offset, payload_nob); } else { tx->tx_niov = 1; tx->tx_iov = &tx->tx_frags.paged.iov; tx->tx_kiov = tx->tx_frags.paged.kiov; tx->tx_nkiov = lnet_extract_kiov(payload_niov, tx->tx_kiov, payload_niov, payload_kiov, payload_offset, payload_nob); } ksocknal_init_msg(&tx->tx_msg, KSOCK_MSG_LNET); /* The first fragment will be set later in pro_pack */ rc = ksocknal_launch_packet(ni, tx, target); if (rc == 0) return (0); ksocknal_free_tx(tx); return (-EIO);}intksocknal_thread_start (int (*fn)(void *arg), void *arg){ long pid = cfs_kernel_thread (fn, arg, 0); if (pid < 0) return ((int)pid); write_lock_bh (&ksocknal_data.ksnd_global_lock); ksocknal_data.ksnd_nthreads++; write_unlock_bh (&ksocknal_data.ksnd_global_lock); return (0);}voidksocknal_thread_fini (void){ write_lock_bh (&ksocknal_data.ksnd_global_lock); ksocknal_data.ksnd_nthreads--; write_unlock_bh (&ksocknal_data.ksnd_global_lock);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -