📄 ralnd_cb.c
字号:
kranal_unmap_buffer(tx); lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL; lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL; tx->tx_buftype = RANAL_BUF_NONE; tx->tx_msg.ram_type = RANAL_MSG_NONE; tx->tx_conn = NULL; spin_lock_irqsave(&kranal_data.kra_tx_lock, flags); list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs); spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags); /* finalize AFTER freeing lnet msgs */ for (i = 0; i < 2; i++) { if (lnetmsg[i] == NULL) continue; lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion); }}kra_conn_t *kranal_find_conn_locked (kra_peer_t *peer){ struct list_head *tmp; /* just return the first connection */ list_for_each (tmp, &peer->rap_conns) { return list_entry(tmp, kra_conn_t, rac_list); } return NULL;}voidkranal_post_fma (kra_conn_t *conn, kra_tx_t *tx){ unsigned long flags; tx->tx_conn = conn; spin_lock_irqsave(&conn->rac_lock, flags); list_add_tail(&tx->tx_list, &conn->rac_fmaq); tx->tx_qtime = jiffies; spin_unlock_irqrestore(&conn->rac_lock, flags); kranal_schedule_conn(conn);}voidkranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid){ unsigned long flags; kra_peer_t *peer; kra_conn_t *conn; int rc; int retry; rwlock_t *g_lock = &kranal_data.kra_global_lock; /* If I get here, I've committed to send, so I complete the tx with * failure on any problems */ LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */ for (retry = 0; ; retry = 1) { read_lock(g_lock); peer = kranal_find_peer_locked(nid); if (peer != NULL) { conn = kranal_find_conn_locked(peer); if (conn != NULL) { kranal_post_fma(conn, tx); read_unlock(g_lock); return; } } /* Making connections; I'll need a write lock... */ read_unlock(g_lock); write_lock_irqsave(g_lock, flags); peer = kranal_find_peer_locked(nid); if (peer != NULL) break; write_unlock_irqrestore(g_lock, flags); if (retry) { CERROR("Can't find peer %s\n", libcfs_nid2str(nid)); kranal_tx_done(tx, -EHOSTUNREACH); return; } rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid), lnet_acceptor_port()); if (rc != 0) { CERROR("Can't add peer %s: %d\n", libcfs_nid2str(nid), rc); kranal_tx_done(tx, rc); return; } } conn = kranal_find_conn_locked(peer); if (conn != NULL) { /* Connection exists; queue message on it */ kranal_post_fma(conn, tx); write_unlock_irqrestore(g_lock, flags); return; } LASSERT (peer->rap_persistence > 0); if (!peer->rap_connecting) { LASSERT (list_empty(&peer->rap_tx_queue)); if (!(peer->rap_reconnect_interval == 0 || /* first attempt */ time_after_eq(jiffies, peer->rap_reconnect_time))) { write_unlock_irqrestore(g_lock, flags); kranal_tx_done(tx, -EHOSTUNREACH); return; } peer->rap_connecting = 1; kranal_peer_addref(peer); /* extra ref for connd */ spin_lock(&kranal_data.kra_connd_lock); list_add_tail(&peer->rap_connd_list, &kranal_data.kra_connd_peers); wake_up(&kranal_data.kra_connd_waitq); spin_unlock(&kranal_data.kra_connd_lock); } /* A connection is being established; queue the message... */ list_add_tail(&tx->tx_list, &peer->rap_tx_queue); write_unlock_irqrestore(g_lock, flags);}voidkranal_rdma(kra_tx_t *tx, int type, kra_rdma_desc_t *sink, int nob, __u64 cookie){ kra_conn_t *conn = tx->tx_conn; RAP_RETURN rrc; unsigned long flags; LASSERT (kranal_tx_mapped(tx)); LASSERT (nob <= sink->rard_nob); LASSERT (nob <= tx->tx_nob); /* No actual race with scheduler sending CLOSE (I'm she!) */ LASSERT (current == conn->rac_device->rad_scheduler); memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc)); tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer); tx->tx_rdma_desc.SrcKey = tx->tx_map_key; tx->tx_rdma_desc.DstPtr = sink->rard_addr; tx->tx_rdma_desc.DstKey = sink->rard_key; tx->tx_rdma_desc.Length = nob; tx->tx_rdma_desc.AppPtr = tx; /* prep final completion message */ kranal_init_msg(&tx->tx_msg, type); tx->tx_msg.ram_u.completion.racm_cookie = cookie; if (nob == 0) { /* Immediate completion */ kranal_post_fma(conn, tx); return; } LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */ rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc); LASSERT (rrc == RAP_SUCCESS); spin_lock_irqsave(&conn->rac_lock, flags); list_add_tail(&tx->tx_list, &conn->rac_rdmaq); tx->tx_qtime = jiffies; spin_unlock_irqrestore(&conn->rac_lock, flags);}intkranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob){ __u32 nob_received = nob; RAP_RETURN rrc; LASSERT (conn->rac_rxmsg != NULL); CDEBUG(D_NET, "Consuming %p\n", conn); rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer, &nob_received, sizeof(kra_msg_t)); LASSERT (rrc == RAP_SUCCESS); conn->rac_rxmsg = NULL; if (nob_received < nob) { CWARN("Incomplete immediate msg from %s: expected %d, got %d\n", libcfs_nid2str(conn->rac_peer->rap_nid), nob, nob_received); return -EPROTO; } return 0;}intkranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg){ lnet_hdr_t *hdr = &lntmsg->msg_hdr; int type = lntmsg->msg_type; lnet_process_id_t target = lntmsg->msg_target; int target_is_router = lntmsg->msg_target_is_router; int routing = lntmsg->msg_routing; unsigned int niov = lntmsg->msg_niov; struct iovec *iov = lntmsg->msg_iov; lnet_kiov_t *kiov = lntmsg->msg_kiov; unsigned int offset = lntmsg->msg_offset; unsigned int nob = lntmsg->msg_len; kra_tx_t *tx; int rc; /* NB 'private' is different depending on what we're sending.... */ CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n", nob, niov, libcfs_id2str(target)); LASSERT (nob == 0 || niov > 0); LASSERT (niov <= LNET_MAX_IOV); LASSERT (!in_interrupt()); /* payload is either all vaddrs or all pages */ LASSERT (!(kiov != NULL && iov != NULL)); if (routing) { CERROR ("Can't route\n"); return -EIO; } switch(type) { default: LBUG(); case LNET_MSG_ACK: LASSERT (nob == 0); break; case LNET_MSG_GET: LASSERT (niov == 0); LASSERT (nob == 0); /* We have to consider the eventual sink buffer rather than any * payload passed here (there isn't any, and strictly, looking * inside lntmsg is a layering violation). We send a simple * IMMEDIATE GET if the sink buffer is mapped already and small * enough for FMA */ if (routing || target_is_router) break; /* send IMMEDIATE */ if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 && lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA && lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate) break; /* send IMMEDIATE */ tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ); if (tx == NULL) return -ENOMEM; if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0) rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov, lntmsg->msg_md->md_iov.iov, 0, lntmsg->msg_md->md_length); else rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov, lntmsg->msg_md->md_iov.kiov, 0, lntmsg->msg_md->md_length); if (rc != 0) { kranal_tx_done(tx, rc); return -EIO; } tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg); if (tx->tx_lntmsg[1] == NULL) { CERROR("Can't create reply for GET to %s\n", libcfs_nid2str(target.nid)); kranal_tx_done(tx, rc); return -EIO; } tx->tx_lntmsg[0] = lntmsg; tx->tx_msg.ram_u.get.ragm_hdr = *hdr; /* rest of tx_msg is setup just before it is sent */ kranal_launch_tx(tx, target.nid); return 0; case LNET_MSG_REPLY: case LNET_MSG_PUT: if (kiov == NULL && /* not paged */ nob <= RANAL_FMA_MAX_DATA && /* small enough */ nob <= *kranal_tunables.kra_max_immediate) break; /* send IMMEDIATE */ tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ); if (tx == NULL) return -ENOMEM; rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob); if (rc != 0) { kranal_tx_done(tx, rc); return -EIO; } tx->tx_lntmsg[0] = lntmsg; tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr; /* rest of tx_msg is setup just before it is sent */ kranal_launch_tx(tx, target.nid); return 0; } /* send IMMEDIATE */ LASSERT (kiov == NULL); LASSERT (nob <= RANAL_FMA_MAX_DATA); tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE); if (tx == NULL) return -ENOMEM; rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob); if (rc != 0) { kranal_tx_done(tx, rc); return -EIO; } tx->tx_msg.ram_u.immediate.raim_hdr = *hdr; tx->tx_lntmsg[0] = lntmsg; kranal_launch_tx(tx, target.nid); return 0;}voidkranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg){ kra_msg_t *rxmsg = conn->rac_rxmsg; unsigned int niov = lntmsg->msg_niov; struct iovec *iov = lntmsg->msg_iov; lnet_kiov_t *kiov = lntmsg->msg_kiov; unsigned int offset = lntmsg->msg_offset; unsigned int nob = lntmsg->msg_len; kra_tx_t *tx; int rc; tx = kranal_get_idle_tx(); if (tx == NULL) goto failed_0; rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob); if (rc != 0) goto failed_1; tx->tx_conn = conn; rc = kranal_map_buffer(tx); if (rc != 0) goto failed_1; tx->tx_lntmsg[0] = lntmsg; kranal_rdma(tx, RANAL_MSG_GET_DONE,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -