📄 o2iblnd_cb.c
字号:
kiblnd_conn_addref(conn); /* 1 ref for me.... */ spin_unlock(&conn->ibc_lock); if (idle) kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx); kiblnd_check_sends(conn); kiblnd_conn_decref(conn); /* ...until here */}voidkiblnd_init_tx_msg (lnet_ni_t *ni, kib_tx_t *tx, int type, int body_nob){ kib_net_t *net = ni->ni_data; struct ib_sge *sge = &tx->tx_sge[tx->tx_nwrq]; struct ib_send_wr *wrq = &tx->tx_wrq[tx->tx_nwrq]; int nob = offsetof (kib_msg_t, ibm_u) + body_nob; LASSERT (net != NULL); LASSERT (tx->tx_nwrq >= 0); LASSERT (tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1); LASSERT (nob <= IBLND_MSG_SIZE); kiblnd_init_msg(tx->tx_msg, type, body_nob); sge->addr = tx->tx_msgaddr; sge->lkey = net->ibn_dev->ibd_mr->lkey; sge->length = nob; memset(wrq, 0, sizeof(*wrq)); wrq->next = NULL; wrq->wr_id = kiblnd_ptr2wreqid(tx, IBLND_WID_TX); wrq->sg_list = sge; wrq->num_sge = 1; wrq->opcode = IB_WR_SEND; wrq->send_flags = IB_SEND_SIGNALED; tx->tx_nwrq++;}intkiblnd_init_rdma (lnet_ni_t *ni, kib_tx_t *tx, int type, int nob, kib_rdma_desc_t *dstrd, __u64 dstcookie){ kib_msg_t *ibmsg = tx->tx_msg; kib_rdma_desc_t *srcrd = tx->tx_rd; struct ib_sge *sge = &tx->tx_sge[0]; struct ib_send_wr *wrq = &tx->tx_wrq[0]; int rc = nob;#if IBLND_MAP_ON_DEMAND LASSERT (!in_interrupt()); LASSERT (tx->tx_nwrq == 0); LASSERT (type == IBLND_MSG_GET_DONE || type == IBLND_MSG_PUT_DONE); sge->addr = srcrd->rd_addr; sge->lkey = srcrd->rd_key; sge->length = nob; wrq = &tx->tx_wrq[0]; wrq->next = &tx->tx_wrq[1]; wrq->wr_id = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA); wrq->sg_list = sge; wrq->num_sge = 1; wrq->opcode = IB_WR_RDMA_WRITE; wrq->send_flags = 0; wrq->wr.rdma.remote_addr = dstrd->rd_addr; wrq->wr.rdma.rkey = dstrd->rd_key; tx->tx_nwrq = 1;#else /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */ int resid = nob; kib_rdma_frag_t *srcfrag; int srcidx; kib_rdma_frag_t *dstfrag; int dstidx; int wrknob; LASSERT (!in_interrupt()); LASSERT (tx->tx_nwrq == 0); LASSERT (type == IBLND_MSG_GET_DONE || type == IBLND_MSG_PUT_DONE); srcidx = dstidx = 0; srcfrag = &srcrd->rd_frags[0]; dstfrag = &dstrd->rd_frags[0]; while (resid > 0) { if (srcidx >= srcrd->rd_nfrags) { CERROR("Src buffer exhausted: %d frags\n", srcidx); rc = -EPROTO; break; } if (dstidx == dstrd->rd_nfrags) { CERROR("Dst buffer exhausted: %d frags\n", dstidx); rc = -EPROTO; break; } if (tx->tx_nwrq == IBLND_MAX_RDMA_FRAGS) { CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n", srcidx, srcrd->rd_nfrags, dstidx, dstrd->rd_nfrags); rc = -EMSGSIZE; break; } wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid); sge = &tx->tx_sge[tx->tx_nwrq]; sge->addr = srcfrag->rf_addr; sge->length = wrknob; sge->lkey = srcrd->rd_key; wrq = &tx->tx_wrq[tx->tx_nwrq]; wrq->next = wrq + 1; wrq->wr_id = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA); wrq->sg_list = sge; wrq->num_sge = 1; wrq->opcode = IB_WR_RDMA_WRITE; wrq->send_flags = 0; wrq->wr.rdma.remote_addr = dstfrag->rf_addr; wrq->wr.rdma.rkey = dstrd->rd_key; wrq++; sge++; resid -= wrknob; if (wrknob < srcfrag->rf_nob) { srcfrag->rf_nob -= wrknob; srcfrag->rf_addr += wrknob; } else { srcfrag++; srcidx++; } if (wrknob < dstfrag->rf_nob) { dstfrag->rf_nob -= wrknob; dstfrag->rf_addr += wrknob; } else { dstfrag++; dstidx++; } tx->tx_nwrq++; } if (rc < 0) /* no RDMA if completing with failure */ tx->tx_nwrq = 0;#endif ibmsg->ibm_u.completion.ibcm_status = rc; ibmsg->ibm_u.completion.ibcm_cookie = dstcookie; kiblnd_init_tx_msg(ni, tx, type, sizeof (kib_completion_msg_t)); return rc;}voidkiblnd_queue_tx_locked (kib_tx_t *tx, kib_conn_t *conn){ struct list_head *q; LASSERT (tx->tx_nwrq > 0); /* work items set up */ LASSERT (!tx->tx_queued); /* not queued for sending already */ tx->tx_queued = 1; tx->tx_deadline = jiffies + (*kiblnd_tunables.kib_timeout * HZ); if (tx->tx_conn == NULL) { kiblnd_conn_addref(conn); tx->tx_conn = conn; LASSERT (tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE); } else { /* PUT_DONE first attached to conn as a PUT_REQ */ LASSERT (tx->tx_conn == conn); LASSERT (tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE); } switch (tx->tx_msg->ibm_type) { default: LBUG(); case IBLND_MSG_PUT_REQ: case IBLND_MSG_GET_REQ: q = &conn->ibc_tx_queue_rsrvd; break; case IBLND_MSG_PUT_NAK: case IBLND_MSG_PUT_ACK: case IBLND_MSG_PUT_DONE: case IBLND_MSG_GET_DONE: q = &conn->ibc_tx_queue_nocred; break; case IBLND_MSG_NOOP: case IBLND_MSG_IMMEDIATE: q = &conn->ibc_tx_queue; break; } list_add_tail(&tx->tx_list, q);}voidkiblnd_queue_tx (kib_tx_t *tx, kib_conn_t *conn){ spin_lock(&conn->ibc_lock); kiblnd_queue_tx_locked(tx, conn); spin_unlock(&conn->ibc_lock); kiblnd_check_sends(conn);}voidkiblnd_connect_peer (kib_peer_t *peer){ struct rdma_cm_id *cmid; struct sockaddr_in sockaddr; int rc; LASSERT (peer->ibp_connecting > 0); cmid = rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP); if (IS_ERR(cmid)) { CERROR("Can't create CMID for %s: %ld\n", libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid)); rc = PTR_ERR(cmid); goto failed; } memset(&sockaddr, 0, sizeof(sockaddr)); sockaddr.sin_family = AF_INET; sockaddr.sin_port = htons(*kiblnd_tunables.kib_service); sockaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid)); kiblnd_peer_addref(peer); /* cmid's ref */ rc = rdma_resolve_addr(cmid, NULL, (struct sockaddr *)&sockaddr, *kiblnd_tunables.kib_timeout * 1000); if (rc == 0) return; /* Can't initiate address resolution: */ CERROR("Can't resolve addr for %s: %d\n", libcfs_nid2str(peer->ibp_nid), rc); kiblnd_peer_decref(peer); /* cmid's ref */ rdma_destroy_id(cmid); failed: kiblnd_peer_connect_failed(peer, 1, rc);}voidkiblnd_launch_tx (lnet_ni_t *ni, kib_tx_t *tx, lnet_nid_t nid){ kib_peer_t *peer; kib_peer_t *peer2; kib_conn_t *conn; rwlock_t *g_lock = &kiblnd_data.kib_global_lock; unsigned long flags; int rc; /* If I get here, I've committed to send, so I complete the tx with * failure on any problems */ LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */ LASSERT (tx->tx_nwrq > 0); /* work items have been set up */ /* First time, just use a read lock since I expect to find my peer * connected */ read_lock_irqsave(g_lock, flags); peer = kiblnd_find_peer_locked(nid); if (peer != NULL && !list_empty(&peer->ibp_conns)) { /* Found a peer with an established connection */ conn = kiblnd_get_conn_locked(peer); kiblnd_conn_addref(conn); /* 1 ref for me... */ read_unlock_irqrestore(g_lock, flags); kiblnd_queue_tx(tx, conn); kiblnd_conn_decref(conn); /* ...to here */ return; } read_unlock(g_lock); /* Re-try with a write lock */ write_lock(g_lock); peer = kiblnd_find_peer_locked(nid); if (peer != NULL) { if (list_empty(&peer->ibp_conns)) { /* found a peer, but it's still connecting... */ LASSERT (peer->ibp_connecting != 0 || peer->ibp_accepting != 0); list_add_tail (&tx->tx_list, &peer->ibp_tx_queue); write_unlock_irqrestore(g_lock, flags); } else { conn = kiblnd_get_conn_locked(peer); kiblnd_conn_addref(conn); /* 1 ref for me... */ write_unlock_irqrestore(g_lock, flags); kiblnd_queue_tx(tx, conn); kiblnd_conn_decref(conn); /* ...to here */ } return; } write_unlock_irqrestore(g_lock, flags); /* Allocate a peer ready to add to the peer table and retry */ rc = kiblnd_create_peer(ni, &peer, nid); if (rc != 0) { CERROR("Can't create peer %s\n", libcfs_nid2str(nid)); tx->tx_status = -EHOSTUNREACH; tx->tx_waiting = 0; kiblnd_tx_done(ni, tx); return; } write_lock_irqsave(g_lock, flags); peer2 = kiblnd_find_peer_locked(nid); if (peer2 != NULL) { if (list_empty(&peer2->ibp_conns)) { /* found a peer, but it's still connecting... */ LASSERT (peer2->ibp_connecting != 0 || peer2->ibp_accepting != 0); list_add_tail (&tx->tx_list, &peer2->ibp_tx_queue); write_unlock_irqrestore(g_lock, flags); } else { conn = kiblnd_get_conn_locked(peer2); kiblnd_conn_addref(conn); /* 1 ref for me... */ write_unlock_irqrestore(g_lock, flags); kiblnd_queue_tx(tx, conn); kiblnd_conn_decref(conn); /* ...to here */ } kiblnd_peer_decref(peer); return; } /* Brand new peer */ LASSERT (peer->ibp_connecting == 0); peer->ibp_connecting = 1; /* always called with a ref on ni, which prevents ni being shutdown */ LASSERT (((kib_net_t *)ni->ni_data)->ibn_shutdown == 0); list_add_tail(&tx->tx_list, &peer->ibp_tx_queue); kiblnd_peer_addref(peer); list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -