📄 ibu.c
字号:
attr_rec[1].data = (int)set; attr_rec[2].data = (int)set; attr_rec[3].data = 255; attr_rec[4].data = 255; attrList.attr_num = sizeof(attr_rec)/sizeof(attr_rec[0]); attrList.attr_rec_p = &attr_rec[0]; status = ib_qp_create_us( IBU_Process.hca_handle, IBU_Process.pd_handle, &attrList, &ibu->qp_handle, &ibu->dest_qp_num, NULL); if (status != IBA_OK) return status; return IBU_SUCCESS;}static ib_mr_handle_t s_mr_handle;static ib_uint32_t s_lkey;static void *ib_malloc_register(size_t size){ ib_uint32_t status; void *ptr; ib_uint32_t rkey; ptr = malloc(size); if (ptr == NULL) { printf("malloc(%d) failed.\n", size); return NULL; } status = ib_mr_register_us( IBU_Process.hca_handle, (ib_uint8_t*)ptr, size, IBU_Process.pd_handle, IB_ACCESS_LOCAL_WRITE, &s_mr_handle, &s_lkey, &rkey); if (status != IBU_SUCCESS) { printf("ib_mr_register_us failed, error %d\n", status); return NULL; } return ptr;}static void ib_free_deregister(void *p){ /*ib_mr_deregister_us(IBU_Process.hca_handle, s_mr_handle);*/ free(p);}#ifndef min#define min(a, b) ((a) < (b) ? (a) : (b))#endif/*static int s_cur_receive = 0;static int s_cur_send = 0;int g_num_receive_posted = 0;int g_num_send_posted = 0;*/static int ibui_post_receive(ibu_t ibu){ ib_uint32_t status; ib_scatter_gather_list_t sg_list; ib_data_segment_t data; ib_work_req_rcv_t work_req; void *mem_ptr; mem_ptr = BlockAlloc(ibu->allocator); sg_list.data_seg_p = &data; sg_list.data_seg_num = 1; data.length = IBU_PACKET_SIZE; data.va = (ib_uint64_t)(ib_uint32_t)mem_ptr; data.l_key = ibu->lkey; work_req.op_type = OP_RECEIVE; work_req.sg_list = sg_list; /* store the VC ptr and the mem ptr in the work id */ ((ibu_work_id_handle_t*)&work_req.work_req_id)->data.ptr = (ib_uint32_t)ibu; ((ibu_work_id_handle_t*)&work_req.work_req_id)->data.mem = (ib_uint32_t)mem_ptr; /* printf("ib_post_rcv_req_us %d\n", s_cur_receive++); g_num_receive_posted++; */ status = ib_post_rcv_req_us(IBU_Process.hca_handle, ibu->qp_handle, &work_req); if (status != IBU_SUCCESS) { printf("Error: failed to post ib receive, status = %d\n", status); return status; } return IBU_SUCCESS;}typedef struct ibu_num_written_node_t{ int num_bytes; struct ibu_num_written_node_t *next;} ibu_num_written_node_t;static ibu_num_written_node_t *g_write_list_head = NULL;static ibu_num_written_node_t *g_write_list_tail = NULL;static int ibui_next_num_written(){ ibu_num_written_node_t *p; int num_bytes; p = g_write_list_head; g_write_list_head = g_write_list_head->next; if (g_write_list_head == NULL) g_write_list_tail = NULL; num_bytes = p->num_bytes; free(p); return num_bytes;}static int ibui_post_write(ibu_t ibu, void *buf, int len, int (*write_progress_update)(int, void*)){ ib_uint32_t status; ib_scatter_gather_list_t sg_list; ib_data_segment_t data; ib_work_req_send_t work_req; void *mem_ptr; ibu_num_written_node_t *p; int length; while (len) { length = min(len, IBU_PACKET_SIZE); len -= length; p = malloc(sizeof(ibu_num_written_node_t)); p->next = NULL; p->num_bytes = length; if (g_write_list_tail) { g_write_list_tail->next = p; } else { g_write_list_head = p; } g_write_list_tail = p; mem_ptr = BlockAlloc(ibu->allocator); memcpy(mem_ptr, buf, length); sg_list.data_seg_p = &data; sg_list.data_seg_num = 1; data.length = length; data.va = (ib_uint64_t)(ib_uint32_t)mem_ptr; data.l_key = ibu->lkey; work_req.dest_address = 0; work_req.dest_q_key = 0; work_req.dest_qpn = 0; /*var.m_dest_qp_num; // not needed */ work_req.eecn = 0; work_req.ethertype = 0; work_req.fence_f = 0; work_req.immediate_data = 0; work_req.immediate_data_f = 0; work_req.op_type = OP_SEND; work_req.remote_addr.va = 0; work_req.remote_addr.key = 0; work_req.se_f = 0; work_req.sg_list = sg_list; work_req.signaled_f = 0; /* store the VC ptr and the mem ptr in the work id */ ((ibu_work_id_handle_t*)&work_req.work_req_id)->data.ptr = (ib_uint32_t)ibu; ((ibu_work_id_handle_t*)&work_req.work_req_id)->data.mem = (ib_uint32_t)mem_ptr; /* printf("ib_post_send_req_us %d\n", s_cur_send++); g_num_send_posted++; */ status = ib_post_send_req_us( IBU_Process.hca_handle, ibu->qp_handle, &work_req); if (status != IBU_SUCCESS) { printf("Error: failed to post ib send, status = %d, %s\n", status, iba_errstr(status)); return status; } buf = (char*)buf + length; } return IBU_SUCCESS;}static int ibui_post_writev(ibu_t ibu, IBU_IOV *iov, int n, int (*write_progress_update)(int, void*)){ int i; for (i=0; i<n; i++) { ibui_post_write(ibu, iov[i].IBU_IOV_BUF, iov[i].IBU_IOV_LEN, NULL); } return IBU_SUCCESS;}static inline void init_state_struct(ibu_state_t *p){ /*p->set = 0;*/ p->user_ptr = NULL; p->state = 0; p->closing = FALSE; p->pending_operations = 0; p->read.total = 0; p->read.num_bytes = 0; p->read.buffer = NULL; /*p->read.iov = NULL;*/ p->read.iovlen = 0; p->read.progress_update = NULL; p->write.total = 0; p->write.num_bytes = 0; p->write.buffer = NULL; /*p->write.iov = NULL;*/ p->write.iovlen = 0; p->write.progress_update = NULL;}/* ibu functions */static BlockAllocator g_StateAllocator;int ibu_init(){ ib_uint32_t status; ib_uint32_t max_cq_entries = IBU_MAX_CQ_ENTRIES+1; ib_uint32_t attr_size; MPIDI_STATE_DECL(MPID_STATE_IBU_INIT); MPIDI_FUNC_ENTER(MPID_STATE_IBU_INIT); /*ib_init_us();*/ /* Initialize globals */ /* get a handle to the host channel adapter */ status = ib_hca_open_us(0 , &IBU_Process.hca_handle); if (status != IBU_SUCCESS) { printf("ibu_init: ib_hca_open_us failed, status %d\n", status); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return status; } /* get a protection domain handle */ status = ib_pd_allocate_us(IBU_Process.hca_handle, &IBU_Process.pd_handle); if (status != IBU_SUCCESS) { printf("ibu_init: ib_pd_allocate_us failed, status %d\n", status); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return status; } /* get a completion queue domain handle */ status = ib_cqd_create_us(IB_Process.hca_handle, &IB_Process.cqd_handle);#if 0 /* for some reason this function fails when it really is ok */ if (status != IBU_SUCCESS) { printf("ib_init: ib_cqd_create_us failed, status %d\n", status); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return status; }#endif#if 0 /* create the completion queue */ status = ib_cq_create_us(IBU_Process.hca_handle, IBU_Process.cqd_handle, &max_cq_entries, &IBU_Process.cq_handle, NULL); if (status != IBU_SUCCESS) { printf("ibu_init: ib_cq_create_us failed, error %d\n", status); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return -1; }#endif /* get the lid */ attr_size = 0; status = ib_hca_query_us(IBU_Process.hca_handle, NULL, HCA_QUERY_HCA_STATIC | HCA_QUERY_PORT_INFO_DYNAMIC, &attr_size); IBU_Process.attr_p = calloc(attr_size, sizeof(ib_uint8_t)); status = ib_hca_query_us(IBU_Process.hca_handle, IBU_Process.attr_p, HCA_QUERY_HCA_STATIC | HCA_QUERY_PORT_INFO_DYNAMIC, &attr_size); if (status != IBU_SUCCESS) { printf("ibu_init: ib_hca_query_us(HCA_QUERY_HCA_STATIC) failed, status %d\n", status); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return status; } IBU_Process.lid = IBU_Process.attr_p->port_dynamic_info_p->lid; /* non infiniband initialization */ g_StateAllocator = BlockAllocInit(sizeof(ibu_state_t), 1000, 500, malloc, free); MPIDI_FUNC_EXIT(MPID_STATE_IBU_INIT); return IBU_SUCCESS;}int ibu_finalize(){ MPIDI_STATE_DECL(MPID_STATE_IBU_FINALIZE); MPIDI_FUNC_ENTER(MPID_STATE_IBU_FINALIZE); /*ib_release_us();*/ BlockAllocFinalize(&g_StateAllocator); MPIDI_FUNC_EXIT(MPID_STATE_IBU_FINALIZE); return IBU_SUCCESS;}int ibu_create_set(ibu_set_t *set){ ib_uint32_t status; ib_uint32_t max_cq_entries = IBU_MAX_CQ_ENTRIES+1; MPIDI_STATE_DECL(MPID_STATE_IBU_CREATE_SET); MPIDI_FUNC_ENTER(MPID_STATE_IBU_CREATE_SET); /* create the completion queue */ status = ib_cq_create_us( IBU_Process.hca_handle, IBU_Process.cqd_handle, &max_cq_entries, set, NULL); if (status != IBU_SUCCESS) { printf("ibu_init: ib_cq_create_us failed, error %d\n", status); } MPIDI_FUNC_EXIT(MPID_STATE_IBU_CREATE_SET); return status;}int ibu_destroy_set(ibu_set_t set){ ib_uint32_t status; MPIDI_STATE_DECL(MPID_STATE_IBU_DESTROY_SET); MPIDI_FUNC_ENTER(MPID_STATE_IBU_DESTROY_SET); status = ib_cq_destroy_us(IBU_Process.hca_handle, set); MPIDI_FUNC_EXIT(MPID_STATE_IBU_DESTROY_SET); return status;}/*int ibu_listen(ibu_set_t set, void * user_ptr, int *port, ibu_t *listener){ MPIDI_STATE_DECL(MPID_STATE_IBU_LISTEN); MPIDI_FUNC_ENTER(MPID_STATE_IBU_LISTEN); MPIDI_FUNC_EXIT(MPID_STATE_IBU_LISTEN); return IBU_SUCCESS;}int ibu_post_connect(ibu_set_t set, void * user_ptr, char *host, int port, ibu_t *connected){ MPIDI_STATE_DECL(MPID_STATE_IBU_POST_CONNECT); MPIDI_FUNC_ENTER(MPID_STATE_IBU_POST_CONNECT); MPIDI_FUNC_EXIT(MPID_STATE_IBU_POST_CONNECT); return IBU_SUCCESS;}int ibu_accept(ibu_set_t set, void * user_ptr, ibu_t listener, ibu_t *accepted){ MPIDI_STATE_DECL(MPID_STATE_IBU_ACCEPT); MPIDI_FUNC_ENTER(MPID_STATE_IBU_ACCEPT); MPIDI_FUNC_EXIT(MPID_STATE_IBU_ACCEPT); return IBU_SUCCESS;}int ibu_post_close(ibu_t ibu){ MPIDI_STATE_DECL(MPID_STATE_IBU_POST_CLOSE); MPIDI_FUNC_ENTER(MPID_STATE_IBU_POST_CLOSE); ibu->closing = TRUE; if (ibu->pending_operations == 0) { } MPIDI_FUNC_EXIT(MPID_STATE_IBU_POST_CLOSE); return IBU_SUCCESS;}*/int ibu_wait(ibu_set_t set, int millisecond_timeout, ibu_wait_t *out){ ib_uint32_t status; ib_work_completion_t completion_data; void *mem_ptr; ibu_t ibu; /* int error; DWORD num_bytes; ibu_state_t *ibu; */ MPIDI_STATE_DECL(MPID_STATE_IBU_WAIT); MPIDI_FUNC_ENTER(MPID_STATE_IBU_WAIT); for (;;) { status = ib_completion_poll_us( IB_Process.hca_handle, set, &completion_data); if (status == IBA_CQ_EMPTY) { /* ibu_wait polls until there is something in the queue */ /* or the timeout has expired */ continue; } if (status != IBA_OK) { printf("error: ib_completion_poll_us did not return IBA_OK\n"); return IBU_FAIL; } if (completion_data.status != IB_COMP_ST_SUCCESS) { printf("error: status = %d != IB_COMP_ST_SUCCESS, %s\n",
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -