⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 conn.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 3 页
字号:
                        CERROR("Can't create peer: network shutdown\n");                        return -ESHUTDOWN;                }                                /* peer table will take 1 of my refs on peer */                usocklnd_peer_addref(peer);                list_add_tail (&peer->up_list,                               usocklnd_nid2peerlist(id.nid));        } else {                usocklnd_peer_decref(peer); /* should destroy peer */                peer = peer2;        }        pthread_rwlock_unlock(&usock_data.ud_peers_lock);          find_or_create_peer_done:                *peerp = peer;        return 0;}/* NB: both peer and conn locks are held */static intusocklnd_enqueue_zcack(usock_conn_t *conn, usock_zc_ack_t *zc_ack){                if (conn->uc_state == UC_READY &&            list_empty(&conn->uc_tx_list) &&            list_empty(&conn->uc_zcack_list) &&            !conn->uc_sending) {                int rc = usocklnd_add_pollrequest(conn, POLL_TX_SET_REQUEST,                                                  POLLOUT);                if (rc != 0)                        return rc;        }                        list_add_tail(&zc_ack->zc_list, &conn->uc_zcack_list);        return 0;}/* NB: both peer and conn locks are held * NB: if sending isn't in progress.  the caller *MUST* send tx * immediately after we'll return */static voidusocklnd_enqueue_tx(usock_conn_t *conn, usock_tx_t *tx,                    int *send_immediately){                if (conn->uc_state == UC_READY &&            list_empty(&conn->uc_tx_list) &&            list_empty(&conn->uc_zcack_list) &&            !conn->uc_sending) {                conn->uc_sending = 1;                *send_immediately = 1;                return;        }                        *send_immediately = 0;        list_add_tail(&tx->tx_list, &conn->uc_tx_list);}/* Safely create new conn if needed. Save result in *connp. * Returns 0 on success, <0 else */intusocklnd_find_or_create_conn(usock_peer_t *peer, int type,                             usock_conn_t **connp,                             usock_tx_t *tx, usock_zc_ack_t *zc_ack,                             int *send_immediately){        usock_conn_t *conn;        int           idx;        int           rc;        lnet_pid_t    userflag = peer->up_peerid.pid & LNET_PID_USERFLAG;                if (userflag)                type = SOCKLND_CONN_ANY;        idx = usocklnd_type2idx(type);                pthread_mutex_lock(&peer->up_lock);        if (peer->up_conns[idx] != NULL) {                conn = peer->up_conns[idx];                LASSERT(conn->uc_type == type);        } else {                if (userflag) {                        CERROR("Refusing to create a connection to "                               "userspace process %s\n",                               libcfs_id2str(peer->up_peerid));                        rc = -EHOSTUNREACH;                        goto find_or_create_conn_failed;                }                                rc = usocklnd_create_active_conn(peer, type, &conn);                if (rc) {                        peer->up_errored = 1;                        usocklnd_del_conns_locked(peer);                        goto find_or_create_conn_failed;                }                /* peer takes 1 of conn refcount */                usocklnd_link_conn_to_peer(conn, peer, idx);                                rc = usocklnd_add_pollrequest(conn, POLL_ADD_REQUEST, POLLOUT);                if (rc) {                        peer->up_conns[idx] = NULL;                        usocklnd_conn_decref(conn); /* should destroy conn */                        goto find_or_create_conn_failed;                }                usocklnd_wakeup_pollthread(conn->uc_pt_idx);        }                pthread_mutex_lock(&conn->uc_lock);        LASSERT(conn->uc_peer == peer);        LASSERT(tx == NULL || zc_ack == NULL);        if (tx != NULL) {                usocklnd_enqueue_tx(conn, tx, send_immediately);        } else {                rc = usocklnd_enqueue_zcack(conn, zc_ack);                        if (rc != 0) {                        usocklnd_conn_kill_locked(conn);                        pthread_mutex_unlock(&conn->uc_lock);                        goto find_or_create_conn_failed;                }        }        pthread_mutex_unlock(&conn->uc_lock);                 usocklnd_conn_addref(conn);        pthread_mutex_unlock(&peer->up_lock);        *connp = conn;        return 0;  find_or_create_conn_failed:        pthread_mutex_unlock(&peer->up_lock);        return rc;}voidusocklnd_link_conn_to_peer(usock_conn_t *conn, usock_peer_t *peer, int idx){        peer->up_conns[idx] = conn;                peer->up_errored    = 0; /* this new fresh conn will try                                  * revitalize even stale errored peer */}intusocklnd_invert_type(int type){        switch (type)        {        case SOCKLND_CONN_ANY:        case SOCKLND_CONN_CONTROL:                return (type);        case SOCKLND_CONN_BULK_IN:                return SOCKLND_CONN_BULK_OUT;        case SOCKLND_CONN_BULK_OUT:                return SOCKLND_CONN_BULK_IN;        default:                return SOCKLND_CONN_NONE;        }}voidusocklnd_conn_new_state(usock_conn_t *conn, int new_state){        pthread_mutex_lock(&conn->uc_lock);        if (conn->uc_state != UC_DEAD)                conn->uc_state = new_state;        pthread_mutex_unlock(&conn->uc_lock);}/* NB: peer is locked by caller */voidusocklnd_cleanup_stale_conns(usock_peer_t *peer, __u64 incrn,                             usock_conn_t *skip_conn){        int i;                if (!peer->up_incrn_is_set) {                peer->up_incarnation = incrn;                peer->up_incrn_is_set = 1;                return;        }        if (peer->up_incarnation == incrn)                return;        peer->up_incarnation = incrn;                for (i = 0; i < N_CONN_TYPES; i++) {                usock_conn_t *conn = peer->up_conns[i];                                if (conn == NULL || conn == skip_conn)                        continue;                pthread_mutex_lock(&conn->uc_lock);                        LASSERT (conn->uc_peer == peer);                conn->uc_peer = NULL;                peer->up_conns[i] = NULL;                if (conn->uc_state != UC_DEAD)                        usocklnd_conn_kill_locked(conn);                                pthread_mutex_unlock(&conn->uc_lock);                usocklnd_conn_decref(conn);                usocklnd_peer_decref(peer);        }}/* RX state transition to UC_RX_HELLO_MAGIC: update RX part to receive * MAGIC part of hello and set uc_rx_state */voidusocklnd_rx_hellomagic_state_transition(usock_conn_t *conn){        LASSERT(conn->uc_rx_hello != NULL);        conn->uc_rx_niov = 1;        conn->uc_rx_iov = conn->uc_rx_iova;        conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_magic;        conn->uc_rx_iov[0].iov_len =                conn->uc_rx_nob_wanted =                conn->uc_rx_nob_left =                sizeof(conn->uc_rx_hello->kshm_magic);        conn->uc_rx_state = UC_RX_HELLO_MAGIC;        conn->uc_rx_flag = 1; /* waiting for incoming hello */        conn->uc_rx_deadline = cfs_time_shift(usock_tuns.ut_timeout);}/* RX state transition to UC_RX_HELLO_VERSION: update RX part to receive * VERSION part of hello and set uc_rx_state */voidusocklnd_rx_helloversion_state_transition(usock_conn_t *conn){        LASSERT(conn->uc_rx_hello != NULL);        conn->uc_rx_niov = 1;        conn->uc_rx_iov = conn->uc_rx_iova;        conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_version;        conn->uc_rx_iov[0].iov_len =                conn->uc_rx_nob_wanted =                conn->uc_rx_nob_left =                sizeof(conn->uc_rx_hello->kshm_version);                conn->uc_rx_state = UC_RX_HELLO_VERSION;}/* RX state transition to UC_RX_HELLO_BODY: update RX part to receive * the rest  of hello and set uc_rx_state */voidusocklnd_rx_hellobody_state_transition(usock_conn_t *conn){        LASSERT(conn->uc_rx_hello != NULL);        conn->uc_rx_niov = 1;        conn->uc_rx_iov = conn->uc_rx_iova;        conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_src_nid;        conn->uc_rx_iov[0].iov_len =                conn->uc_rx_nob_wanted =                conn->uc_rx_nob_left =                offsetof(ksock_hello_msg_t, kshm_ips) -                offsetof(ksock_hello_msg_t, kshm_src_nid);                conn->uc_rx_state = UC_RX_HELLO_BODY;}/* RX state transition to UC_RX_HELLO_IPS: update RX part to receive * array of IPs and set uc_rx_state */voidusocklnd_rx_helloIPs_state_transition(usock_conn_t *conn){        LASSERT(conn->uc_rx_hello != NULL);        conn->uc_rx_niov = 1;        conn->uc_rx_iov = conn->uc_rx_iova;        conn->uc_rx_iov[0].iov_base = &conn->uc_rx_hello->kshm_ips;        conn->uc_rx_iov[0].iov_len =                conn->uc_rx_nob_wanted =                conn->uc_rx_nob_left =                conn->uc_rx_hello->kshm_nips *                sizeof(conn->uc_rx_hello->kshm_ips[0]);                conn->uc_rx_state = UC_RX_HELLO_IPS;}/* RX state transition to UC_RX_LNET_HEADER: update RX part to receive * LNET header and set uc_rx_state */voidusocklnd_rx_lnethdr_state_transition(usock_conn_t *conn){        conn->uc_rx_niov = 1;        conn->uc_rx_iov = conn->uc_rx_iova;        conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg.ksm_u.lnetmsg;                        conn->uc_rx_iov[0].iov_len =                conn->uc_rx_nob_wanted =                conn->uc_rx_nob_left =                sizeof(ksock_lnet_msg_t);                conn->uc_rx_state = UC_RX_LNET_HEADER;        conn->uc_rx_flag = 1;}/* RX state transition to UC_RX_KSM_HEADER: update RX part to receive * KSM header and set uc_rx_state */voidusocklnd_rx_ksmhdr_state_transition(usock_conn_t *conn){        conn->uc_rx_niov = 1;        conn->uc_rx_iov = conn->uc_rx_iova;        conn->uc_rx_iov[0].iov_base = &conn->uc_rx_msg;                        conn->uc_rx_iov[0].iov_len =                conn->uc_rx_nob_wanted =                conn->uc_rx_nob_left =                                        offsetof(ksock_msg_t, ksm_u);                conn->uc_rx_state = UC_RX_KSM_HEADER;        conn->uc_rx_flag = 0;}/* RX state transition to UC_RX_SKIPPING: update RX part for * skipping and set uc_rx_state */voidusocklnd_rx_skipping_state_transition(usock_conn_t *conn){        static char    skip_buffer[4096];        int            nob;        unsigned int   niov = 0;        int            skipped = 0;        int            nob_to_skip = conn->uc_rx_nob_left;                LASSERT(nob_to_skip != 0);        conn->uc_rx_iov = conn->uc_rx_iova;        /* Set up to skip as much as possible now.  If there's more left         * (ran out of iov entries) we'll get called again */        do {                nob = MIN (nob_to_skip, sizeof(skip_buffer));                conn->uc_rx_iov[niov].iov_base = skip_buffer;                conn->uc_rx_iov[niov].iov_len  = nob;                niov++;                skipped += nob;                nob_to_skip -=nob;        } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */                 niov < sizeof(conn->uc_rx_iova) / sizeof (struct iovec));        conn->uc_rx_niov = niov;        conn->uc_rx_nob_wanted = skipped;        conn->uc_rx_state = UC_RX_SKIPPING;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -