📄 ib_write_aggressive.c
字号:
i = car_ptr->data.ib.buf.vec_write.cur_index; while (num_left > 0) { /* subtract the length of the current vector */ num_left -= car_ptr->data.ib.buf.vec_write.vec[i].MPID_IOV_LEN; if (num_left > 0) { /* the entire vector was written so move to the next index */ i++; } else { /* this vector was only partially written, so update the buf and len fields */ car_ptr->data.ib.buf.vec_write.vec[i].MPID_IOV_BUF = (char*)(car_ptr->data.ib.buf.vec_write.vec[i].MPID_IOV_BUF) + car_ptr->data.ib.buf.vec_write.vec[i].MPID_IOV_LEN + num_left; car_ptr->data.ib.buf.vec_write.num_written_at_cur_index = car_ptr->data.ib.buf.vec_write.vec[i].MPID_IOV_LEN + num_left; car_ptr->data.ib.buf.vec_write.vec[i].MPID_IOV_LEN = -num_left; } } car_ptr->data.ib.buf.vec_write.cur_index = i; } /* if the entire mpi segment has been written, enqueue the car in the completion queue */ if (car_ptr->data.ib.buf.vec_write.total_num_written == buf_ptr->vec.segment_last) {#ifdef MPICH_DEV_BUILD if (car_ptr != car_ptr->vc_ptr->writeq_head) { err_printf("Error: ib_update_car_num_written not dequeueing the head write car.\n"); }#endif ib_car_dequeue_write(car_ptr->vc_ptr); /*printf("dec cc: written vec: %d\n", num_written);fflush(stdout);*/ mm_dec_cc_atomic(car_ptr->request_ptr); mm_car_free(car_ptr); } else { /*msg_printf("partial buffer written, total_num_written %d, segment_last %d\n", car_ptr->data.ib.buf.vec_write.total_num_written, buf_ptr->vec.segment_last);*/ } break; case MM_TMP_BUFFER: num_written = min( /* the amout of buffer space available to have been written */ buf_ptr->tmp.len - car_ptr->data.ib.buf.tmp.num_written, /* the actual amount written */ num_written); /*msg_printf("num_written tmp: %d\n", num_written);*/ /* update the amount written */ car_ptr->data.ib.buf.tmp.num_written += num_written; /* check to see if finished */ if (car_ptr->data.ib.buf.tmp.num_written == buf_ptr->tmp.len) { dbg_printf("num_written: %d\n", car_ptr->data.ib.buf.tmp.num_written); /* remove from write queue and insert in completion queue */#ifdef MPICH_DEV_BUILD if (car_ptr != car_ptr->vc_ptr->writeq_head) { err_printf("Error: ib_update_car_num_written not dequeueing the head write car.\n"); }#endif ib_car_dequeue_write(car_ptr->vc_ptr); /*printf("dec cc: written tmp buffer: %d\n", num_written);fflush(stdout);*/ mm_dec_cc_atomic(car_ptr->request_ptr); mm_car_free(car_ptr); } break; case MM_SIMPLE_BUFFER: num_written = min( /* the amout of buffer space available to have been written */ buf_ptr->simple.len - car_ptr->data.ib.buf.simple.num_written, /* the actual amount written */ num_written); /*msg_printf("num_written simple: %d\n", num_written);*/ /* update the amount written */ car_ptr->data.ib.buf.simple.num_written += num_written; /* check to see if finished */ if (car_ptr->data.ib.buf.simple.num_written == buf_ptr->simple.len) { dbg_printf("num_written: %d\n", car_ptr->data.ib.buf.simple.num_written); /* remove from write queue and insert in completion queue */#ifdef MPICH_DEV_BUILD if (car_ptr != car_ptr->vc_ptr->writeq_head) { err_printf("Error: ib_update_car_num_written not dequeueing the head write car.\n"); }#endif ib_car_dequeue_write(car_ptr->vc_ptr); /*printf("dec cc: written simple buffer: %d\n", num_written);fflush(stdout);*/ mm_dec_cc_atomic(car_ptr->request_ptr); mm_car_free(car_ptr); } break;#ifdef WITH_METHOD_IB case MM_IB_BUFFER: break;#endif#ifdef WITH_METHOD_NEW case MM_NEW_METHOD_BUFFER: break;#endif case MM_NULL_BUFFER: err_printf("Error: ib_update_car_num_written called on a null buffer\n"); break; default: err_printf("Error: ib_update_car_num_written: unknown or unsupported buffer type: %d\n", buf_ptr->type); break; } /* update num_written */ (*num_written_ptr) -= num_written; MPIDI_FUNC_EXIT(MPID_STATE_IB_UPDATE_CAR_NUM_WRITTEN); return MPI_SUCCESS;}/*@ ib_write_aggressive - write as many cars as possible Parameters:+ MPIDI_VC *vc_ptr - vc Notes:@*/int ib_write_aggressive(MPIDI_VC *vc_ptr){ int error; MM_Car *car_ptr; MM_Segment_buffer *buf_ptr; /* This needs to be available until the posted write finishes */ /* A quick fix was to copy the array into the ib structure */ /* Maybe this array can be kept in the VC */ MPID_IOV vec[MPID_IOV_LIMIT]; int cur_pos = 0; BOOL stop = FALSE; MPIDI_STATE_DECL(MPID_STATE_IB_WRITE_AGGRESSIVE); MPIDI_FUNC_ENTER(MPID_STATE_IB_WRITE_AGGRESSIVE); MPIU_dbg_printf("ib_write_aggressive\n"); if (vc_ptr->writeq_head == NULL) { /*msg_printf("ib_write_aggressive: write signalled with no car's in the write queue.\n");*/ MPIDI_FUNC_EXIT(MPID_STATE_IB_WRITE_AGGRESSIVE); return MPI_SUCCESS; } car_ptr = vc_ptr->writeq_head; /* pack as many cars into a vector as possible */ do { buf_ptr = car_ptr->buf_ptr; switch (buf_ptr->type) {#ifdef WITH_METHOD_SHM case MM_SHM_BUFFER: stop = !ib_stuff_vector_shm(vec, &cur_pos, car_ptr, buf_ptr); break;#endif#ifdef WITH_METHOD_VIA case MM_VIA_BUFFER: stop = !ib_stuff_vector_via(vec, &cur_pos, car_ptr, buf_ptr); break;#endif#ifdef WITH_METHOD_VIA_RDMA case MM_VIA_RDMA_BUFFER: stop = !ib_stuff_vector_via_rdma(vec, &cur_pos, car_ptr, buf_ptr); break;#endif case MM_VEC_BUFFER: if (buf_ptr->vec.num_cars_outstanding > 0) { /* num_cars_outstanding > 0 means that the reader has provided data and is waiting for the writers to complete */ stop = !ib_stuff_vector_vec(vec, &cur_pos, car_ptr, buf_ptr); } else { /* the reader hasn't read any data yet, so we can't write anything. */ stop = TRUE; } break; case MM_TMP_BUFFER: stop = !ib_stuff_vector_tmp(vec, &cur_pos, car_ptr, buf_ptr); break; case MM_SIMPLE_BUFFER: stop = !ib_stuff_vector_simple(vec, &cur_pos, car_ptr, buf_ptr); break;#ifdef WITH_METHOD_IB case MM_IB_BUFFER: stop = !ib_stuff_vector_ib(vec, &cur_pos, car_ptr, buf_ptr); break;#endif#ifdef WITH_METHOD_NEW case MM_NEW_METHOD_BUFFER: stop = !ib_stuff_vector_new(vec, &cur_pos, car_ptr, buf_ptr); break;#endif case MM_NULL_BUFFER: err_printf("Error: ib_write_aggressive called on a null buffer\n"); break; default: err_printf("Error: ib_write_aggressive: unknown or unsupported buffer type: %d\n", buf_ptr->type); break; } if (stop) break;#define REALLY_AGGRESSIVE_WRITE#ifdef REALLY_AGGRESSIVE_WRITE car_ptr = (car_ptr->next_ptr) ? car_ptr->next_ptr : /* go to the next car in the list */ car_ptr->vcqnext_ptr; /* else go to the next enqueued list */#else car_ptr = car_ptr->next_ptr;#endif } while (car_ptr); if (cur_pos > 0) { /* post a write of the data */ if (cur_pos == 1) { MPIU_dbg_printf("ibu_post_write, %d bytes\n", vec[0].MPID_IOV_LEN); if ((error = ibr_post_write(vc_ptr, vec[0].MPID_IOV_BUF, vec[0].MPID_IOV_LEN, NULL)) != IB_SUCCESS) { err_printf("ib_write_aggressive: ibu_post_write failed, error %d\n", error); MPIDI_FUNC_EXIT(MPID_STATE_IB_WRITE_AGGRESSIVE); return -1; } } else { /*** debugging printout */ { char str[1024], *s = str; int i, n=0; s += sprintf(s, "ibu_post_writev "); for (i=0; i<cur_pos; i++) { s += sprintf(s, "%d+", vec[i].MPID_IOV_LEN); n += vec[i].MPID_IOV_LEN; } s--; sprintf(s, "=%d bytes\n", n); MPIU_dbg_printf("%s", str); } /*** end debugging printout */ if ((error = ibr_post_writev(vc_ptr, vec, cur_pos, NULL)) != IB_SUCCESS) { err_printf("ib_write_aggressive: ib_writev failed, error %d\n", error); MPIDI_FUNC_EXIT(MPID_STATE_IB_WRITE_AGGRESSIVE); return -1; } } } MPIDI_FUNC_EXIT(MPID_STATE_IB_WRITE_AGGRESSIVE); return MPI_SUCCESS;}int ib_handle_written(MPIDI_VC *vc_ptr, void *mem_ptr, int num_written){ MPIDI_STATE_DECL(MPID_STATE_IB_HANDLE_WRITTEN); MPIDI_FUNC_ENTER(MPID_STATE_IB_HANDLE_WRITTEN); if (vc_ptr == NULL) { MPIDI_FUNC_EXIT(MPID_STATE_IB_HANDLE_WRITTEN); return MPI_SUCCESS; } MPIU_dbg_printf("ib_handle_written - %d bytes\n", num_written); /* update all the cars and buffers affected by the ib_write action */ while (num_written) { /* ib_update_car_num_written causes the head to get dequeued and the next car to take its place * until all the data is accounted for or there is an error. */ if (ib_update_car_num_written(vc_ptr->writeq_head, &num_written) != MPI_SUCCESS) { err_printf("ib_write_aggressive:ib_update_car_num_written failed.\n"); MPIDI_FUNC_EXIT(MPID_STATE_IB_HANDLE_WRITTEN); return -1; } /*msg_printf("write_aggressive: num_written updated %d\n", num_written);*/ } /* if there are more cars in the queue, post the next ib_write action */ if (vc_ptr->writeq_head != NULL) ib_write_aggressive(vc_ptr); MPIDI_FUNC_EXIT(MPID_STATE_IB_HANDLE_WRITTEN); return MPI_SUCCESS;}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -