📄 socklnd_cb.c
字号:
} else if (rc == 0 && conn->ksnc_rx_started) { /* EOF in the middle of a message */ rc = -EPROTO; } break; } /* Completed a fragment */ if (conn->ksnc_rx_nob_wanted == 0) { rc = 1; break; } } ksocknal_connsock_decref(conn); RETURN (rc);}voidksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx){ lnet_msg_t *lnetmsg = tx->tx_lnetmsg; int rc = (tx->tx_resid == 0) ? 0 : -EIO; ENTRY; LASSERT(ni != NULL || tx->tx_conn != NULL); if (tx->tx_conn != NULL) ksocknal_conn_decref(tx->tx_conn); if (ni == NULL && tx->tx_conn != NULL) ni = tx->tx_conn->ksnc_peer->ksnp_ni; ksocknal_free_tx (tx); if (lnetmsg != NULL) /* KSOCK_MSG_NOOP go without lnetmsg */ lnet_finalize (ni, lnetmsg, rc); EXIT;}voidksocknal_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int error){ ksock_tx_t *tx; while (!list_empty (txlist)) { tx = list_entry (txlist->next, ksock_tx_t, tx_list); if (error && tx->tx_lnetmsg != NULL) { CDEBUG (D_NETERROR, "Deleting packet type %d len %d %s->%s\n", le32_to_cpu (tx->tx_lnetmsg->msg_hdr.type), le32_to_cpu (tx->tx_lnetmsg->msg_hdr.payload_length), libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)), libcfs_nid2str(le64_to_cpu (tx->tx_lnetmsg->msg_hdr.dest_nid))); } else if (error) { CDEBUG (D_NETERROR, "Deleting noop packet\n"); } list_del (&tx->tx_list); LASSERT (atomic_read(&tx->tx_refcount) == 1); ksocknal_tx_done (ni, tx); }}static voidksocknal_check_zc_req(ksock_tx_t *tx){ ksock_conn_t *conn = tx->tx_conn; ksock_peer_t *peer = conn->ksnc_peer; lnet_kiov_t *kiov = tx->tx_kiov; int nkiov = tx->tx_nkiov; /* Set tx_msg.ksm_zc_req_cookie to a unique non-zero cookie and add tx * to ksnp_zc_req_list if some fragment of this message should be sent * zero-copy. Our peer will send an ACK containing this cookie when * she has received this message to tell us we can signal completion. * tx_msg.ksm_zc_req_cookie remains non-zero while tx is on * ksnp_zc_req_list. */ if (conn->ksnc_proto != &ksocknal_protocol_v2x || !conn->ksnc_zc_capable) return; while (nkiov > 0) { if (kiov->kiov_len >= *ksocknal_tunables.ksnd_zc_min_frag) break; --nkiov; ++kiov; } if (nkiov == 0) return; /* assign cookie and queue tx to pending list, it will be released when * a matching ack is received. See ksocknal_handle_zc_ack() */ ksocknal_tx_addref(tx); spin_lock(&peer->ksnp_lock); LASSERT (tx->tx_msg.ksm_zc_req_cookie == 0); tx->tx_msg.ksm_zc_req_cookie = peer->ksnp_zc_next_cookie++; list_add_tail(&tx->tx_zc_list, &peer->ksnp_zc_req_list); spin_unlock(&peer->ksnp_lock);}static voidksocknal_unzc_req(ksock_tx_t *tx){ ksock_peer_t *peer = tx->tx_conn->ksnc_peer; spin_lock(&peer->ksnp_lock); if (tx->tx_msg.ksm_zc_req_cookie == 0) { /* Not waiting for an ACK */ spin_unlock(&peer->ksnp_lock); return; } tx->tx_msg.ksm_zc_req_cookie = 0; list_del(&tx->tx_zc_list); spin_unlock(&peer->ksnp_lock); ksocknal_tx_decref(tx);}intksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx){ int rc; if (!tx->tx_checked_zc) { tx->tx_checked_zc = 1; ksocknal_check_zc_req(tx); } rc = ksocknal_transmit (conn, tx); CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc); if (tx->tx_resid == 0) { /* Sent everything OK */ LASSERT (rc == 0); return (0); } if (rc == -EAGAIN) return (rc); if (rc == -ENOMEM) { static int counter; counter++; /* exponential backoff warnings */ if ((counter & (-counter)) == counter) CWARN("%u ENOMEM tx %p (%u allocated)\n", counter, conn, atomic_read(&libcfs_kmemory)); /* Queue on ksnd_enomem_conns for retry after a timeout */ spin_lock_bh (&ksocknal_data.ksnd_reaper_lock); /* enomem list takes over scheduler's ref... */ LASSERT (conn->ksnc_tx_scheduled); list_add_tail(&conn->ksnc_tx_list, &ksocknal_data.ksnd_enomem_conns); if (!cfs_time_aftereq(cfs_time_add(cfs_time_current(), SOCKNAL_ENOMEM_RETRY), ksocknal_data.ksnd_reaper_waketime)) cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq); spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock); return (rc); } /* Actual error */ LASSERT (rc < 0); if (!conn->ksnc_closing) { switch (rc) { case -ECONNRESET: LCONSOLE_WARN("Host %u.%u.%u.%u reset our connection " "while we were sending data; it may have " "rebooted.\n", HIPQUAD(conn->ksnc_ipaddr)); break; default: LCONSOLE_WARN("There was an unexpected network error " "while writing to %u.%u.%u.%u: %d.\n", HIPQUAD(conn->ksnc_ipaddr), rc); break; } CDEBUG(D_NET, "[%p] Error %d on write to %s" " ip %d.%d.%d.%d:%d\n", conn, rc, libcfs_id2str(conn->ksnc_peer->ksnp_id), HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); } ksocknal_unzc_req(tx); /* it's not an error if conn is being closed */ ksocknal_close_conn_and_siblings (conn, (conn->ksnc_closing) ? 0 : rc); return (rc);}voidksocknal_launch_connection_locked (ksock_route_t *route){ /* called holding write lock on ksnd_global_lock */ LASSERT (!route->ksnr_scheduled); LASSERT (!route->ksnr_connecting); LASSERT ((ksocknal_route_mask() & ~route->ksnr_connected) != 0); route->ksnr_scheduled = 1; /* scheduling conn for connd */ ksocknal_route_addref(route); /* extra ref for connd */ spin_lock_bh (&ksocknal_data.ksnd_connd_lock); list_add_tail (&route->ksnr_connd_list, &ksocknal_data.ksnd_connd_routes); cfs_waitq_signal (&ksocknal_data.ksnd_connd_waitq); spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);}ksock_conn_t *ksocknal_find_conn_locked (int payload_nob, ksock_peer_t *peer){ struct list_head *tmp; ksock_conn_t *typed = NULL; int tnob = 0; ksock_conn_t *fallback = NULL; int fnob = 0; ksock_conn_t *conn; list_for_each (tmp, &peer->ksnp_conns) { ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list); int hdr_nob = 0;#if SOCKNAL_ROUND_ROBIN const int nob = 0;#else int nob = atomic_read(&c->ksnc_tx_nob) + SOCK_WMEM_QUEUED(c->ksnc_sock);#endif LASSERT (!c->ksnc_closing); LASSERT (c->ksnc_proto != NULL); if (fallback == NULL || nob < fnob) { fallback = c; fnob = nob; } if (!*ksocknal_tunables.ksnd_typed_conns) continue; if (payload_nob == 0) { /* noop packet */ hdr_nob = offsetof(ksock_msg_t, ksm_u); } else { /* lnet packet */ hdr_nob = (c->ksnc_proto == &ksocknal_protocol_v2x)? offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload): sizeof(lnet_hdr_t); } switch (c->ksnc_type) { default: CERROR("ksnc_type bad: %u\n", c->ksnc_type); LBUG(); case SOCKLND_CONN_ANY: break; case SOCKLND_CONN_BULK_IN: continue; case SOCKLND_CONN_BULK_OUT: if ((hdr_nob + payload_nob) < *ksocknal_tunables.ksnd_min_bulk) continue; break; case SOCKLND_CONN_CONTROL: if ((hdr_nob + payload_nob) >= *ksocknal_tunables.ksnd_min_bulk) continue; break; } if (typed == NULL || nob < tnob) { typed = c; tnob = nob; } } /* prefer the typed selection */ conn = (typed != NULL) ? typed : fallback;#if SOCKNAL_ROUND_ROBIN if (conn != NULL) { /* round-robin all else being equal */ list_del (&conn->ksnc_list); list_add_tail (&conn->ksnc_list, &peer->ksnp_conns); }#endif return conn;}voidksocknal_next_mono_tx(ksock_conn_t *conn){ ksock_tx_t *tx = conn->ksnc_tx_mono; /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */ LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x); LASSERT(!list_empty(&conn->ksnc_tx_queue)); LASSERT(tx != NULL); if (tx->tx_list.next == &conn->ksnc_tx_queue) { /* no more packets queued */ conn->ksnc_tx_mono = NULL; } else { conn->ksnc_tx_mono = list_entry(tx->tx_list.next, ksock_tx_t, tx_list); LASSERT(conn->ksnc_tx_mono->tx_msg.ksm_type == tx->tx_msg.ksm_type); }}intksocknal_piggyback_zcack(ksock_conn_t *conn, __u64 cookie){ ksock_tx_t *tx = conn->ksnc_tx_mono; /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */ if (tx == NULL) return 0; if (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP) { /* tx is noop zc-ack, can't piggyback zc-ack cookie */ return 0; } LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_LNET); LASSERT(tx->tx_msg.ksm_zc_ack_cookie == 0); /* piggyback the zc-ack cookie */ tx->tx_msg.ksm_zc_ack_cookie = cookie; ksocknal_next_mono_tx(conn); return 1;}voidksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn){ ksock_sched_t *sched = conn->ksnc_scheduler; ksock_msg_t *msg = &tx->tx_msg; ksock_tx_t *ztx; int bufnob = 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -