⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ralnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
                    &rxmsg->ram_u.get.ragm_desc, nob,                    rxmsg->ram_u.get.ragm_cookie);        return; failed_1:        kranal_tx_done(tx, -EIO); failed_0:        lnet_finalize(ni, lntmsg, -EIO);}intkranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,                   void **new_private){        kra_conn_t *conn = (kra_conn_t *)private;        LCONSOLE_ERROR_MSG(0x12b, "Dropping message from %s: no buffers free.\n",                           libcfs_nid2str(conn->rac_peer->rap_nid));        return -EDEADLK;}intkranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,             int delayed, unsigned int niov,              struct iovec *iov, lnet_kiov_t *kiov,             unsigned int offset, unsigned int mlen, unsigned int rlen){        kra_conn_t  *conn = private;        kra_msg_t   *rxmsg = conn->rac_rxmsg;        kra_tx_t    *tx;        void        *buffer;        int          rc;        LASSERT (mlen <= rlen);        LASSERT (!in_interrupt());        /* Either all pages or all vaddrs */        LASSERT (!(kiov != NULL && iov != NULL));        CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);        switch(rxmsg->ram_type) {        default:                LBUG();        case RANAL_MSG_IMMEDIATE:                if (mlen == 0) {                        buffer = NULL;                } else if (kiov != NULL) {                        CERROR("Can't recv immediate into paged buffer\n");                        return -EIO;                } else {                        LASSERT (niov > 0);                        while (offset >= iov->iov_len) {                                offset -= iov->iov_len;                                iov++;                                niov--;                                LASSERT (niov > 0);                        }                        if (mlen > iov->iov_len - offset) {                                CERROR("Can't handle immediate frags\n");                                return -EIO;                        }                        buffer = ((char *)iov->iov_base) + offset;                }                rc = kranal_consume_rxmsg(conn, buffer, mlen);                lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO);                return 0;        case RANAL_MSG_PUT_REQ:                tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK);                if (tx == NULL) {                        kranal_consume_rxmsg(conn, NULL, 0);                        return -ENOMEM;                }                                rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);                if (rc != 0) {                        kranal_tx_done(tx, rc);                        kranal_consume_rxmsg(conn, NULL, 0);                        return -EIO;                }                tx->tx_conn = conn;                rc = kranal_map_buffer(tx);                if (rc != 0) {                        kranal_tx_done(tx, rc);                        kranal_consume_rxmsg(conn, NULL, 0);                        return -EIO;                }                tx->tx_msg.ram_u.putack.rapam_src_cookie =                        conn->rac_rxmsg->ram_u.putreq.raprm_cookie;                tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;                tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;                tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =                        (__u64)((unsigned long)tx->tx_buffer);                tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;                tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */                kranal_post_fma(conn, tx);                kranal_consume_rxmsg(conn, NULL, 0);                return 0;        case RANAL_MSG_GET_REQ:                if (lntmsg != NULL) {                        /* Matched! */                        kranal_reply(ni, conn, lntmsg);                } else {                        /* No match */                        tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);                        if (tx != NULL) {                                tx->tx_msg.ram_u.completion.racm_cookie =                                         rxmsg->ram_u.get.ragm_cookie;                                kranal_post_fma(conn, tx);                        }                }                kranal_consume_rxmsg(conn, NULL, 0);                return 0;        }}intkranal_thread_start (int(*fn)(void *arg), void *arg){        long    pid = kernel_thread(fn, arg, 0);        if (pid < 0)                return(int)pid;        atomic_inc(&kranal_data.kra_nthreads);        return 0;}voidkranal_thread_fini (void){        atomic_dec(&kranal_data.kra_nthreads);}intkranal_check_conn_timeouts (kra_conn_t *conn){        kra_tx_t          *tx;        struct list_head  *ttmp;        unsigned long      flags;        long               timeout;        unsigned long      now = jiffies;        LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||                 conn->rac_state == RANAL_CONN_CLOSING);        if (!conn->rac_close_sent &&            time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {                /* not sent in a while; schedule conn so scheduler sends a keepalive */                CDEBUG(D_NET, "Scheduling keepalive %p->%s\n",                       conn, libcfs_nid2str(conn->rac_peer->rap_nid));                kranal_schedule_conn(conn);        }        timeout = conn->rac_timeout * HZ;        if (!conn->rac_close_recvd &&            time_after_eq(now, conn->rac_last_rx + timeout)) {                CERROR("%s received from %s within %lu seconds\n",                       (conn->rac_state == RANAL_CONN_ESTABLISHED) ?                       "Nothing" : "CLOSE not",                       libcfs_nid2str(conn->rac_peer->rap_nid),                        (now - conn->rac_last_rx)/HZ);                return -ETIMEDOUT;        }        if (conn->rac_state != RANAL_CONN_ESTABLISHED)                return 0;        /* Check the conn's queues are moving.  These are "belt+braces" checks,         * in case of hardware/software errors that make this conn seem         * responsive even though it isn't progressing its message queues. */        spin_lock_irqsave(&conn->rac_lock, flags);        list_for_each (ttmp, &conn->rac_fmaq) {                tx = list_entry(ttmp, kra_tx_t, tx_list);                if (time_after_eq(now, tx->tx_qtime + timeout)) {                        spin_unlock_irqrestore(&conn->rac_lock, flags);                        CERROR("tx on fmaq for %s blocked %lu seconds\n",                               libcfs_nid2str(conn->rac_peer->rap_nid),                               (now - tx->tx_qtime)/HZ);                        return -ETIMEDOUT;                }        }        list_for_each (ttmp, &conn->rac_rdmaq) {                tx = list_entry(ttmp, kra_tx_t, tx_list);                if (time_after_eq(now, tx->tx_qtime + timeout)) {                        spin_unlock_irqrestore(&conn->rac_lock, flags);                        CERROR("tx on rdmaq for %s blocked %lu seconds\n",                               libcfs_nid2str(conn->rac_peer->rap_nid),                                (now - tx->tx_qtime)/HZ);                        return -ETIMEDOUT;                }        }        list_for_each (ttmp, &conn->rac_replyq) {                tx = list_entry(ttmp, kra_tx_t, tx_list);                if (time_after_eq(now, tx->tx_qtime + timeout)) {                        spin_unlock_irqrestore(&conn->rac_lock, flags);                        CERROR("tx on replyq for %s blocked %lu seconds\n",                               libcfs_nid2str(conn->rac_peer->rap_nid),                               (now - tx->tx_qtime)/HZ);                        return -ETIMEDOUT;                }        }        spin_unlock_irqrestore(&conn->rac_lock, flags);        return 0;}voidkranal_reaper_check (int idx, unsigned long *min_timeoutp){        struct list_head  *conns = &kranal_data.kra_conns[idx];        struct list_head  *ctmp;        kra_conn_t        *conn;        unsigned long      flags;        int                rc; again:        /* NB. We expect to check all the conns and not find any problems, so         * we just use a shared lock while we take a look... */        read_lock(&kranal_data.kra_global_lock);        list_for_each (ctmp, conns) {                conn = list_entry(ctmp, kra_conn_t, rac_hashlist);                if (conn->rac_timeout < *min_timeoutp )                        *min_timeoutp = conn->rac_timeout;                if (conn->rac_keepalive < *min_timeoutp )                        *min_timeoutp = conn->rac_keepalive;                rc = kranal_check_conn_timeouts(conn);                if (rc == 0)                        continue;                kranal_conn_addref(conn);                read_unlock(&kranal_data.kra_global_lock);                CERROR("Conn to %s, cqid %d timed out\n",                       libcfs_nid2str(conn->rac_peer->rap_nid),                        conn->rac_cqid);                write_lock_irqsave(&kranal_data.kra_global_lock, flags);                switch (conn->rac_state) {                default:                        LBUG();                case RANAL_CONN_ESTABLISHED:                        kranal_close_conn_locked(conn, -ETIMEDOUT);                        break;                case RANAL_CONN_CLOSING:                        kranal_terminate_conn_locked(conn);                        break;                }                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);                kranal_conn_decref(conn);                /* start again now I've dropped the lock */                goto again;        }        read_unlock(&kranal_data.kra_global_lock);}intkranal_connd (void *arg){        long               id = (long)arg;        char               name[16];        wait_queue_t       wait;        unsigned long      flags;        kra_peer_t        *peer;        kra_acceptsock_t  *ras;        int                did_something;        snprintf(name, sizeof(name), "kranal_connd_%02ld", id);        cfs_daemonize(name);        cfs_block_allsigs();        init_waitqueue_entry(&wait, current);        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);        while (!kranal_data.kra_shutdown) {                did_something = 0;                if (!list_empty(&kranal_data.kra_connd_acceptq)) {                        ras = list_entry(kranal_data.kra_connd_acceptq.next,                                         kra_acceptsock_t, ras_list);                        list_del(&ras->ras_list);                        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);                        CDEBUG(D_NET,"About to handshake someone\n");                        kranal_conn_handshake(ras->ras_sock, NULL);                        kranal_free_acceptsock(ras);                        CDEBUG(D_NET,"Finished handshaking someone\n");                        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);                        did_something = 1;                }                if (!list_empty(&kranal_data.kra_connd_peers)) {                        peer = list_entry(kranal_data.kra_connd_peers.next,                                          kra_peer_t, rap_connd_list);                        list_del_init(&peer->rap_connd_list);                        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);                        kranal_connect(peer);                        kranal_peer_decref(peer);                        spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);                        did_something = 1;                }                if (did_something)                        continue;                set_current_state(TASK_INTERRUPTIBLE);                add_wait_queue_exclusive(&kranal_data.kra_connd_waitq, &wait);                spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);                schedule ();                set_current_state(TASK_RUNNING);                remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);                spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);        }        spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);        kranal_thread_fini();        return 0;}voidkranal_update_reaper_timeout(long timeout){        unsigned long   flags;        LASSERT (timeout > 0);        spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);        if (timeout < kranal_data.kra_new_min_timeout)                kranal_data.kra_new_min_timeout = timeout;        spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);}intkranal_reaper (void *arg){        wait_queue_t       wait;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -