⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ibwrapper.c

📁 samba最新软件
💻 C
📖 第 1 页 / 共 3 页
字号:
	opts->max_send_wr = IBW_MAX_SEND_WR;	opts->max_recv_wr = IBW_MAX_RECV_WR;	opts->recv_bufsize = IBW_RECV_BUFSIZE;	opts->recv_threshold = IBW_RECV_THRESHOLD;	for(i=0; i<nattr; i++) {		name = attr[i].name;		value = attr[i].value;		assert(name!=NULL && value!=NULL);		if (strcmp(name, "max_send_wr")==0)			opts->max_send_wr = atoi(value);		else if (strcmp(name, "max_recv_wr")==0)			opts->max_recv_wr = atoi(value);		else if (strcmp(name, "recv_bufsize")==0)			opts->recv_bufsize = atoi(value);		else if (strcmp(name, "recv_threshold")==0)			opts->recv_threshold = atoi(value);		else {			sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);			return -1;		}	}	return 0;}struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,	void *ctx_userdata,	ibw_connstate_fn_t ibw_connstate,	ibw_receive_fn_t ibw_receive,	struct event_context *ectx){	struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx);	struct ibw_ctx_priv *pctx;	int	rc;	DEBUG(10, ("ibw_init(ctx_userdata: %p, ectx: %p)\n", ctx_userdata, ectx));	/* initialize basic data structures */	memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE);	assert(ctx!=NULL);	ibw_lasterr[0] = '\0';	talloc_set_destructor(ctx, ibw_ctx_destruct);	ctx->ctx_userdata = ctx_userdata;	pctx = talloc_zero(ctx, struct ibw_ctx_priv);	talloc_set_destructor(pctx, ibw_ctx_priv_destruct);	ctx->internal = (void *)pctx;	assert(pctx!=NULL);	pctx->connstate_func = ibw_connstate;	pctx->receive_func = ibw_receive;	pctx->ectx = ectx;	/* process attributes */	if (ibw_process_init_attrs(attr, nattr, &pctx->opts))		goto cleanup;	/* init cm */	pctx->cm_channel = rdma_create_event_channel();	if (!pctx->cm_channel) {		sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", errno);		goto cleanup;	}	pctx->cm_channel_event = event_add_fd(pctx->ectx, pctx,		pctx->cm_channel->fd, EVENT_FD_READ, ibw_event_handler_cm, ctx);#if RDMA_USER_CM_MAX_ABI_VERSION >= 2	rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, ctx, RDMA_PS_TCP);#else	rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, ctx);#endif	if (rc) {		rc = errno;		sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);		goto cleanup;	}	DEBUG(10, ("created cm_id %p\n", pctx->cm_id));	pctx->pagesize = sysconf(_SC_PAGESIZE);	return ctx;	/* don't put code here */cleanup:	DEBUG(0, (ibw_lasterr));	if (ctx)		talloc_free(ctx);	return NULL;}int ibw_stop(struct ibw_ctx *ctx){	struct ibw_ctx_priv *pctx = (struct ibw_ctx_priv *)ctx->internal;	struct ibw_conn *p;	DEBUG(10, ("ibw_stop\n"));	for(p=ctx->conn_list; p!=NULL; p=p->next) {		if (ctx->state==IBWC_ERROR || ctx->state==IBWC_CONNECTED) {			if (ibw_disconnect(p))				return -1;		}	}	ctx->state = IBWS_STOPPED;	pctx->connstate_func(ctx, NULL);	return 0;}int ibw_bind(struct ibw_ctx *ctx, struct sockaddr_in *my_addr){	struct ibw_ctx_priv *pctx = (struct ibw_ctx_priv *)ctx->internal;	int	rc;	DEBUG(10, ("ibw_bind: addr=%s, port=%u\n",		inet_ntoa(my_addr->sin_addr), ntohs(my_addr->sin_port)));	rc = rdma_bind_addr(pctx->cm_id, (struct sockaddr *) my_addr);	if (rc) {		sprintf(ibw_lasterr, "rdma_bind_addr error %d\n", rc);		DEBUG(0, (ibw_lasterr));		return rc;	}	DEBUG(10, ("rdma_bind_addr successful\n"));	return 0;}int ibw_listen(struct ibw_ctx *ctx, int backlog){	struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);	int	rc;	DEBUG(10, ("ibw_listen\n"));	rc = rdma_listen(pctx->cm_id, backlog);	if (rc) {		sprintf(ibw_lasterr, "rdma_listen failed: %d\n", rc);		DEBUG(0, (ibw_lasterr));		return rc;	}	return 0;}int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata){	struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);	struct rdma_conn_param	conn_param;	int	rc;	DEBUG(10, ("ibw_accept: cmid=%p\n", pconn->cm_id));	conn->conn_userdata = conn_userdata;	memset(&conn_param, 0, sizeof(struct rdma_conn_param));	conn_param.responder_resources = 1;	conn_param.initiator_depth = 1;	rc = rdma_accept(pconn->cm_id, &conn_param);	if (rc) {		sprintf(ibw_lasterr, "rdma_accept failed %d\n", rc);		DEBUG(0, (ibw_lasterr));		return -1;;	}	pconn->is_accepted = 1;	/* continued at RDMA_CM_EVENT_ESTABLISHED */	return 0;}int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata){	struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);	struct ibw_conn_priv *pconn = NULL;	int	rc;	assert(conn!=NULL);	conn->conn_userdata = conn_userdata;	pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);	DEBUG(10, ("ibw_connect: addr=%s, port=%u\n", inet_ntoa(serv_addr->sin_addr),		ntohs(serv_addr->sin_port)));	/* clean previous - probably half - initialization */	if (ibw_conn_priv_destruct(pconn)) {		DEBUG(0, ("ibw_connect/ibw_pconn_destruct failed for cm_id=%p\n", pconn->cm_id));		return -1;	}	/* init cm */#if RDMA_USER_CM_MAX_ABI_VERSION >= 2	rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);#else	rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn);#endif	if (rc) {		rc = errno;		sprintf(ibw_lasterr, "ibw_connect/rdma_create_id error %d\n", rc);		talloc_free(conn);		return -1;	}	DEBUG(10, ("ibw_connect: rdma_create_id succeeded, cm_id=%p\n", pconn->cm_id));	rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) serv_addr, 2000);	if (rc) {		sprintf(ibw_lasterr, "rdma_resolve_addr error %d\n", rc);		DEBUG(0, (ibw_lasterr));		talloc_free(conn);		return -1;	}	/* continued at RDMA_CM_EVENT_ADDR_RESOLVED */	return 0;}int ibw_disconnect(struct ibw_conn *conn){	int	rc;	struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);	DEBUG(10, ("ibw_disconnect: cmid=%p\n", pconn->cm_id));	assert(pconn!=NULL);	switch(conn->state) {	case IBWC_ERROR:		ibw_conn_priv_destruct(pconn); /* do this here right now */		break;	case IBWC_CONNECTED:		rc = rdma_disconnect(pconn->cm_id);		if (rc) {			sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc);			DEBUG(0, (ibw_lasterr));			return rc;		}		break;	default:		DEBUG(9, ("invalid state for disconnect: %d\n", conn->state));		break;	}	return 0;}int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t len){	struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);	struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);	struct ibw_wr *p = pconn->wr_list_avail;	if (p!=NULL) {		DEBUG(10, ("ibw_alloc_send_buf#1: cmid=%p, len=%d\n", pconn->cm_id, len));		DLIST_REMOVE(pconn->wr_list_avail, p);		DLIST_ADD(pconn->wr_list_used, p);		if (len <= pctx->opts.recv_bufsize) {			*buf = (void *)p->buf;		} else {			p->buf_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large);			if (p->buf_large==NULL) {				sprintf(ibw_lasterr, "ibw_alloc_mr#1 failed\n");				goto error;			}			*buf = (void *)p->buf_large;		}		/* p->wr_id is already filled in ibw_init_memory */	} else {		DEBUG(10, ("ibw_alloc_send_buf#2: cmid=%p, len=%d\n", pconn->cm_id, len));		/* not optimized */		p = pconn->extra_avail;		if (!p) {			p = pconn->extra_avail = talloc_zero(pconn, struct ibw_wr);			talloc_set_destructor(p, ibw_wr_destruct);			if (p==NULL) {				sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)\n", pconn->extra_max);				goto error;			}			p->wr_id = pctx->opts.max_send_wr + pconn->extra_max;			pconn->extra_max++;			switch(pconn->extra_max) {				case 1: DEBUG(2, ("warning: queue performed\n")); break;				case 10: DEBUG(0, ("warning: queue reached 10\n")); break;				case 100: DEBUG(0, ("warning: queue reached 100\n")); break;				case 1000: DEBUG(0, ("warning: queue reached 1000\n")); break;				default: break;			}		}		p->buf_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large);		if (p->buf_large==NULL) {			sprintf(ibw_lasterr, "ibw_alloc_mr#2 failed\n");			goto error;		}		*buf = (void *)p->buf_large;		DLIST_REMOVE(pconn->extra_avail, p);		/* we don't have prepared index for this, so that		 * we will have to find this by wr_id later on */		DLIST_ADD(pconn->extra_sent, p);	}	*key = (void *)p;	return 0;error:	DEBUG(0, ("ibw_alloc_send_buf error: %s", ibw_lasterr));	return -1;}static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, uint32_t len){	struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);	struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);	int	rc;	/* can we send it right now? */	if (pconn->wr_sent<pctx->opts.max_send_wr) {		struct ibv_send_wr *bad_wr;		struct ibv_sge list = {			.addr 	= (uintptr_t)buf,			.length = len,			.lkey 	= pconn->mr_send->lkey		};		struct ibv_send_wr wr = {			.wr_id 	    = p->wr_id + pctx->opts.max_recv_wr,			.sg_list    = &list,			.num_sge    = 1,			.opcode     = IBV_WR_SEND,			.send_flags = IBV_SEND_SIGNALED,		};		if (p->buf_large==NULL) {			DEBUG(10, ("ibw_send#normal(cmid: %p, wrid: %u, n: %d)\n",				pconn->cm_id, (uint32_t)wr.wr_id, len));		} else {			DEBUG(10, ("ibw_send#large(cmid: %p, wrid: %u, n: %d)\n",				pconn->cm_id, (uint32_t)wr.wr_id, len));			list.lkey = p->mr_large->lkey;		}		rc = ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);		if (rc) {			sprintf(ibw_lasterr, "ibv_post_send error %d (%d)\n",				rc, pconn->wr_sent);			goto error;		}		pconn->wr_sent++;		return rc;	} /* else put the request into our own queue: */	DEBUG(10, ("ibw_send#queued(cmid: %p, len: %u)\n", pconn->cm_id, len));	/* TODO: clarify how to continue when state==IBWC_STOPPED */	/* to be sent by ibw_wc_send */	/* regardless "normal" or [a part of] "large" packet */	if (!p->queued_ref_cnt) {		DLIST_ADD_END2(pconn->queue, p, struct ibw_wr *,			qprev, qnext); /* TODO: optimize */		p->queued_msg = buf;	}	p->queued_ref_cnt++;	p->queued_rlen = len; /* last wins; see ibw_wc_send */	return 0;error:	DEBUG(0, (ibw_lasterr));	return -1;}int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len){	struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);	struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);	int	rc;	assert(len>=sizeof(uint32_t));	assert((*((uint32_t *)buf)==len)); /* TODO: htonl */	if (len > pctx->opts.recv_bufsize) {		struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);		int	rlen = len;		char	*packet = (char *)buf;		uint32_t	recv_bufsize = pctx->opts.recv_bufsize;		DEBUG(10, ("ibw_send#frag(cmid: %p, buf: %p, len: %u)\n",			pconn->cm_id, buf, len));		/* single threaded => no race here: */		assert(p->ref_cnt==0);		while(rlen > recv_bufsize) {			rc = ibw_send_packet(conn, packet, p, recv_bufsize);			if (rc)				return rc;			packet += recv_bufsize;			rlen -= recv_bufsize;			p->ref_cnt++; /* not good to have it in ibw_send_packet */		}		if (rlen) {			rc = ibw_send_packet(conn, packet, p, rlen);			p->ref_cnt++; /* not good to have it in ibw_send_packet */		}		p->ref_cnt--; /* for the same handling */	} else {		assert(p->ref_cnt==0);		assert(p->queued_ref_cnt==0);		rc = ibw_send_packet(conn, buf, p, len);	}	return rc;}int ibw_cancel_send_buf(struct ibw_conn *conn, void *buf, void *key){	struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);	struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);	struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);	assert(p!=NULL);	assert(buf!=NULL);	assert(conn!=NULL);	if (p->buf_large!=NULL)		ibw_free_mr(&p->buf_large, &p->mr_large);	/* parallel case */	if (p->wr_id < pctx->opts.max_send_wr) {		DEBUG(10, ("ibw_cancel_send_buf#1 %u", (int)p->wr_id));		DLIST_REMOVE(pconn->wr_list_used, p);		DLIST_ADD(pconn->wr_list_avail, p);	} else { /* "extra" packet */		DEBUG(10, ("ibw_cancel_send_buf#2 %u", (int)p->wr_id));		DLIST_REMOVE(pconn->extra_sent, p);		DLIST_ADD(pconn->extra_avail, p);	}	return 0;}const char *ibw_getLastError(void){	return ibw_lasterr;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -