📄 ibwrapper.c
字号:
assert(cma_id->context!=NULL); conn = talloc_get_type(cma_id->context, struct ibw_conn); rc = ibw_manage_connect(conn); if (rc) goto error; break; case RDMA_CM_EVENT_CONNECT_REQUEST: DEBUG(11, ("RDMA_CM_EVENT_CONNECT_REQUEST\n")); ctx->state = IBWS_CONNECT_REQUEST; conn = ibw_conn_new(ctx, ctx); pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); pconn->cm_id = cma_id; /* !!! event will be freed but id not */ cma_id->context = (void *)conn; DEBUG(10, ("pconn->cm_id %p\n", pconn->cm_id)); if (ibw_setup_cq_qp(conn)) goto error; conn->state = IBWC_INIT; pctx->connstate_func(ctx, conn); /* continued at ibw_accept when invoked by the func above */ if (!pconn->is_accepted) { rc = rdma_reject(cma_id, NULL, 0); if (rc) DEBUG(0, ("rdma_reject failed with rc=%d\n", rc)); talloc_free(conn); DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id)); } /* TODO: clarify whether if it's needed by upper layer: */ ctx->state = IBWS_READY; pctx->connstate_func(ctx, NULL); /* NOTE: more requests can arrive until RDMA_CM_EVENT_ESTABLISHED ! */ break; case RDMA_CM_EVENT_ESTABLISHED: /* expected after ibw_accept and ibw_connect[not directly] */ DEBUG(1, ("ESTABLISHED (conn: %p)\n", cma_id->context)); conn = talloc_get_type(cma_id->context, struct ibw_conn); assert(conn!=NULL); /* important assumption */ DEBUG(10, ("ibw_setup_cq_qp succeeded (cmid=%p)\n", cma_id)); /* client conn is up */ conn->state = IBWC_CONNECTED; /* both ctx and conn have changed */ pctx->connstate_func(ctx, conn); break; case RDMA_CM_EVENT_ADDR_ERROR: sprintf(ibw_lasterr, "RDMA_CM_EVENT_ADDR_ERROR, error %d\n", event->status); case RDMA_CM_EVENT_ROUTE_ERROR: sprintf(ibw_lasterr, "RDMA_CM_EVENT_ROUTE_ERROR, error %d\n", event->status); case RDMA_CM_EVENT_CONNECT_ERROR: sprintf(ibw_lasterr, "RDMA_CM_EVENT_CONNECT_ERROR, error %d\n", event->status); case RDMA_CM_EVENT_UNREACHABLE: sprintf(ibw_lasterr, "RDMA_CM_EVENT_UNREACHABLE, error %d\n", event->status); goto error; case RDMA_CM_EVENT_REJECTED: sprintf(ibw_lasterr, "RDMA_CM_EVENT_REJECTED, error %d\n", event->status); DEBUG(1, ("cm event handler: %s", ibw_lasterr)); conn = talloc_get_type(cma_id->context, struct ibw_conn); if (conn) { /* must be done BEFORE connstate */ if ((rc=rdma_ack_cm_event(event))) DEBUG(0, ("reject/rdma_ack_cm_event failed with %d\n", rc)); event = NULL; /* not to touch cma_id or conn */ conn->state = IBWC_ERROR; /* it should free the conn */ pctx->connstate_func(NULL, conn); } break; /* this is not strictly an error */ case RDMA_CM_EVENT_DISCONNECTED: DEBUG(11, ("RDMA_CM_EVENT_DISCONNECTED\n")); if ((rc=rdma_ack_cm_event(event))) DEBUG(0, ("disc/rdma_ack_cm_event failed with %d\n", rc)); event = NULL; /* don't ack more */ if (cma_id!=pctx->cm_id) { DEBUG(0, ("client DISCONNECT event cm_id=%p\n", cma_id)); conn = talloc_get_type(cma_id->context, struct ibw_conn); conn->state = IBWC_DISCONNECTED; pctx->connstate_func(NULL, conn); } break; case RDMA_CM_EVENT_DEVICE_REMOVAL: sprintf(ibw_lasterr, "cma detected device removal!\n"); goto error; default: sprintf(ibw_lasterr, "unknown event %d\n", event->event); goto error; } if (event!=NULL && (rc=rdma_ack_cm_event(event))) { sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc); goto error; } return;error: DEBUG(0, ("cm event handler: %s", ibw_lasterr)); if (event!=NULL) { if (cma_id!=NULL && cma_id!=pctx->cm_id) { conn = talloc_get_type(cma_id->context, struct ibw_conn); if (conn) { conn->state = IBWC_ERROR; pctx->connstate_func(NULL, conn); } } else { ctx->state = IBWS_ERROR; pctx->connstate_func(ctx, NULL); } if ((rc=rdma_ack_cm_event(event))!=0) { DEBUG(0, ("rdma_ack_cm_event failed with %d\n", rc)); } } return;}static void ibw_event_handler_verbs(struct event_context *ev, struct fd_event *fde, uint16_t flags, void *private_data){ struct ibw_conn *conn = talloc_get_type(private_data, struct ibw_conn); struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibv_wc wc; int rc; struct ibv_cq *ev_cq; void *ev_ctx; DEBUG(10, ("ibw_event_handler_verbs(%u)\n", (uint32_t)flags)); /* TODO: check whether if it's good to have more channels here... */ rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx); if (rc) { sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc); goto error; } if (ev_cq != pconn->cq) { sprintf(ibw_lasterr, "ev_cq(%p) != pconn->cq(%p)\n", ev_cq, pconn->cq); goto error; } rc = ibv_req_notify_cq(pconn->cq, 0); if (rc) { sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc); goto error; } while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) { if (wc.status) { sprintf(ibw_lasterr, "cq completion failed status=%d, opcode=%d, rc=%d\n", wc.status, wc.opcode, rc); goto error; } switch(wc.opcode) { case IBV_WC_SEND: DEBUG(10, ("send completion\n")); if (ibw_wc_send(conn, &wc)) goto error; break; case IBV_WC_RDMA_WRITE: DEBUG(10, ("rdma write completion\n")); break; case IBV_WC_RDMA_READ: DEBUG(10, ("rdma read completion\n")); break; case IBV_WC_RECV: DEBUG(10, ("recv completion\n")); if (ibw_wc_recv(conn, &wc)) goto error; break; default: sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode); goto error; } } if (rc!=0) { sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc); goto error; } ibv_ack_cq_events(pconn->cq, 1); return;error: ibv_ack_cq_events(pconn->cq, 1); DEBUG(0, (ibw_lasterr)); if (conn->state!=IBWC_ERROR) { conn->state = IBWC_ERROR; pctx->connstate_func(NULL, conn); }}static int ibw_process_queue(struct ibw_conn *conn){ struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct ibw_ctx_priv *pctx; struct ibw_wr *p; int rc; uint32_t msg_size; if (pconn->queue==NULL) return 0; /* NOP */ p = pconn->queue; /* we must have at least 1 fragment to send */ assert(p->queued_ref_cnt>0); p->queued_ref_cnt--; pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen; assert(p->queued_msg!=NULL); assert(msg_size!=0); DEBUG(10, ("ibw_process_queue refcnt=%d msgsize=%u\n", p->queued_ref_cnt, msg_size)); rc = ibw_send_packet(conn, p->queued_msg, p, msg_size); /* was this the last fragment? */ if (p->queued_ref_cnt) { p->queued_msg += pctx->opts.recv_bufsize; } else { DLIST_REMOVE2(pconn->queue, p, qprev, qnext); p->queued_msg = NULL; } return rc;}static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc){ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct ibw_wr *p; int send_index; DEBUG(10, ("ibw_wc_send(cmid: %p, wr_id: %u, bl: %u)\n", pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len)); assert(pconn->cm_id->qp->qp_num==wc->qp_num); assert(wc->wr_id >= pctx->opts.max_recv_wr); send_index = wc->wr_id - pctx->opts.max_recv_wr; pconn->wr_sent--; if (send_index < pctx->opts.max_send_wr) { DEBUG(10, ("ibw_wc_send#1 %u\n", (int)wc->wr_id)); p = pconn->wr_index[send_index]; if (p->buf_large!=NULL) { if (p->ref_cnt) { /* awaiting more of it... */ p->ref_cnt--; } else { ibw_free_mr(&p->buf_large, &p->mr_large); DLIST_REMOVE(pconn->wr_list_used, p); DLIST_ADD(pconn->wr_list_avail, p); } } else { /* nasty - but necessary */ DLIST_REMOVE(pconn->wr_list_used, p); DLIST_ADD(pconn->wr_list_avail, p); } } else { /* "extra" request - not optimized */ DEBUG(10, ("ibw_wc_send#2 %u\n", (int)wc->wr_id)); for(p=pconn->extra_sent; p!=NULL; p=p->next) if ((p->wr_id + pctx->opts.max_recv_wr)==(int)wc->wr_id) break; if (p==NULL) { sprintf(ibw_lasterr, "failed to find wr_id %d\n", (int)wc->wr_id); return -1; } if (p->ref_cnt) { p->ref_cnt--; } else { ibw_free_mr(&p->buf_large, &p->mr_large); DLIST_REMOVE(pconn->extra_sent, p); DLIST_ADD(pconn->extra_avail, p); } } return ibw_process_queue(conn);}static int ibw_append_to_part(struct ibw_conn_priv *pconn, struct ibw_part *part, char **pp, uint32_t add_len, int info){ DEBUG(10, ("ibw_append_to_part: cmid=%p, (bs=%u, len=%u, tr=%u), al=%u, i=%u\n", pconn->cm_id, part->bufsize, part->len, part->to_read, add_len, info)); /* allocate more if necessary - it's an "evergrowing" buffer... */ if (part->len + add_len > part->bufsize) { if (part->buf==NULL) { assert(part->len==0); part->buf = talloc_size(pconn, add_len); if (part->buf==NULL) { sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n", add_len, info); return -1; } part->bufsize = add_len; } else { part->buf = talloc_realloc_size(pconn, part->buf, part->len + add_len); if (part->buf==NULL) { sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n", part->len, add_len, info); return -1; } } part->bufsize = part->len + add_len; } /* consume pp */ memcpy(part->buf + part->len, *pp, add_len); *pp += add_len; part->len += add_len; part->to_read -= add_len; return 0;}static int ibw_wc_mem_threshold(struct ibw_conn_priv *pconn, struct ibw_part *part, uint32_t threshold){ DEBUG(10, ("ibw_wc_mem_threshold: cmid=%p, (bs=%u, len=%u, tr=%u), thr=%u\n", pconn->cm_id, part->bufsize, part->len, part->to_read, threshold)); if (part->bufsize > threshold) { DEBUG(3, ("ibw_wc_mem_threshold: cmid=%p, %u > %u\n", pconn->cm_id, part->bufsize, threshold)); talloc_free(part->buf); part->buf = talloc_size(pconn, threshold); if (part->buf==NULL) { sprintf(ibw_lasterr, "talloc_size failed\n"); return -1; } part->bufsize = threshold; } return 0;}static int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc){ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); struct ibw_part *part = &pconn->part; char *p; uint32_t remain = wc->byte_len; DEBUG(10, ("ibw_wc_recv: cmid=%p, wr_id: %u, bl: %u\n", pconn->cm_id, (uint32_t)wc->wr_id, remain)); assert(pconn->cm_id->qp->qp_num==wc->qp_num); assert((int)wc->wr_id < pctx->opts.max_recv_wr); assert(wc->byte_len <= pctx->opts.recv_bufsize); p = pconn->buf_recv + ((int)wc->wr_id * pctx->opts.recv_bufsize); while(remain) { /* here always true: (part->len!=0 && part->to_read!=0) || (part->len==0 && part->to_read==0) */ if (part->len) { /* is there a partial msg to be continued? */ int read_len = (part->to_read<=remain) ? part->to_read : remain; if (ibw_append_to_part(pconn, part, &p, read_len, 421)) goto error; remain -= read_len; if (part->len<=sizeof(uint32_t) && part->to_read==0) { assert(part->len==sizeof(uint32_t)); /* set it again now... */ part->to_read = *((uint32_t *)(part->buf)); /* TODO: ntohl */ if (part->to_read<sizeof(uint32_t)) { sprintf(ibw_lasterr, "got msglen=%u #2\n", part->to_read); goto error; } part->to_read -= sizeof(uint32_t); /* it's already read */ } if (part->to_read==0) { pctx->receive_func(conn, part->buf, part->len); part->len = 0; /* tells not having partial data (any more) */ if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold)) goto error; } } else { if (remain>=sizeof(uint32_t)) { uint32_t msglen = *(uint32_t *)p; /* TODO: ntohl */ if (msglen<sizeof(uint32_t)) { sprintf(ibw_lasterr, "got msglen=%u\n", msglen); goto error; } /* mostly awaited case: */ if (msglen<=remain) { pctx->receive_func(conn, p, msglen); p += msglen; remain -= msglen; } else { part->to_read = msglen; /* part->len is already 0 */ if (ibw_append_to_part(pconn, part, &p, remain, 422)) goto error; remain = 0; /* to be continued ... */ /* part->to_read > 0 here */ } } else { /* edge case: */ part->to_read = sizeof(uint32_t); /* part->len is already 0 */ if (ibw_append_to_part(pconn, part, &p, remain, 423)) goto error; remain = 0; /* part->to_read > 0 here */ } } } /* <remain> is always decreased at least by 1 */ if (ibw_refill_cq_recv(conn)) goto error; return 0;error: DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr)); return -1;}static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts){ int i; const char *name, *value; DEBUG(10, ("ibw_process_init_attrs: nattr: %d\n", nattr));
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -