⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
                       libcfs_id2str(srcid));                ptllnd_peer_decref(plp);                return;        }        /* Check peer only sends when I've sent her credits */        if (plp->plp_sent_credits == 0) {                CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",                       libcfs_id2str(plp->plp_id),                       plp->plp_credits, plp->plp_outstanding_credits,                        plp->plp_sent_credits,                       plni->plni_peer_credits + plp->plp_lazy_credits);                return;        }        plp->plp_sent_credits--;                /* No check for credit overflow - the peer may post new buffers after         * the startup handshake. */        if (msg->ptlm_credits > 0) {                plp->plp_credits += msg->ptlm_credits;                ptllnd_check_sends(plp);        }        /* All OK so far; assume the message is good... */        rx.rx_peer      = plp;        rx.rx_msg       = msg;        rx.rx_nob       = nob;        plni->plni_nrxs++;        switch (msg->ptlm_type) {        default: /* message types have been checked already */                ptllnd_rx_done(&rx);                break;        case PTLLND_MSG_TYPE_PUT:        case PTLLND_MSG_TYPE_GET:                rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,                                msg->ptlm_srcnid, &rx, 1);                if (rc < 0)                        ptllnd_rx_done(&rx);                break;        case PTLLND_MSG_TYPE_IMMEDIATE:                rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,                                msg->ptlm_srcnid, &rx, 0);                if (rc < 0)                        ptllnd_rx_done(&rx);                break;        }        ptllnd_peer_decref(plp);}voidptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event){        ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);        ptllnd_ni_t     *plni = ni->ni_data;        char            *msg = &buf->plb_buffer[event->offset];        int              repost;        int              unlinked = event->type == PTL_EVENT_UNLINK;        LASSERT (buf->plb_ni == ni);        LASSERT (event->type == PTL_EVENT_PUT_END ||                 event->type == PTL_EVENT_UNLINK);        if (event->ni_fail_type != PTL_NI_OK) {                CERROR("event type %s(%d), status %s(%d) from %s\n",                       ptllnd_evtype2str(event->type), event->type,                       ptllnd_errtype2str(event->ni_fail_type),                        event->ni_fail_type,                       ptllnd_ptlid2str(event->initiator));        } else if (event->type == PTL_EVENT_PUT_END) {#if (PTL_MD_LOCAL_ALIGN8 == 0)                /* Portals can't force message alignment - someone sending an                 * odd-length message could misalign subsequent messages */                if ((event->mlength & 7) != 0) {                        CERROR("Message from %s has odd length %llu: "                               "probable version incompatibility\n",                               ptllnd_ptlid2str(event->initiator),                               event->mlength);                        LBUG();                }#endif                LASSERT ((event->offset & 7) == 0);                ptllnd_parse_request(ni, event->initiator,                                     (kptl_msg_t *)msg, event->mlength);        }#ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS        /* UNLINK event only on explicit unlink */        repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);        if (event->unlinked)                unlinked = 1;#else        /* UNLINK event only on implicit unlink */        repost = (event->type == PTL_EVENT_UNLINK);#endif        if (unlinked) {                LASSERT(buf->plb_posted);                buf->plb_posted = 0;                plni->plni_nposted_buffers--;        }        if (repost)                (void) ptllnd_post_buffer(buf);}voidptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event){        ptllnd_ni_t *plni = ni->ni_data;        ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);        int          error = (event->ni_fail_type != PTL_NI_OK);        int          isreq;        int          isbulk;#ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS        int          unlinked = event->unlinked;#else        int          unlinked = (event->type == PTL_EVENT_UNLINK);#endif        if (error)                CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",                       ptllnd_errtype2str(event->ni_fail_type),                       event->ni_fail_type,                       ptllnd_evtype2str(event->type), event->type,                       unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,                       libcfs_id2str(tx->tx_peer->plp_id));        LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));        isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);        if (isreq) {                LASSERT (event->md.start == (void *)&tx->tx_msg);                if (unlinked) {                        tx->tx_reqmdh = PTL_INVALID_HANDLE;                        gettimeofday(&tx->tx_req_done, NULL);                }        }        isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);        if ( isbulk && unlinked ) {                tx->tx_bulkmdh = PTL_INVALID_HANDLE;                gettimeofday(&tx->tx_bulk_done, NULL);        }        LASSERT (!isreq != !isbulk);            /* always one and only 1 match */        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",                       libcfs_id2str(tx->tx_peer->plp_id),                        tx->tx_peer->plp_credits,                       tx->tx_peer->plp_outstanding_credits,                       tx->tx_peer->plp_sent_credits,                       plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,                       tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");        LASSERT (!isreq != !isbulk);            /* always one and only 1 match */        switch (tx->tx_type) {        default:                LBUG();        case PTLLND_MSG_TYPE_NOOP:        case PTLLND_MSG_TYPE_HELLO:        case PTLLND_MSG_TYPE_IMMEDIATE:                LASSERT (event->type == PTL_EVENT_UNLINK ||                         event->type == PTL_EVENT_SEND_END);                LASSERT (isreq);                break;        case PTLLND_MSG_TYPE_GET:                LASSERT (event->type == PTL_EVENT_UNLINK ||                         (isreq && event->type == PTL_EVENT_SEND_END) ||                         (isbulk && event->type == PTL_EVENT_PUT_END));                if (isbulk && !error && event->type == PTL_EVENT_PUT_END) {                        /* Check GET matched */                        if (event->hdr_data == PTLLND_RDMA_OK) {                                lnet_set_reply_msg_len(ni,                                                        tx->tx_lnetreplymsg,                                                       event->mlength);                        } else {                                CERROR ("Unmatched GET with %s\n",                                        libcfs_id2str(tx->tx_peer->plp_id));                                tx->tx_status = -EIO;                        }                }                break;        case PTLLND_MSG_TYPE_PUT:                LASSERT (event->type == PTL_EVENT_UNLINK ||                         (isreq && event->type == PTL_EVENT_SEND_END) ||                         (isbulk && event->type == PTL_EVENT_GET_END));                break;        case PTLLND_RDMA_READ:                LASSERT (event->type == PTL_EVENT_UNLINK ||                         event->type == PTL_EVENT_SEND_END ||                         event->type == PTL_EVENT_REPLY_END);                LASSERT (isbulk);                break;        case PTLLND_RDMA_WRITE:                LASSERT (event->type == PTL_EVENT_UNLINK ||                         event->type == PTL_EVENT_SEND_END);                LASSERT (isbulk);        }        /* Schedule ptllnd_tx_done() on error or last completion event */        if (error ||            (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&             PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {                if (error)                        tx->tx_status = -EIO;                list_del(&tx->tx_list);                list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);        }}ptllnd_tx_t *ptllnd_find_timed_out_tx(ptllnd_peer_t *peer){        time_t            now = cfs_time_current_sec();        struct list_head *tmp;        list_for_each(tmp, &peer->plp_txq) {                ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);                                if (tx->tx_deadline < now)                        return tx;        }                list_for_each(tmp, &peer->plp_activeq) {                ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);                                if (tx->tx_deadline < now)                        return tx;        }        return NULL;}voidptllnd_check_peer(ptllnd_peer_t *peer){        ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer);                if (tx == NULL)                return;                CERROR("%s: timed out\n", libcfs_id2str(peer->plp_id));        ptllnd_close_peer(peer, -ETIMEDOUT);}voidptllnd_watchdog (lnet_ni_t *ni, time_t now){        ptllnd_ni_t      *plni = ni->ni_data;        const int         n = 4;        int               p = plni->plni_watchdog_interval;        int               chunk = plni->plni_peer_hash_size;        int               interval = now - (plni->plni_watchdog_nextt - p);        int               i;        struct list_head *hashlist;        struct list_head *tmp;        struct list_head *nxt;        /* Time to check for RDMA timeouts on a few more peers:          * I try to do checks every 'p' seconds on a proportion of the peer         * table and I need to check every connection 'n' times within a         * timeout interval, to ensure I detect a timeout on any connection         * within (n+1)/n times the timeout interval. */        LASSERT (now >= plni->plni_watchdog_nextt);        if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */                chunk = (chunk * n * interval) / plni->plni_timeout;                if (chunk == 0)                        chunk = 1;        }        for (i = 0; i < chunk; i++) {                hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx];                                list_for_each_safe(tmp, nxt, hashlist) {                        ptllnd_check_peer(list_entry(tmp, ptllnd_peer_t, plp_list));                }                                plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) %                                              plni->plni_peer_hash_size;        }        plni->plni_watchdog_nextt = now + p;}voidptllnd_wait (lnet_ni_t *ni, int milliseconds){        static struct timeval  prevt;        static int             prevt_count;        static int             call_count;        struct timeval         start;        struct timeval         then;        struct timeval         now;        struct timeval         deadline;                ptllnd_ni_t   *plni = ni->ni_data;        ptllnd_tx_t   *tx;        ptl_event_t    event;        int            which;        int            rc;        int            found = 0;        int            timeout = 0;        /* Handle any currently queued events, returning immediately if any.         * Otherwise block for the timeout and handle all events queued         * then. */        gettimeofday(&start, NULL);        call_count++;        if (milliseconds <= 0) {                deadline = start;        } else {                deadline.tv_sec  = start.tv_sec  +  milliseconds/1000;                deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000;                if (deadline.tv_usec >= 1000000) {                        start.tv_usec -= 1000000;                        start.tv_sec++;                }        }        for (;;) {                gettimeofday(&then, NULL);                                rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);                gettimeofday(&now, NULL);                if ((now.tv_sec*1000 + now.tv_usec/1000) -                     (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) {                        /* 1000 mS grace...........................^ */                        CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout,                               (int)(now.tv_sec*1000 + now.tv_usec/1000) -                                (int)(then.tv_sec*1000 + then.tv_usec/1000));                }                if (rc == PTL_EQ_EMPTY) {                        if (found)              /* handled some events */                                break;                        if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */                                ptllnd_watchdog(ni, now.tv_sec);                                LASSERT (now.tv_sec < plni->plni_watchdog_nextt);                        }                                                if (now.tv_sec > deadline.tv_sec || /* timeout expired */                            (now.tv_sec == deadline.tv_sec &&                             now.tv_usec >= deadline.tv_usec))                            

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -