⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socklnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
                        } else if (rc == 0 && conn->ksnc_rx_started) {                                /* EOF in the middle of a message */                                rc = -EPROTO;                        }                        break;                }                /* Completed a fragment */                if (conn->ksnc_rx_nob_wanted == 0) {                        rc = 1;                        break;                }        }        ksocknal_connsock_decref(conn);        RETURN (rc);}voidksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx){        lnet_msg_t  *lnetmsg = tx->tx_lnetmsg;        int          rc = (tx->tx_resid == 0) ? 0 : -EIO;        ENTRY;        LASSERT(ni != NULL || tx->tx_conn != NULL);        if (tx->tx_conn != NULL)                ksocknal_conn_decref(tx->tx_conn);        if (ni == NULL && tx->tx_conn != NULL)                 ni = tx->tx_conn->ksnc_peer->ksnp_ni;        ksocknal_free_tx (tx);        if (lnetmsg != NULL) /* KSOCK_MSG_NOOP go without lnetmsg */                lnet_finalize (ni, lnetmsg, rc);        EXIT;}voidksocknal_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int error){        ksock_tx_t *tx;                while (!list_empty (txlist)) {                tx = list_entry (txlist->next, ksock_tx_t, tx_list);                if (error && tx->tx_lnetmsg != NULL) {                        CDEBUG (D_NETERROR, "Deleting packet type %d len %d %s->%s\n",                                le32_to_cpu (tx->tx_lnetmsg->msg_hdr.type),                                le32_to_cpu (tx->tx_lnetmsg->msg_hdr.payload_length),                                libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)),                                libcfs_nid2str(le64_to_cpu (tx->tx_lnetmsg->msg_hdr.dest_nid)));                } else if (error) {                        CDEBUG (D_NETERROR, "Deleting noop packet\n");                }                list_del (&tx->tx_list);                LASSERT (atomic_read(&tx->tx_refcount) == 1);                ksocknal_tx_done (ni, tx);        }}static voidksocknal_check_zc_req(ksock_tx_t *tx){        ksock_conn_t   *conn = tx->tx_conn;        ksock_peer_t   *peer = conn->ksnc_peer;        lnet_kiov_t    *kiov = tx->tx_kiov;        int             nkiov = tx->tx_nkiov;        /* Set tx_msg.ksm_zc_req_cookie to a unique non-zero cookie and add tx         * to ksnp_zc_req_list if some fragment of this message should be sent         * zero-copy.  Our peer will send an ACK containing this cookie when         * she has received this message to tell us we can signal completion.         * tx_msg.ksm_zc_req_cookie remains non-zero while tx is on         * ksnp_zc_req_list. */        if (conn->ksnc_proto != &ksocknal_protocol_v2x ||            !conn->ksnc_zc_capable)                return;                while (nkiov > 0) {                if (kiov->kiov_len >= *ksocknal_tunables.ksnd_zc_min_frag)                        break;                --nkiov;                ++kiov;        }        if (nkiov == 0)                return;                /* assign cookie and queue tx to pending list, it will be released when         * a matching ack is received. See ksocknal_handle_zc_ack() */        ksocknal_tx_addref(tx);        spin_lock(&peer->ksnp_lock);        LASSERT (tx->tx_msg.ksm_zc_req_cookie == 0);        tx->tx_msg.ksm_zc_req_cookie = peer->ksnp_zc_next_cookie++;         list_add_tail(&tx->tx_zc_list, &peer->ksnp_zc_req_list);        spin_unlock(&peer->ksnp_lock);}static voidksocknal_unzc_req(ksock_tx_t *tx){        ksock_peer_t   *peer = tx->tx_conn->ksnc_peer;        spin_lock(&peer->ksnp_lock);        if (tx->tx_msg.ksm_zc_req_cookie == 0) {                /* Not waiting for an ACK */                spin_unlock(&peer->ksnp_lock);                return;        }        tx->tx_msg.ksm_zc_req_cookie = 0;        list_del(&tx->tx_zc_list);        spin_unlock(&peer->ksnp_lock);        ksocknal_tx_decref(tx);}intksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx){        int            rc;        if (!tx->tx_checked_zc) {                tx->tx_checked_zc = 1;                ksocknal_check_zc_req(tx);        }              rc = ksocknal_transmit (conn, tx);        CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);        if (tx->tx_resid == 0) {                /* Sent everything OK */                LASSERT (rc == 0);                return (0);        }        if (rc == -EAGAIN)                return (rc);        if (rc == -ENOMEM) {                static int counter;                counter++;   /* exponential backoff warnings */                if ((counter & (-counter)) == counter)                        CWARN("%u ENOMEM tx %p (%u allocated)\n",                              counter, conn, atomic_read(&libcfs_kmemory));                /* Queue on ksnd_enomem_conns for retry after a timeout */                spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);                /* enomem list takes over scheduler's ref... */                LASSERT (conn->ksnc_tx_scheduled);                list_add_tail(&conn->ksnc_tx_list,                              &ksocknal_data.ksnd_enomem_conns);                if (!cfs_time_aftereq(cfs_time_add(cfs_time_current(),                                                   SOCKNAL_ENOMEM_RETRY),                                   ksocknal_data.ksnd_reaper_waketime))                        cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);                                spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);                return (rc);        }        /* Actual error */        LASSERT (rc < 0);        if (!conn->ksnc_closing) {                switch (rc) {                case -ECONNRESET:                        LCONSOLE_WARN("Host %u.%u.%u.%u reset our connection "                                      "while we were sending data; it may have "                                      "rebooted.\n",                                      HIPQUAD(conn->ksnc_ipaddr));                        break;                default:                        LCONSOLE_WARN("There was an unexpected network error "                                      "while writing to %u.%u.%u.%u: %d.\n",                                      HIPQUAD(conn->ksnc_ipaddr), rc);                        break;                }                CDEBUG(D_NET, "[%p] Error %d on write to %s"                       " ip %d.%d.%d.%d:%d\n", conn, rc,                       libcfs_id2str(conn->ksnc_peer->ksnp_id),                       HIPQUAD(conn->ksnc_ipaddr),                       conn->ksnc_port);        }                   ksocknal_unzc_req(tx);        /* it's not an error if conn is being closed */        ksocknal_close_conn_and_siblings (conn,                                           (conn->ksnc_closing) ? 0 : rc);        return (rc);}voidksocknal_launch_connection_locked (ksock_route_t *route){        /* called holding write lock on ksnd_global_lock */        LASSERT (!route->ksnr_scheduled);        LASSERT (!route->ksnr_connecting);        LASSERT ((ksocknal_route_mask() & ~route->ksnr_connected) != 0);                route->ksnr_scheduled = 1;              /* scheduling conn for connd */        ksocknal_route_addref(route);           /* extra ref for connd */                spin_lock_bh (&ksocknal_data.ksnd_connd_lock);                list_add_tail (&route->ksnr_connd_list,                       &ksocknal_data.ksnd_connd_routes);        cfs_waitq_signal (&ksocknal_data.ksnd_connd_waitq);                spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);}ksock_conn_t *ksocknal_find_conn_locked (int payload_nob, ksock_peer_t *peer){        struct list_head *tmp;        ksock_conn_t     *typed = NULL;        int               tnob  = 0;        ksock_conn_t     *fallback = NULL;        int               fnob     = 0;        ksock_conn_t     *conn;        list_for_each (tmp, &peer->ksnp_conns) {                ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);                int           hdr_nob = 0;#if SOCKNAL_ROUND_ROBIN                const int     nob = 0;#else                int           nob = atomic_read(&c->ksnc_tx_nob) +                                        SOCK_WMEM_QUEUED(c->ksnc_sock);#endif                LASSERT (!c->ksnc_closing);                LASSERT (c->ksnc_proto != NULL);                if (fallback == NULL || nob < fnob) {                        fallback = c;                        fnob     = nob;                }                if (!*ksocknal_tunables.ksnd_typed_conns)                        continue;                if (payload_nob == 0) {                        /* noop packet */                        hdr_nob = offsetof(ksock_msg_t, ksm_u);                } else {                        /* lnet packet */                        hdr_nob = (c->ksnc_proto == &ksocknal_protocol_v2x)?                                  offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload):                                  sizeof(lnet_hdr_t);                }                switch (c->ksnc_type) {                default:                        CERROR("ksnc_type bad: %u\n", c->ksnc_type);                        LBUG();                case SOCKLND_CONN_ANY:                        break;                case SOCKLND_CONN_BULK_IN:                        continue;                case SOCKLND_CONN_BULK_OUT:                        if ((hdr_nob + payload_nob) < *ksocknal_tunables.ksnd_min_bulk)                                continue;                        break;                case SOCKLND_CONN_CONTROL:                        if ((hdr_nob + payload_nob) >= *ksocknal_tunables.ksnd_min_bulk)                                continue;                        break;                }                if (typed == NULL || nob < tnob) {                        typed = c;                        tnob  = nob;                }        }        /* prefer the typed selection */        conn = (typed != NULL) ? typed : fallback;#if SOCKNAL_ROUND_ROBIN        if (conn != NULL) {                /* round-robin all else being equal */                list_del (&conn->ksnc_list);                list_add_tail (&conn->ksnc_list, &peer->ksnp_conns);        }#endif        return conn;}voidksocknal_next_mono_tx(ksock_conn_t *conn){        ksock_tx_t     *tx = conn->ksnc_tx_mono;        /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */        LASSERT(conn->ksnc_proto == &ksocknal_protocol_v2x);        LASSERT(!list_empty(&conn->ksnc_tx_queue));        LASSERT(tx != NULL);        if (tx->tx_list.next == &conn->ksnc_tx_queue) {                /* no more packets queued */                conn->ksnc_tx_mono = NULL;        } else {                conn->ksnc_tx_mono = list_entry(tx->tx_list.next, ksock_tx_t, tx_list);                LASSERT(conn->ksnc_tx_mono->tx_msg.ksm_type == tx->tx_msg.ksm_type);        }}intksocknal_piggyback_zcack(ksock_conn_t *conn, __u64 cookie){        ksock_tx_t     *tx = conn->ksnc_tx_mono;        /* Called holding BH lock: conn->ksnc_scheduler->kss_lock */        if (tx == NULL)                return 0;        if (tx->tx_msg.ksm_type == KSOCK_MSG_NOOP) {                /* tx is noop zc-ack, can't piggyback zc-ack cookie */                return 0;        }        LASSERT(tx->tx_msg.ksm_type == KSOCK_MSG_LNET);        LASSERT(tx->tx_msg.ksm_zc_ack_cookie == 0);        /* piggyback the zc-ack cookie */        tx->tx_msg.ksm_zc_ack_cookie = cookie;        ksocknal_next_mono_tx(conn);        return 1;}voidksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn){        ksock_sched_t *sched = conn->ksnc_scheduler;        ksock_msg_t   *msg = &tx->tx_msg;        ksock_tx_t    *ztx;        int            bufnob = 0;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -