⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 handlers.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 3 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2001, 2002 Cluster File Systems, Inc. *   Author: Maxim Patlasov <maxim@clusterfs.com> * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * */#include "usocklnd.h"#include <unistd.h>#include <syscall.h>intusocklnd_notifier_handler(int fd){        int notification;        return syscall(SYS_read, fd, &notification, sizeof(notification));}voidusocklnd_exception_handler(usock_conn_t *conn){        pthread_mutex_lock(&conn->uc_lock);        if (conn->uc_state == UC_CONNECTING ||            conn->uc_state == UC_SENDING_HELLO)                usocklnd_conn_kill_locked(conn);                pthread_mutex_unlock(&conn->uc_lock);                }intusocklnd_read_handler(usock_conn_t *conn){        int rc;        int continue_reading;        int state;  read_again:        rc = 0;        pthread_mutex_lock(&conn->uc_lock);        state = conn->uc_state;                /* process special case: LNET calls lnd_recv() asyncronously */        if (state == UC_READY && conn->uc_rx_state == UC_RX_PARSE) {                /* still don't have usocklnd_recv() called */                rc = usocklnd_add_pollrequest(conn, POLL_RX_SET_REQUEST, 0);                if (rc == 0)                        conn->uc_rx_state = UC_RX_PARSE_WAIT;                else                        usocklnd_conn_kill_locked(conn);                pthread_mutex_unlock(&conn->uc_lock);                return rc;        }        pthread_mutex_unlock(&conn->uc_lock);        /* From here and below the conn cannot be changed         * asyncronously, except:         * 1) usocklnd_send() can work with uc_tx_list and uc_zcack_list,         * 2) usocklnd_shutdown() can change uc_state to UC_DEAD */        switch (state) {                        case UC_RECEIVING_HELLO:        case UC_READY:                if (conn->uc_rx_nob_wanted != 0) {                        /* read from conn fd as much wanted data as possible */                        rc = usocklnd_read_data(conn);                        if (rc == 0) /* partial read */                                break;                        if (rc < 0) {/* error happened or EOF */                                usocklnd_conn_kill(conn);                                break;                        }                }                /* process incoming data */                if (state == UC_READY )                        rc = usocklnd_read_msg(conn, &continue_reading);                else /* state == UC_RECEIVING_HELLO */                        rc = usocklnd_read_hello(conn, &continue_reading);                if (rc < 0) {                        usocklnd_conn_kill(conn);                        break;                }                                if (continue_reading)                        goto read_again;                break;        case UC_DEAD:                break;        default:                LBUG();        }                return rc;}/* Switch on rx_state. * Return 0 on success, 1 if whole packet is read, else return <0 * Always set cont_flag: 1 if we're ready to continue reading, else 0 * NB: If whole packet is read, cont_flag will be set to zero to take * care of fairess */intusocklnd_read_msg(usock_conn_t *conn, int *cont_flag){        int   rc = 0;        __u64 cookie;               *cont_flag = 0;        /* smth. new emerged in RX part - let's process it */        switch (conn->uc_rx_state) {        case UC_RX_KSM_HEADER:                if (conn->uc_flip) {                        __swab32s(&conn->uc_rx_msg.ksm_type);                        __swab32s(&conn->uc_rx_msg.ksm_csum);                        __swab64s(&conn->uc_rx_msg.ksm_zc_req_cookie);                        __swab64s(&conn->uc_rx_msg.ksm_zc_ack_cookie);                }                 /* we never send packets for wich zc-acking is required */                if (conn->uc_rx_msg.ksm_type != KSOCK_MSG_LNET ||                    conn->uc_rx_msg.ksm_zc_ack_cookie != 0) {                        conn->uc_errored = 1;                        return -EPROTO;                }                /* zc_req will be processed later, when                   lnet payload will be received */                usocklnd_rx_lnethdr_state_transition(conn);                *cont_flag = 1;                break;                        case UC_RX_LNET_HEADER:                if (the_lnet.ln_pid & LNET_PID_USERFLAG) {                        /* replace dest_nid,pid (ksocknal sets its own) */                        conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_nid =                                cpu_to_le64(conn->uc_peer->up_ni->ni_nid);                        conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr.dest_pid =                                cpu_to_le32(the_lnet.ln_pid);                                        } else if (conn->uc_peer->up_peerid.pid & LNET_PID_USERFLAG) {                         /* Userspace peer */                        lnet_process_id_t *id = &conn->uc_peer->up_peerid;                        lnet_hdr_t        *lhdr = &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr;                                                /* Substitute process ID assigned at connection time */                        lhdr->src_pid = cpu_to_le32(id->pid);                        lhdr->src_nid = cpu_to_le64(id->nid);                }                                conn->uc_rx_state = UC_RX_PARSE;                usocklnd_conn_addref(conn); /* ++ref while parsing */                                rc = lnet_parse(conn->uc_peer->up_ni,                                 &conn->uc_rx_msg.ksm_u.lnetmsg.ksnm_hdr,                                 conn->uc_peerid.nid, conn, 0);                                if (rc < 0) {                        /* I just received garbage: give up on this conn */                        conn->uc_errored = 1;                        usocklnd_conn_decref(conn);                        return -EPROTO;                }                /* Race with usocklnd_recv() is possible */                pthread_mutex_lock(&conn->uc_lock);                LASSERT (conn->uc_rx_state == UC_RX_PARSE ||                         conn->uc_rx_state == UC_RX_LNET_PAYLOAD);                                /* check whether usocklnd_recv() got called */                if (conn->uc_rx_state == UC_RX_LNET_PAYLOAD)                        *cont_flag = 1;                pthread_mutex_unlock(&conn->uc_lock);                break;                        case UC_RX_PARSE:                LBUG(); /* it's error to be here, because this special                         * case is handled by caller */                break;                        case UC_RX_PARSE_WAIT:                LBUG(); /* it's error to be here, because the conn                         * shouldn't wait for POLLIN event in this                         * state */                break;                        case UC_RX_LNET_PAYLOAD:                /* payload all received */                lnet_finalize(conn->uc_peer->up_ni, conn->uc_rx_lnetmsg, 0);                cookie = conn->uc_rx_msg.ksm_zc_req_cookie;                if (cookie != 0)                        rc = usocklnd_handle_zc_req(conn->uc_peer, cookie);                                if (rc != 0) {                        /* change state not to finalize twice */                        conn->uc_rx_state = UC_RX_KSM_HEADER;                        return -EPROTO;                }                                /* Fall through */                        case UC_RX_SKIPPING:                if (conn->uc_rx_nob_left != 0) {                        usocklnd_rx_skipping_state_transition(conn);                        *cont_flag = 1;                } else {                        usocklnd_rx_ksmhdr_state_transition(conn);                        rc = 1; /* whole packet is read */                }                break;        default:                LBUG(); /* unknown state */        }        return rc;}/* Handle incoming ZC request from sender. * NB: it's called only from read_handler, so we're sure that * the conn cannot become zombie in the middle of processing */intusocklnd_handle_zc_req(usock_peer_t *peer, __u64 cookie){        usock_conn_t   *conn;        usock_zc_ack_t *zc_ack;        int             type;        int             rc;        int             dummy;        LIBCFS_ALLOC (zc_ack, sizeof(*zc_ack));        if (zc_ack == NULL)                return -ENOMEM;        zc_ack->zc_cookie = cookie;                /* Let's assume that CONTROL is the best type for zcack,         * but userspace clients don't use typed connections */        if (the_lnet.ln_pid & LNET_PID_USERFLAG)                type = SOCKLND_CONN_ANY;        else                type = SOCKLND_CONN_CONTROL;                rc = usocklnd_find_or_create_conn(peer, type, &conn, NULL, zc_ack,                                          &dummy);        if (rc != 0) {                LIBCFS_FREE (zc_ack, sizeof(*zc_ack));                return rc;        }        usocklnd_conn_decref(conn);        return 0;}/* Switch on rx_state. * Return 0 on success, else return <0 * Always set cont_flag: 1 if we're ready to continue reading, else 0 */intusocklnd_read_hello(usock_conn_t *conn, int *cont_flag){        int                rc = 0;        ksock_hello_msg_t *hello = conn->uc_rx_hello;                *cont_flag = 0;                /* smth. new emerged in hello - let's process it */        switch (conn->uc_rx_state) {        case UC_RX_HELLO_MAGIC:                if (hello->kshm_magic == LNET_PROTO_MAGIC)                        conn->uc_flip = 0;                else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC))                        conn->uc_flip = 1;                else                        return -EPROTO;                usocklnd_rx_helloversion_state_transition(conn);                *cont_flag = 1;                break;                                case UC_RX_HELLO_VERSION:                if ((!conn->uc_flip &&                     (hello->kshm_version != KSOCK_PROTO_V2)) ||                    (conn->uc_flip &&                     (hello->kshm_version != __swab32(KSOCK_PROTO_V2))))                        return -EPROTO;                usocklnd_rx_hellobody_state_transition(conn);                *cont_flag = 1;                break;                        case UC_RX_HELLO_BODY:                if (conn->uc_flip) {                        ksock_hello_msg_t *hello = conn->uc_rx_hello;                        __swab32s(&hello->kshm_src_pid);                        __swab64s(&hello->kshm_src_nid);                        __swab32s(&hello->kshm_dst_pid);                        __swab64s(&hello->kshm_dst_nid);                        __swab64s(&hello->kshm_src_incarnation);                        __swab64s(&hello->kshm_dst_incarnation);                        __swab32s(&hello->kshm_ctype);                        __swab32s(&hello->kshm_nips);                }                if (conn->uc_rx_hello->kshm_nips > LNET_MAX_INTERFACES) {                        CERROR("Bad nips %d from ip %u.%u.%u.%u port %d\n",                               conn->uc_rx_hello->kshm_nips,                               HIPQUAD(conn->uc_peer_ip), conn->uc_peer_port);                        return -EPROTO;                }                                if (conn->uc_rx_hello->kshm_nips) {                                                usocklnd_rx_helloIPs_state_transition(conn);                        *cont_flag = 1;                        break;                }                /* fall through */        case UC_RX_HELLO_IPS:                if (conn->uc_activeflag == 1) /* active conn */                        rc = usocklnd_activeconn_hellorecv(conn);                else                          /* passive conn */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -