📄 fmgm.c
字号:
{ inodeid = j; break; } } MYASSERT( inodeid >= 0, ("Can't find host #%d \"%s\" in GM tables!", i,peer->hostname) );if(gmfmdbg>=1){printf("GMInterfaceID=%3d Node[%d]=\"%s\"\n",inodeid,i,peer->hostname);fflush(stdout);} peer->inodeid = inodeid; peer->iportnum = 2; /*XXX*/ } gm->maxmsglen = sizeof(GMMsg); gm->maxmsgszlog = gm_min_size_for_length(gm->maxmsglen);if(gmfmdbg>=1){printf("GM: Min size for msg len %d(0x%x) is %d(0x%x)\n",gm->maxmsglen,gm->maxmsglen,gm->maxmsgszlog,gm->maxmsgszlog);fflush(stdout);} status=gm_set_acceptable_sizes(gm->port,GM_LOW_PRIORITY,1<<gm->maxmsgszlog); ON_ERR( ("Couldn't set msg acceptable sizes") ); gm->orig.nst = gm->curr.nst = gm_num_send_tokens( gm->port );if(gmfmdbg>=1){printf("gm_num_send_tokens()=%d\n",gm->orig.nst);fflush(stdout);} gm->orig.nrt = gm->curr.nrt = gm_num_receive_tokens( gm->port );if(gmfmdbg>=1){printf("gm_num_receive_tokens()=%d\n",gm->orig.nrt);fflush(stdout);} gm->sendbufs = 0; for( i = 0; i < gm->orig.nst; i++ ) { GMMsg *sbuf = 0; sbuf = gm_dma_malloc( gm->port, gm->maxmsglen ); status = !!sbuf ? GM_SUCCESS : GM_FAILURE; ON_ERR( ("Couldn't allocate dma send buffer #%d",i) ); sbuf->next = gm->sendbufs; gm->sendbufs = sbuf; } for( i = 0; i < gm->orig.nrt; i++ ) { GMMsg *rbuf = 0; rbuf = gm_dma_malloc( gm->port, gm->maxmsglen ); status = !!rbuf ? GM_SUCCESS : GM_FAILURE; ON_ERR( ("Couldn't allocate dma receive buffer #%d",i) ); gm_provide_receive_buffer_with_tag( gm->port, rbuf, gm->maxmsgszlog, GM_LOW_PRIORITY, 0 ); }if(gmfmdbg>=1){printf("Registered %d receive bufs\n",gm->orig.nrt);fflush(stdout);} gm->bar.nrecd = 0; GM_barrier();#undef ON_ERR}/*---------------------------------------------------------------------------*/void GM_finalize( void ){ gm_close( gm->port );if(gmfmdbg>=1){printf("gm_close() done\n");fflush(stdout);} gm_finalize();if(gmfmdbg>=1){printf("gm_finalize() done\n");fflush(stdout);}}/*---------------------------------------------------------------------------*/static void send_callback( struct gm_port *port, void *context, gm_status_t status ){ MYASSERT( status == GM_SUCCESS, ("Fatal error on send");gm_perror("",status)); MYASSERT( port == gm->port, ("ports must match") ); MYASSERT( gm->curr.nst < gm->orig.nst, ("%u %u",gm->curr.nst,gm->orig.nst) ); { GMMsg *sbuf = (GMMsg *)context; sbuf->next = gm->sendbufs; gm->sendbufs = sbuf; gm->curr.nst++; }}/*---------------------------------------------------------------------------*/GM_stream *GM_begin_message( int recipient, int length, int handler, int src_id, int dest_id ){ GM_stream *sendstream = 0; MYASSERT( length <= GMMAXDATALEN, ("GM msg size %d can't exceed compiled %d",length,GMMAXDATALEN)); MYASSERT( gm->curr.nst <= gm->orig.nst, ("%u",gm->curr.nst) ); while( gm->curr.nst <= 0 ) { MYASSERT(0,("To be completed")); } MYASSERT( gm->sendbufs, ("%u",gm->curr.nst) ); { GMMsg *sbuf = gm->sendbufs; gm->sendbufs = sbuf->next; --gm->curr.nst; sbuf->internal = 0; sbuf->src_id = src_id; sbuf->dest_id = dest_id; sbuf->handler = handler; sbuf->src_pe = gm->nodeid; sbuf->dest_pe = recipient; sbuf->pieces.npieces = 0; sendstream = (GM_stream *)sbuf; } return sendstream;}/*---------------------------------------------------------------------------*/void GM_send_piece( GM_stream *sendstream, void *buffer, int length ){ GMMsg *msg = (GMMsg *)sendstream; GMMsgPieces *pcs = 0; int offset = 0; MYASSERT( msg && msg->src_pe == GM_nodeid, ("!" ) ); pcs = &msg->pieces; if( pcs->npieces > 0 ) { GMMsgPieceSpec *lastpc = &pcs->pspec[pcs->npieces-1]; offset = lastpc->offset + lastpc->length; offset = ((offset-1)/16 + 1) * 16; } MYASSERT( offset+length <= GMMAXDATALEN, ("GM msg size %d can't exceed compiled %d", offset+length, GMMAXDATALEN) ); MYASSERT( pcs->npieces < GMMAXPIECES, ("GM msg pieces can't exceed compiled %d pieces", GMMAXPIECES) ); { GMMsgPieceSpec *newspec = &pcs->pspec[pcs->npieces++]; newspec->offset = offset; newspec->length = length; memcpy( &pcs->pdata[offset], (char *)buffer, length ); }}/*---------------------------------------------------------------------------*/void GM_end_message( GM_stream *sendstream ){ GMMsg *sbuf = (GMMsg *)sendstream; unsigned dest_gmid = gm->peers[sbuf->dest_pe].inodeid; unsigned dest_gmport = gm->peers[sbuf->dest_pe].iportnum; int msglen = gm->maxmsglen - GMMAXDATALEN; GMMsgPieces *pcs = &sbuf->pieces; int databytes = 0; if( pcs->npieces > 0 ) { GMMsgPieceSpec *lastpc = &pcs->pspec[pcs->npieces-1]; databytes = lastpc->offset + lastpc->length; } msglen += databytes; (void) gm_send_with_callback( gm->port, sbuf, gm->maxmsgszlog, msglen, GM_LOW_PRIORITY, dest_gmid, dest_gmport, send_callback, sbuf );}/*---------------------------------------------------------------------------*/static int npieces_recd; /*#pieces received by user from current message*//*---------------------------------------------------------------------------*/void GM_receive( void *buffer, GM_stream *receivestream, unsigned int length ){ GMMsg *msg = (GMMsg *)receivestream; int next_piece = npieces_recd++; GMMsgPieces *pcs = 0; MYASSERT( msg && msg->dest_pe == GM_nodeid, ("!" ) ); pcs = &msg->pieces; MYASSERT( next_piece < pcs->npieces, ("Only %d pieces exist", pcs->npieces) ); { GMMsgPieceSpec *next_spec = &pcs->pspec[next_piece]; int offset = next_spec->offset; MYASSERT( length <= next_spec->length, ("Only %d < %u bytes in piece", next_spec->length, length) ); memcpy( (char *)buffer, &pcs->pdata[offset], length ); }}/*---------------------------------------------------------------------------*/int GM_extract( unsigned int maxbytes ){ int nbytes = 0;if(gmfmdbg>=5){printf("GM_extract() started\n");fflush(stdout);} while( nbytes < maxbytes && gm_receive_pending( gm->port ) ) { union gm_recv_event *e = 0; struct gm_recv *recv = 0;if(gmfmdbg>=2){printf("Doing blocking receive...\n");fflush(stdout);} e = gm_receive( gm->port );if(gmfmdbg>=2){printf("Done receive.\n");fflush(stdout);} recv = &e->recv; switch( gm_ntohc(recv->type) ) { case GM_RECV_EVENT: { int len = gm_ntohl(recv->length); GMMsg *msg = (GMMsg *)gm_ntohp(recv->buffer);if(gmfmdbg>=2){printf("Recd msg %p len %d!\n",msg,len);fflush(stdout);print_gmmsg(msg);} MYASSERT( 0 <= len && len <= gm->maxmsglen, ("%d",len) ); { GM_stream *gm_stream = (GM_stream *)msg; int handler = msg->handler, gm_sender = msg->src_pe; int src_fm_id = msg->src_id, dest_fm_id = msg->dest_id; npieces_recd = 0;if(gmfmdbg>=3){printf("Recd %sternal msg\n", msg->internal ? "in" : "ex");}; if( msg->internal ) { MYASSERT( fmprivcb, ("!") ); fmprivcb( handler, gm_stream, gm_sender, src_fm_id, dest_fm_id); } else { MYASSERT( fmcb, ("!") ); fmcb( handler, gm_stream, gm_sender, src_fm_id, dest_fm_id); } } nbytes += len; /*Return buffer to GM*/ gm_provide_receive_buffer_with_tag( gm->port, gm_ntohp(recv->buffer), gm_ntohc(recv->size), GM_LOW_PRIORITY, gm_ntohc(recv->tag) );if(gmfmdbg>=2){printf("Returned event %p to GM!\n",msg);fflush(stdout);} break; } default: {if(gmfmdbg>=2){printf("Recd unknown event %d!\n",recv->type);fflush(stdout);} (void) gm_unknown( gm->port, e );if(gmfmdbg>=2){printf("gm_unknown done!\n");fflush(stdout);} break; } }if(gmfmdbg>=2){printf("Retrying gm_receive_pending %d\n",nbytes);fflush(stdout);} }if(gmfmdbg>=5){printf("GM_extract()=%d\n",nbytes);fflush(stdout);} return nbytes;}/*---------------------------------------------------------------------------*/int GM_numpieces( GM_stream *gm_stream ){ GMMsg *msg = (GMMsg *)gm_stream; GMMsgPieces *pcs = &msg->pieces; return pcs->npieces;}/*---------------------------------------------------------------------------*/int GM_piecelen( GM_stream *gm_stream, int i ){ GMMsg *msg = (GMMsg *)gm_stream; GMMsgPieces *pcs = &msg->pieces; GMMsgPieceSpec *spec = 0; MYASSERT( 0 <= i && i < pcs->npieces, ("Only %d pieces", pcs->npieces) ); spec = &pcs->pspec[i]; return spec->length;}/*---------------------------------------------------------------------------*/int GM_debug_level( int level ){ int old = gmfmdbg; gmfmdbg = level; return old;}/*---------------------------------------------------------------------------*/
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -