📄 ptllnd_rx_buf.c
字号:
cfs_mem_cache_free(kptllnd_data.kptl_rx_cache, rx);}voidkptllnd_rx_buffer_callback (ptl_event_t *ev){ kptl_eventarg_t *eva = ev->md.user_ptr; kptl_rx_buffer_t *rxb = kptllnd_eventarg2obj(eva); kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool; kptl_rx_t *rx; int unlinked; unsigned long flags;#ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS unlinked = ev->unlinked;#else unlinked = ev->type == PTL_EVENT_UNLINK;#endif CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n", kptllnd_ptlid2str(ev->initiator), kptllnd_evtype2str(ev->type), ev->type, rxb, kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type, unlinked); LASSERT (!rxb->rxb_idle); LASSERT (ev->md.start == rxb->rxb_buffer); LASSERT (ev->offset + ev->mlength <= PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages); LASSERT (ev->type == PTL_EVENT_PUT_END || ev->type == PTL_EVENT_UNLINK); LASSERT (ev->type == PTL_EVENT_UNLINK || ev->match_bits == LNET_MSG_MATCHBITS); if (ev->ni_fail_type != PTL_NI_OK) { CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn", kptllnd_ptlid2str(ev->initiator), kptllnd_evtype2str(ev->type), ev->type, rxb, kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type, unlinked); } else if (ev->type == PTL_EVENT_PUT_END && !rxbp->rxbp_shutdown) { /* rxbp_shutdown sampled without locking! I only treat it as a * hint since shutdown can start while rx's are queued on * kptl_sched_rxq. */#if (PTL_MD_LOCAL_ALIGN8 == 0) /* Portals can't force message alignment - someone sending an * odd-length message will misalign subsequent messages and * force the fixup below... */ if ((ev->mlength & 7) != 0) CWARN("Message from %s has odd length "LPU64": " "probable version incompatibility\n", kptllnd_ptlid2str(ev->initiator), (__u64)ev->mlength);#endif rx = kptllnd_rx_alloc(); if (rx == NULL) { CERROR("Message from %s dropped: ENOMEM", kptllnd_ptlid2str(ev->initiator)); } else { if ((ev->offset & 7) == 0) { kptllnd_rx_buffer_addref(rxb); rx->rx_rxb = rxb; rx->rx_nob = ev->mlength; rx->rx_msg = (kptl_msg_t *) (rxb->rxb_buffer + ev->offset); } else {#if (PTL_MD_LOCAL_ALIGN8 == 0) /* Portals can't force alignment - copy into * rx_space (avoiding overflow) to fix */ int maxlen = *kptllnd_tunables.kptl_max_msg_size; rx->rx_rxb = NULL; rx->rx_nob = MIN(maxlen, ev->mlength); rx->rx_msg = (kptl_msg_t *)rx->rx_space; memcpy(rx->rx_msg, rxb->rxb_buffer + ev->offset, rx->rx_nob);#else /* Portals should have forced the alignment */ LBUG();#endif } rx->rx_initiator = ev->initiator; rx->rx_treceived = jiffies;#ifdef CRAY_XT3 rx->rx_uid = ev->uid;#endif /* Queue for attention */ spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags); list_add_tail(&rx->rx_list, &kptllnd_data.kptl_sched_rxq); wake_up(&kptllnd_data.kptl_sched_waitq); spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags); } } if (unlinked) { spin_lock_irqsave(&rxbp->rxbp_lock, flags); rxb->rxb_posted = 0; rxb->rxb_mdh = PTL_INVALID_HANDLE; kptllnd_rx_buffer_decref_locked(rxb); spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); }}voidkptllnd_nak (kptl_rx_t *rx){ /* Fire-and-forget a stub message that will let the peer know my * protocol magic/version and make her drop/refresh any peer state she * might have with me. */ ptl_md_t md = { .start = kptllnd_data.kptl_nak_msg, .length = kptllnd_data.kptl_nak_msg->ptlm_nob, .threshold = 1, .options = 0, .user_ptr = NULL, .eq_handle = PTL_EQ_NONE}; ptl_handle_md_t mdh; int rc; rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh); if (rc != PTL_OK) { CWARN("Can't NAK %s: bind failed %s(%d)\n", kptllnd_ptlid2str(rx->rx_initiator), kptllnd_errtype2str(rc), rc); return; } rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator, *kptllnd_tunables.kptl_portal, 0, LNET_MSG_MATCHBITS, 0, 0); if (rc != PTL_OK) CWARN("Can't NAK %s: put failed %s(%d)\n", kptllnd_ptlid2str(rx->rx_initiator), kptllnd_errtype2str(rc), rc);}voidkptllnd_rx_parse(kptl_rx_t *rx){ kptl_msg_t *msg = rx->rx_msg; kptl_peer_t *peer; int rc; unsigned long flags; lnet_process_id_t srcid; LASSERT (rx->rx_peer == NULL); if ((rx->rx_nob >= 4 && (msg->ptlm_magic == LNET_PROTO_MAGIC || msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) || (rx->rx_nob >= 6 && ((msg->ptlm_magic == PTLLND_MSG_MAGIC && msg->ptlm_version != PTLLND_MSG_VERSION) || (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC) && msg->ptlm_version != __swab16(PTLLND_MSG_VERSION))))) { /* NAK incompatible versions * See other LNDs for how to handle this if/when ptllnd begins * to allow different versions to co-exist */ CERROR("Bad version: got %04x expected %04x from %s\n", (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ? msg->ptlm_version : __swab16(msg->ptlm_version)), PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator)); kptllnd_nak(rx); goto rx_done; } rc = kptllnd_msg_unpack(msg, rx->rx_nob); if (rc != 0) { CERROR ("Error %d unpacking rx from %s\n", rc, kptllnd_ptlid2str(rx->rx_initiator)); goto rx_done; } srcid.nid = msg->ptlm_srcnid; srcid.pid = msg->ptlm_srcpid; CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n", libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type), msg->ptlm_credits, rx, rx->rx_rxb, jiffies - rx->rx_treceived, cfs_duration_sec(jiffies - rx->rx_treceived)); if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) { CERROR("Bad source id %s from %s\n", libcfs_id2str(srcid), kptllnd_ptlid2str(rx->rx_initiator)); goto rx_done; } if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) { peer = kptllnd_id2peer(srcid); if (peer == NULL) goto rx_done; CWARN("NAK from %s (%s)\n", libcfs_id2str(srcid), kptllnd_ptlid2str(rx->rx_initiator)); rc = -EPROTO; goto failed; } if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid || msg->ptlm_dstpid != the_lnet.ln_pid) { CERROR("Bad dstid %s (expected %s) from %s\n", libcfs_id2str((lnet_process_id_t) { .nid = msg->ptlm_dstnid, .pid = msg->ptlm_dstpid}), libcfs_id2str((lnet_process_id_t) { .nid = kptllnd_data.kptl_ni->ni_nid, .pid = the_lnet.ln_pid}), kptllnd_ptlid2str(rx->rx_initiator)); goto rx_done; } if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) { peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg); if (peer == NULL) goto rx_done; } else { peer = kptllnd_id2peer(srcid); if (peer == NULL) { CWARN("NAK %s: no connection; peer must reconnect\n", libcfs_id2str(srcid)); /* NAK to make the peer reconnect */ kptllnd_nak(rx); goto rx_done; } /* Ignore anything apart from HELLO while I'm waiting for it and * any messages for a previous incarnation of the connection */ if (peer->peer_state == PEER_STATE_WAITING_HELLO || msg->ptlm_dststamp < peer->peer_myincarnation) { kptllnd_peer_decref(peer); goto rx_done; } if (msg->ptlm_srcstamp != peer->peer_incarnation) { CERROR("%s: Unexpected srcstamp "LPX64" " "("LPX64" expected)\n", libcfs_id2str(peer->peer_id), msg->ptlm_srcstamp, peer->peer_incarnation); rc = -EPROTO; goto failed; } if (msg->ptlm_dststamp != peer->peer_myincarnation) { CERROR("%s: Unexpected dststamp "LPX64" " "("LPX64" expected)\n", libcfs_id2str(peer->peer_id), msg->ptlm_dststamp, peer->peer_myincarnation); rc = -EPROTO; goto failed; } } LASSERT (msg->ptlm_srcnid == peer->peer_id.nid && msg->ptlm_srcpid == peer->peer_id.pid); spin_lock_irqsave(&peer->peer_lock, flags); /* Check peer only sends when I've sent her credits */ if (peer->peer_sent_credits == 0) { int c = peer->peer_credits; int oc = peer->peer_outstanding_credits; int sc = peer->peer_sent_credits; spin_unlock_irqrestore(&peer->peer_lock, flags); CERROR("%s: buffer overrun [%d/%d+%d]\n", libcfs_id2str(peer->peer_id), c, sc, oc); goto failed; } peer->peer_sent_credits--; /* No check for credit overflow - the peer may post new * buffers after the startup handshake. */ peer->peer_credits += msg->ptlm_credits; spin_unlock_irqrestore(&peer->peer_lock, flags); /* See if something can go out now that credits have come in */ if (msg->ptlm_credits != 0) kptllnd_peer_check_sends(peer); /* ptllnd-level protocol correct - rx takes my ref on peer and increments * peer_outstanding_credits when it completes */ rx->rx_peer = peer; kptllnd_peer_alive(peer); switch (msg->ptlm_type) { default: /* already checked by kptllnd_msg_unpack() */ LBUG(); case PTLLND_MSG_TYPE_HELLO: CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO\n"); goto rx_done; case PTLLND_MSG_TYPE_NOOP: CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP\n"); goto rx_done; case PTLLND_MSG_TYPE_IMMEDIATE: CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n"); rc = lnet_parse(kptllnd_data.kptl_ni, &msg->ptlm_u.immediate.kptlim_hdr, msg->ptlm_srcnid, rx, 0); if (rc >= 0) /* kptllnd_recv owns 'rx' now */ return; goto failed; case PTLLND_MSG_TYPE_PUT: case PTLLND_MSG_TYPE_GET: CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n", msg->ptlm_type == PTLLND_MSG_TYPE_PUT ? "PUT" : "GET"); /* checked in kptllnd_msg_unpack() */ LASSERT (msg->ptlm_u.rdma.kptlrm_matchbits >= PTL_RESERVED_MATCHBITS); /* Update last match bits seen */ spin_lock_irqsave(&peer->peer_lock, flags); if (msg->ptlm_u.rdma.kptlrm_matchbits > rx->rx_peer->peer_last_matchbits_seen) rx->rx_peer->peer_last_matchbits_seen = msg->ptlm_u.rdma.kptlrm_matchbits; spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags); rc = lnet_parse(kptllnd_data.kptl_ni, &msg->ptlm_u.rdma.kptlrm_hdr, msg->ptlm_srcnid, rx, 1); if (rc >= 0) /* kptllnd_recv owns 'rx' now */ return; goto failed; } failed: kptllnd_peer_close(peer, rc); if (rx->rx_peer == NULL) /* drop ref on peer */ kptllnd_peer_decref(peer); /* unless rx_done will */ rx_done: kptllnd_rx_done(rx);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -