📄 tcp_write_aggressive.c
字号:
/* update vector */ car_ptr->data.tcp.buf.vec_write.cur_num_written += num_written; car_ptr->data.tcp.buf.vec_write.total_num_written += num_written; if (car_ptr->data.tcp.buf.vec_write.cur_num_written == buf_ptr->vec.buf_size) { /* reset this car */ car_ptr->data.tcp.buf.vec_write.cur_index = 0; car_ptr->data.tcp.buf.vec_write.num_read_copy = 0; car_ptr->data.tcp.buf.vec_write.cur_num_written = 0; car_ptr->data.tcp.buf.vec_write.num_written_at_cur_index = 0; car_ptr->data.tcp.buf.vec_write.vec_size = 0; /* signal that we have finished writing the current vector */ mm_dec_atomic(&(buf_ptr->vec.num_cars_outstanding)); } else { num_left = num_written; i = car_ptr->data.tcp.buf.vec_write.cur_index; while (num_left > 0) { /* subtract the length of the current vector */ num_left -= car_ptr->data.tcp.buf.vec_write.vec[i].MPID_IOV_LEN; if (num_left > 0) { /* the entire vector was written so move to the next index */ i++; } else { /* this vector was only partially written, so update the buf and len fields */ car_ptr->data.tcp.buf.vec_write.vec[i].MPID_IOV_BUF = (char*)(car_ptr->data.tcp.buf.vec_write.vec[i].MPID_IOV_BUF) + car_ptr->data.tcp.buf.vec_write.vec[i].MPID_IOV_LEN + num_left; car_ptr->data.tcp.buf.vec_write.num_written_at_cur_index = car_ptr->data.tcp.buf.vec_write.vec[i].MPID_IOV_LEN + num_left; car_ptr->data.tcp.buf.vec_write.vec[i].MPID_IOV_LEN = -num_left; } } car_ptr->data.tcp.buf.vec_write.cur_index = i; } /* if the entire mpi segment has been written, enqueue the car in the completion queue */ if (car_ptr->data.tcp.buf.vec_write.total_num_written == buf_ptr->vec.segment_last) {#ifdef MPICH_DEV_BUILD if (car_ptr != car_ptr->vc_ptr->writeq_head) { err_printf("Error: tcp_update_car_num_written not dequeueing the head write car.\n"); }#endif tcp_car_dequeue_write(car_ptr->vc_ptr); /*printf("dec cc: written vec: %d\n", num_written);fflush(stdout);*/ mm_dec_cc_atomic(car_ptr->request_ptr); mm_car_free(car_ptr); } else { /*msg_printf("partial buffer written, total_num_written %d, segment_last %d\n", car_ptr->data.tcp.buf.vec_write.total_num_written, buf_ptr->vec.segment_last);*/ } break; case MM_TMP_BUFFER: num_written = min( /* the amout of buffer space available to have been written */ buf_ptr->tmp.len - car_ptr->data.tcp.buf.tmp.num_written, /* the actual amount written */ num_written); /*msg_printf("num_written tmp: %d\n", num_written);*/ /* update the amount written */ car_ptr->data.tcp.buf.tmp.num_written += num_written; /* check to see if finished */ if (car_ptr->data.tcp.buf.tmp.num_written == buf_ptr->tmp.len) { dbg_printf("num_written: %d\n", car_ptr->data.tcp.buf.tmp.num_written); /* remove from write queue and insert in completion queue */#ifdef MPICH_DEV_BUILD if (car_ptr != car_ptr->vc_ptr->writeq_head) { err_printf("Error: tcp_update_car_num_written not dequeueing the head write car.\n"); }#endif tcp_car_dequeue_write(car_ptr->vc_ptr); /*printf("dec cc: written tmp buffer: %d\n", num_written);fflush(stdout);*/ mm_dec_cc_atomic(car_ptr->request_ptr); mm_car_free(car_ptr); } break; case MM_SIMPLE_BUFFER: num_written = min( /* the amout of buffer space available to have been written */ buf_ptr->simple.len - car_ptr->data.tcp.buf.simple.num_written, /* the actual amount written */ num_written); /*msg_printf("num_written simple: %d\n", num_written);*/ /* update the amount written */ car_ptr->data.tcp.buf.simple.num_written += num_written; /* check to see if finished */ if (car_ptr->data.tcp.buf.simple.num_written == buf_ptr->simple.len) { dbg_printf("num_written: %d\n", car_ptr->data.tcp.buf.simple.num_written); /* remove from write queue and insert in completion queue */#ifdef MPICH_DEV_BUILD if (car_ptr != car_ptr->vc_ptr->writeq_head) { err_printf("Error: tcp_update_car_num_written not dequeueing the head write car.\n"); }#endif tcp_car_dequeue_write(car_ptr->vc_ptr); /*printf("dec cc: written simple buffer: %d\n", num_written);fflush(stdout);*/ mm_dec_cc_atomic(car_ptr->request_ptr); mm_car_free(car_ptr); } break;#ifdef WITH_METHOD_IB case MM_IB_BUFFER: break;#endif#ifdef WITH_METHOD_NEW case MM_NEW_METHOD_BUFFER: break;#endif case MM_NULL_BUFFER: err_printf("Error: tcp_update_car_num_written called on a null buffer\n"); break; default: err_printf("Error: tcp_update_car_num_written: unknown or unsupported buffer type: %d\n", buf_ptr->type); break; } /* update num_written */ (*num_written_ptr) -= num_written; MPIDI_FUNC_EXIT(MPID_STATE_TCP_UPDATE_CAR_NUM_WRITTEN); return MPI_SUCCESS;}/*@ tcp_write_aggressive - write as many cars as possible Parameters:+ MPIDI_VC *vc_ptr - vc Notes:@*/int tcp_write_aggressive(MPIDI_VC *vc_ptr){ MM_Car *car_ptr; MM_Segment_buffer *buf_ptr; MPID_IOV vec[MPID_IOV_LIMIT]; int cur_pos = 0; BOOL stop = FALSE; int num_written; MPIDI_STATE_DECL(MPID_STATE_TCP_WRITE_AGGRESSIVE); MPIDI_FUNC_ENTER(MPID_STATE_TCP_WRITE_AGGRESSIVE); if (!vc_ptr->data.tcp.connected) { MPIDI_FUNC_EXIT(MPID_STATE_TCP_WRITE_AGGRESSIVE); return MPI_SUCCESS; } if (vc_ptr->writeq_head == NULL) { msg_printf("tcp_write_aggressive: write signalled with no vc's in the write queue.\n"); MPIDI_FUNC_EXIT(MPID_STATE_TCP_WRITE_AGGRESSIVE); return MPI_SUCCESS; } car_ptr = vc_ptr->writeq_head; /* pack as many cars into a vector as possible */ do { buf_ptr = car_ptr->buf_ptr; switch (buf_ptr->type) {#ifdef WITH_METHOD_SHM case MM_SHM_BUFFER: stop = !tcp_stuff_vector_shm(vec, &cur_pos, car_ptr, buf_ptr); break;#endif#ifdef WITH_METHOD_VIA case MM_VIA_BUFFER: stop = !tcp_stuff_vector_via(vec, &cur_pos, car_ptr, buf_ptr); break;#endif#ifdef WITH_METHOD_VIA_RDMA case MM_VIA_RDMA_BUFFER: stop = !tcp_stuff_vector_via_rdma(vec, &cur_pos, car_ptr, buf_ptr); break;#endif case MM_VEC_BUFFER: if (buf_ptr->vec.num_cars_outstanding > 0) { /* num_cars_outstanding means that the reader has provided data and is waiting for the writers to complete */ stop = !tcp_stuff_vector_vec(vec, &cur_pos, car_ptr, buf_ptr); } else { /* the reader hasn't read any data yet, so we can't write anything. */ stop = TRUE; } break; case MM_TMP_BUFFER: stop = !tcp_stuff_vector_tmp(vec, &cur_pos, car_ptr, buf_ptr); break; case MM_SIMPLE_BUFFER: stop = !tcp_stuff_vector_simple(vec, &cur_pos, car_ptr, buf_ptr); break;#ifdef WITH_METHOD_IB case MM_IB_BUFFER: stop = !tcp_stuff_vector_ib(vec, &cur_pos, car_ptr, buf_ptr); break;#endif#ifdef WITH_METHOD_NEW case MM_NEW_METHOD_BUFFER: stop = !tcp_stuff_vector_new(vec, &cur_pos, car_ptr, buf_ptr); break;#endif case MM_NULL_BUFFER: err_printf("Error: tcp_write_aggressive called on a null buffer\n"); break; default: err_printf("Error: tcp_write_aggressive: unknown or unsupported buffer type: %d\n", buf_ptr->type); break; } if (stop) break;#define REALLY_AGGRESSIVE_WRITE#ifdef REALLY_AGGRESSIVE_WRITE car_ptr = (car_ptr->next_ptr) ? car_ptr->next_ptr : /* go to the next car in the list */ car_ptr->vcqnext_ptr; /* else go to the next enqueued list */#else car_ptr = car_ptr->next_ptr;#endif } while (car_ptr); if (cur_pos > 0) { /* write the data */ if (cur_pos == 1) { num_written = bwrite(vc_ptr->data.tcp.bfd, vec[0].MPID_IOV_BUF, vec[0].MPID_IOV_LEN); if (num_written == SOCKET_ERROR) { TCP_Process.error = beasy_getlasterror(); beasy_error_to_string(TCP_Process.error, TCP_Process.err_msg, TCP_ERROR_MSG_LENGTH); err_printf("tcp_write_aggressive: bwrite failed, error %d: %s\n", TCP_Process.error, TCP_Process.err_msg); MPIDI_FUNC_EXIT(MPID_STATE_TCP_WRITE_AGGRESSIVE); return -1; } } else { num_written = bwritev(vc_ptr->data.tcp.bfd, vec, cur_pos); if (num_written == SOCKET_ERROR) { TCP_Process.error = beasy_getlasterror(); beasy_error_to_string(TCP_Process.error, TCP_Process.err_msg, TCP_ERROR_MSG_LENGTH); err_printf("tcp_write_aggressive: bwritev failed, error %d: %s\n", TCP_Process.error, TCP_Process.err_msg); MPIDI_FUNC_EXIT(MPID_STATE_TCP_WRITE_AGGRESSIVE); return -1; } } /* update all the cars and buffers affected by the bwrite(v) action */ /*msg_printf("write_aggressive: num_written %d\n", num_written);*/ while (num_written) { /* tcp_update_car_num_written causes the head to get dequeued and the next car to take its place * until all the data is accounted for or there is an error. */ if (tcp_update_car_num_written(vc_ptr->writeq_head, &num_written) != MPI_SUCCESS) { err_printf("tcp_write_aggressive:tcp_update_car_num_written failed.\n"); return -1; } /*msg_printf("write_aggressive: num_written updated %d\n", num_written);*/ } } MPIDI_FUNC_EXIT(MPID_STATE_TCP_WRITE_AGGRESSIVE); return MPI_SUCCESS;}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -