⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 2 页
字号:
                break;        }        /* I don't have to handle kiovs */        LASSERT (payload_nob == 0 || payload_iov != NULL);        tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);        if (tx == NULL) {                CERROR("Can't send %s to %s: can't allocate descriptor\n",                       lnet_msgtyp2str(type), libcfs_id2str(target));                rc = -ENOMEM;                goto out;        }        tx->tx_lnet_msg = lntmsg;        tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr;        if (payload_nob == 0) {                nfrag = 0;        } else {                tx->tx_frags->iov[0].iov_base = tx->tx_msg;                tx->tx_frags->iov[0].iov_len = offsetof(kptl_msg_t,                                                        ptlm_u.immediate.kptlim_payload);                /* NB relying on lustre not asking for PTL_MD_MAX_IOV                 * fragments!! */#ifdef _USING_LUSTRE_PORTALS_                nfrag = 1 + lnet_extract_iov(PTL_MD_MAX_IOV - 1,                                              &tx->tx_frags->iov[1],                                             payload_niov, payload_iov,                                             payload_offset, payload_nob);#else                nfrag = 1 + kptllnd_extract_iov(PTL_MD_MAX_IOV - 1,                                                &tx->tx_frags->iov[1],                                                payload_niov, payload_iov,                                                payload_offset, payload_nob);#endif        }                nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);        kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob);        CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n",               libcfs_id2str(target),               lnet_msgtyp2str(lntmsg->msg_type),               (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_PUT) ?                le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index) :               (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_GET) ?                le32_to_cpu(lntmsg->msg_hdr.msg.get.ptl_index) : -1,               tx);        kptllnd_tx_launch(peer, tx, nfrag); out:        kptllnd_peer_decref(peer);        return rc;}int kptllnd_eager_recv(struct lnet_ni *ni, void *private,                   lnet_msg_t *msg, void **new_privatep){        kptl_rx_t        *rx = private;        CDEBUG(D_NET, "Eager RX=%p RXB=%p\n", rx, rx->rx_rxb);        /* I have to release my ref on rxb (if I have one) to ensure I'm an         * eager receiver, so I copy the incoming request from the buffer it         * landed in, into space reserved in the descriptor... */#if (PTL_MD_LOCAL_ALIGN8 == 0)        if (rx->rx_rxb == NULL)                 /* already copied */                return 0;                       /* to fix alignment */#else        LASSERT(rx->rx_rxb != NULL);#endif        LASSERT(rx->rx_nob <= *kptllnd_tunables.kptl_max_msg_size);        memcpy(rx->rx_space, rx->rx_msg, rx->rx_nob);        rx->rx_msg = (kptl_msg_t *)rx->rx_space;        kptllnd_rx_buffer_decref(rx->rx_rxb);        rx->rx_rxb = NULL;        return 0;}int kptllnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,              unsigned int offset, unsigned int mlen, unsigned int rlen){        kptl_rx_t    *rx = private;        kptl_msg_t   *rxmsg = rx->rx_msg;        int           nob;        int           rc;        CDEBUG(D_NET, "%s niov=%d offset=%d mlen=%d rlen=%d\n",               kptllnd_msgtype2str(rxmsg->ptlm_type),               niov, offset, mlen, rlen);        LASSERT (mlen <= rlen);        LASSERT (mlen >= 0);        LASSERT (!in_interrupt());        LASSERT (!(kiov != NULL && iov != NULL)); /* never both */        LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */#ifdef CRAY_XT3        if (lntmsg != NULL &&            rx->rx_uid != 0) {                /* Set the UID if the sender's uid isn't 0; i.e. non-root                 * running in userspace (e.g. a catamount node; linux kernel                 * senders, including routers have uid 0).  If this is a lustre                 * RPC request, this tells lustre not to trust the creds in the                 * RPC message body. */                lnet_set_msg_uid(ni, lntmsg, rx->rx_uid);        }#endif        switch(rxmsg->ptlm_type)        {        default:                LBUG();                rc = -EINVAL;                break;        case PTLLND_MSG_TYPE_IMMEDIATE:                CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE %d,%d\n", mlen, rlen);                nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[rlen]);                if (nob > rx->rx_nob) {                        CERROR ("Immediate message from %s too big: %d(%d)\n",                                libcfs_id2str(rx->rx_peer->peer_id), nob,                                rx->rx_nob);                        rc = -EINVAL;                        break;                }                if (kiov != NULL)                        lnet_copy_flat2kiov(                                niov, kiov, offset,                                *kptllnd_tunables.kptl_max_msg_size,                                rxmsg->ptlm_u.immediate.kptlim_payload,                                0,                                mlen);                else                        lnet_copy_flat2iov(                                niov, iov, offset,                                *kptllnd_tunables.kptl_max_msg_size,                                rxmsg->ptlm_u.immediate.kptlim_payload,                                0,                                mlen);                lnet_finalize (ni, lntmsg, 0);                rc = 0;                break;        case PTLLND_MSG_TYPE_GET:                CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET %d,%d\n", mlen, rlen);                /* NB always send RDMA so the peer can complete.  I send                 * success/failure in the portals 'hdr_data' */                if (lntmsg == NULL)                        rc = kptllnd_active_rdma(rx, NULL,                                                 TX_TYPE_GET_RESPONSE,                                                 0, NULL, NULL, 0, 0);                else                        rc = kptllnd_active_rdma(rx, lntmsg,                                                  TX_TYPE_GET_RESPONSE,                                                 lntmsg->msg_niov,                                                 lntmsg->msg_iov,                                                  lntmsg->msg_kiov,                                                 lntmsg->msg_offset,                                                  lntmsg->msg_len);                break;        case PTLLND_MSG_TYPE_PUT:                CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT %d,%d\n", mlen, rlen);                /* NB always send RDMA so the peer can complete; it'll be 0                 * bytes if there was no match (lntmsg == NULL). I have no way                 * to let my peer know this, but she's only interested in when                 * the net has stopped accessing her buffer in any case. */                rc = kptllnd_active_rdma(rx, lntmsg, TX_TYPE_PUT_RESPONSE,                                         niov, iov, kiov, offset, mlen);                break;        }        /*         * We're done with the RX         */        kptllnd_rx_done(rx);        return rc;}voidkptllnd_eq_callback(ptl_event_t *ev){        kptl_eventarg_t *eva = ev->md.user_ptr;        switch (eva->eva_type) {        default:                LBUG();                        case PTLLND_EVENTARG_TYPE_MSG:        case PTLLND_EVENTARG_TYPE_RDMA:                kptllnd_tx_callback(ev);                break;                        case PTLLND_EVENTARG_TYPE_BUF:                kptllnd_rx_buffer_callback(ev);                break;        }}voidkptllnd_thread_fini (void){        atomic_dec(&kptllnd_data.kptl_nthreads);}intkptllnd_thread_start (int (*fn)(void *arg), void *arg){        long                pid;        atomic_inc(&kptllnd_data.kptl_nthreads);        pid = kernel_thread (fn, arg, 0);        if (pid >= 0)                return 0;                CERROR("Failed to start kernel_thread: error %d\n", (int)pid);        kptllnd_thread_fini();        return (int)pid;}intkptllnd_watchdog(void *arg){        int                 id = (long)arg;        char                name[16];        wait_queue_t        waitlink;        int                 stamp = 0;        int                 peer_index = 0;        unsigned long       deadline = jiffies;        int                 timeout;        int                 i;        snprintf(name, sizeof(name), "kptllnd_wd_%02d", id);        cfs_daemonize(name);        cfs_block_allsigs();        init_waitqueue_entry(&waitlink, current);        /* threads shut down in phase 2 after all peers have been destroyed */        while (kptllnd_data.kptl_shutdown < 2) {                timeout = (int)(deadline - jiffies);                                if (timeout <= 0) {                        const int n = 4;                        const int p = 1;                        int       chunk = kptllnd_data.kptl_peer_hash_size;                        /* Time to check for RDMA timeouts on a few more                         * peers: I do checks every 'p' seconds on a                         * proportion of the peer table and I need to check                         * every connection 'n' times within a timeout                         * interval, to ensure I detect a timeout on any                         * connection within (n+1)/n times the timeout                         * interval. */                        if ((*kptllnd_tunables.kptl_timeout) > n * p)                                chunk = (chunk * n * p) /                                        (*kptllnd_tunables.kptl_timeout);                        if (chunk == 0)                                chunk = 1;                        for (i = 0; i < chunk; i++) {                                kptllnd_peer_check_bucket(peer_index, stamp);                                peer_index = (peer_index + 1) %                                     kptllnd_data.kptl_peer_hash_size;                        }                        deadline += p * HZ;                        stamp++;                        continue;                }                kptllnd_handle_closing_peers();                set_current_state(TASK_INTERRUPTIBLE);                add_wait_queue_exclusive(&kptllnd_data.kptl_watchdog_waitq,                                         &waitlink);                schedule_timeout(timeout);                                set_current_state (TASK_RUNNING);                remove_wait_queue(&kptllnd_data.kptl_watchdog_waitq, &waitlink);        }        kptllnd_thread_fini();        CDEBUG(D_NET, "<<<\n");        return (0);};intkptllnd_scheduler (void *arg){        int                 id = (long)arg;        char                name[16];        wait_queue_t        waitlink;        unsigned long       flags;        int                 did_something;        int                 counter = 0;        kptl_rx_t          *rx;        kptl_rx_buffer_t   *rxb;        kptl_tx_t          *tx;        snprintf(name, sizeof(name), "kptllnd_sd_%02d", id);        cfs_daemonize(name);        cfs_block_allsigs();        init_waitqueue_entry(&waitlink, current);        spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);        /* threads shut down in phase 2 after all peers have been destroyed */        while (kptllnd_data.kptl_shutdown < 2) {                did_something = 0;                if (!list_empty(&kptllnd_data.kptl_sched_rxq)) {                        rx = list_entry (kptllnd_data.kptl_sched_rxq.next,                                         kptl_rx_t, rx_list);                        list_del(&rx->rx_list);                                                spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,                                               flags);                        kptllnd_rx_parse(rx);                        did_something = 1;                        spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);                }                if (!list_empty(&kptllnd_data.kptl_sched_rxbq)) {                        rxb = list_entry (kptllnd_data.kptl_sched_rxbq.next,                                          kptl_rx_buffer_t, rxb_repost_list);                        list_del(&rxb->rxb_repost_list);                        spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,                                               flags);                        kptllnd_rx_buffer_post(rxb);                        did_something = 1;                        spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);                }                if (!list_empty(&kptllnd_data.kptl_sched_txq)) {                        tx = list_entry (kptllnd_data.kptl_sched_txq.next,                                         kptl_tx_t, tx_list);                        list_del_init(&tx->tx_list);                        spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);                        kptllnd_tx_fini(tx);                        did_something = 1;                        spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);                }                if (did_something) {                        if (++counter != *kptllnd_tunables.kptl_reschedule_loops)                                continue;                }                set_current_state(TASK_INTERRUPTIBLE);                add_wait_queue_exclusive(&kptllnd_data.kptl_sched_waitq,                                         &waitlink);                spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);                if (!did_something)                        schedule();                 else                        cond_resched();                set_current_state(TASK_RUNNING);                remove_wait_queue(&kptllnd_data.kptl_sched_waitq, &waitlink);                spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);                counter = 0;        }        spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);        kptllnd_thread_fini();        return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -