📄 ib_read.c
字号:
MPIDI_FUNC_EXIT(MPID_STATE_IB_HANDLE_READ_IB); return MPI_SUCCESS;}#endif#ifdef WITH_METHOD_NEWint ib_read_new(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr){ MPIDI_STATE_DECL(MPID_STATE_IB_READ_IB); MPIDI_FUNC_ENTER(MPID_STATE_IB_READ_IB); MPIDI_FUNC_EXIT(MPID_STATE_IB_READ_IB); return MPI_SUCCESS;}int ib_handle_read_new(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr, void *mem_ptr, int num_read){ MPIDI_STATE_DECL(MPID_STATE_IB_HANDLE_READ_IB); MPIDI_FUNC_ENTER(MPID_STATE_IB_HANDLE_READ_IB); MPIDI_FUNC_EXIT(MPID_STATE_IB_HANDLE_READ_IB); return MPI_SUCCESS;}#endifint ib_read_vec(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr){#if 0 int error;#endif MPIDI_STATE_DECL(MPID_STATE_IB_READ_VEC); MPIDI_FUNC_ENTER(MPID_STATE_IB_READ_VEC); if (buf_ptr->vec.num_cars_outstanding == 0) { /* get more buffers */ car_ptr->request_ptr->mm.get_buffers(car_ptr->request_ptr); /* reset the progress structures in the car */ car_ptr->data.ib.buf.vec_write.cur_index = 0; car_ptr->data.ib.buf.vec_write.num_read_copy = 0; car_ptr->data.ib.buf.vec_write.cur_num_written = 0; car_ptr->data.ib.buf.vec_write.num_written_at_cur_index = 0; car_ptr->data.ib.buf.vec_write.vec_size = 0; /* copy the vector from the buffer to the car */ memcpy(car_ptr->data.ib.buf.vec_read.vec, buf_ptr->vec.vec, buf_ptr->vec.vec_size * sizeof(MPID_IOV)); car_ptr->data.ib.buf.vec_read.vec_size = buf_ptr->vec.vec_size; buf_ptr->vec.num_read = 0; /* reset the number of outstanding write cars */ buf_ptr->vec.num_cars_outstanding = buf_ptr->vec.num_cars; }#if 0 if (car_ptr->data.ib.buf.vec_read.cur_num_read < buf_ptr->vec.buf_size) { /* read */ if (car_ptr->data.ib.buf.vec_read.vec_size == 1) /* optimization for single buffer reads */ { if ((error = sock_post_read(vc_ptr->data.ib.sock, car_ptr->data.ib.buf.vec_read.vec[car_ptr->data.ib.buf.vec_read.cur_index].MPID_IOV_BUF, car_ptr->data.ib.buf.vec_read.vec[car_ptr->data.ib.buf.vec_read.cur_index].MPID_IOV_LEN, NULL)) != SOCK_SUCCESS) { IB_Process.error = error; ib_print_sock_error(error, "ib_read_vec: sock_read failed."); MPIDI_FUNC_EXIT(MPID_STATE_IB_READ_VEC); return -1; } } else { if ((error = sock_post_readv(vc_ptr->data.ib.sock, &car_ptr->data.ib.buf.vec_read.vec[car_ptr->data.ib.buf.vec_read.cur_index], car_ptr->data.ib.buf.vec_read.vec_size, NULL)) != SOCK_SUCCESS) { IB_Process.error = error; ib_print_sock_error(error, "ib_read_vec: sock_readv failed."); MPIDI_FUNC_EXIT(MPID_STATE_IB_READ_VEC); return -1; } } MPIDI_FUNC_EXIT(MPID_STATE_IB_READ_VEC); return MPI_SUCCESS; }#endif MPIDI_FUNC_EXIT(MPID_STATE_IB_READ_VEC); return MPI_SUCCESS;}int ib_handle_read_vec(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr, void *mem_ptr, int num_read){ int num_left; int i, count; MPIDI_STATE_DECL(MPID_STATE_IB_HANDLE_READ_VEC); MPIDI_FUNC_ENTER(MPID_STATE_IB_HANDLE_READ_VEC); MPIU_dbg_printf("ib_handle_read_vec: received %d bytes total\n", num_read); num_left = num_read; i = car_ptr->data.ib.buf.vec_read.cur_index; while (num_left > 0) { count = min(car_ptr->data.ib.buf.vec_read.vec[i].MPID_IOV_LEN, (unsigned int)num_left); MPIU_dbg_printf("ib_handle_read_vec: copying %d bytes from index %d\n", count, i); memcpy(car_ptr->data.ib.buf.vec_read.vec[i].MPID_IOV_BUF, mem_ptr, count); mem_ptr = (char*)mem_ptr + count; num_left -= car_ptr->data.ib.buf.vec_read.vec[i].MPID_IOV_LEN; if (num_left > 0) { i++; } else { car_ptr->data.ib.buf.vec_read.vec[i].MPID_IOV_BUF = (char*)(car_ptr->data.ib.buf.vec_read.vec[i].MPID_IOV_BUF) + car_ptr->data.ib.buf.vec_read.vec[i].MPID_IOV_LEN + num_left; car_ptr->data.ib.buf.vec_read.vec[i].MPID_IOV_LEN = -num_left; } } car_ptr->data.ib.buf.vec_read.cur_index = i; buf_ptr->vec.num_read += num_read; car_ptr->data.ib.buf.vec_read.cur_num_read += num_read; car_ptr->data.ib.buf.vec_read.total_num_read += num_read; if (car_ptr->data.ib.buf.vec_read.cur_num_read == buf_ptr->vec.buf_size) { /* reset the car */ car_ptr->data.ib.buf.vec_read.cur_index = 0; car_ptr->data.ib.buf.vec_read.cur_num_read = 0; car_ptr->data.ib.buf.vec_read.vec_size = 0; } if (car_ptr->data.ib.buf.vec_read.total_num_read == buf_ptr->vec.segment_last) { ib_car_dequeue_read(vc_ptr, car_ptr); mm_cq_enqueue(car_ptr); } else { /* post read of the rest of the data*/ /*ib_read_data(vc_ptr);*/ /* How do I know if there are more reads posted or if I need to post them here? */ } MPIDI_FUNC_EXIT(MPID_STATE_IB_HANDLE_READ_VEC); return MPI_SUCCESS;}int ib_read_tmp(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr){#if 0 int error;#endif MPIDI_STATE_DECL(MPID_STATE_IB_READ_TMP); MPIDI_FUNC_ENTER(MPID_STATE_IB_READ_TMP); if (buf_ptr->tmp.buf == NULL) { /* get the tmp buffer */ car_ptr->request_ptr->mm.get_buffers(car_ptr->request_ptr); }#if 0 if ((error = sock_post_read(vc_ptr->data.ib.sock, (char*)(buf_ptr->tmp.buf) + buf_ptr->tmp.num_read, buf_ptr->tmp.len - buf_ptr->tmp.num_read, NULL)) != SOCK_SUCCESS) { IB_Process.error = error; ib_print_sock_error(error, "ib_read_tmp: sock_read failed."); MPIDI_FUNC_EXIT(MPID_STATE_IB_READ_TMP); return -1; }#endif MPIDI_FUNC_EXIT(MPID_STATE_IB_READ_TMP); return MPI_SUCCESS;}int ib_handle_read_tmp(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr, void *mem_ptr, int num_read){ MPIDI_STATE_DECL(MPID_STATE_IB_HANDLE_READ_TMP); MPIDI_FUNC_ENTER(MPID_STATE_IB_HANDLE_READ_TMP); /*msg_printf("num_read tmp: %d\n", num_read);*/ memcpy(buf_ptr->tmp.buf, mem_ptr, num_read); /* update the amount read */ buf_ptr->tmp.num_read += num_read; /* check to see if finished */ if (buf_ptr->tmp.num_read == buf_ptr->tmp.len) { dbg_printf("num_read: %d\n", buf_ptr->tmp.num_read); /* remove from read queue and insert in completion queue */ ib_car_dequeue_read(vc_ptr, car_ptr); /* If I put it in the completion queue, the car will be freed. This is bad because it needs to stay in the unexpected queue */ /*mm_cq_enqueue(car_ptr);*/ /* So I put the completion queue logic here, minus the freeing of the car */ ib_post_read_pkt(vc_ptr); mm_dec_cc_atomic(car_ptr->request_ptr); } else { /* post more read*/ /*ib_read_data(vc_ptr);*/ /* How do I know if there are more reads posted or if I need to post them here? */ } MPIDI_FUNC_EXIT(MPID_STATE_IB_HANDLE_READ_TMP); return MPI_SUCCESS;}int ib_read_simple(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr){#if 0 int error;#endif MPIDI_STATE_DECL(MPID_STATE_IB_READ_SIMPLE); MPIDI_FUNC_ENTER(MPID_STATE_IB_READ_SIMPLE); if (buf_ptr->simple.buf == NULL) { /* get the simple buffer */ /*car_ptr->request_ptr->mm.get_buffers(car_ptr->request_ptr);*/ err_printf("Error: ib_read_simple called with NULL simple buffer.\n"); return -1; }#if 0 if ((error = sock_post_read(vc_ptr->data.ib.sock, (char*)(buf_ptr->simple.buf) + buf_ptr->simple.num_read, buf_ptr->simple.len - buf_ptr->simple.num_read, NULL)) != SOCK_SUCCESS) { IB_Process.error = error; ib_print_sock_error(error, "ib_read_tmp: sock_read failed."); MPIDI_FUNC_EXIT(MPID_STATE_IB_READ_SIMPLE); return -1; }#endif MPIDI_FUNC_EXIT(MPID_STATE_IB_READ_SIMPLE); return MPI_SUCCESS;}int ib_handle_read_simple(MPIDI_VC *vc_ptr, MM_Car *car_ptr, MM_Segment_buffer *buf_ptr, void *mem_ptr, int num_read){ MPIDI_STATE_DECL(MPID_STATE_IB_HANDLE_READ_SIMPLE); MPIDI_FUNC_ENTER(MPID_STATE_IB_HANDLE_READ_SIMPLE); MPIU_dbg_printf("num_read simple: %d\n", num_read); memcpy(buf_ptr->simple.buf, mem_ptr, num_read); /* update the amount read */ buf_ptr->simple.num_read += num_read; /* check to see if finished */ if (buf_ptr->simple.num_read == buf_ptr->simple.len) { dbg_printf("num_read: %d\n", buf_ptr->simple.num_read); /* remove from read queue and insert in completion queue */ ib_car_dequeue_read(vc_ptr, car_ptr); mm_cq_enqueue(car_ptr); } else { /* post a read of the rest of the data*/ /*ib_read_data(vc_ptr);*/ /* How do I know if there are more reads posted or if I need to post them here? */ } MPIDI_FUNC_EXIT(MPID_STATE_IB_HANDLE_READ_SIMPLE); return MPI_SUCCESS;}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -