📄 socklnd_cb.c
字号:
intksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip){ static char ksocknal_slop_buffer[4096]; int nob; unsigned int niov; int skipped; LASSERT(conn->ksnc_proto != NULL); if ((*ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0) { /* Remind the socket to ack eagerly... */ ksocknal_lib_eager_ack(conn); } if (nob_to_skip == 0) { /* right at next packet boundary now */ conn->ksnc_rx_started = 0; mb (); /* racing with timeout thread */ switch (conn->ksnc_proto->pro_version) { case KSOCK_PROTO_V2: conn->ksnc_rx_state = SOCKNAL_RX_KSM_HEADER; conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg; conn->ksnc_rx_nob_wanted = offsetof(ksock_msg_t, ksm_u); conn->ksnc_rx_nob_left = offsetof(ksock_msg_t, ksm_u); conn->ksnc_rx_iov[0].iov_len = offsetof(ksock_msg_t, ksm_u); break; case KSOCK_PROTO_V1: /* Receiving bare lnet_hdr_t */ conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER; conn->ksnc_rx_nob_wanted = sizeof(lnet_hdr_t); conn->ksnc_rx_nob_left = sizeof(lnet_hdr_t); conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg; conn->ksnc_rx_iov[0].iov_len = sizeof (lnet_hdr_t); break; default: LBUG (); } conn->ksnc_rx_niov = 1; conn->ksnc_rx_kiov = NULL; conn->ksnc_rx_nkiov = 0; conn->ksnc_rx_csum = ~0; return (1); } /* Set up to skip as much as possible now. If there's more left * (ran out of iov entries) we'll get called again */ conn->ksnc_rx_state = SOCKNAL_RX_SLOP; conn->ksnc_rx_nob_left = nob_to_skip; conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; skipped = 0; niov = 0; do { nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer)); conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer; conn->ksnc_rx_iov[niov].iov_len = nob; niov++; skipped += nob; nob_to_skip -=nob; } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */ niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec)); conn->ksnc_rx_niov = niov; conn->ksnc_rx_kiov = NULL; conn->ksnc_rx_nkiov = 0; conn->ksnc_rx_nob_wanted = skipped; return (0);}/* (Sink) handle incoming ZC request from sender */static intksocknal_handle_zc_req(ksock_peer_t *peer, __u64 cookie){ ksock_conn_t *conn; ksock_tx_t *tx; ksock_sched_t *sched; int rc; read_lock (&ksocknal_data.ksnd_global_lock); conn = ksocknal_find_conn_locked (0, peer); if (conn == NULL) { read_unlock (&ksocknal_data.ksnd_global_lock); CERROR("Can't find connection to send zcack.\n"); return -ECONNRESET; } sched = conn->ksnc_scheduler; spin_lock_bh (&sched->kss_lock); rc = ksocknal_piggyback_zcack(conn, cookie); spin_unlock_bh (&sched->kss_lock); read_unlock (&ksocknal_data.ksnd_global_lock); if (rc) { /* Ack cookie is piggybacked */ return 0; } tx = ksocknal_alloc_tx(KSOCK_NOOP_TX_SIZE); if (tx == NULL) { CERROR("Can't allocate noop tx desc\n"); return -ENOMEM; } tx->tx_conn = NULL; tx->tx_lnetmsg = NULL; tx->tx_kiov = NULL; tx->tx_nkiov = 0; tx->tx_iov = tx->tx_frags.virt.iov; tx->tx_niov = 1; ksocknal_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP); tx->tx_msg.ksm_zc_ack_cookie = cookie; /* incoming cookie */ read_lock (&ksocknal_data.ksnd_global_lock); conn = ksocknal_find_conn_locked (0, peer); if (conn == NULL) { read_unlock (&ksocknal_data.ksnd_global_lock); ksocknal_free_tx(tx); CERROR("Can't find connection to send zcack.\n"); return -ECONNRESET; } ksocknal_queue_tx_locked(tx, conn); read_unlock (&ksocknal_data.ksnd_global_lock); return 0;}/* (Sender) handle ZC_ACK from sink */static intksocknal_handle_zc_ack(ksock_peer_t *peer, __u64 cookie){ ksock_tx_t *tx; struct list_head *ctmp; spin_lock(&peer->ksnp_lock); list_for_each(ctmp, &peer->ksnp_zc_req_list) { tx = list_entry (ctmp, ksock_tx_t, tx_zc_list); if (tx->tx_msg.ksm_zc_req_cookie != cookie) continue; tx->tx_msg.ksm_zc_req_cookie = 0; list_del(&tx->tx_zc_list); spin_unlock(&peer->ksnp_lock); ksocknal_tx_decref(tx); return 0; } spin_unlock(&peer->ksnp_lock); return -EPROTO;}intksocknal_process_receive (ksock_conn_t *conn){ int rc; LASSERT (atomic_read(&conn->ksnc_conn_refcount) > 0); /* NB: sched lock NOT held */ /* SOCKNAL_RX_LNET_HEADER is here for backward compatability */ LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_KSM_HEADER || conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD || conn->ksnc_rx_state == SOCKNAL_RX_LNET_HEADER || conn->ksnc_rx_state == SOCKNAL_RX_SLOP); again: if (conn->ksnc_rx_nob_wanted != 0) { rc = ksocknal_receive(conn); if (rc <= 0) { LASSERT (rc != -EAGAIN); if (rc == 0) CDEBUG (D_NET, "[%p] EOF from %s" " ip %d.%d.%d.%d:%d\n", conn, libcfs_id2str(conn->ksnc_peer->ksnp_id), HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); else if (!conn->ksnc_closing) CERROR ("[%p] Error %d on read from %s" " ip %d.%d.%d.%d:%d\n", conn, rc, libcfs_id2str(conn->ksnc_peer->ksnp_id), HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port); /* it's not an error if conn is being closed */ ksocknal_close_conn_and_siblings (conn, (conn->ksnc_closing) ? 0 : rc); return (rc == 0 ? -ESHUTDOWN : rc); } if (conn->ksnc_rx_nob_wanted != 0) { /* short read */ return (-EAGAIN); } } switch (conn->ksnc_rx_state) { case SOCKNAL_RX_KSM_HEADER: if (conn->ksnc_flip) { __swab32s(&conn->ksnc_msg.ksm_type); __swab32s(&conn->ksnc_msg.ksm_csum); __swab64s(&conn->ksnc_msg.ksm_zc_req_cookie); __swab64s(&conn->ksnc_msg.ksm_zc_ack_cookie); } if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP && conn->ksnc_msg.ksm_csum != 0 && /* has checksum */ conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) { /* NOOP Checksum error */ CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n", libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum); ksocknal_new_packet(conn, 0); ksocknal_close_conn_and_siblings(conn, -EPROTO); return (-EIO); } if (conn->ksnc_msg.ksm_zc_ack_cookie != 0) { LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x); rc = ksocknal_handle_zc_ack(conn->ksnc_peer, conn->ksnc_msg.ksm_zc_ack_cookie); if (rc != 0) { CERROR("%s: Unknown zero copy ACK cookie: "LPU64"\n", libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_msg.ksm_zc_ack_cookie); ksocknal_new_packet(conn, 0); ksocknal_close_conn_and_siblings(conn, -EPROTO); return (rc); } } if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP) { ksocknal_new_packet (conn, 0); return 0; /* NOOP is done and just return */ } LASSERT (conn->ksnc_msg.ksm_type == KSOCK_MSG_LNET); conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER; conn->ksnc_rx_nob_wanted = sizeof(ksock_lnet_msg_t); conn->ksnc_rx_nob_left = sizeof(ksock_lnet_msg_t); conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space; conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg; conn->ksnc_rx_iov[0].iov_len = sizeof(ksock_lnet_msg_t); conn->ksnc_rx_niov = 1; conn->ksnc_rx_kiov = NULL; conn->ksnc_rx_nkiov = 0; goto again; /* read lnet header now */ case SOCKNAL_RX_LNET_HEADER: /* unpack message header */ conn->ksnc_proto->pro_unpack(&conn->ksnc_msg); if ((conn->ksnc_peer->ksnp_id.pid & LNET_PID_USERFLAG) != 0) { /* Userspace peer */ lnet_process_id_t *id = &conn->ksnc_peer->ksnp_id; lnet_hdr_t *lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr; /* Substitute process ID assigned at connection time */ lhdr->src_pid = cpu_to_le32(id->pid); lhdr->src_nid = cpu_to_le64(id->nid); } conn->ksnc_rx_state = SOCKNAL_RX_PARSE; ksocknal_conn_addref(conn); /* ++ref while parsing */ rc = lnet_parse(conn->ksnc_peer->ksnp_ni, &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr, conn->ksnc_peer->ksnp_id.nid, conn, 0); if (rc < 0) { /* I just received garbage: give up on this conn */ ksocknal_new_packet(conn, 0); ksocknal_close_conn_and_siblings (conn, rc); ksocknal_conn_decref(conn); return (-EPROTO); } /* I'm racing with ksocknal_recv() */ LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_PARSE || conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD); if (conn->ksnc_rx_state != SOCKNAL_RX_LNET_PAYLOAD) return 0; /* ksocknal_recv() got called */ goto again; case SOCKNAL_RX_LNET_PAYLOAD: /* payload all received */ rc = 0; if (conn->ksnc_rx_nob_left == 0 && /* not truncating */ conn->ksnc_msg.ksm_csum != 0 && /* has checksum */ conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) { CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n", libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum); rc = -EIO; } lnet_finalize(conn->ksnc_peer->ksnp_ni, conn->ksnc_cookie, rc); if (rc == 0 && conn->ksnc_msg.ksm_zc_req_cookie != 0) { LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x); rc = ksocknal_handle_zc_req(conn->ksnc_peer, conn->ksnc_msg.ksm_zc_req_cookie); } if (rc != 0) { ksocknal_new_packet(conn, 0); ksocknal_close_conn_and_siblings (conn, rc); return (-EPROTO); } /* Fall through */ case SOCKNAL_RX_SLOP: /* starting new packet? */ if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left)) return 0; /* come back later */ goto again; /* try to finish reading slop now */ default: break; } /* Not Reached */ LBUG (); return (-EINVAL); /* keep gcc happy */}intksocknal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, unsigned int rlen){ ksock_conn_t *conn = (ksock_conn_t *)private; ksock_sched_t *sched = conn->ksnc_scheduler; LASSERT (mlen <= rlen);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -