⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tcp_module_lmt.c

📁 fortran并行计算包
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2006 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "tcp_module_impl.h"extern int h_errno;static int create_s_cookie (int data_sz, char **cookie, int *len);static int read_s_cookie (MPID_IOV cookie, int *data_sz);static int create_r_cookie (char *hostname, int port, int data_sz, char **cookie, int *len);static int read_r_cookie (MPID_IOV cookie, char **hostname, int *port, int *data_sz);static void free_cookie (void *c);static int set_sockopts (int fd);//#define TESTING_CHUNKING#ifdef TESTING_CHUNKING#define CHUNK 6299651//(32*1024)#endif#undef FUNCNAME#define FUNCNAME MPID_nem_tcp_module_lmt_pre_send#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPID_nem_tcp_module_lmt_pre_send (MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV *cookie){    int mpi_errno = MPI_SUCCESS;    int len;    MPIDI_msg_sz_t data_sz;    int dt_contig;    MPI_Aint dt_true_lb;    MPID_Datatype * dt_ptr;    MPIDI_CH3I_VC *vc_ch = (MPIDI_CH3I_VC *)vc->channel_private;    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_MODULE_LMT_PRE_SEND);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_MODULE_LMT_PRE_SEND);    MPIDI_Datatype_get_info (req->dev.user_count, req->dev.datatype, dt_contig, data_sz, dt_ptr, dt_true_lb);    mpi_errno = create_s_cookie (data_sz, &vc_ch->net.tcp.lmt_cookie, &len);    if (mpi_errno) MPIU_ERR_POP (mpi_errno);    cookie->MPID_IOV_BUF = vc_ch->net.tcp.lmt_cookie;    cookie->MPID_IOV_LEN = len; fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_MODULE_LMT_PRE_SEND);    return mpi_errno; fn_fail:    goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPID_nem_tcp_module_lmt_pre_recv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPID_nem_tcp_module_lmt_pre_recv (MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV s_cookie, MPID_IOV *r_cookie, int *send_cts){    int mpi_errno = MPI_SUCCESS;    int ret;    unsigned int len;    struct sockaddr_in saddr;    MPIDI_msg_sz_t data_sz;    int dt_contig;    MPI_Aint dt_true_lb;    MPID_Datatype * dt_ptr;    MPIDI_CH3I_VC *vc_ch = (MPIDI_CH3I_VC *)vc->channel_private;    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_MODULE_LMT_PRE_RECV);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_MODULE_LMT_PRE_RECV);    mpi_errno = read_s_cookie (s_cookie, &vc_ch->net.tcp.lmt_s_len);    if (mpi_errno) MPIU_ERR_POP (mpi_errno);    memset (&saddr, sizeof(saddr), 0);    if (!vc_ch->net.tcp.lmt_connected)    {        vc_ch->net.tcp.lmt_desc = socket (AF_INET, SOCK_STREAM, 0);        MPIU_ERR_CHKANDJUMP2 (vc_ch->net.tcp.lmt_desc == -1, mpi_errno, MPI_ERR_OTHER, "**sock_create", "**sock_create %s %d", strerror (errno), errno);        //        ret = fcntl (vc_ch->net.tcp.lmt_desc, F_SETFL, O_NONBLOCK);        //        MPIU_ERR_CHKANDJUMP2 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**fail", "**fail %s %d", strerror (errno), errno);        saddr.sin_family      = AF_INET;        saddr.sin_addr.s_addr = htonl (INADDR_ANY);        saddr.sin_port        = htons (0);        ret = bind (vc_ch->net.tcp.lmt_desc, (struct sockaddr *)&saddr, sizeof (saddr));        MPIU_ERR_CHKANDJUMP3 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**sock|poll|bind", "**sock|poll|bind %d %d %s", ntohs (saddr.sin_port), errno, strerror (errno));        len = sizeof (saddr);        ret = getsockname (vc_ch->net.tcp.lmt_desc, (struct sockaddr *)&saddr, &len);        MPIU_ERR_CHKANDJUMP2 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**fail", "**fail %s %d", strerror (errno), errno);        set_sockopts (vc_ch->net.tcp.lmt_desc);        ret = listen (vc_ch->net.tcp.lmt_desc, SOMAXCONN);        MPIU_ERR_CHKANDJUMP2 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**listen", "**listen %s %d", errno, strerror (errno));    }    MPIDI_Datatype_get_info (req->dev.user_count, req->dev.datatype, dt_contig, data_sz, dt_ptr, dt_true_lb);    mpi_errno = create_r_cookie (MPID_nem_hostname, ntohs (saddr.sin_port), data_sz, &vc_ch->net.tcp.lmt_cookie, &len);    if (mpi_errno) MPIU_ERR_POP (mpi_errno);    r_cookie->MPID_IOV_BUF = vc_ch->net.tcp.lmt_cookie;    r_cookie->MPID_IOV_LEN = len;    *send_cts = 1; fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_MODULE_LMT_PRE_RECV);    return mpi_errno; fn_fail:    goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPID_nem_tcp_module_lmt_start_send#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPID_nem_tcp_module_lmt_start_send (MPIDI_VC_t *vc, MPID_Request *req, MPID_IOV r_cookie){    int mpi_errno = MPI_SUCCESS;    int ret;    MPIDI_msg_sz_t data_sz;    int dt_contig;    MPI_Aint dt_true_lb;    MPID_Datatype * dt_ptr;    MPIDI_msg_sz_t last;    int nb;    int s_len = 0;    int r_len;    int r_port;    char *r_hostname;    MPIDI_CH3I_VC *vc_ch = (MPIDI_CH3I_VC *)vc->channel_private;    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_MODULE_LMT_START_SEND);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_MODULE_LMT_START_SEND);    mpi_errno = read_r_cookie (r_cookie, &r_hostname, &r_port, &r_len);    if (mpi_errno) MPIU_ERR_POP (mpi_errno);    free_cookie (vc_ch->net.tcp.lmt_cookie);    if (!vc_ch->net.tcp.lmt_connected)    {        struct sockaddr_in saddr;        struct hostent *hp;        vc_ch->net.tcp.lmt_desc = socket (AF_INET, SOCK_STREAM, 0);        MPIU_ERR_CHKANDJUMP2 (vc_ch->net.tcp.lmt_desc == -1, mpi_errno, MPI_ERR_OTHER, "**sock_create", "**sock_create %s %d", strerror (errno), errno);        //        ret = fcntl (vc_ch->net.tcp.lmt_desc, F_SETFL, O_NONBLOCK);        //        MPIU_ERR_CHKANDJUMP2 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**fail", "**fail %s %d", strerror (errno), errno);        hp = gethostbyname (r_hostname);        MPIU_ERR_CHKANDJUMP2 (hp == NULL, mpi_errno, MPI_ERR_OTHER, "**gethostbyname", "**gethostbyname %s %d", hstrerror (h_errno), h_errno);        memset (&saddr, sizeof(saddr), 0);        saddr.sin_family = AF_INET;        saddr.sin_port   = htons (r_port);        MPID_NEM_MEMCPY (&saddr.sin_addr, hp->h_addr, hp->h_length);        set_sockopts (vc_ch->net.tcp.lmt_desc);        ret = connect (vc_ch->net.tcp.lmt_desc, (struct sockaddr *)&saddr, sizeof(saddr));        MPIU_ERR_CHKANDJUMP2 (ret == -1, mpi_errno, MPI_ERR_OTHER, "**fail", "**fail %s %d", strerror (errno), errno);        vc_ch->net.tcp.lmt_connected = 1;    }    MPIDI_Datatype_get_info (req->dev.user_count, req->dev.datatype, dt_contig, data_sz, dt_ptr, dt_true_lb);    if (r_len < data_sz)    {        /* message will be truncated */        s_len = data_sz;        data_sz = r_len; 	req->status.MPI_ERROR = MPIU_ERR_SET2 (mpi_errno, MPI_ERR_TRUNCATE, "**truncate", "**truncate %d %d", s_len, r_len);    }    MPID_Segment_init (req->dev.user_buf, req->dev.user_count, req->dev.datatype, &req->dev.segment, 0);    req->dev.segment_first = 0;    req->dev.segment_size = data_sz;    req->dev.iov_count = MPID_IOV_LIMIT;    req->dev.iov_offset = 0;    last = data_sz;    do    {        int iov_offset;        int left_to_send;        MPID_Segment_pack_vector (&req->dev.segment, req->dev.segment_first, &last, req->dev.iov, &req->dev.iov_count);        left_to_send = last - req->dev.segment_first;        iov_offset = 0;#ifdef TESTING_CHUNKING        {            char *buf = req->dev.iov[0].MPID_IOV_BUF;            int l;            while (left_to_send)            {                if (left_to_send > CHUNK)                    l = CHUNK;                else                    l = left_to_send;                do                    nb = write (vc_ch->net.tcp.lmt_desc, buf, l);                while (nb == -1 && errno == EINTR);                MPIU_ERR_CHKANDJUMP (nb == -1, mpi_errno, MPI_ERR_OTHER, "**sock_writev");                left_to_send -= nb;                buf += nb;            }            MPIDI_CH3U_Request_complete (req);            goto fn_exit;        }#endif        do            nb = writev (vc_ch->net.tcp.lmt_desc, &req->dev.iov[iov_offset], req->dev.iov_count - iov_offset);        while (nb == -1 && errno == EINTR);        MPIU_ERR_CHKANDJUMP (nb == -1, mpi_errno, MPI_ERR_OTHER, "**sock_writev");        left_to_send -= nb;        while (left_to_send)        { /* send rest of iov */            while (nb >= req->dev.iov[iov_offset].MPID_IOV_LEN)            { /* update iov to reflect sent bytes */                nb -= req->dev.iov[iov_offset].MPID_IOV_LEN;                ++iov_offset;            }            req->dev.iov[iov_offset].MPID_IOV_BUF = (char *)req->dev.iov[iov_offset].MPID_IOV_BUF + nb;            req->dev.iov[iov_offset].MPID_IOV_LEN -= nb;            do                nb = writev (vc_ch->net.tcp.lmt_desc, &req->dev.iov[iov_offset], req->dev.iov_count - iov_offset);            while (nb == -1 && errno == EINTR);            MPIU_ERR_CHKANDJUMP (nb == -1, mpi_errno, MPI_ERR_OTHER, "**sock_writev");            left_to_send -= nb;        }    }    while (last < data_sz);    MPIDI_CH3U_Request_complete (req); fn_exit:    MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_TCP_MODULE_LMT_START_SEND);    return mpi_errno; fn_fail:    goto fn_exit;}#undef FUNCNAME#define FUNCNAME MPID_nem_tcp_module_lmt_start_recv#undef FCNAME#define FCNAME MPIDI_QUOTE(FUNCNAME)int MPID_nem_tcp_module_lmt_start_recv (MPIDI_VC_t *vc, MPID_Request *req){    int mpi_errno = MPI_SUCCESS;    int ret;    MPIDI_msg_sz_t data_sz;    int dt_contig;    MPI_Aint dt_true_lb;    MPID_Datatype * dt_ptr;    MPIDI_msg_sz_t last;    int nb;    int r_len;    MPIDI_CH3I_VC *vc_ch = (MPIDI_CH3I_VC *)vc->channel_private;    MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_TCP_MODULE_LMT_START_RECV);    MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_TCP_MODULE_LMT_START_RECV);    free_cookie (vc_ch->net.tcp.lmt_cookie);    if (!vc_ch->net.tcp.lmt_connected)    {

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -