📄 o2iblnd_cb.c
字号:
write_unlock_irqrestore(g_lock, flags); kiblnd_connect_peer(peer); kiblnd_peer_decref(peer);}intkiblnd_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg){ lnet_hdr_t *hdr = &lntmsg->msg_hdr; int type = lntmsg->msg_type; lnet_process_id_t target = lntmsg->msg_target; int target_is_router = lntmsg->msg_target_is_router; int routing = lntmsg->msg_routing; unsigned int payload_niov = lntmsg->msg_niov; struct iovec *payload_iov = lntmsg->msg_iov; lnet_kiov_t *payload_kiov = lntmsg->msg_kiov; unsigned int payload_offset = lntmsg->msg_offset; unsigned int payload_nob = lntmsg->msg_len; kib_msg_t *ibmsg; kib_tx_t *tx; int nob; int rc; /* NB 'private' is different depending on what we're sending.... */ CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n", payload_nob, payload_niov, libcfs_id2str(target)); LASSERT (payload_nob == 0 || payload_niov > 0); LASSERT (payload_niov <= LNET_MAX_IOV); /* Thread context */ LASSERT (!in_interrupt()); /* payload is either all vaddrs or all pages */ LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); switch (type) { default: LBUG(); return (-EIO); case LNET_MSG_ACK: LASSERT (payload_nob == 0); break; case LNET_MSG_GET: if (routing || target_is_router) break; /* send IMMEDIATE */ /* is the REPLY message too small for RDMA? */ nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]); if (nob <= IBLND_MSG_SIZE) break; /* send IMMEDIATE */ tx = kiblnd_get_idle_tx(ni); if (tx == NULL) { CERROR("Can allocate txd for GET to %s: \n", libcfs_nid2str(target.nid)); return -ENOMEM; } ibmsg = tx->tx_msg; ibmsg->ibm_u.get.ibgm_hdr = *hdr; ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie; if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0) rc = kiblnd_setup_rd_iov(ni, tx, &ibmsg->ibm_u.get.ibgm_rd, lntmsg->msg_md->md_niov, lntmsg->msg_md->md_iov.iov, 0, lntmsg->msg_md->md_length); else rc = kiblnd_setup_rd_kiov(ni, tx, &ibmsg->ibm_u.get.ibgm_rd, lntmsg->msg_md->md_niov, lntmsg->msg_md->md_iov.kiov, 0, lntmsg->msg_md->md_length); if (rc != 0) { CERROR("Can't setup GET sink for %s: %d\n", libcfs_nid2str(target.nid), rc); kiblnd_tx_done(ni, tx); return -EIO; }#if IBLND_MAP_ON_DEMAND nob = sizeof(kib_get_msg_t);#else nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[tx->tx_nfrags]);#endif kiblnd_init_tx_msg(ni, tx, IBLND_MSG_GET_REQ, nob); tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg); if (tx->tx_lntmsg[1] == NULL) { CERROR("Can't create reply for GET -> %s\n", libcfs_nid2str(target.nid)); kiblnd_tx_done(ni, tx); return -EIO; } tx->tx_lntmsg[0] = lntmsg; /* finalise lntmsg[0,1] on completion */ tx->tx_waiting = 1; /* waiting for GET_DONE */ kiblnd_launch_tx(ni, tx, target.nid); return 0; case LNET_MSG_REPLY: case LNET_MSG_PUT: /* Is the payload small enough not to need RDMA? */ nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]); if (nob <= IBLND_MSG_SIZE) break; /* send IMMEDIATE */ tx = kiblnd_get_idle_tx(ni); if (tx == NULL) { CERROR("Can't allocate %s txd for %s\n", type == LNET_MSG_PUT ? "PUT" : "REPLY", libcfs_nid2str(target.nid)); return -ENOMEM; } if (payload_kiov == NULL) rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd, payload_niov, payload_iov, payload_offset, payload_nob); else rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd, payload_niov, payload_kiov, payload_offset, payload_nob); if (rc != 0) { CERROR("Can't setup PUT src for %s: %d\n", libcfs_nid2str(target.nid), rc); kiblnd_tx_done(ni, tx); return -EIO; } ibmsg = tx->tx_msg; ibmsg->ibm_u.putreq.ibprm_hdr = *hdr; ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie; kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_REQ, sizeof(kib_putreq_msg_t)); tx->tx_lntmsg[0] = lntmsg; /* finalise lntmsg on completion */ tx->tx_waiting = 1; /* waiting for PUT_{ACK,NAK} */ kiblnd_launch_tx(ni, tx, target.nid); return 0; } /* send IMMEDIATE */ LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]) <= IBLND_MSG_SIZE); tx = kiblnd_get_idle_tx(ni); if (tx == NULL) { CERROR ("Can't send %d to %s: tx descs exhausted\n", type, libcfs_nid2str(target.nid)); return -ENOMEM; } ibmsg = tx->tx_msg; ibmsg->ibm_u.immediate.ibim_hdr = *hdr; if (payload_kiov != NULL) lnet_copy_kiov2flat(IBLND_MSG_SIZE, ibmsg, offsetof(kib_msg_t, ibm_u.immediate.ibim_payload), payload_niov, payload_kiov, payload_offset, payload_nob); else lnet_copy_iov2flat(IBLND_MSG_SIZE, ibmsg, offsetof(kib_msg_t, ibm_u.immediate.ibim_payload), payload_niov, payload_iov, payload_offset, payload_nob); nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]); kiblnd_init_tx_msg(ni, tx, IBLND_MSG_IMMEDIATE, nob); tx->tx_lntmsg[0] = lntmsg; /* finalise lntmsg on completion */ kiblnd_launch_tx(ni, tx, target.nid); return 0;}voidkiblnd_reply (lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg){ lnet_process_id_t target = lntmsg->msg_target; unsigned int niov = lntmsg->msg_niov; struct iovec *iov = lntmsg->msg_iov; lnet_kiov_t *kiov = lntmsg->msg_kiov; unsigned int offset = lntmsg->msg_offset; unsigned int nob = lntmsg->msg_len; kib_tx_t *tx; int rc; tx = kiblnd_get_idle_tx(ni); if (tx == NULL) { CERROR("Can't get tx for REPLY to %s\n", libcfs_nid2str(target.nid)); goto failed_0; } if (nob == 0) rc = 0; else if (kiov == NULL) rc = kiblnd_setup_rd_iov(ni, tx, tx->tx_rd, niov, iov, offset, nob); else rc = kiblnd_setup_rd_kiov(ni, tx, tx->tx_rd, niov, kiov, offset, nob); if (rc != 0) { CERROR("Can't setup GET src for %s: %d\n", libcfs_nid2str(target.nid), rc); goto failed_1; } rc = kiblnd_init_rdma(ni, tx, IBLND_MSG_GET_DONE, nob, &rx->rx_msg->ibm_u.get.ibgm_rd, rx->rx_msg->ibm_u.get.ibgm_cookie); if (rc < 0) { CERROR("Can't setup rdma for GET from %s: %d\n", libcfs_nid2str(target.nid), rc); goto failed_1; } if (nob == 0) { /* No RDMA: local completion may happen now! */ lnet_finalize(ni, lntmsg, 0); } else { /* RDMA: lnet_finalize(lntmsg) when it * completes */ tx->tx_lntmsg[0] = lntmsg; } kiblnd_queue_tx(tx, rx->rx_conn); return; failed_1: kiblnd_tx_done(ni, tx); failed_0: lnet_finalize(ni, lntmsg, -EIO);}intkiblnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, unsigned int rlen){ kib_rx_t *rx = private; kib_msg_t *rxmsg = rx->rx_msg; kib_conn_t *conn = rx->rx_conn; kib_tx_t *tx; kib_msg_t *txmsg; int nob; int post_credit = IBLND_POSTRX_PEER_CREDIT; int rc = 0; LASSERT (mlen <= rlen); LASSERT (!in_interrupt()); /* Either all pages or all vaddrs */ LASSERT (!(kiov != NULL && iov != NULL)); switch (rxmsg->ibm_type) { default: LBUG(); case IBLND_MSG_IMMEDIATE: nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]); if (nob > rx->rx_nob) { CERROR ("Immediate message from %s too big: %d(%d)\n", libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid), nob, rx->rx_nob); rc = -EPROTO; break; } if (kiov != NULL) lnet_copy_flat2kiov(niov, kiov, offset, IBLND_MSG_SIZE, rxmsg, offsetof(kib_msg_t, ibm_u.immediate.ibim_payload), mlen); else lnet_copy_flat2iov(niov, iov, offset, IBLND_MSG_SIZE, rxmsg, offsetof(kib_msg_t, ibm_u.immediate.ibim_payload), mlen); lnet_finalize (ni, lntmsg, 0); break; case IBLND_MSG_PUT_REQ: if (mlen == 0) { lnet_finalize(ni, lntmsg, 0); kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, 0, rxmsg->ibm_u.putreq.ibprm_cookie); break; } tx = kiblnd_get_idle_tx(ni); if (tx == NULL) { CERROR("Can't allocate tx for %s\n", libcfs_nid2str(conn->ibc_peer->ibp_nid)); /* Not replying will break the connection */ rc = -ENOMEM; break; } txmsg = tx->tx_msg; if (kiov == NULL) rc = kiblnd_setup_rd_iov(ni, tx, &txmsg->ibm_u.putack.ibpam_rd, niov, iov, offset, mlen); else rc = kiblnd_setup_rd_kiov(ni, tx, &txmsg->ibm_u.putack.ibpam_rd, niov, kiov, offset, mlen); if (rc != 0) { CERROR("Can't setup PUT sink for %s: %d\n", libcfs_nid2str(conn->ibc_peer->ibp_nid), rc); kiblnd_tx_done(ni, tx); /* tell peer it's over */ kiblnd_send_completion(rx->rx_conn, IBLND_MSG_PUT_NAK, rc, rxmsg->ibm_u.putreq.ibprm_cookie); break; } txmsg->ibm_u.putack.ibpam_src_cookie = rxmsg->ibm_u.putreq.ibprm_cookie; txmsg->ibm_u.putack.ibpam_dst_cookie = tx->tx_cookie;#if IBLND_MAP_ON_DEMAND nob = sizeof(kib_putack_msg_t);#else nob = offsetof(kib_putack_msg_t, ibpam_rd.rd_frags[tx->tx_nfrags]);#endif kiblnd_init_tx_msg(ni, tx, IBLND_MSG_PUT_ACK, nob); tx->tx_lntmsg[0] = lntmsg; /* finalise lntmsg on completion */ tx->tx_waiting = 1; /* waiting for PUT_DONE */ kiblnd_queue_tx(tx, conn); /* reposted buffer reserved for PUT_DONE */ post_credit = IBLND_POSTRX_NO_CREDIT; break; case IBLND_MSG_GET_REQ: if (lntmsg != NULL) { /* Optimized GET; RDMA lntmsg's payload */ kiblnd_reply(ni, rx, lntmsg); } else { /* GET didn't match anything */ kiblnd_send_completion(rx->rx_conn, IBLND_MSG_GET_DONE, -ENODATA, rxmsg->ibm_u.get.ibgm_cookie); } break; } kiblnd_post_rx(rx, post_credit); return rc;}intkiblnd_thread_start (int (*fn)(void *arg), void *arg){ long pid = kernel_thread (fn, arg, 0); if (pid < 0)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -