⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socket_write.c

📁 刚才是说明 现在是安装程序在 LINUX环境下进行编程的MPICH安装文件
💻 C
字号:
/* -*- Mode: C; c-basic-offset:4 ; -*- *//* *  (C) 2001 by Argonne National Laboratory. *      See COPYRIGHT in top-level directory. */#include "socketimpl.h"#ifdef WITH_METHOD_SHMint socket_write_shm(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr);#endif#ifdef WITH_METHOD_VIAint socket_write_via(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr);#endif#ifdef WITH_METHOD_VIA_RDMAint socket_write_via_rdma(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr);#endifint socket_write_vec(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr);int socket_write_tmp(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr);int socket_write_simple(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr);int socket_write(MPIDI_VC *vc_ptr){    MM_Car *car_ptr;    MM_Segment_buffer *buf_ptr;    int ret_val;    MPIDI_STATE_DECL(MPID_STATE_SOCKET_WRITE);    MPIDI_FUNC_ENTER(MPID_STATE_SOCKET_WRITE);    if (!vc_ptr->data.socket.connected)    {	MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE);	return MPI_SUCCESS;    }    if (vc_ptr->writeq_head == NULL)    {	msg_printf("socket_write: write called with no vc's in the write queue.\n");	MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE);	return MPI_SUCCESS;    }    car_ptr = vc_ptr->writeq_head;    buf_ptr = car_ptr->buf_ptr;    switch (buf_ptr->type)    {#ifdef WITH_METHOD_SHM    case MM_SHM_BUFFER:	ret_val = socket_write_shm(vc_ptr, car_ptr, buf_ptr);	MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE);	return ret_val;	break;#endif#ifdef WITH_METHOD_VIA    case MM_VIA_BUFFER:	ret_val = socket_write_via(vc_ptr, car_ptr, buf_ptr);	MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE);	return ret_val;	break;#endif#ifdef WITH_METHOD_VIA_RDMA    case MM_VIA_RDMA_BUFFER:	ret_val = socket_write_via_rdma(vc_ptr, car_ptr, buf_ptr);	MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE);	return ret_val;	break;#endif    case MM_VEC_BUFFER:	ret_val = (buf_ptr->vec.num_cars_outstanding > 0) ? socket_write_vec(vc_ptr, car_ptr, buf_ptr) : MPI_SUCCESS;	MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE);	return ret_val;	break;    case MM_SIMPLE_BUFFER:	ret_val = socket_write_simple(vc_ptr, car_ptr, buf_ptr);	MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE);	return ret_val;	break;    case MM_TMP_BUFFER:	ret_val = socket_write_tmp(vc_ptr, car_ptr, buf_ptr);	MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE);	return ret_val;	break;#ifdef WITH_METHOD_IB    case MM_IB_BUFFER:	ret_val = socket_write_ib(vc_ptr, car_ptr, buf_ptr);	MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE);	return ret_val;	break;#endif#ifdef WITH_METHOD_NEW    case MM_NEW_METHOD_BUFFER:	ret_val = socket_write_new(vc_ptr, car_ptr, buf_ptr);	MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE);	return ret_val;	break;#endif    case MM_NULL_BUFFER:	err_printf("Error: socket_write called on a null buffer\n");	break;    default:	err_printf("Error: socket_write: unknown or unsupported buffer type: %d\n", buf_ptr->type);	break;    }    MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE);    return -1;}#ifdef WITH_METHOD_SHMint socket_write_shm(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr){    MPIDI_STATE_DECL(MPID_STATE_SOCKET_WRITE_SHM);    MPIDI_FUNC_ENTER(MPID_STATE_SOCKET_WRITE_SHM);    MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_SHM);    return MPI_SUCCESS;}#endif#ifdef WITH_METHOD_VIAint socket_write_via(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr){    MPIDI_STATE_DECL(MPID_STATE_SOCKET_WRITE_VIA);    MPIDI_FUNC_ENTER(MPID_STATE_SOCKET_WRITE_VIA);    MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_VIA);    return MPI_SUCCESS;}#endif#ifdef WITH_METHOD_VIA_RDMAint socket_write_via_rdma(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr){    MPIDI_STATE_DECL(MPID_STATE_SOCKET_WRITE_VIA_RDMA);    MPIDI_FUNC_ENTER(MPID_STATE_SOCKET_WRITE_VIA_RDMA);    MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_VIA_RDMA);    return MPI_SUCCESS;}#endifint socket_write_vec(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr){    int error;    int num_written;    int cur_index;    MPID_IOV *car_vec, *buf_vec;    int num_left, i;    MPIDI_STATE_DECL(MPID_STATE_SOCKET_WRITE_VEC);        MPIDI_FUNC_ENTER(MPID_STATE_SOCKET_WRITE_VEC);#ifdef MPICH_DEV_BUILD    /* this function assumes that buf_ptr->vec.num_cars_outstanding > 0 */    if (buf_ptr->vec.num_cars_outstanding == 0)    {	err_printf("Error: socket_write_vec called when num_cars_outstanding == 0.\n");	return -1;    }#endif    /* num_read_copy is the amount of data described by the writer's vector in car_ptr->data.socket.buf.vec_write */    /* num_read is the amount of data described by the reader's vector in buf_ptr->vec */    /* If they are not equal then the writer's car needs to be updated */    if (car_ptr->data.socket.buf.vec_write.num_read_copy != buf_ptr->vec.num_read)    {	/* update vector */	cur_index = car_ptr->data.socket.buf.vec_write.cur_index;	car_vec = car_ptr->data.socket.buf.vec_write.vec;	buf_vec = buf_ptr->vec.vec;		/* update num_read_copy */	car_ptr->data.socket.buf.vec_write.num_read_copy = buf_ptr->vec.num_read;		/* copy the buf vector into the car vector from the current index to the end */	memcpy(&car_vec[cur_index], &buf_vec[cur_index], 	    (buf_ptr->vec.vec_size - cur_index) * sizeof(MPID_IOV));	car_vec[cur_index].MPID_IOV_BUF = 	    (char*)car_vec[cur_index].MPID_IOV_BUF + car_ptr->data.socket.buf.vec_write.num_written_at_cur_index;	car_vec[cur_index].MPID_IOV_LEN = car_vec[cur_index].MPID_IOV_LEN - car_ptr->data.socket.buf.vec_write.num_written_at_cur_index;	/* modify the vector copied from buf_ptr->vec to represent only the data that has been read 	 * This is done by traversing the vector, subtracting the lengths of each buffer until all the read	 * data is accounted for.	 */	/* set the size of the car vector to zero */	car_ptr->data.socket.buf.vec_write.vec_size = 0;		/* add vector elements to the size until all the read data is accounted for */	num_left = car_ptr->data.socket.buf.vec_write.num_read_copy - car_ptr->data.socket.buf.vec_write.cur_num_written;	i = cur_index;	while (num_left > 0)	{	    car_ptr->data.socket.buf.vec_write.vec_size++;	    num_left -= car_vec[i].MPID_IOV_LEN;	    i++;	}	/* if the last vector buffer is larger than the amount of data read into that buffer,	update the length field in the car's copy of the vector to represent only the read data */	if (num_left < 0)	{	    car_vec[i].MPID_IOV_LEN += num_left;	}		/* at this point the vec in the writer's car describes all the currently read data */    }    /* num_read_copy is the amount of data described by the writer's vector in car_ptr->data.socket.buf.vec_write */    /* cur_num_written is the amount of data in this car that has already been written */    /* If they are not equal then there is data available to be written */    if (car_ptr->data.socket.buf.vec_write.cur_num_written < car_ptr->data.socket.buf.vec_write.num_read_copy)    {	/* write */	if (car_ptr->data.socket.buf.vec_write.vec_size == 1) /* optimization for single buffer writes */	{	    /*num_written = bwrite(vc_ptr->data.socket.bfd, 		car_ptr->data.socket.buf.vec_write.vec[car_ptr->data.socket.buf.vec_write.cur_index].MPID_IOV_BUF,		car_ptr->data.socket.buf.vec_write.vec[car_ptr->data.socket.buf.vec_write.cur_index].MPID_IOV_LEN);	    if (num_written == SOCKET_ERROR)		*/	    if ((error = sock_write(vc_ptr->data.socket.sock,		car_ptr->data.socket.buf.vec_write.vec[car_ptr->data.socket.buf.vec_write.cur_index].MPID_IOV_BUF,		car_ptr->data.socket.buf.vec_write.vec[car_ptr->data.socket.buf.vec_write.cur_index].MPID_IOV_LEN,		&num_written)) != SOCK_SUCCESS)	    {		socket_print_sock_error(error, "socket_write: sock_write failed");		MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_VEC);		return -1;	    }	}	else	{	    /*num_written = bwritev(		vc_ptr->data.socket.bfd, 		&car_ptr->data.socket.buf.vec_write.vec[car_ptr->data.socket.buf.vec_write.cur_index], 		car_ptr->data.socket.buf.vec_write.vec_size);	    if (num_written == SOCKET_ERROR)		*/	    if ((error = sock_writev(		vc_ptr->data.socket.sock,		&car_ptr->data.socket.buf.vec_write.vec[car_ptr->data.socket.buf.vec_write.cur_index], 		car_ptr->data.socket.buf.vec_write.vec_size, &num_written)) != SOCK_SUCCESS)	    {		socket_print_sock_error(error, "socket_write: sock_writev failed");		MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_VEC);		return -1;	    }	}	/* update vector */	car_ptr->data.socket.buf.vec_write.cur_num_written += num_written;	car_ptr->data.socket.buf.vec_write.total_num_written += num_written;	if (car_ptr->data.socket.buf.vec_write.cur_num_written == buf_ptr->vec.buf_size)	{	    /* reset this car */	    car_ptr->data.socket.buf.vec_write.cur_index = 0;	    car_ptr->data.socket.buf.vec_write.num_read_copy = 0;	    car_ptr->data.socket.buf.vec_write.cur_num_written = 0;	    car_ptr->data.socket.buf.vec_write.num_written_at_cur_index = 0;	    car_ptr->data.socket.buf.vec_write.vec_size = 0;	    /* signal that we have finished writing the current vector */	    mm_dec_atomic(&(buf_ptr->vec.num_cars_outstanding));	}	else	{	    num_left = num_written;	    i = car_ptr->data.socket.buf.vec_write.cur_index;	    while (num_left > 0)	    {		num_left -= car_ptr->data.socket.buf.vec_write.vec[i].MPID_IOV_LEN;		if (num_left > 0)		{		    i++;		}		else		{		    car_ptr->data.socket.buf.vec_write.vec[i].MPID_IOV_BUF = 			car_ptr->data.socket.buf.vec_write.vec[i].MPID_IOV_BUF +			car_ptr->data.socket.buf.vec_write.vec[i].MPID_IOV_LEN +			num_left;		    car_ptr->data.socket.buf.vec_write.num_written_at_cur_index = 			car_ptr->data.socket.buf.vec_write.vec[i].MPID_IOV_LEN + num_left;		    car_ptr->data.socket.buf.vec_write.vec[i].MPID_IOV_LEN = -num_left;		}	    }	    car_ptr->data.socket.buf.vec_write.cur_index = i;	}    }        /* if the entire mpi segment has been written, enqueue the car in the completion queue */    if (car_ptr->data.socket.buf.vec_write.total_num_written == buf_ptr->vec.segment_last)    {#ifdef MPICH_DEV_BUILD	if (car_ptr != car_ptr->vc_ptr->writeq_head)	{	    err_printf("Error: socket_write_vec not dequeueing the head write car.\n");	}#endif	socket_car_dequeue_write(car_ptr->vc_ptr);	car_ptr->next_ptr = NULL; /* prevent the next car from being enqueued by mm_cq_handle_write_car() */	mm_cq_enqueue(car_ptr);    }    MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_VEC);    return MPI_SUCCESS;}int socket_write_tmp(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr){    int error;    int num_written;    MPIDI_STATE_DECL(MPID_STATE_SOCKET_WRITE_TMP);    MPIDI_FUNC_ENTER(MPID_STATE_SOCKET_WRITE_TMP);    if ((car_ptr->data.socket.buf.tmp.num_written == buf_ptr->tmp.num_read) || (buf_ptr->tmp.num_read == 0))    {	MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_TMP);	return MPI_SUCCESS;    }    /* write as much as possible */    /*num_written = bwrite(vc_ptr->data.socket.bfd, 	(char*)(buf_ptr->tmp.buf) + car_ptr->data.socket.buf.tmp.num_written,	buf_ptr->tmp.num_read - car_ptr->data.socket.buf.tmp.num_written);    if (num_written == SOCKET_ERROR)	*/    if ((error = sock_write(vc_ptr->data.socket.sock, 	(char*)(buf_ptr->tmp.buf) + car_ptr->data.socket.buf.tmp.num_written,	buf_ptr->tmp.num_read - car_ptr->data.socket.buf.tmp.num_written, &num_written)) != SOCK_SUCCESS)    {	socket_print_sock_error(error, "socket_write_tmp: sock_write failed");    }    /* update the amount written */    car_ptr->data.socket.buf.tmp.num_written += num_written;    /* check to see if finished */    if (car_ptr->data.socket.buf.tmp.num_written == buf_ptr->tmp.len)    {	dbg_printf("num_written: %d\n", car_ptr->data.socket.buf.tmp.num_written);	/* remove from write queue and insert in completion queue */#ifdef MPICH_DEV_BUILD	if (car_ptr != vc_ptr->writeq_head)	{	    err_printf("Error: socket_write_tmp not dequeueing the head write car.\n");	}#endif	socket_car_dequeue_write(vc_ptr);	car_ptr->next_ptr = NULL; /* prevent the next car from being enqueued by mm_cq_handle_write_car() */	mm_cq_enqueue(car_ptr);    }    MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_TMP);    return MPI_SUCCESS;}int socket_write_simple(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr){    int error;    int num_written;    MPIDI_STATE_DECL(MPID_STATE_SOCKET_WRITE_SIMPLE);    MPIDI_FUNC_ENTER(MPID_STATE_SOCKET_WRITE_SIMPLE);    if ((car_ptr->data.socket.buf.simple.num_written == buf_ptr->simple.num_read) || (buf_ptr->simple.num_read == 0))    {	MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_SIMPLE);	return MPI_SUCCESS;    }    /* write as much as possible */    /*    num_written = bwrite(vc_ptr->data.socket.bfd, 	(char*)(buf_ptr->simple.buf) + car_ptr->data.socket.buf.simple.num_written,	buf_ptr->simple.num_read - car_ptr->data.socket.buf.simple.num_written);    if (num_written == SOCKET_ERROR)	*/    if ((error = sock_write(vc_ptr->data.socket.sock, 	(char*)(buf_ptr->simple.buf) + car_ptr->data.socket.buf.simple.num_written,	buf_ptr->simple.num_read - car_ptr->data.socket.buf.simple.num_written, &num_written)) != SOCK_SUCCESS)    {	socket_print_sock_error(error, "socket_write_tmp: sock_write failed.");    }    /* update the amount written */    car_ptr->data.socket.buf.simple.num_written += num_written;    /* check to see if finished */    if (car_ptr->data.socket.buf.simple.num_written == buf_ptr->simple.len)    {	dbg_printf("num_written: %d\n", car_ptr->data.socket.buf.simple.num_written);	/* remove from write queue and insert in completion queue */#ifdef MPICH_DEV_BUILD	if (car_ptr != vc_ptr->writeq_head)	{	    err_printf("Error: socket_write_tmp not dequeueing the head write car.\n");	}#endif	socket_car_dequeue_write(vc_ptr);	car_ptr->next_ptr = NULL; /* prevent the next car from being enqueued by mm_cq_handle_write_car() */	mm_cq_enqueue(car_ptr);    }    MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_SIMPLE);    return MPI_SUCCESS;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -