⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 handlers.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 3 页
字号:
                    list_empty(&conn->uc_zcack_list)) {                        conn->uc_tx_flag = 0;                        ret = usocklnd_add_pollrequest(conn,                                                      POLL_TX_SET_REQUEST, 0);                        if (ret)                                rc = ret;                }                pthread_mutex_unlock(&conn->uc_lock);                                break;        case UC_DEAD:                break;        default:                LBUG();        }        if (rc < 0)                usocklnd_conn_kill(conn);                return rc;}/* Return the first tx from tx_list with piggybacked zc_ack * from zcack_list when possible. If tx_list is empty, return * brand new noop tx for zc_ack from zcack_list. Return NULL * if an error happened */usock_tx_t *usocklnd_try_piggyback(struct list_head *tx_list_p,                       struct list_head *zcack_list_p){        usock_tx_t     *tx;        usock_zc_ack_t *zc_ack;        /* assign tx and zc_ack */        if (list_empty(tx_list_p))                tx = NULL;        else {                tx = list_entry(tx_list_p->next, usock_tx_t, tx_list);                list_del(&tx->tx_list);                /* already piggybacked or partially send */                if (tx->tx_msg.ksm_zc_ack_cookie ||                    tx->tx_resid != tx->tx_nob)                        return tx;        }                        if (list_empty(zcack_list_p)) {                /* nothing to piggyback */                return tx;        } else {                zc_ack = list_entry(zcack_list_p->next,                                    usock_zc_ack_t, zc_list);                list_del(&zc_ack->zc_list);        }                                                if (tx != NULL)                /* piggyback the zc-ack cookie */                tx->tx_msg.ksm_zc_ack_cookie = zc_ack->zc_cookie;        else                /* cannot piggyback, need noop */                tx = usocklnd_create_noop_tx(zc_ack->zc_cookie);                                     LIBCFS_FREE (zc_ack, sizeof(*zc_ack));        return tx;}/* All actions that we need after sending hello on active conn: * 1) update RX iov to receive hello * 2) state transition to UC_RECEIVING_HELLO * 3) notify poll_thread that we're waiting for incoming hello */intusocklnd_activeconn_hellosent(usock_conn_t *conn){        int rc = 0;                pthread_mutex_lock(&conn->uc_lock);        if (conn->uc_state != UC_DEAD) {                usocklnd_rx_hellomagic_state_transition(conn);                conn->uc_state = UC_RECEIVING_HELLO;                conn->uc_tx_flag = 0;                rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST, POLLIN);        }        pthread_mutex_unlock(&conn->uc_lock);        return rc;}/* All actions that we need after sending hello on passive conn: * 1) Cope with 1st easy case: conn is already linked to a peer * 2) Cope with 2nd easy case: remove zombie conn  * 3) Resolve race: *    a) find the peer *    b) link the conn to the peer if conn[idx] is empty *    c) if the conn[idx] isn't empty and is in READY state, *       remove the conn as duplicated *    d) if the conn[idx] isn't empty and isn't in READY state, *       override conn[idx] with the conn */intusocklnd_passiveconn_hellosent(usock_conn_t *conn){        usock_conn_t    *conn2;        usock_peer_t    *peer;        struct list_head tx_list;        struct list_head zcack_list;        int              idx;        int              rc = 0;        /* almost nothing to do if conn is already linked to peer hash table */        if (conn->uc_peer != NULL)                goto passive_hellosent_done;        /* conn->uc_peer == NULL, so the conn isn't accessible via         * peer hash list, so nobody can touch the conn but us */                if (conn->uc_ni == NULL) /* remove zombie conn */                goto passive_hellosent_connkill;                /* all code below is race resolution, because normally         * passive conn is linked to peer just after receiving hello */        CFS_INIT_LIST_HEAD (&tx_list);        CFS_INIT_LIST_HEAD (&zcack_list);                /* conn is passive and isn't linked to any peer,           so its tx and zc_ack lists have to be empty */        LASSERT (list_empty(&conn->uc_tx_list) &&                 list_empty(&conn->uc_zcack_list) &&                 conn->uc_sending == 0);        rc = usocklnd_find_or_create_peer(conn->uc_ni, conn->uc_peerid, &peer);        if (rc)                return rc;        idx = usocklnd_type2idx(conn->uc_type);                                /* try to link conn to peer */        pthread_mutex_lock(&peer->up_lock);                if (peer->up_conns[idx] == NULL) {                usocklnd_link_conn_to_peer(conn, peer, idx);                usocklnd_conn_addref(conn);                conn->uc_peer = peer;                usocklnd_peer_addref(peer);        } else {                conn2 = peer->up_conns[idx];                pthread_mutex_lock(&conn2->uc_lock);                if (conn2->uc_state == UC_READY) {                        /* conn2 is in READY state, so conn is "duplicated" */                        pthread_mutex_unlock(&conn2->uc_lock);                        pthread_mutex_unlock(&peer->up_lock);                        usocklnd_peer_decref(peer);                        goto passive_hellosent_connkill;                }                /* uc_state != UC_READY => switch conn and conn2 */                /* Relink txs and zc_acks from conn2 to conn.                 * We're sure that nobody but us can access to conn,                 * nevertheless we use mutex (if we're wrong yet,                 * deadlock is easy to see that corrupted list */                list_add(&tx_list, &conn2->uc_tx_list);                list_del_init(&conn2->uc_tx_list);                list_add(&zcack_list, &conn2->uc_zcack_list);                list_del_init(&conn2->uc_zcack_list);                        pthread_mutex_lock(&conn->uc_lock);                list_add_tail(&conn->uc_tx_list, &tx_list);                list_del_init(&tx_list);                list_add_tail(&conn->uc_zcack_list, &zcack_list);                list_del_init(&zcack_list);                conn->uc_peer = peer;                pthread_mutex_unlock(&conn->uc_lock);                                conn2->uc_peer = NULL; /* make conn2 zombie */                pthread_mutex_unlock(&conn2->uc_lock);                usocklnd_conn_decref(conn2);                usocklnd_link_conn_to_peer(conn, peer, idx);                usocklnd_conn_addref(conn);                conn->uc_peer = peer;        }        lnet_ni_decref(conn->uc_ni);        conn->uc_ni = NULL;        pthread_mutex_unlock(&peer->up_lock);        usocklnd_peer_decref(peer);  passive_hellosent_done:                /* safely transit to UC_READY state */        /* rc == 0 */        pthread_mutex_lock(&conn->uc_lock);        if (conn->uc_state != UC_DEAD) {                usocklnd_rx_ksmhdr_state_transition(conn);                /* we're ready to recive incoming packets and maybe                   already have smth. to transmit */                LASSERT (conn->uc_sending == 0);                if ( list_empty(&conn->uc_tx_list) &&                     list_empty(&conn->uc_zcack_list) ) {                        conn->uc_tx_flag = 0;                        rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,                                                 POLLIN);                } else {                        conn->uc_tx_deadline =                                cfs_time_shift(usock_tuns.ut_timeout);                        conn->uc_tx_flag = 1;                        rc = usocklnd_add_pollrequest(conn, POLL_SET_REQUEST,                                                      POLLIN | POLLOUT);                }                if (rc == 0)                        conn->uc_state = UC_READY;        }        pthread_mutex_unlock(&conn->uc_lock);        return rc;  passive_hellosent_connkill:        usocklnd_conn_kill(conn);        return 0;}/* Send as much tx data as possible. * Returns 0 or 1 on succsess, <0 if fatal error. * 0 means partial send or non-fatal error, 1 - complete. * Rely on libcfs_sock_writev() for differentiating fatal and * non-fatal errors. An error should be considered as non-fatal if: * 1) it still makes sense to continue reading && * 2) anyway, poll() will set up POLLHUP|POLLERR flags */intusocklnd_send_tx(usock_conn_t *conn, usock_tx_t *tx){        struct iovec *iov;        int           nob;        int           fd = conn->uc_fd;        cfs_time_t    t;                LASSERT (tx->tx_resid != 0);        do {                usock_peer_t *peer = conn->uc_peer;                LASSERT (tx->tx_niov > 0);                                nob = libcfs_sock_writev(fd, tx->tx_iov, tx->tx_niov);                if (nob < 0)                        conn->uc_errored = 1;                if (nob <= 0) /* write queue is flow-controlled or error */                        return nob;                                LASSERT (nob <= tx->tx_resid);                 tx->tx_resid -= nob;                t = cfs_time_current();                conn->uc_tx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));                if(peer != NULL)                        peer->up_last_alive = t;                /* "consume" iov */                 iov = tx->tx_iov;                do {                         LASSERT (tx->tx_niov > 0);                                                 if (nob < iov->iov_len) {                                 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);                                iov->iov_len -= nob;                                 break;                         }                         nob -= iov->iov_len;                         tx->tx_iov = ++iov;                         tx->tx_niov--;                 } while (nob != 0);                        } while (tx->tx_resid != 0);        return 1; /* send complete */}/* Read from wire as much data as possible. * Returns 0 or 1 on succsess, <0 if error or EOF. * 0 means partial read, 1 - complete */intusocklnd_read_data(usock_conn_t *conn){        struct iovec *iov;        int           nob;        cfs_time_t    t;        LASSERT (conn->uc_rx_nob_wanted != 0);        do {                usock_peer_t *peer = conn->uc_peer;                                LASSERT (conn->uc_rx_niov > 0);                                nob = libcfs_sock_readv(conn->uc_fd, conn->uc_rx_iov, conn->uc_rx_niov);                                if (nob <= 0) {/* read nothing or error */                        conn->uc_errored = 1;                        return nob;                }                                LASSERT (nob <= conn->uc_rx_nob_wanted);                 conn->uc_rx_nob_wanted -= nob;                conn->uc_rx_nob_left -= nob;                t = cfs_time_current();                conn->uc_rx_deadline = cfs_time_add(t, cfs_time_seconds(usock_tuns.ut_timeout));                if(peer != NULL)                        peer->up_last_alive = t;                                /* "consume" iov */                 iov = conn->uc_rx_iov;                do {                         LASSERT (conn->uc_rx_niov > 0);                                                 if (nob < iov->iov_len) {                                 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);                                 iov->iov_len -= nob;                                 break;                         }                         nob -= iov->iov_len;                         conn->uc_rx_iov = ++iov;                        conn->uc_rx_niov--;                 } while (nob != 0);                        } while (conn->uc_rx_nob_wanted != 0);        return 1; /* read complete */}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -