⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 conn.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 3 页
字号:
                return libcfs_fcntl_nonblock(fd);}voidusocklnd_init_msg(ksock_msg_t *msg, int type){        msg->ksm_type           = type;        msg->ksm_csum           = 0;        msg->ksm_zc_req_cookie  = 0;        msg->ksm_zc_ack_cookie  = 0;}usock_tx_t *usocklnd_create_noop_tx(__u64 cookie){        usock_tx_t *tx;                LIBCFS_ALLOC (tx, sizeof(usock_tx_t));        if (tx == NULL)                return NULL;        tx->tx_size = sizeof(usock_tx_t);        tx->tx_lnetmsg = NULL;        usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);        tx->tx_msg.ksm_zc_ack_cookie = cookie;                tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;        tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =                offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_hdr);        tx->tx_iov = tx->tx_iova;        tx->tx_niov = 1;                return tx;}        usock_tx_t *usocklnd_create_tx(lnet_msg_t *lntmsg){        usock_tx_t   *tx;        unsigned int  payload_niov = lntmsg->msg_niov;         struct iovec *payload_iov = lntmsg->msg_iov;         unsigned int  payload_offset = lntmsg->msg_offset;        unsigned int  payload_nob = lntmsg->msg_len;        int           size = offsetof(usock_tx_t,                                      tx_iova[1 + payload_niov]);        LIBCFS_ALLOC (tx, size);        if (tx == NULL)                return NULL;        tx->tx_size = size;        tx->tx_lnetmsg = lntmsg;        tx->tx_resid = tx->tx_nob =                offsetof(ksock_msg_t,  ksm_u.lnetmsg.ksnm_payload) +                payload_nob;                usocklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);        tx->tx_msg.ksm_u.lnetmsg.ksnm_hdr = lntmsg->msg_hdr;        tx->tx_iova[0].iov_base = (void *)&tx->tx_msg;        tx->tx_iova[0].iov_len = offsetof(ksock_msg_t,                                          ksm_u.lnetmsg.ksnm_payload);        tx->tx_iov = tx->tx_iova;        tx->tx_niov = 1 +                 lnet_extract_iov(payload_niov, &tx->tx_iov[1],                                 payload_niov, payload_iov,                                 payload_offset, payload_nob);        return tx;}voidusocklnd_init_hello_msg(ksock_hello_msg_t *hello,                        lnet_ni_t *ni, int type, lnet_nid_t peer_nid){        usock_net_t *net = (usock_net_t *)ni->ni_data;        hello->kshm_magic       = LNET_PROTO_MAGIC;        hello->kshm_version     = KSOCK_PROTO_V2;        hello->kshm_nips        = 0;        hello->kshm_ctype       = type;                hello->kshm_dst_incarnation = 0; /* not used */        hello->kshm_src_incarnation = net->un_incarnation;        hello->kshm_src_pid = the_lnet.ln_pid;        hello->kshm_src_nid = ni->ni_nid;        hello->kshm_dst_nid = peer_nid;        hello->kshm_dst_pid = 0; /* not used */}usock_tx_t *usocklnd_create_hello_tx(lnet_ni_t *ni,                         int type, lnet_nid_t peer_nid){        usock_tx_t        *tx;        int                size;        ksock_hello_msg_t *hello;        size = sizeof(usock_tx_t) + offsetof(ksock_hello_msg_t, kshm_ips);        LIBCFS_ALLOC (tx, size);        if (tx == NULL)                return NULL;        tx->tx_size = size;        tx->tx_lnetmsg = NULL;        hello = (ksock_hello_msg_t *)&tx->tx_iova[1];        usocklnd_init_hello_msg(hello, ni, type, peer_nid);                tx->tx_iova[0].iov_base = (void *)hello;        tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =                offsetof(ksock_hello_msg_t, kshm_ips);        tx->tx_iov = tx->tx_iova;        tx->tx_niov = 1;        return tx;}usock_tx_t *usocklnd_create_cr_hello_tx(lnet_ni_t *ni,                            int type, lnet_nid_t peer_nid){        usock_tx_t              *tx;        int                      size;        lnet_acceptor_connreq_t *cr;        ksock_hello_msg_t       *hello;        size = sizeof(usock_tx_t) +                sizeof(lnet_acceptor_connreq_t) +                offsetof(ksock_hello_msg_t, kshm_ips);        LIBCFS_ALLOC (tx, size);        if (tx == NULL)                return NULL;        tx->tx_size = size;        tx->tx_lnetmsg = NULL;        cr = (lnet_acceptor_connreq_t *)&tx->tx_iova[1];        memset(cr, 0, sizeof(*cr));        cr->acr_magic   = LNET_PROTO_ACCEPTOR_MAGIC;        cr->acr_version = LNET_PROTO_ACCEPTOR_VERSION;        cr->acr_nid     = peer_nid;                hello = (ksock_hello_msg_t *)((char *)cr + sizeof(*cr));        usocklnd_init_hello_msg(hello, ni, type, peer_nid);                tx->tx_iova[0].iov_base = (void *)cr;        tx->tx_iova[0].iov_len = tx->tx_resid = tx->tx_nob =                sizeof(lnet_acceptor_connreq_t) +                offsetof(ksock_hello_msg_t, kshm_ips);        tx->tx_iov = tx->tx_iova;        tx->tx_niov = 1;        return tx;}voidusocklnd_destroy_tx(lnet_ni_t *ni, usock_tx_t *tx){        lnet_msg_t  *lnetmsg = tx->tx_lnetmsg;        int          rc = (tx->tx_resid == 0) ? 0 : -EIO;        LASSERT (ni != NULL || lnetmsg == NULL);        LIBCFS_FREE (tx, tx->tx_size);                if (lnetmsg != NULL) /* NOOP and hello go without lnetmsg */                lnet_finalize(ni, lnetmsg, rc);}voidusocklnd_destroy_txlist(lnet_ni_t *ni, struct list_head *txlist){        usock_tx_t *tx;        while (!list_empty(txlist)) {                tx = list_entry(txlist->next, usock_tx_t, tx_list);                list_del(&tx->tx_list);                                usocklnd_destroy_tx(ni, tx);        }}voidusocklnd_destroy_zcack_list(struct list_head *zcack_list){        usock_zc_ack_t *zcack;        while (!list_empty(zcack_list)) {                zcack = list_entry(zcack_list->next, usock_zc_ack_t, zc_list);                list_del(&zcack->zc_list);                                LIBCFS_FREE (zcack, sizeof(*zcack));        }}voidusocklnd_destroy_peer(usock_peer_t *peer){        usock_net_t *net = peer->up_ni->ni_data;        int          i;        for (i = 0; i < N_CONN_TYPES; i++)                LASSERT (peer->up_conns[i] == NULL);        LIBCFS_FREE (peer, sizeof (*peer));        pthread_mutex_lock(&net->un_lock);        if(--net->un_peercount == 0)                                pthread_cond_signal(&net->un_cond);        pthread_mutex_unlock(&net->un_lock);}voidusocklnd_destroy_conn(usock_conn_t *conn){        LASSERT (conn->uc_peer == NULL || conn->uc_ni == NULL);        if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {                LASSERT (conn->uc_peer != NULL);                lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, -EIO);        }        if (!list_empty(&conn->uc_tx_list)) {                LASSERT (conn->uc_peer != NULL);                                usocklnd_destroy_txlist(conn->uc_peer->up_ni, &conn->uc_tx_list);        }        usocklnd_destroy_zcack_list(&conn->uc_zcack_list);                if (conn->uc_peer != NULL)                usocklnd_peer_decref(conn->uc_peer);        if (conn->uc_ni != NULL)                lnet_ni_decref(conn->uc_ni);        if (conn->uc_tx_hello)                usocklnd_destroy_tx(NULL, conn->uc_tx_hello);        usocklnd_conn_free(conn);}intusocklnd_get_conn_type(lnet_msg_t *lntmsg){        int nob;        if (the_lnet.ln_pid & LNET_PID_USERFLAG)                return SOCKLND_CONN_ANY;        nob = offsetof(ksock_msg_t, ksm_u.lnetmsg.ksnm_payload) +                lntmsg->msg_len;                if (nob >= usock_tuns.ut_min_bulk)                return SOCKLND_CONN_BULK_OUT;        else                return SOCKLND_CONN_CONTROL;}int usocklnd_type2idx(int type){        switch (type) {        case SOCKLND_CONN_ANY:        case SOCKLND_CONN_CONTROL:                return 0;        case SOCKLND_CONN_BULK_IN:                return 1;        case SOCKLND_CONN_BULK_OUT:                return 2;        default:                LBUG();        }}usock_peer_t *usocklnd_find_peer_locked(lnet_ni_t *ni, lnet_process_id_t id){        struct list_head *peer_list = usocklnd_nid2peerlist(id.nid);        struct list_head *tmp;        usock_peer_t     *peer;        list_for_each (tmp, peer_list) {                peer = list_entry (tmp, usock_peer_t, up_list);                if (peer->up_ni != ni)                        continue;                if (peer->up_peerid.nid != id.nid ||                    peer->up_peerid.pid != id.pid)                        continue;                usocklnd_peer_addref(peer);                return peer;        }        return (NULL);}intusocklnd_create_peer(lnet_ni_t *ni, lnet_process_id_t id,                     usock_peer_t **peerp){        usock_net_t  *net = ni->ni_data;        usock_peer_t *peer;        int           i;        LIBCFS_ALLOC (peer, sizeof (*peer));        if (peer == NULL)                return -ENOMEM;        for (i = 0; i < N_CONN_TYPES; i++)                peer->up_conns[i] = NULL;        peer->up_peerid       = id;        peer->up_ni           = ni;        peer->up_incrn_is_set = 0;        peer->up_errored      = 0;        peer->up_last_alive   = 0;        cfs_atomic_set (&peer->up_refcount, 1); /* 1 ref for caller */        pthread_mutex_init(&peer->up_lock, NULL);                pthread_mutex_lock(&net->un_lock);        net->un_peercount++;                pthread_mutex_unlock(&net->un_lock);        *peerp = peer;        return 0;}/* Safely create new peer if needed. Save result in *peerp. * Returns 0 on success, <0 else */intusocklnd_find_or_create_peer(lnet_ni_t *ni, lnet_process_id_t id,                             usock_peer_t **peerp){        int           rc;        usock_peer_t *peer;        usock_peer_t *peer2;        usock_net_t  *net = ni->ni_data;        pthread_rwlock_rdlock(&usock_data.ud_peers_lock);        peer = usocklnd_find_peer_locked(ni, id);        pthread_rwlock_unlock(&usock_data.ud_peers_lock);        if (peer != NULL)                goto find_or_create_peer_done;        rc = usocklnd_create_peer(ni, id, &peer);        if (rc)                return rc;                pthread_rwlock_wrlock(&usock_data.ud_peers_lock);        peer2 = usocklnd_find_peer_locked(ni, id);        if (peer2 == NULL) {                if (net->un_shutdown) {                        pthread_rwlock_unlock(&usock_data.ud_peers_lock);                        usocklnd_peer_decref(peer); /* should destroy peer */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -