⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_rx_buf.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 2 页
字号:
        cfs_mem_cache_free(kptllnd_data.kptl_rx_cache, rx);}voidkptllnd_rx_buffer_callback (ptl_event_t *ev){        kptl_eventarg_t        *eva = ev->md.user_ptr;        kptl_rx_buffer_t       *rxb = kptllnd_eventarg2obj(eva);        kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;        kptl_rx_t              *rx;        int                     unlinked;        unsigned long           flags;#ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS        unlinked = ev->unlinked;#else        unlinked = ev->type == PTL_EVENT_UNLINK;#endif        CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",               kptllnd_ptlid2str(ev->initiator),                kptllnd_evtype2str(ev->type), ev->type, rxb,                kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,               unlinked);        LASSERT (!rxb->rxb_idle);        LASSERT (ev->md.start == rxb->rxb_buffer);        LASSERT (ev->offset + ev->mlength <=                  PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);        LASSERT (ev->type == PTL_EVENT_PUT_END ||                  ev->type == PTL_EVENT_UNLINK);        LASSERT (ev->type == PTL_EVENT_UNLINK ||                 ev->match_bits == LNET_MSG_MATCHBITS);        if (ev->ni_fail_type != PTL_NI_OK) {                CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",                       kptllnd_ptlid2str(ev->initiator),                       kptllnd_evtype2str(ev->type), ev->type, rxb,                       kptllnd_errtype2str(ev->ni_fail_type),                       ev->ni_fail_type, unlinked);        } else if (ev->type == PTL_EVENT_PUT_END &&                   !rxbp->rxbp_shutdown) {                /* rxbp_shutdown sampled without locking!  I only treat it as a                 * hint since shutdown can start while rx's are queued on                 * kptl_sched_rxq. */#if (PTL_MD_LOCAL_ALIGN8 == 0)                /* Portals can't force message alignment - someone sending an                 * odd-length message will misalign subsequent messages and                 * force the fixup below...  */                if ((ev->mlength & 7) != 0)                        CWARN("Message from %s has odd length "LPU64": "                              "probable version incompatibility\n",                              kptllnd_ptlid2str(ev->initiator),                              (__u64)ev->mlength);#endif                rx = kptllnd_rx_alloc();                if (rx == NULL) {                        CERROR("Message from %s dropped: ENOMEM",                               kptllnd_ptlid2str(ev->initiator));                } else {                        if ((ev->offset & 7) == 0) {                                kptllnd_rx_buffer_addref(rxb);                                rx->rx_rxb = rxb;                                rx->rx_nob = ev->mlength;                                rx->rx_msg = (kptl_msg_t *)                                             (rxb->rxb_buffer + ev->offset);                        } else {#if (PTL_MD_LOCAL_ALIGN8 == 0)                                /* Portals can't force alignment - copy into                                 * rx_space (avoiding overflow) to fix */                                int maxlen = *kptllnd_tunables.kptl_max_msg_size;                                                                rx->rx_rxb = NULL;                                rx->rx_nob = MIN(maxlen, ev->mlength);                                rx->rx_msg = (kptl_msg_t *)rx->rx_space;                                memcpy(rx->rx_msg, rxb->rxb_buffer + ev->offset,                                       rx->rx_nob);#else                                /* Portals should have forced the alignment */                                LBUG();#endif                        }                        rx->rx_initiator = ev->initiator;                        rx->rx_treceived = jiffies;#ifdef CRAY_XT3                        rx->rx_uid = ev->uid;#endif                        /* Queue for attention */                        spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,                                           flags);                        list_add_tail(&rx->rx_list,                                       &kptllnd_data.kptl_sched_rxq);                        wake_up(&kptllnd_data.kptl_sched_waitq);                        spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,                                                flags);                }        }        if (unlinked) {                spin_lock_irqsave(&rxbp->rxbp_lock, flags);                rxb->rxb_posted = 0;                rxb->rxb_mdh = PTL_INVALID_HANDLE;                kptllnd_rx_buffer_decref_locked(rxb);                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);        }}voidkptllnd_nak (kptl_rx_t *rx){        /* Fire-and-forget a stub message that will let the peer know my         * protocol magic/version and make her drop/refresh any peer state she         * might have with me. */        ptl_md_t md = {                .start        = kptllnd_data.kptl_nak_msg,                .length       = kptllnd_data.kptl_nak_msg->ptlm_nob,                .threshold    = 1,                .options      = 0,                .user_ptr     = NULL,                .eq_handle    = PTL_EQ_NONE};        ptl_handle_md_t   mdh;        int               rc;        rc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &mdh);        if (rc != PTL_OK) {                CWARN("Can't NAK %s: bind failed %s(%d)\n",                      kptllnd_ptlid2str(rx->rx_initiator),                      kptllnd_errtype2str(rc), rc);                return;        }        rc = PtlPut(mdh, PTL_NOACK_REQ, rx->rx_initiator,                    *kptllnd_tunables.kptl_portal, 0,                    LNET_MSG_MATCHBITS, 0, 0);        if (rc != PTL_OK)                CWARN("Can't NAK %s: put failed %s(%d)\n",                      kptllnd_ptlid2str(rx->rx_initiator),                      kptllnd_errtype2str(rc), rc);}voidkptllnd_rx_parse(kptl_rx_t *rx){        kptl_msg_t             *msg = rx->rx_msg;        kptl_peer_t            *peer;        int                     rc;        unsigned long           flags;        lnet_process_id_t       srcid;        LASSERT (rx->rx_peer == NULL);        if ((rx->rx_nob >= 4 &&             (msg->ptlm_magic == LNET_PROTO_MAGIC ||              msg->ptlm_magic == __swab32(LNET_PROTO_MAGIC))) ||            (rx->rx_nob >= 6 &&             ((msg->ptlm_magic == PTLLND_MSG_MAGIC &&               msg->ptlm_version != PTLLND_MSG_VERSION) ||              (msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC) &&               msg->ptlm_version != __swab16(PTLLND_MSG_VERSION))))) {                /* NAK incompatible versions                 * See other LNDs for how to handle this if/when ptllnd begins                 * to allow different versions to co-exist */                CERROR("Bad version: got %04x expected %04x from %s\n",                       (__u32)(msg->ptlm_magic == PTLLND_MSG_MAGIC ?                               msg->ptlm_version : __swab16(msg->ptlm_version)),                        PTLLND_MSG_VERSION, kptllnd_ptlid2str(rx->rx_initiator));                kptllnd_nak(rx);                goto rx_done;        }                rc = kptllnd_msg_unpack(msg, rx->rx_nob);        if (rc != 0) {                CERROR ("Error %d unpacking rx from %s\n",                        rc, kptllnd_ptlid2str(rx->rx_initiator));                goto rx_done;        }        srcid.nid = msg->ptlm_srcnid;        srcid.pid = msg->ptlm_srcpid;        CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks (%ld s)\n",               libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),               msg->ptlm_credits, rx, rx->rx_rxb,                jiffies - rx->rx_treceived,               cfs_duration_sec(jiffies - rx->rx_treceived));        if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {                CERROR("Bad source id %s from %s\n",                       libcfs_id2str(srcid),                       kptllnd_ptlid2str(rx->rx_initiator));                goto rx_done;        }        if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {                peer = kptllnd_id2peer(srcid);                if (peer == NULL)                        goto rx_done;                                CWARN("NAK from %s (%s)\n",                      libcfs_id2str(srcid),                      kptllnd_ptlid2str(rx->rx_initiator));                rc = -EPROTO;                goto failed;        }        if (msg->ptlm_dstnid != kptllnd_data.kptl_ni->ni_nid ||            msg->ptlm_dstpid != the_lnet.ln_pid) {                CERROR("Bad dstid %s (expected %s) from %s\n",                       libcfs_id2str((lnet_process_id_t) {                               .nid = msg->ptlm_dstnid,                               .pid = msg->ptlm_dstpid}),                       libcfs_id2str((lnet_process_id_t) {                               .nid = kptllnd_data.kptl_ni->ni_nid,                               .pid = the_lnet.ln_pid}),                       kptllnd_ptlid2str(rx->rx_initiator));                goto rx_done;        }        if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {                peer = kptllnd_peer_handle_hello(rx->rx_initiator, msg);                if (peer == NULL)                        goto rx_done;        } else {                peer = kptllnd_id2peer(srcid);                if (peer == NULL) {                        CWARN("NAK %s: no connection; peer must reconnect\n",                              libcfs_id2str(srcid));                        /* NAK to make the peer reconnect */                        kptllnd_nak(rx);                        goto rx_done;                }                /* Ignore anything apart from HELLO while I'm waiting for it and                 * any messages for a previous incarnation of the connection */                if (peer->peer_state == PEER_STATE_WAITING_HELLO ||                    msg->ptlm_dststamp < peer->peer_myincarnation) {                        kptllnd_peer_decref(peer);                        goto rx_done;                }                if (msg->ptlm_srcstamp != peer->peer_incarnation) {                        CERROR("%s: Unexpected srcstamp "LPX64" "                               "("LPX64" expected)\n",                               libcfs_id2str(peer->peer_id),                               msg->ptlm_srcstamp,                               peer->peer_incarnation);                        rc = -EPROTO;                        goto failed;                }                if (msg->ptlm_dststamp != peer->peer_myincarnation) {                        CERROR("%s: Unexpected dststamp "LPX64" "                               "("LPX64" expected)\n",                               libcfs_id2str(peer->peer_id), msg->ptlm_dststamp,                               peer->peer_myincarnation);                        rc = -EPROTO;                        goto failed;                }        }        LASSERT (msg->ptlm_srcnid == peer->peer_id.nid &&                 msg->ptlm_srcpid == peer->peer_id.pid);        spin_lock_irqsave(&peer->peer_lock, flags);        /* Check peer only sends when I've sent her credits */        if (peer->peer_sent_credits == 0) {                int  c = peer->peer_credits;                int oc = peer->peer_outstanding_credits;                int sc = peer->peer_sent_credits;                                spin_unlock_irqrestore(&peer->peer_lock, flags);                CERROR("%s: buffer overrun [%d/%d+%d]\n",                       libcfs_id2str(peer->peer_id), c, sc, oc);                goto failed;        }        peer->peer_sent_credits--;        /* No check for credit overflow - the peer may post new         * buffers after the startup handshake. */        peer->peer_credits += msg->ptlm_credits;        spin_unlock_irqrestore(&peer->peer_lock, flags);        /* See if something can go out now that credits have come in */        if (msg->ptlm_credits != 0)                kptllnd_peer_check_sends(peer);        /* ptllnd-level protocol correct - rx takes my ref on peer and increments         * peer_outstanding_credits when it completes */        rx->rx_peer = peer;        kptllnd_peer_alive(peer);        switch (msg->ptlm_type) {        default:                /* already checked by kptllnd_msg_unpack() */                LBUG();        case PTLLND_MSG_TYPE_HELLO:                CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO\n");                goto rx_done;        case PTLLND_MSG_TYPE_NOOP:                CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP\n");                goto rx_done;        case PTLLND_MSG_TYPE_IMMEDIATE:                CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");                rc = lnet_parse(kptllnd_data.kptl_ni,                                &msg->ptlm_u.immediate.kptlim_hdr,                                msg->ptlm_srcnid,                                rx, 0);                if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */                        return;                goto failed;                        case PTLLND_MSG_TYPE_PUT:        case PTLLND_MSG_TYPE_GET:                CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",                        msg->ptlm_type == PTLLND_MSG_TYPE_PUT ?                        "PUT" : "GET");                /* checked in kptllnd_msg_unpack() */                LASSERT (msg->ptlm_u.rdma.kptlrm_matchbits >=                          PTL_RESERVED_MATCHBITS);                /* Update last match bits seen */                spin_lock_irqsave(&peer->peer_lock, flags);                if (msg->ptlm_u.rdma.kptlrm_matchbits >                    rx->rx_peer->peer_last_matchbits_seen)                        rx->rx_peer->peer_last_matchbits_seen =                                msg->ptlm_u.rdma.kptlrm_matchbits;                spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);                rc = lnet_parse(kptllnd_data.kptl_ni,                                &msg->ptlm_u.rdma.kptlrm_hdr,                                msg->ptlm_srcnid,                                rx, 1);                if (rc >= 0)                    /* kptllnd_recv owns 'rx' now */                        return;                goto failed;         } failed:        kptllnd_peer_close(peer, rc);        if (rx->rx_peer == NULL)                /* drop ref on peer */                kptllnd_peer_decref(peer);      /* unless rx_done will */ rx_done:        kptllnd_rx_done(rx);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -