⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xdev_mxdev_mxdevice.c

📁 MPI for java for Distributed Programming
💻 C
📖 第 1 页 / 共 3 页
字号:
  jobject staticBuffer;  jbyteArray directbuffer;  //int * sbuf_length;   int capacity ;  /* dynamic buffer declarations */  jbyteArray darr;  jbyte* dBuffer;  /* MX related declarations */  mx_return_t rc ;   mx_request_t recv_handle ;  mx_segment_t buffer_desc[1];  uint64_t match_recv, match_recv2, match_mask , proc_mask, tag_mask ;   mx_status_t recv_status;  uint32_t result;  if(tag == ANY_TAG) {    tag_mask = EMPTY ;   }  else {    tag_mask = MASK_TAG ;   }  if(any_src == 1) {      src_id = 0 ;    proc_mask = EMPTY;   }  else {    proc_mask = MASK_SRC ;   }  match_mask =   MATCH_CONTEXT | proc_mask | tag_mask ;    if(tag == ANY_TAG) { 	      match_recv = PRI_MATCH(context, src_id, 0);	      match_recv2 = SEC_MATCH(context, src_id, 0);  } else {    match_recv = PRI_MATCH(context, src_id, tag);	      match_recv2 = SEC_MATCH(context, src_id, tag);  }  //printf("src_id <%d> \n", src_id );   //printf("context <%d> \n", context);   //printf("match_mask U32 <%x> \n",MX_U32(match_mask));  //printf("match_recv U32 <%x> \n",MX_U32(match_recv));  //printf("match_mask L32 <%x> \n",MX_L32(match_mask));   //printf("match_recv L32 <%x> \n",MX_L32(match_recv));   //fflush(stdout);     /* get static buffer related stuff */  staticBuffer =           (*env)->GetObjectField(env,buf,FID_mpjbuf_Buffer_staticBuffer);  directbuffer =           (jbyteArray) (*env)->GetObjectField(env,                                          staticBuffer,                                          FID_mpjbuf_NIOBuffer_buffer);  buffer_address = (char *)(*env)->GetDirectBufferAddress(env,                            (jobject)directbuffer);  capacity = (unsigned int)                (*env)->GetIntField(env,buf,FID_mpjbuf_Buffer_capacity);   /* compose message ,sort out tag/context*/  buffer_desc[0].segment_ptr = buffer_address;   buffer_desc[0].segment_length = capacity+8 ;//+offset ..    //JRequestStruct *jreq = NULL;   //jreq = (JRequestStruct *) malloc(sizeof(JRequestStruct)) ;  //jobject globalRequestObject = (*env)->NewGlobalRef(env, req);   //jreq->jrequest = globalRequestObject ;   /* recv the message */  rc = mx_irecv(local_endpoint, buffer_desc, 1, match_recv, match_mask,		  NULL, &recv_handle);  //jreq instead of NULL   if(rc != MX_SUCCESS) {     printf("error in irecv"); fflush(stdout); 	    }  (*env)->SetLongField(env,req,reqhandleID,(jlong)recv_handle);  (*env)->SetLongField(env,req,lepID,(jlong)local_endpoint);  (*env)->SetObjectField(env,req,bufferhandleID,buf);  (*env)->SetLongField(env,req,matchrecvhandleID,(jlong)match_recv2);  (*env)->SetLongField(env,req,matchrecvmaskhandleID,(jlong)match_mask);  (*env)->SetLongField(env,req,bufferaddresshandleID,(jlong)buffer_address);  }/* * Class:     xdev_mxdev_MXDevice * Method:    nativeIprobe * Signature: (Lxdev/ProcessID;IILmpjdev/Status;II)I */JNIEXPORT jint JNICALL Java_xdev_mxdev_MXDevice_nativeIprobe  (JNIEnv *env, jobject jthis, jobject srcID, jint tag, jint context,    jobject status, jint any_src, jint isCompleted ) {  //printf("native probe method \n");	    mx_return_t rc;   mx_status_t c_status;  uint64_t result = EMPTY ;  uint64_t match_recv, proc_mask, tag_mask, match_mask ;  int src_id = ((*env)->GetIntField(env, srcID, processidID )) ;    if(tag == ANY_TAG) {    tag_mask = EMPTY ;   }  else {    tag_mask = MASK_TAG ;   }  if(any_src == 1) {      src_id = 0 ;    proc_mask = EMPTY;   }  else {    proc_mask = MASK_SRC ;   }  match_mask =   MATCH_CONTEXT | proc_mask | tag_mask ;    if(tag == ANY_TAG) { 	      match_recv = PRI_MATCH(context, src_id, 0);	    } else {    match_recv = PRI_MATCH(context, src_id, tag);	    }  //printf(" result upper (b)<%d> \n", MX_U32(result));  //printf(" result lower (b)<%d> \n", MX_L32(result));  rc = mx_iprobe(local_endpoint, 		match_recv,		match_mask,		&c_status,		&result);  //printf(" result upper (a)<%d> \n", MX_U32(result));  //printf(" result lower (a)<%d> \n", MX_L32(result));    if(result == EMPTY) {     //printf(" aint any message "); fflush(stdout);  }  else {    //printf(" yes message "); fflush(stdout);    isCompleted = 1; 	      (*env)->SetIntField(env,status, status_src_ID, 		      GET_SRC(c_status.match_info) );    (*env)->SetIntField(env,status, status_tag_ID, 		      GET_TAG(c_status.match_info) );    (*env)->SetIntField(env,status, countInBytesID , 		      c_status.msg_length - 16); //-8 is offset ...  }  return isCompleted ; }/* * Class:     xdev_mxdev_MXDevice * Method:    nativeProbe * Signature: (Lxdev/ProcessID;IILmpjdev/Status;I)V */JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeProbe  (JNIEnv *env, jobject jthis, jobject srcID, jint tag, jint context,    jobject status, jint any_src) {  //printf("native iprobe method \n");	    mx_return_t rc;   mx_status_t c_status;  uint64_t result ;  uint64_t match_recv, proc_mask, tag_mask, match_mask ;  int src_id = ((*env)->GetIntField(env, srcID, processidID )) ;    if(tag == ANY_TAG) {    tag_mask = EMPTY ;   }  else {    tag_mask = MASK_TAG ;   }  if(any_src == 1) {      src_id = 0 ;    proc_mask = EMPTY;   }  else {    proc_mask = MASK_SRC ;   }  match_mask =   MATCH_CONTEXT | proc_mask | tag_mask ;    if(tag == ANY_TAG) { 	      match_recv = PRI_MATCH(context, src_id, 0);	      //match_recv2 = SEC_MATCH(context, src_id, 0);  } else {    match_recv = PRI_MATCH(context, src_id, tag);	      //match_recv2 = SEC_MATCH(context, src_id, tag);  }  rc = mx_probe(local_endpoint, 		MX_INFINITE,   		match_recv,		match_mask,		&c_status,		&result);  (*env)->SetIntField(env,status, status_src_ID, 		      GET_SRC(c_status.match_info) );  (*env)->SetIntField(env,status, status_tag_ID, 		      GET_TAG(c_status.match_info) );  (*env)->SetIntField(env,status, countInBytesID , 		      c_status.msg_length - 16); //-8 is offset ...} /* * Class:     xdev_mxdev_MXDevice * Method:    nativePeek * Signature: (Lmpjdev/Status;)J */JNIEXPORT jlong JNICALL Java_xdev_mxdev_MXDevice_nativePeek  (JNIEnv *env, jclass jthis, jobject status) {  //printf(" nativePeek \n");fflush(stdout);   mx_request_t peekedRequest ;   mx_return_t rc ;   mx_status_t status ;   uint32_t result ;   //JRequestStruct *jrequest ;   //jobject javaRequest, statusInRequest ;    rc = mx_peek(local_endpoint, MX_INFINITE, &peekedRequest, &result);   if(rc != MX_SUCCESS) {     printf(" error while peeking the message \n");   }  if(result) {    //printf(" message peeked successfully \n");  fflush(stdout);   }  /* commented because of hashtable approach ...  rc = mx_wait( local_endpoint, &peekedRequest,                   MX_INFINITE, &status, &result);   jrequest = (JRequestStruct *) status.context ;   javaRequest =  jrequest->jrequest ;   statusInRequest = (*env)->GetObjectField( env, 		  javaRequest, FID_xdev_mxdev_MXRequest_status );   //.. save javaRequest in Request's long field ..  (*env)->SetLongField(env,javaRequest, FID_xdev_mxdev_MXRequest_requestStruct, 		       (jlong)jrequest);  //printf(" returning from native peek \n"); fflush(stdout);    //jobject copiedJavaRequest = javaRequest ;   //(*env)->DeleteGlobalRef(env, javaRequest);   (*env)->SetIntField(env,statusInRequest, status_src_ID,                      GET_SRC(status.match_info) );  (*env)->SetIntField(env,statusInRequest, status_tag_ID,                      GET_TAG(status.match_info) );  (*env)->SetIntField(env,statusInRequest, countInBytesID,                      status.msg_length - 8);  printf(" length <%d> \n ",status.msg_length) ;  fflush(stdout);   return javaRequest ;   */   // .. returnthe address of request struct ..   return peekedRequest ; }/* * Class:     xdev_mxdev_MXDevice * Method:    deletePeekedRequest * Signature: (Lxdev/mxdev/MXRequest;J)V */JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_deletePeekedRequest  (JNIEnv *env, jclass jthis, jobject peekedRequest, jlong requestStruct) {   (*env)->DeleteGlobalRef(env, peekedRequest);   //JRequestStruct *jrequest = (JRequestStruct *) requestStruct ;   //free(jrequest); }/* * Class:     xdev_mxdev_MXDevice * Method:    nativePeek * Signature: (Lmpjdev/Status;)VJNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativePeek  (JNIEnv *env, jobject jthis, jobject requestObject) {  printf(" nativePeek \n");   mx_request_t request ;   mx_return_t rc ;   uint32_t result ;   rc = mx_peek( local_endpoint, MX_INFINITE, &request, &result);   if(rc == MX_SUCCESS) {     //printf(" mx_peek successful \n"); fflush(stdout);             }  if(result) {     printf(" a message has been peeked in nativeIwaitany \n"); fflush(stdout);   }     // complete comms ...  mx_status_t recv_status ;   rc = mx_wait( local_endpoint, &request, MX_INFINITE, &recv_status, &result);   JRequestStruct *jrequest = (JRequestStruct *) recv_status.context ;   jobject javaRequest =  jrequest->jrequest ;     printf("1 \n"); fflush(stdout);   jclass req_class = (*env)->GetObjectClass(env, javaRequest );    //.. get the comms ..    if(rc != MX_SUCCESS) {    printf(" error while calling mx_wait \n"); 	    }    //mx_status_t recv_status;  mx_request_t reqhandle ;  mx_endpoint_t mlep ;  jobject buffer ;  uint64_t match_recv;  uint64_t match_mask;  char * buffer_address;  mx_segment_t buffer_desc[1];    reqhandle = (mx_request_t) ((*env)->GetLongField(env, javaRequest, 			  reqhandleID )) ;  mx_request_t dreq ;   mlep = (mx_endpoint_t) ((*env)->GetLongField(env, javaRequest, 			  m_local_endpoint ));      printf("3 \n"); fflush(stdout);   match_recv = (uint64_t) ((*env)->GetLongField(env,			  javaRequest, matchrecvhandleID)) ;  match_mask = (uint64_t) 	  ((*env)->GetLongField(env, javaRequest, matchrecvmaskhandleID)) ;  buffer_address = (char *) ((*env)->GetLongField(env, 			  javaRequest, bufferaddresshandleID)) ;  buffer = ((*env)->GetObjectField(env, javaRequest,                           bufferhandleID )) ;  jclass mpjbuf_class = (*env)->GetObjectClass(env, buffer);  jbyteArray darr;  jbyte* dBuffer;  jboolean isCopy = JNI_TRUE;  char encoding = 1;   int dbuf_length ;   //.. dbuf_length ..  encoding = buffer_address[0] ;  dbuf_length = 	  (((int)(unsigned char) buffer_address[4]) << 24) |	  (((int)(unsigned char) buffer_address[5]) << 16) |	  (((int)(unsigned char) buffer_address[6]) << 8) |	  (((int)(unsigned char) buffer_address[7]) );     printf("dbuf_length [after strange] <%d> \n",dbuf_length); fflush(stdout);     //int dbuf_length = byte22int(buffer_address,0);  int sbuf_length = recv_status.msg_length - 8 ; //- offset   if(dbuf_length > 0) {     darr = (*env)->NewByteArray (env, dbuf_length);    dBuffer = (*env)->GetByteArrayElements(env, darr, &isCopy);    buffer_desc[0].segment_ptr = dBuffer;     buffer_desc[0].segment_length = dbuf_length;    //printf(" calling mx_irecv in nativeIwait \n"); fflush(stdout);         rc = mx_irecv(mlep, buffer_desc, 1, match_recv, match_mask,  		  NULL, &dreq);    if(rc != MX_SUCCESS) {       printf(" return code is not successful \n"); fflush(stdout);  	        }        rc = mx_wait(mlep, & dreq, MX_INFINITE, 		  &recv_status, &result);     if(rc != MX_SUCCESS) {       printf(" return code is not successful \n"); fflush(stdout);  	        }    //printf(" received \n"); fflush(stdout);     (*env)->SetByteArrayRegion(env,darr,0,dbuf_length,dBuffer);           jmethodID setdbuf = (*env)->GetMethodID(env, mpjbuf_class,  		       "setDynamicBuffer", "([B)V");     (*env)->CallVoidMethod(env, buffer, setdbuf, darr);  } else {    //printf(" no dynamic message \n") ;   }   jmethodID set_size = (*env)->GetMethodID(env, mpjbuf_class,  		       "setSize", "(I)V");   (*env)->CallVoidMethod(env, buffer, set_size, sbuf_length );   //printf("recvRequest wait returns \n");   (*env)->SetIntField(env,javaRequest, testcalledID, 1);   //printf("recv:req calling wait for control message \n"); fflush(stdout);  //printf("status.msg_length <%d> \n", recv_status.msg_length );   //(*env)->SetIntField(env,status, status_src_ID,  //                    GET_SRC(recv_status.match_info) );  //(*env)->SetIntField(env,status, status_tag_ID,  //                    GET_TAG(recv_status.match_info) );  //(*env)->SetIntField(env,status, countInBytesID,   //		      recv_status.msg_length - 16);//-16  //.. extract JRequest ..  //.. figure out if it is send or recv request ..   //if(recvrequest) {   //.. copy paste code from iwait of recvrequest ..  //}    //if(sendrequest) {   //.. copy paste code from iwait of sendrequest ..	    //}}*//* * Class:     xdev_mxdev_MXDevice * Method:    nativeFinish * Signature: ()V */ JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeFinish  (JNIEnv *env, jobject obj1) {  //printf("native:finish process <%d> starting finish \n",myRank );   //fflush(stdout);  //sleep(5);  mx_return_t rc;  //printf("native:finish process <%d> will close endpoint \n",myRank );   //fflush(stdout);   //fflush(stdout);  rc = mx_close_endpoint(local_endpoint);   if(rc != MX_SUCCESS) {     //printf("error in nativeFinish "); fflush(stdout); 	    }  //printf("native:finish process <%d> closed endpoint \n",myRank );   //fflush(stdout);   //fflush(stdout);  free(peer_endpoints);   peer_endpoints = NULL;   if(rc == MX_SUCCESS) {     //printf("closed endpoint \n");  }  //printf("native:finish process <%d> calling finalize \n",myRank ) ;   //fflush(stdout);      rc = mx_finalize();    if(rc == MX_SUCCESS) {    //printf("native:finish process <%d> called finalize \n", myRank );     //fflush(stdout);   }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -