⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 conn.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 3 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2001, 2002 Cluster File Systems, Inc. *   Author: Maxim Patlasov <maxim@clusterfs.com> * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * */#include "usocklnd.h"/* Return 1 if the conn is timed out, 0 else */intusocklnd_conn_timed_out(usock_conn_t *conn, cfs_time_t current_time){        if (conn->uc_tx_flag && /* sending is in progress */            cfs_time_aftereq(current_time, conn->uc_tx_deadline))                return 1;        if (conn->uc_rx_flag && /* receiving is in progress */            cfs_time_aftereq(current_time, conn->uc_rx_deadline))                return 1;                return 0;}voidusocklnd_conn_kill(usock_conn_t *conn){        pthread_mutex_lock(&conn->uc_lock);        if (conn->uc_state != UC_DEAD)                usocklnd_conn_kill_locked(conn);        pthread_mutex_unlock(&conn->uc_lock);        }/* Mark the conn as DEAD and schedule its deletion */voidusocklnd_conn_kill_locked(usock_conn_t *conn){        conn->uc_rx_flag = conn->uc_tx_flag = 0;        conn->uc_state = UC_DEAD;        usocklnd_add_killrequest(conn);}usock_conn_t *usocklnd_conn_allocate(){        usock_conn_t        *conn;        usock_pollrequest_t *pr;        LIBCFS_ALLOC (pr, sizeof(*pr));        if (pr == NULL)                return NULL;                LIBCFS_ALLOC (conn, sizeof(*conn));        if (conn == NULL) {                LIBCFS_FREE (pr, sizeof(*pr));                return NULL;        }        memset(conn, 0, sizeof(*conn));        conn->uc_preq = pr;        LIBCFS_ALLOC (conn->uc_rx_hello,                      offsetof(ksock_hello_msg_t,                               kshm_ips[LNET_MAX_INTERFACES]));        if (conn->uc_rx_hello == NULL) {                LIBCFS_FREE (pr, sizeof(*pr));                LIBCFS_FREE (conn, sizeof(*conn));                return NULL;        }        return conn;}voidusocklnd_conn_free(usock_conn_t *conn){        usock_pollrequest_t *pr = conn->uc_preq;        if (pr != NULL)                LIBCFS_FREE (pr, sizeof(*pr));        if (conn->uc_rx_hello != NULL)                LIBCFS_FREE (conn->uc_rx_hello,                             offsetof(ksock_hello_msg_t,                                      kshm_ips[LNET_MAX_INTERFACES]));                LIBCFS_FREE (conn, sizeof(*conn));}voidusocklnd_tear_peer_conn(usock_conn_t *conn){        usock_peer_t     *peer = conn->uc_peer;        int               idx = usocklnd_type2idx(conn->uc_type);        lnet_ni_t        *ni;        lnet_process_id_t id;        int               decref_flag  = 0;        int               killall_flag = 0;                if (peer == NULL) /* nothing to tear */                return;                pthread_mutex_lock(&peer->up_lock);        pthread_mutex_lock(&conn->uc_lock);                ni = peer->up_ni;        id = peer->up_peerid;        if (peer->up_conns[idx] == conn) {                if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD) {                        /* change state not to finalize twice */                        conn->uc_rx_state = UC_RX_KSM_HEADER;                        lnet_finalize(peer->up_ni, conn->uc_rx_lnetmsg, -EIO);                                        }                                usocklnd_destroy_txlist(peer->up_ni,                                        &conn->uc_tx_list);                peer->up_conns[idx] = NULL;                conn->uc_peer = NULL;                decref_flag = 1;                if(conn->uc_errored && !peer->up_errored)                        peer->up_errored = killall_flag = 1;        }                pthread_mutex_unlock(&conn->uc_lock);        if (killall_flag)                usocklnd_del_conns_locked(peer);        pthread_mutex_unlock(&peer->up_lock);                if (!decref_flag)                return;        usocklnd_conn_decref(conn);        usocklnd_peer_decref(peer);        usocklnd_check_peer_stale(ni, id);}/* Remove peer from hash list if all up_conns[i] is NULL && * hash table is the only consumer of the peer */voidusocklnd_check_peer_stale(lnet_ni_t *ni, lnet_process_id_t id){        usock_peer_t *peer;                pthread_rwlock_wrlock(&usock_data.ud_peers_lock);        peer = usocklnd_find_peer_locked(ni, id);        if (peer == NULL) {                pthread_rwlock_unlock(&usock_data.ud_peers_lock);                return;        }        if (cfs_atomic_read(&peer->up_refcount) == 2) {                int i;                for (i = 0; i < N_CONN_TYPES; i++)                        LASSERT (peer->up_conns[i] == NULL);                list_del(&peer->up_list);                                                        if (peer->up_errored &&                    (peer->up_peerid.pid & LNET_PID_USERFLAG) == 0)                        lnet_notify (peer->up_ni, peer->up_peerid.nid, 0,                                     cfs_time_seconds(peer->up_last_alive));                                usocklnd_peer_decref(peer);        }        usocklnd_peer_decref(peer);        pthread_rwlock_unlock(&usock_data.ud_peers_lock);}/* Returns 0 on success, <0 else */intusocklnd_create_passive_conn(lnet_ni_t *ni, int fd, usock_conn_t **connp){        int           rc;        __u32         peer_ip;        __u16         peer_port;        usock_conn_t *conn;        rc = libcfs_getpeername(fd, &peer_ip, &peer_port);        if (rc)                return rc;        rc = usocklnd_set_sock_options(fd);        if (rc)                return rc;        conn = usocklnd_conn_allocate();        if (conn == NULL)                return -ENOMEM;        usocklnd_rx_hellomagic_state_transition(conn);                conn->uc_fd = fd;        conn->uc_peer_ip = peer_ip;        conn->uc_peer_port = peer_port;        conn->uc_state = UC_RECEIVING_HELLO;        conn->uc_pt_idx = usocklnd_ip2pt_idx(peer_ip);        conn->uc_ni = ni;        CFS_INIT_LIST_HEAD (&conn->uc_tx_list);        CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);        pthread_mutex_init(&conn->uc_lock, NULL);        cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */        *connp = conn;        return 0;}/* Returns 0 on success, <0 else */intusocklnd_create_active_conn(usock_peer_t *peer, int type,                            usock_conn_t **connp){        int           rc;        int           fd;        usock_conn_t *conn;        __u32         dst_ip   = LNET_NIDADDR(peer->up_peerid.nid);        __u16         dst_port = lnet_acceptor_port();                conn = usocklnd_conn_allocate();        if (conn == NULL)                return -ENOMEM;        conn->uc_tx_hello = usocklnd_create_cr_hello_tx(peer->up_ni, type,                                                        peer->up_peerid.nid);        if (conn->uc_tx_hello == NULL) {                usocklnd_conn_free(conn);                return -ENOMEM;        }                                if (the_lnet.ln_pid & LNET_PID_USERFLAG)                rc = usocklnd_connect_cli_mode(&fd, dst_ip, dst_port);        else                rc = usocklnd_connect_srv_mode(&fd, dst_ip, dst_port);                if (rc) {                usocklnd_destroy_tx(NULL, conn->uc_tx_hello);                usocklnd_conn_free(conn);                return rc;        }                conn->uc_tx_deadline = cfs_time_shift(usock_tuns.ut_timeout);        conn->uc_tx_flag = 1;                conn->uc_fd = fd;        conn->uc_peer_ip = dst_ip;        conn->uc_peer_port = dst_port;        conn->uc_type = type;        conn->uc_activeflag = 1;        conn->uc_state = UC_CONNECTING;        conn->uc_pt_idx = usocklnd_ip2pt_idx(dst_ip);        conn->uc_ni = NULL;        conn->uc_peerid = peer->up_peerid;        conn->uc_peer = peer;        usocklnd_peer_addref(peer);        CFS_INIT_LIST_HEAD (&conn->uc_tx_list);        CFS_INIT_LIST_HEAD (&conn->uc_zcack_list);        pthread_mutex_init(&conn->uc_lock, NULL);        cfs_atomic_set(&conn->uc_refcount, 1); /* 1 ref for me */        *connp = conn;        return 0;}/* Returns 0 on success, <0 else */intusocklnd_connect_srv_mode(int *fdp, __u32 dst_ip, __u16 dst_port){        __u16 port;        int   fd;        int   rc;        for (port = LNET_ACCEPTOR_MAX_RESERVED_PORT;              port >= LNET_ACCEPTOR_MIN_RESERVED_PORT;              port--) {                /* Iterate through reserved ports. */                rc = libcfs_sock_create(&fd);                if (rc)                        return rc;                                                                        rc = libcfs_sock_bind_to_port(fd, port);                if (rc) {                        close(fd);                        continue;                }                rc = usocklnd_set_sock_options(fd);                if (rc) {                        close(fd);                        return rc;                }                rc = libcfs_sock_connect(fd, dst_ip, dst_port);                if (rc == 0) {                        *fdp = fd;                        return 0;                }                                if (rc != -EADDRINUSE && rc != -EADDRNOTAVAIL) {                        close(fd);                        return rc;                }                close(fd);        }        CERROR("Can't bind to any reserved port\n");        return rc;}/* Returns 0 on success, <0 else */intusocklnd_connect_cli_mode(int *fdp, __u32 dst_ip, __u16 dst_port){        int fd;        int rc;        rc = libcfs_sock_create(&fd);        if (rc)                return rc;                rc = usocklnd_set_sock_options(fd);        if (rc) {                close(fd);                return rc;        }        rc = libcfs_sock_connect(fd, dst_ip, dst_port);        if (rc) {                close(fd);                return rc;        }        *fdp = fd;        return 0;}intusocklnd_set_sock_options(int fd){        int rc;        rc = libcfs_sock_set_nagle(fd, usock_tuns.ut_socknagle);        if (rc)                return rc;        if (usock_tuns.ut_sockbufsiz) {                rc = libcfs_sock_set_bufsiz(fd, usock_tuns.ut_sockbufsiz);                if (rc)                        return rc;                }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -