📄 ibwrapper.c
字号:
opts->max_send_wr = IBW_MAX_SEND_WR; opts->max_recv_wr = IBW_MAX_RECV_WR; opts->recv_bufsize = IBW_RECV_BUFSIZE; opts->recv_threshold = IBW_RECV_THRESHOLD; for(i=0; i<nattr; i++) { name = attr[i].name; value = attr[i].value; assert(name!=NULL && value!=NULL); if (strcmp(name, "max_send_wr")==0) opts->max_send_wr = atoi(value); else if (strcmp(name, "max_recv_wr")==0) opts->max_recv_wr = atoi(value); else if (strcmp(name, "recv_bufsize")==0) opts->recv_bufsize = atoi(value); else if (strcmp(name, "recv_threshold")==0) opts->recv_threshold = atoi(value); else { sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name); return -1; } } return 0;}struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr, void *ctx_userdata, ibw_connstate_fn_t ibw_connstate, ibw_receive_fn_t ibw_receive, struct event_context *ectx){ struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx); struct ibw_ctx_priv *pctx; int rc; DEBUG(10, ("ibw_init(ctx_userdata: %p, ectx: %p)\n", ctx_userdata, ectx)); /* initialize basic data structures */ memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE); assert(ctx!=NULL); ibw_lasterr[0] = '\0'; talloc_set_destructor(ctx, ibw_ctx_destruct); ctx->ctx_userdata = ctx_userdata; pctx = talloc_zero(ctx, struct ibw_ctx_priv); talloc_set_destructor(pctx, ibw_ctx_priv_destruct); ctx->internal = (void *)pctx; assert(pctx!=NULL); pctx->connstate_func = ibw_connstate; pctx->receive_func = ibw_receive; pctx->ectx = ectx; /* process attributes */ if (ibw_process_init_attrs(attr, nattr, &pctx->opts)) goto cleanup; /* init cm */ pctx->cm_channel = rdma_create_event_channel(); if (!pctx->cm_channel) { sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", errno); goto cleanup; } pctx->cm_channel_event = event_add_fd(pctx->ectx, pctx, pctx->cm_channel->fd, EVENT_FD_READ, ibw_event_handler_cm, ctx);#if RDMA_USER_CM_MAX_ABI_VERSION >= 2 rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, ctx, RDMA_PS_TCP);#else rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, ctx);#endif if (rc) { rc = errno; sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc); goto cleanup; } DEBUG(10, ("created cm_id %p\n", pctx->cm_id)); pctx->pagesize = sysconf(_SC_PAGESIZE); return ctx; /* don't put code here */cleanup: DEBUG(0, (ibw_lasterr)); if (ctx) talloc_free(ctx); return NULL;}int ibw_stop(struct ibw_ctx *ctx){ struct ibw_ctx_priv *pctx = (struct ibw_ctx_priv *)ctx->internal; struct ibw_conn *p; DEBUG(10, ("ibw_stop\n")); for(p=ctx->conn_list; p!=NULL; p=p->next) { if (ctx->state==IBWC_ERROR || ctx->state==IBWC_CONNECTED) { if (ibw_disconnect(p)) return -1; } } ctx->state = IBWS_STOPPED; pctx->connstate_func(ctx, NULL); return 0;}int ibw_bind(struct ibw_ctx *ctx, struct sockaddr_in *my_addr){ struct ibw_ctx_priv *pctx = (struct ibw_ctx_priv *)ctx->internal; int rc; DEBUG(10, ("ibw_bind: addr=%s, port=%u\n", inet_ntoa(my_addr->sin_addr), ntohs(my_addr->sin_port))); rc = rdma_bind_addr(pctx->cm_id, (struct sockaddr *) my_addr); if (rc) { sprintf(ibw_lasterr, "rdma_bind_addr error %d\n", rc); DEBUG(0, (ibw_lasterr)); return rc; } DEBUG(10, ("rdma_bind_addr successful\n")); return 0;}int ibw_listen(struct ibw_ctx *ctx, int backlog){ struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv); int rc; DEBUG(10, ("ibw_listen\n")); rc = rdma_listen(pctx->cm_id, backlog); if (rc) { sprintf(ibw_lasterr, "rdma_listen failed: %d\n", rc); DEBUG(0, (ibw_lasterr)); return rc; } return 0;}int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata){ struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct rdma_conn_param conn_param; int rc; DEBUG(10, ("ibw_accept: cmid=%p\n", pconn->cm_id)); conn->conn_userdata = conn_userdata; memset(&conn_param, 0, sizeof(struct rdma_conn_param)); conn_param.responder_resources = 1; conn_param.initiator_depth = 1; rc = rdma_accept(pconn->cm_id, &conn_param); if (rc) { sprintf(ibw_lasterr, "rdma_accept failed %d\n", rc); DEBUG(0, (ibw_lasterr)); return -1;; } pconn->is_accepted = 1; /* continued at RDMA_CM_EVENT_ESTABLISHED */ return 0;}int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata){ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_conn_priv *pconn = NULL; int rc; assert(conn!=NULL); conn->conn_userdata = conn_userdata; pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); DEBUG(10, ("ibw_connect: addr=%s, port=%u\n", inet_ntoa(serv_addr->sin_addr), ntohs(serv_addr->sin_port))); /* clean previous - probably half - initialization */ if (ibw_conn_priv_destruct(pconn)) { DEBUG(0, ("ibw_connect/ibw_pconn_destruct failed for cm_id=%p\n", pconn->cm_id)); return -1; } /* init cm */#if RDMA_USER_CM_MAX_ABI_VERSION >= 2 rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);#else rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn);#endif if (rc) { rc = errno; sprintf(ibw_lasterr, "ibw_connect/rdma_create_id error %d\n", rc); talloc_free(conn); return -1; } DEBUG(10, ("ibw_connect: rdma_create_id succeeded, cm_id=%p\n", pconn->cm_id)); rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) serv_addr, 2000); if (rc) { sprintf(ibw_lasterr, "rdma_resolve_addr error %d\n", rc); DEBUG(0, (ibw_lasterr)); talloc_free(conn); return -1; } /* continued at RDMA_CM_EVENT_ADDR_RESOLVED */ return 0;}int ibw_disconnect(struct ibw_conn *conn){ int rc; struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); DEBUG(10, ("ibw_disconnect: cmid=%p\n", pconn->cm_id)); assert(pconn!=NULL); switch(conn->state) { case IBWC_ERROR: ibw_conn_priv_destruct(pconn); /* do this here right now */ break; case IBWC_CONNECTED: rc = rdma_disconnect(pconn->cm_id); if (rc) { sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc); DEBUG(0, (ibw_lasterr)); return rc; } break; default: DEBUG(9, ("invalid state for disconnect: %d\n", conn->state)); break; } return 0;}int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t len){ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct ibw_wr *p = pconn->wr_list_avail; if (p!=NULL) { DEBUG(10, ("ibw_alloc_send_buf#1: cmid=%p, len=%d\n", pconn->cm_id, len)); DLIST_REMOVE(pconn->wr_list_avail, p); DLIST_ADD(pconn->wr_list_used, p); if (len <= pctx->opts.recv_bufsize) { *buf = (void *)p->buf; } else { p->buf_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large); if (p->buf_large==NULL) { sprintf(ibw_lasterr, "ibw_alloc_mr#1 failed\n"); goto error; } *buf = (void *)p->buf_large; } /* p->wr_id is already filled in ibw_init_memory */ } else { DEBUG(10, ("ibw_alloc_send_buf#2: cmid=%p, len=%d\n", pconn->cm_id, len)); /* not optimized */ p = pconn->extra_avail; if (!p) { p = pconn->extra_avail = talloc_zero(pconn, struct ibw_wr); talloc_set_destructor(p, ibw_wr_destruct); if (p==NULL) { sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)\n", pconn->extra_max); goto error; } p->wr_id = pctx->opts.max_send_wr + pconn->extra_max; pconn->extra_max++; switch(pconn->extra_max) { case 1: DEBUG(2, ("warning: queue performed\n")); break; case 10: DEBUG(0, ("warning: queue reached 10\n")); break; case 100: DEBUG(0, ("warning: queue reached 100\n")); break; case 1000: DEBUG(0, ("warning: queue reached 1000\n")); break; default: break; } } p->buf_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large); if (p->buf_large==NULL) { sprintf(ibw_lasterr, "ibw_alloc_mr#2 failed\n"); goto error; } *buf = (void *)p->buf_large; DLIST_REMOVE(pconn->extra_avail, p); /* we don't have prepared index for this, so that * we will have to find this by wr_id later on */ DLIST_ADD(pconn->extra_sent, p); } *key = (void *)p; return 0;error: DEBUG(0, ("ibw_alloc_send_buf error: %s", ibw_lasterr)); return -1;}static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, uint32_t len){ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); int rc; /* can we send it right now? */ if (pconn->wr_sent<pctx->opts.max_send_wr) { struct ibv_send_wr *bad_wr; struct ibv_sge list = { .addr = (uintptr_t)buf, .length = len, .lkey = pconn->mr_send->lkey }; struct ibv_send_wr wr = { .wr_id = p->wr_id + pctx->opts.max_recv_wr, .sg_list = &list, .num_sge = 1, .opcode = IBV_WR_SEND, .send_flags = IBV_SEND_SIGNALED, }; if (p->buf_large==NULL) { DEBUG(10, ("ibw_send#normal(cmid: %p, wrid: %u, n: %d)\n", pconn->cm_id, (uint32_t)wr.wr_id, len)); } else { DEBUG(10, ("ibw_send#large(cmid: %p, wrid: %u, n: %d)\n", pconn->cm_id, (uint32_t)wr.wr_id, len)); list.lkey = p->mr_large->lkey; } rc = ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr); if (rc) { sprintf(ibw_lasterr, "ibv_post_send error %d (%d)\n", rc, pconn->wr_sent); goto error; } pconn->wr_sent++; return rc; } /* else put the request into our own queue: */ DEBUG(10, ("ibw_send#queued(cmid: %p, len: %u)\n", pconn->cm_id, len)); /* TODO: clarify how to continue when state==IBWC_STOPPED */ /* to be sent by ibw_wc_send */ /* regardless "normal" or [a part of] "large" packet */ if (!p->queued_ref_cnt) { DLIST_ADD_END2(pconn->queue, p, struct ibw_wr *, qprev, qnext); /* TODO: optimize */ p->queued_msg = buf; } p->queued_ref_cnt++; p->queued_rlen = len; /* last wins; see ibw_wc_send */ return 0;error: DEBUG(0, (ibw_lasterr)); return -1;}int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len){ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_wr *p = talloc_get_type(key, struct ibw_wr); int rc; assert(len>=sizeof(uint32_t)); assert((*((uint32_t *)buf)==len)); /* TODO: htonl */ if (len > pctx->opts.recv_bufsize) { struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); int rlen = len; char *packet = (char *)buf; uint32_t recv_bufsize = pctx->opts.recv_bufsize; DEBUG(10, ("ibw_send#frag(cmid: %p, buf: %p, len: %u)\n", pconn->cm_id, buf, len)); /* single threaded => no race here: */ assert(p->ref_cnt==0); while(rlen > recv_bufsize) { rc = ibw_send_packet(conn, packet, p, recv_bufsize); if (rc) return rc; packet += recv_bufsize; rlen -= recv_bufsize; p->ref_cnt++; /* not good to have it in ibw_send_packet */ } if (rlen) { rc = ibw_send_packet(conn, packet, p, rlen); p->ref_cnt++; /* not good to have it in ibw_send_packet */ } p->ref_cnt--; /* for the same handling */ } else { assert(p->ref_cnt==0); assert(p->queued_ref_cnt==0); rc = ibw_send_packet(conn, buf, p, len); } return rc;}int ibw_cancel_send_buf(struct ibw_conn *conn, void *buf, void *key){ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct ibw_wr *p = talloc_get_type(key, struct ibw_wr); assert(p!=NULL); assert(buf!=NULL); assert(conn!=NULL); if (p->buf_large!=NULL) ibw_free_mr(&p->buf_large, &p->mr_large); /* parallel case */ if (p->wr_id < pctx->opts.max_send_wr) { DEBUG(10, ("ibw_cancel_send_buf#1 %u", (int)p->wr_id)); DLIST_REMOVE(pconn->wr_list_used, p); DLIST_ADD(pconn->wr_list_avail, p); } else { /* "extra" packet */ DEBUG(10, ("ibw_cancel_send_buf#2 %u", (int)p->wr_id)); DLIST_REMOVE(pconn->extra_sent, p); DLIST_ADD(pconn->extra_avail, p); } return 0;}const char *ibw_getLastError(void){ return ibw_lasterr;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -