📄 socket_write_aggressive.c
字号:
(char*)(car_ptr->data.socket.buf.vec_write.vec[i].MPID_IOV_BUF) + car_ptr->data.socket.buf.vec_write.vec[i].MPID_IOV_LEN + num_left; car_ptr->data.socket.buf.vec_write.num_written_at_cur_index = car_ptr->data.socket.buf.vec_write.vec[i].MPID_IOV_LEN + num_left; car_ptr->data.socket.buf.vec_write.vec[i].MPID_IOV_LEN = -num_left; } } car_ptr->data.socket.buf.vec_write.cur_index = i; } /* if the entire mpi segment has been written, enqueue the car in the completion queue */ if (car_ptr->data.socket.buf.vec_write.total_num_written == buf_ptr->vec.segment_last) {#ifdef MPICH_DEV_BUILD if (car_ptr != car_ptr->vc_ptr->writeq_head) { err_printf("Error: socket_update_car_num_written not dequeueing the head write car.\n"); }#endif socket_car_dequeue_write(car_ptr->vc_ptr); /*printf("dec cc: written vec: %d\n", num_written);fflush(stdout);*/ mm_dec_cc_atomic(car_ptr->request_ptr); mm_car_free(car_ptr); } else { /*msg_printf("partial buffer written, total_num_written %d, segment_last %d\n", car_ptr->data.socket.buf.vec_write.total_num_written, buf_ptr->vec.segment_last);*/ } break; case MM_TMP_BUFFER: num_written = min( /* the amout of buffer space available to have been written */ buf_ptr->tmp.len - car_ptr->data.socket.buf.tmp.num_written, /* the actual amount written */ num_written); /*msg_printf("num_written tmp: %d\n", num_written);*/ /* update the amount written */ car_ptr->data.socket.buf.tmp.num_written += num_written; /* check to see if finished */ if (car_ptr->data.socket.buf.tmp.num_written == buf_ptr->tmp.len) { dbg_printf("num_written: %d\n", car_ptr->data.socket.buf.tmp.num_written); /* remove from write queue and insert in completion queue */#ifdef MPICH_DEV_BUILD if (car_ptr != car_ptr->vc_ptr->writeq_head) { err_printf("Error: socket_update_car_num_written not dequeueing the head write car.\n"); }#endif socket_car_dequeue_write(car_ptr->vc_ptr); /*printf("dec cc: written tmp buffer: %d\n", num_written);fflush(stdout);*/ mm_dec_cc_atomic(car_ptr->request_ptr); mm_car_free(car_ptr); } break; case MM_SIMPLE_BUFFER: num_written = min( /* the amout of buffer space available to have been written */ buf_ptr->simple.len - car_ptr->data.socket.buf.simple.num_written, /* the actual amount written */ num_written); /*msg_printf("num_written simple: %d\n", num_written);*/ /* update the amount written */ car_ptr->data.socket.buf.simple.num_written += num_written; /* check to see if finished */ if (car_ptr->data.socket.buf.simple.num_written == buf_ptr->simple.len) { dbg_printf("num_written: %d\n", car_ptr->data.socket.buf.simple.num_written); /* remove from write queue and insert in completion queue */#ifdef MPICH_DEV_BUILD if (car_ptr != car_ptr->vc_ptr->writeq_head) { err_printf("Error: socket_update_car_num_written not dequeueing the head write car.\n"); }#endif socket_car_dequeue_write(car_ptr->vc_ptr); /*printf("dec cc: written simple buffer: %d\n", num_written);fflush(stdout);*/ mm_dec_cc_atomic(car_ptr->request_ptr); mm_car_free(car_ptr); } break;#ifdef WITH_METHOD_IB case MM_IB_BUFFER: break;#endif#ifdef WITH_METHOD_NEW case MM_NEW_METHOD_BUFFER: break;#endif case MM_NULL_BUFFER: err_printf("Error: socket_update_car_num_written called on a null buffer\n"); break; default: err_printf("Error: socket_update_car_num_written: unknown or unsupported buffer type: %d\n", buf_ptr->type); break; } /* update num_written */ (*num_written_ptr) -= num_written; MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_UPDATE_CAR_NUM_WRITTEN); return MPI_SUCCESS;}/*@ socket_write_aggressive - write as many cars as possible Parameters:+ MPIDI_VC *vc_ptr - vc Notes:@*/int socket_write_aggressive(MPIDI_VC *vc_ptr){ int error; MM_Car *car_ptr; MM_Segment_buffer *buf_ptr; /* This needs to be available until the posted write finishes */ /* A quick fix was to copy the array into the sock structure */ /* Maybe this array can be kept in the VC */ MPID_IOV vec[MPID_IOV_LIMIT]; int cur_pos = 0; BOOL stop = FALSE; MPIDI_STATE_DECL(MPID_STATE_SOCKET_WRITE_AGGRESSIVE); MPIDI_FUNC_ENTER(MPID_STATE_SOCKET_WRITE_AGGRESSIVE); MPIU_dbg_printf("socket_write_aggressive\n"); if (!(vc_ptr->data.socket.state & SOCKET_CONNECTED)) { MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_AGGRESSIVE); return MPI_SUCCESS; } if (vc_ptr->writeq_head == NULL) { /*msg_printf("socket_write_aggressive: write signalled with no car's in the write queue.\n");*/ MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_AGGRESSIVE); return MPI_SUCCESS; } car_ptr = vc_ptr->writeq_head; /* pack as many cars into a vector as possible */ do { buf_ptr = car_ptr->buf_ptr; switch (buf_ptr->type) {#ifdef WITH_METHOD_SHM case MM_SHM_BUFFER: stop = !socket_stuff_vector_shm(vec, &cur_pos, car_ptr, buf_ptr); break;#endif#ifdef WITH_METHOD_VIA case MM_VIA_BUFFER: stop = !socket_stuff_vector_via(vec, &cur_pos, car_ptr, buf_ptr); break;#endif#ifdef WITH_METHOD_VIA_RDMA case MM_VIA_RDMA_BUFFER: stop = !socket_stuff_vector_via_rdma(vec, &cur_pos, car_ptr, buf_ptr); break;#endif case MM_VEC_BUFFER: if (buf_ptr->vec.num_cars_outstanding > 0) { /* num_cars_outstanding > 0 means that the reader has provided data and is waiting for the writers to complete */ stop = !socket_stuff_vector_vec(vec, &cur_pos, car_ptr, buf_ptr); } else { /* the reader hasn't read any data yet, so we can't write anything. */ stop = TRUE; } break; case MM_TMP_BUFFER: stop = !socket_stuff_vector_tmp(vec, &cur_pos, car_ptr, buf_ptr); break; case MM_SIMPLE_BUFFER: stop = !socket_stuff_vector_simple(vec, &cur_pos, car_ptr, buf_ptr); break;#ifdef WITH_METHOD_IB case MM_IB_BUFFER: stop = !socket_stuff_vector_ib(vec, &cur_pos, car_ptr, buf_ptr); break;#endif#ifdef WITH_METHOD_NEW case MM_NEW_METHOD_BUFFER: stop = !socket_stuff_vector_new(vec, &cur_pos, car_ptr, buf_ptr); break;#endif case MM_NULL_BUFFER: err_printf("Error: socket_write_aggressive called on a null buffer\n"); break; default: err_printf("Error: socket_write_aggressive: unknown or unsupported buffer type: %d\n", buf_ptr->type); break; } if (stop) break;#define REALLY_AGGRESSIVE_WRITE#ifdef REALLY_AGGRESSIVE_WRITE car_ptr = (car_ptr->next_ptr) ? car_ptr->next_ptr : /* go to the next car in the list */ car_ptr->vcqnext_ptr; /* else go to the next enqueued list */#else car_ptr = car_ptr->next_ptr;#endif } while (car_ptr); if (cur_pos > 0) { /* post a write of the data */ if (cur_pos == 1) { MPIU_dbg_printf("sock_post_write(%d), %d bytes\n", sock_getid(vc_ptr->data.socket.sock), vec[0].MPID_IOV_LEN); if ((error = sock_post_write(vc_ptr->data.socket.sock, vec[0].MPID_IOV_BUF, vec[0].MPID_IOV_LEN, NULL)) != SOCK_SUCCESS) { socket_print_sock_error(error, "socket_write_aggressive: sock_post_write failed."); MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_AGGRESSIVE); return -1; } } else { /*** debugging printout */ { char str[1024], *s = str; int i, n=0; s += sprintf(s, "sock_post_writev(%d) ", sock_getid(vc_ptr->data.socket.sock)); for (i=0; i<cur_pos; i++) { s += sprintf(s, "%d+", vec[i].SOCK_IOV_LEN); n += vec[i].MPID_IOV_LEN; } s--; sprintf(s, "=%d bytes\n", n); MPIU_dbg_printf("%s", str); } /*** end debugging printout */ if ((error = sock_post_writev(vc_ptr->data.socket.sock, vec, cur_pos, NULL)) != SOCK_SUCCESS) { socket_print_sock_error(error, "socket_write_aggressive: sock_writev failed."); MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_AGGRESSIVE); return -1; } } } MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_WRITE_AGGRESSIVE); return MPI_SUCCESS;}int socket_handle_written(MPIDI_VC *vc_ptr, int num_written){ MPIDI_STATE_DECL(MPID_STATE_SOCKET_HANDLE_WRITTEN); MPIDI_FUNC_ENTER(MPID_STATE_SOCKET_HANDLE_WRITTEN); if (vc_ptr == NULL) { MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_HANDLE_WRITTEN); return MPI_SUCCESS; } MPIU_dbg_printf("socket_handle_written(%d) - %d bytes\n", sock_getid(vc_ptr->data.socket.sock), num_written); if (!(vc_ptr->data.socket.state & SOCKET_CONNECTED)) { if (vc_ptr->data.socket.state & SOCKET_WRITING_ACK) { socket_handle_written_ack(vc_ptr, num_written); MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_HANDLE_WRITTEN); return MPI_SUCCESS; } else if (vc_ptr->data.socket.state & SOCKET_WRITING_CONTEXT_PKT) { socket_handle_written_context_pkt(vc_ptr, num_written); MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_HANDLE_WRITTEN); return MPI_SUCCESS; } else { err_printf("socket_handle_written: unknown connecting state %d.\n", vc_ptr->data.socket.state); } MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_HANDLE_WRITTEN); return MPI_SUCCESS; } /* update all the cars and buffers affected by the sock_write action */ while (num_written) { /* socket_update_car_num_written causes the head to get dequeued and the next car to take its place * until all the data is accounted for or there is an error. */ if (socket_update_car_num_written(vc_ptr->writeq_head, &num_written) != MPI_SUCCESS) { err_printf("socket_write_aggressive:socket_update_car_num_written failed.\n"); MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_HANDLE_WRITTEN); return -1; } /*msg_printf("write_aggressive: num_written updated %d\n", num_written);*/ } /* if there are more cars in the queue, post the next sock_write action */ if (vc_ptr->writeq_head != NULL) socket_write_aggressive(vc_ptr); MPIDI_FUNC_EXIT(MPID_STATE_SOCKET_HANDLE_WRITTEN); return MPI_SUCCESS;}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -