📄 ralnd_cb.c
字号:
case RANAL_MSG_PUT_REQ: rc = kranal_map_buffer(tx); LASSERT (rc != -EAGAIN); if (rc != 0) break; tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie; rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); expect_reply = 1; break; case RANAL_MSG_PUT_ACK: rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); expect_reply = 1; break; case RANAL_MSG_GET_REQ: rc = kranal_map_buffer(tx); LASSERT (rc != -EAGAIN); if (rc != 0) break; tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie; tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key; tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits = (__u64)((unsigned long)tx->tx_buffer); tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob; rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0); expect_reply = 1; break; } if (rc == -EAGAIN) { /* I need credits to send this. Replace tx at the head of the * fmaq and I'll get rescheduled when credits appear */ CDEBUG(D_NET, "EAGAIN on %p\n", conn); spin_lock_irqsave(&conn->rac_lock, flags); list_add(&tx->tx_list, &conn->rac_fmaq); spin_unlock_irqrestore(&conn->rac_lock, flags); return; } if (!expect_reply || rc != 0) { kranal_tx_done(tx, rc); } else { /* LASSERT(current) above ensures this doesn't race with reply * processing */ spin_lock_irqsave(&conn->rac_lock, flags); list_add_tail(&tx->tx_list, &conn->rac_replyq); tx->tx_qtime = jiffies; spin_unlock_irqrestore(&conn->rac_lock, flags); } if (more_to_do) { CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn); kranal_schedule_conn(conn); }}static inline voidkranal_swab_rdma_desc (kra_rdma_desc_t *d){ __swab64s(&d->rard_key.Key); __swab16s(&d->rard_key.Cookie); __swab16s(&d->rard_key.MdHandle); __swab32s(&d->rard_key.Flags); __swab64s(&d->rard_addr.AddressBits); __swab32s(&d->rard_nob);}kra_tx_t *kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie){ struct list_head *ttmp; kra_tx_t *tx; unsigned long flags; spin_lock_irqsave(&conn->rac_lock, flags); list_for_each(ttmp, &conn->rac_replyq) { tx = list_entry(ttmp, kra_tx_t, tx_list); CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n", tx, tx->tx_msg.ram_type, tx->tx_cookie); if (tx->tx_cookie != cookie) continue; if (tx->tx_msg.ram_type != type) { spin_unlock_irqrestore(&conn->rac_lock, flags); CWARN("Unexpected type %x (%x expected) " "matched reply from %s\n", tx->tx_msg.ram_type, type, libcfs_nid2str(conn->rac_peer->rap_nid)); return NULL; } list_del(&tx->tx_list); spin_unlock_irqrestore(&conn->rac_lock, flags); return tx; } spin_unlock_irqrestore(&conn->rac_lock, flags); CWARN("Unmatched reply %02x/"LPX64" from %s\n", type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid)); return NULL;}voidkranal_check_fma_rx (kra_conn_t *conn){ unsigned long flags; __u32 seq; kra_tx_t *tx; kra_msg_t *msg; void *prefix; RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix); kra_peer_t *peer = conn->rac_peer; int rc = 0; int repost = 1; if (rrc == RAP_NOT_DONE) return; CDEBUG(D_NET, "RX on %p\n", conn); LASSERT (rrc == RAP_SUCCESS); conn->rac_last_rx = jiffies; seq = conn->rac_rx_seq++; msg = (kra_msg_t *)prefix; /* stash message for portals callbacks they'll NULL * rac_rxmsg if they consume it */ LASSERT (conn->rac_rxmsg == NULL); conn->rac_rxmsg = msg; if (msg->ram_magic != RANAL_MSG_MAGIC) { if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) { CERROR("Unexpected magic %08x from %s\n", msg->ram_magic, libcfs_nid2str(peer->rap_nid)); rc = -EPROTO; goto out; } __swab32s(&msg->ram_magic); __swab16s(&msg->ram_version); __swab16s(&msg->ram_type); __swab64s(&msg->ram_srcnid); __swab64s(&msg->ram_connstamp); __swab32s(&msg->ram_seq); /* NB message type checked below; NOT here... */ switch (msg->ram_type) { case RANAL_MSG_PUT_ACK: kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc); break; case RANAL_MSG_GET_REQ: kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc); break; default: break; } } if (msg->ram_version != RANAL_MSG_VERSION) { CERROR("Unexpected protocol version %d from %s\n", msg->ram_version, libcfs_nid2str(peer->rap_nid)); rc = -EPROTO; goto out; } if (msg->ram_srcnid != peer->rap_nid) { CERROR("Unexpected peer %s from %s\n", libcfs_nid2str(msg->ram_srcnid), libcfs_nid2str(peer->rap_nid)); rc = -EPROTO; goto out; } if (msg->ram_connstamp != conn->rac_peer_connstamp) { CERROR("Unexpected connstamp "LPX64"("LPX64 " expected) from %s\n", msg->ram_connstamp, conn->rac_peer_connstamp, libcfs_nid2str(peer->rap_nid)); rc = -EPROTO; goto out; } if (msg->ram_seq != seq) { CERROR("Unexpected sequence number %d(%d expected) from %s\n", msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid)); rc = -EPROTO; goto out; } if ((msg->ram_type & RANAL_MSG_FENCE) != 0) { /* This message signals RDMA completion... */ rrc = RapkFmaSyncWait(conn->rac_rihandle); if (rrc != RAP_SUCCESS) { CERROR("RapkFmaSyncWait failed: %d\n", rrc); rc = -ENETDOWN; goto out; } } if (conn->rac_close_recvd) { CERROR("Unexpected message %d after CLOSE from %s\n", msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid)); rc = -EPROTO; goto out; } if (msg->ram_type == RANAL_MSG_CLOSE) { CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid)); conn->rac_close_recvd = 1; write_lock_irqsave(&kranal_data.kra_global_lock, flags); if (conn->rac_state == RANAL_CONN_ESTABLISHED) kranal_close_conn_locked(conn, 0); else if (conn->rac_state == RANAL_CONN_CLOSING && conn->rac_close_sent) kranal_terminate_conn_locked(conn); write_unlock_irqrestore(&kranal_data.kra_global_lock, flags); goto out; } if (conn->rac_state != RANAL_CONN_ESTABLISHED) goto out; switch (msg->ram_type) { case RANAL_MSG_NOOP: /* Nothing to do; just a keepalive */ CDEBUG(D_NET, "RX NOOP on %p\n", conn); break; case RANAL_MSG_IMMEDIATE: CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn); rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr, msg->ram_srcnid, conn, 0); repost = rc < 0; break; case RANAL_MSG_PUT_REQ: CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn); rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr, msg->ram_srcnid, conn, 1); repost = rc < 0; break; case RANAL_MSG_PUT_NAK: CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ, msg->ram_u.completion.racm_cookie); if (tx == NULL) break; LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); kranal_tx_done(tx, -ENOENT); /* no match */ break; case RANAL_MSG_PUT_ACK: CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ, msg->ram_u.putack.rapam_src_cookie); if (tx == NULL) break; kranal_rdma(tx, RANAL_MSG_PUT_DONE, &msg->ram_u.putack.rapam_desc, msg->ram_u.putack.rapam_desc.rard_nob, msg->ram_u.putack.rapam_dst_cookie); break; case RANAL_MSG_PUT_DONE: CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK, msg->ram_u.completion.racm_cookie); if (tx == NULL) break; LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); kranal_tx_done(tx, 0); break; case RANAL_MSG_GET_REQ: CDEBUG(D_NET, "RX GET_REQ on %p\n", conn); rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr, msg->ram_srcnid, conn, 1); repost = rc < 0; break; case RANAL_MSG_GET_NAK: CDEBUG(D_NET, "RX GET_NAK on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ, msg->ram_u.completion.racm_cookie); if (tx == NULL) break; LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || tx->tx_buftype == RANAL_BUF_VIRT_MAPPED); kranal_tx_done(tx, -ENOENT); /* no match */ break; case RANAL_MSG_GET_DONE: CDEBUG(D_NET, "RX GET_DONE on %p\n", conn); tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ, msg->ram_u.completion.racm_cookie); if (tx == NULL) break; LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED || tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);#if 0 /* completion message should send rdma length if we ever allow * GET truncation */ lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);#endif kranal_tx_done(tx, 0); break; } out: if (rc < 0) /* protocol/comms error */ kranal_close_conn (conn, rc); if (repost && conn->rac_rxmsg != NULL) kranal_consume_rxmsg(conn, NULL, 0); /* check again later */ kranal_schedule_conn(conn);}voidkranal_complete_closed_conn (kra_conn_t *conn){ kra_tx_t *tx; int nfma; int nreplies; LASSERT (conn->rac_state == RANAL_CONN_CLOSED); LASSERT (list_empty(&conn->rac_list)); LASSERT (list_empty(&conn->rac_hashlist)); for (nfma = 0; !list_empty(&conn->rac_fmaq); nfma++) { tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list); list_del(&tx->tx_list); kranal_tx_done(tx, -ECONNABORTED); } LASSERT (list_empty(&conn->rac_rdmaq)); for (nreplies = 0; !list_empty(&conn->rac_replyq); nreplies++) { tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list); list_del(&tx->tx_list); kranal_tx_done(tx, -ECONNABORTED); } CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n", conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);}intkranal_process_new_conn (kra_conn_t *conn){ RAP_RETURN rrc; rrc = RapkC
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -