📄 ibu.c
字号:
completion_data.status, iba_compstr(completion_data.status)); return IBU_FAIL; } //if (GetQueuedCompletionStatus(set, &num_bytes, (DWORD*)&ibu, &ovl, millisecond_timeout)) ibu = (ibu_t)(((ibu_work_id_handle_t*)&completion_data.work_req_id)->data.ptr); mem_ptr = (void*)(((ibu_work_id_handle_t*)&completion_data.work_req_id)->data.mem); switch (completion_data.op_type) { case OP_SEND: /*ib_handle_written(vc_ptr, mem_ptr, ibu_next_num_written());*/ /* put the send packet back in the pool */ /*BlockFree(allocator, mem_ptr);*/ break; case OP_RECEIVE: /*ib_handle_read(vc_ptr, mem_ptr, completion_data.bytes_num);*/ /* put the receive packet back in the pool */ /*BlockFree(m_allocator, mem_ptr);*/ /* post another receive to replace the consumed one */ /*ibu_post_receive(vc_ptr);*/ break; default: printf("unknown ib op_type: %d\n", completion_data.op_type); break; }#if 0 if (ibu->closing && ibu->pending_operations == 0) { out->num_bytes = 0; out->error = 0; out->op_type = IBU_OP_CLOSE; out->user_ptr = ibu->user_ptr; /*BlockFree(g_ibu_allocator, ibu);*/ MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return IBU_SUCCESS; } if (ovl == &ibu->read.ovl) { ibu->read.total += num_bytes; if (ibu->read.use_iov) { while (num_bytes) { if (ibu->read.iov[ibu->read.index].IBU_IOV_LEN <= num_bytes) { num_bytes -= ibu->read.iov[ibu->read.index].IBU_IOV_LEN; ibu->read.index++; ibu->read.iovlen--; } else { ibu->read.iov[ibu->read.index].IBU_IOV_LEN -= num_bytes; ibu->read.iov[ibu->read.index].IBU_IOV_BUF = (char*)(ibu->read.iov[ibu->read.index].IBU_IOV_BUF) + num_bytes; num_bytes = 0; } } if (ibu->read.iovlen == 0) { out->num_bytes = ibu->read.total; out->op_type = IBU_OP_READ; out->user_ptr = ibu->user_ptr; ibu->pending_operations--; if (ibu->closing && ibu->pending_operations == 0) { printf("ibu_wait: closing ibuet(%d) after iov read completed.\n", ibu_getid(ibu)); shutdown(ibu->ibu, SD_BOTH); closeibuet(ibu->ibu); ibu->ibu = IBU_INVALID_QP; } MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return IBU_SUCCESS; } /* make the user upcall */ if (ibu->read.progress_update != NULL) ibu->read.progress_update(num_bytes, ibu->user_ptr); /* post a read of the remaining data */ WSARecv(ibu->ibu, ibu->read.iov, ibu->read.iovlen, &ibu->read.num_bytes, &dwFlags, &ibu->read.ovl, NULL); } else { ibu->read.buffer = (char*)(ibu->read.buffer) + num_bytes; ibu->read.bufflen -= num_bytes; if (ibu->read.bufflen == 0) { out->num_bytes = ibu->read.total; out->op_type = IBU_OP_READ; out->user_ptr = ibu->user_ptr; ibu->pending_operations--; if (ibu->closing && ibu->pending_operations == 0) { printf("ibu_wait: closing ibuet(%d) after simple read completed.\n", ibu_getid(ibu)); shutdown(ibu->ibu, SD_BOTH); closesocket(ibu->ibu); ibu->ibu = IBU_INVALID_QP; } MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return IBU_SUCCESS; } /* make the user upcall */ if (ibu->read.progress_update != NULL) ibu->read.progress_update(num_bytes, ibu->user_ptr); /* post a read of the remaining data */ ReadFile((HANDLE)(ibu->ibu), ibu->read.buffer, ibu->read.bufflen, &ibu->read.num_bytes, &ibu->read.ovl); } } else if (ovl == &ibu->write.ovl) { if (ibu->state & IBU_CONNECTING) { /* insert code here to determine that the connect succeeded */ /* ... */ ibu->state ^= IBU_CONNECTING; /* remove the IBU_CONNECTING bit */ out->op_type = IBU_OP_CONNECT; out->user_ptr = ibu->user_ptr; ibu->pending_operations--; if (ibu->closing && ibu->pending_operations == 0) { printf("ibu_wait: closing ibuet(%d) after connect completed.\n", ibu_getid(ibu)); shutdown(ibu->ibu, SD_BOTH); closesocket(ibu->ibu); ibu->ibu = IBU_INVALID_QP; } MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return IBU_SUCCESS; } else { /*printf("ibu_wait: write update, total = %d + %d = %d\n", ibu->write.total, num_bytes, ibu->write.total + num_bytes);*/ ibu->write.total += num_bytes; if (ibu->write.use_iov) { while (num_bytes) { if (ibu->write.iov[ibu->write.index].IBU_IOV_LEN <= num_bytes) { /*printf("ibu_wait: write.index %d, len %d\n", ibu->write.index, ibu->write.iov[ibu->write.index].IBU_IOV_LEN);*/ num_bytes -= ibu->write.iov[ibu->write.index].IBU_IOV_LEN; ibu->write.index++; ibu->write.iovlen--; } else { /*printf("ibu_wait: partial data written [%d].len = %d, num_bytes = %d\n", ibu->write.index, ibu->write.iov[ibu->write.index].IBU_IOV_LEN, num_bytes);*/ ibu->write.iov[ibu->write.index].IBU_IOV_LEN -= num_bytes; ibu->write.iov[ibu->write.index].IBU_IOV_BUF = (char*)(ibu->write.iov[ibu->write.index].IBU_IOV_BUF) + num_bytes; num_bytes = 0; } } if (ibu->write.iovlen == 0) { out->num_bytes = ibu->write.total; out->op_type = IBU_OP_WRITE; out->user_ptr = ibu->user_ptr; ibu->pending_operations--; if (ibu->closing && ibu->pending_operations == 0) { printf("ibu_wait: closing ibuet(%d) after iov write completed.\n", ibu_getid(ibu)); shutdown(ibu->ibu, SD_BOTH); closeibuet(ibu->ibu); ibu->ibu = IBU_INVALID_QP; } MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return IBU_SUCCESS; } /* make the user upcall */ if (ibu->write.progress_update != NULL) ibu->write.progress_update(num_bytes, ibu->user_ptr); /* post a write of the remaining data */ printf("ibu_wait: posting write of the remaining data, vec size %d\n", ibu->write.iovlen); WSASend(ibu->ibu, ibu->write.iov, ibu->write.iovlen, &ibu->write.num_bytes, 0, &ibu->write.ovl, NULL); } else { ibu->write.buffer = (char*)(ibu->write.buffer) + num_bytes; ibu->write.bufflen -= num_bytes; if (ibu->write.bufflen == 0) { out->num_bytes = ibu->write.total; out->op_type = IBU_OP_WRITE; out->user_ptr = ibu->user_ptr; ibu->pending_operations--; if (ibu->closing && ibu->pending_operations == 0) { printf("ibu_wait: closing ibuet(%d) after simple write completed.\n", ibu_getid(ibu)); shutdown(ibu->ibu, SD_BOTH); closeibuet(ibu->ibu); ibu->ibu = IBU_INVALID_QP; } MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return IBU_SUCCESS; } /* make the user upcall */ if (ibu->write.progress_update != NULL) ibu->write.progress_update(num_bytes, ibu->user_ptr); /* post a write of the remaining data */ WriteFile((HANDLE)(ibu->ibu), ibu->write.buffer, ibu->write.bufflen, &ibu->write.num_bytes, &ibu->write.ovl); } } } else { MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT); return IBU_FAIL; }#endif } MPIDI_FUNC_EXIT(MPID_STATE_IBU_WAIT);}int ibu_set_user_ptr(ibu_t ibu, void *user_ptr){ MPIDI_STATE_DECL(MPID_STATE_IBU_SET_USER_PTR); MPIDI_FUNC_ENTER(MPID_STATE_IBU_SET_USER_PTR); if (ibu == IBU_INVALID_QP) { MPIDI_FUNC_EXIT(MPID_STATE_IBU_SET_USER_PTR); return IBU_FAIL; } ibu->user_ptr = user_ptr; MPIDI_FUNC_EXIT(MPID_STATE_IBU_SET_USER_PTR); return IBU_SUCCESS;}/* immediate functions *//* infiniband has no immediate functions *//*int ibu_read(ibu_t ibu, void *buf, int len, int *num_read){ MPIDI_STATE_DECL(MPID_STATE_IBU_READ); MPIDI_FUNC_ENTER(MPID_STATE_IBU_READ); *num_read = 0; MPIDI_FUNC_EXIT(MPID_STATE_IBU_READ); return IBU_SUCCESS;}int ibu_readv(ibu_t ibu, IBU_IOV *iov, int n, int *num_read){ DWORD nFlags = 0; MPIDI_STATE_DECL(MPID_STATE_IBU_READV); MPIDI_FUNC_ENTER(MPID_STATE_IBU_READV); *num_read = 0; MPIDI_FUNC_EXIT(MPID_STATE_IBU_READV); return IBU_SUCCESS;}int ibu_write(ibu_t ibu, void *buf, int len, int *num_written){ MPIDI_STATE_DECL(MPID_STATE_IBU_WRITE); MPIDI_FUNC_ENTER(MPID_STATE_IBU_WRITE); *num_written = 0; MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITE); return IBU_SUCCESS;}int ibu_writev(ibu_t ibu, IBU_IOV *iov, int n, int *num_written){ MPIDI_STATE_DECL(MPID_STATE_IBU_WRITEV); MPIDI_FUNC_ENTER(MPID_STATE_IBU_WRITEV); *num_written = 0; MPIDI_FUNC_EXIT(MPID_STATE_IBU_WRITEV); return IBU_SUCCESS;}*//* non-blocking functions */int ibu_post_read(ibu_t ibu, void *buf, int len, int (*rfn)(int, void*)){ MPIDI_STATE_DECL(MPID_STATE_IBU_POST_READ); MPIDI_FUNC_ENTER(MPID_STATE_IBU_POST_READ); ibu->read.total = 0; ibu->read.buffer = buf; ibu->read.bufflen = len; ibu->read.use_iov = FALSE; ibu->read.progress_update = rfn; ibu->state |= IBU_READING; ibu->pending_operations++; /*ReadFile((HANDLE)(ibu->ibu), buf, len, &ibu->read.num_bytes, &ibu->read.ovl);*/ MPIDI_FUNC_EXIT(MPID_STATE_IBU_POST_READ); return IBU_SUCCESS;}int ibu_post_readv(ibu_t ibu, IBU_IOV *iov, int n, int (*rfn)(int, void*)){ DWORD flags = 0; MPIDI_STATE_DECL(MPID_STATE_IBU_POST_READV); MPIDI_FUNC_ENTER(MPID_STATE_IBU_POST_READV); ibu->read.total = 0; /*ibu->read.iov = iov;*/ memcpy(ibu->read.iov, iov, sizeof(IBU_IOV) * n); ibu->read.iovlen = n; ibu->read.index = 0; ibu->read.use_iov = TRUE; ibu->read.progress_update = rfn; ibu->state |= IBU_READING; ibu->pending_operations++; /*WSARecv(ibu->ibu, ibu->read.iov, n, &ibu->read.num_bytes, &flags, &ibu->read.ovl, NULL);*/ MPIDI_FUNC_EXIT(MPID_STATE_IBU_POST_READV); return IBU_SUCCESS;}int ibu_post_write(ibu_t ibu, void *buf, int len, int (*wfn)(int, void*)){ MPIDI_STATE_DECL(MPID_STATE_IBU_POST_WRITE); MPIDI_FUNC_ENTER(MPID_STATE_IBU_POST_WRITE); ibu->write.total = 0; ibu->write.buffer = buf; ibu->write.bufflen = len; ibu->write.use_iov = FALSE; ibu->write.progress_update = wfn; ibu->state |= IBU_WRITING; ibu->pending_operations++; /*WriteFile((HANDLE)(ibu->ibu), buf, len, &ibu->write.num_bytes, &ibu->write.ovl);*/ ibui_post_write(ibu, buf, len, wfn); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POST_WRITE); return IBU_SUCCESS;}int ibu_post_writev(ibu_t ibu, IBU_IOV *iov, int n, int (*wfn)(int, void*)){ MPIDI_STATE_DECL(MPID_STATE_IBU_POST_WRITEV); MPIDI_FUNC_ENTER(MPID_STATE_IBU_POST_WRITEV); ibu->write.total = 0; /*ibu->write.iov = iov;*/ memcpy(ibu->write.iov, iov, sizeof(IBU_IOV) * n); ibu->write.iovlen = n; ibu->write.index = 0; ibu->write.use_iov = TRUE; ibu->write.progress_update = wfn; ibu->state |= IBU_WRITING; ibu->pending_operations++; /* { char str[1024], *s = str; int i; s += sprintf(s, "ibu_post_writev("); for (i=0; i<n; i++) s += sprintf(s, "%d,", iov[i].IBU_IOV_LEN); sprintf(s, ")\n"); printf("%s", str); } */ /*WSASend(ibu->ibu, ibu->write.iov, n, &ibu->write.num_bytes, 0, &ibu->write.ovl, NULL);*/ ibui_post_writev(ibu, ibu->write.iov, n, wfn); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POST_WRITEV); return IBU_SUCCESS;}/* extended functions */int ibu_get_lid(){ return IBU_Process.lid;}#if 0int ibu_getid(ibu_t ibu){ return 0;}int ibu_easy_receive(ibu_t ibu, void *buf, int len, int *num_read){ int error; int n; int total = 0; MPIDI_STATE_DECL(MPID_STATE_IBU_EASY_RECEIVE); MPIDI_FUNC_ENTER(MPID_STATE_IBU_EASY_RECEIVE); while (len) { error = ibu_read(ibu, buf, len, &n); if (error != IBU_SUCCESS) { *num_read = total; MPIDI_FUNC_EXIT(MPID_STATE_IBU_EASY_RECEIVE); return error; } total += n; buf = (char*)buf + n; len -= n; } MPIDI_FUNC_EXIT(MPID_STATE_IBU_EASY_RECEIVE); return IBU_SUCCESS;}int ibu_easy_send(ibu_t ibu, void *buf, int len, int *num_written){ int error; int n; int total = 0; MPIDI_STATE_DECL(MPID_STATE_IBU_EASY_SEND); MPIDI_FUNC_ENTER(MPID_STATE_IBU_EASY_SEND); while (len) { error = ibui_write(ibu, buf, len, &n); if (error != IBU_SUCCESS) { *num_written = total; MPIDI_FUNC_EXIT(MPID_STATE_IBU_EASY_SEND); return error; } total += n; buf = (char*)buf + n; len -= n; } MPIDI_FUNC_EXIT(MPID_STATE_IBU_EASY_SEND); return IBU_SUCCESS;}#endif
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -