⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xdev_mxdev_mxdevice.c

📁 MPI for java for Distributed Programming
💻 C
📖 第 1 页 / 共 3 页
字号:
/** The MIT License 	Copyright (c) 2005 - 2007   1. Distributed Systems Group, University of Portsmouth (2005)   2. Aamir Shafi (2005 - 2007)   3. Bryan Carpenter (2005 - 2007)   4. Mark Baker (2005 - 2007)*/	#include "myriexpress.h"#include <stdio.h>#include "xdev_mxdev_MXDevice.h"#include "jni.h"#include "mxdev_const.h" //***********************************************************//** block diagram for the tag ..it is a 64 bit number     **//** <---16-----><------16------><--1--><--------31------->** //** ------------------------------------------------------**//** |  context  |     src      |  PRI  |      tag        |**//** ------------------------------------------------------**//***********************************************************static JavaVM *jvm;int myRank ;  int procs ;jclass   CL_mpjdev_Status ;jclass   CL_mpjbuf_Buffer;jclass   CL_xdev_mxdev_MXProcessID;jclass   CL_mpjbuf_NIOBuffer;jclass   CL_mpjbuf_Type;jclass   CL_xdev_ProcessID; jclass   CL_xdev_mxdev_MXRequest ; jfieldID FID_mpjbuf_Buffer_staticBuffer;jfieldID FID_mpjbuf_Buffer_dynamicBuffer;jfieldID FID_mpjbuf_Buffer_size;jfieldID FID_mpjbuf_Buffer_capacity;jfieldID FID_mpjbuf_NIOBuffer_buffer;jfieldID FID_mpjbuf_Type_code;jfieldID FID_xdev_ProcessID_uuid;jfieldID FID_xdev_mxdev_MXRequest_status ; jfieldID FID_xdev_mxdev_MXRequest_requestStruct ; jfieldID processhandleID; jfieldID processidID; jfieldID status_src_ID ; jfieldID status_tag_ID ;jfieldID countInBytesID;/* Caching of JNI stuff ..*/jint JNI_OnLoad(JavaVM *vm, void *reserved) {  JNIEnv *env;          jvm=vm;   if (JNI_OK!=(*vm)->GetEnv(vm,(void **)&env,JNI_VERSION_1_4)) {    exit(1);  }  // why am i not deleting these two global references ... (?)  CL_mpjbuf_Buffer =  (*env)->NewGlobalRef(env, 		  (*env)->FindClass(env,"mpjbuf/Buffer"));  CL_xdev_mxdev_MXProcessID =  (*env)->NewGlobalRef(env, 		  (*env)->FindClass(env,"xdev/mxdev/MXProcessID"));  CL_xdev_mxdev_MXRequest = (*env)->FindClass(env,"xdev/mxdev/MXRequest");  CL_mpjbuf_NIOBuffer =  (*env)->NewGlobalRef(env, 		  (*env)->FindClass(env,"mpjbuf/NIOBuffer"));  CL_mpjbuf_Type = (*env)->FindClass(env,"mpjbuf/Type");  CL_xdev_ProcessID = (*env)->FindClass(env,"xdev/ProcessID");  CL_mpjdev_Status = (*env)->FindClass(env,"mpjdev/Status");  FID_mpjbuf_Buffer_size = (*env)->GetFieldID(env,CL_mpjbuf_Buffer,"size","I");  FID_mpjbuf_Buffer_capacity = 	  (*env)->GetFieldID(env,CL_mpjbuf_Buffer,"capacity","I");  FID_mpjbuf_Buffer_staticBuffer =    (*env)->GetFieldID(env,CL_mpjbuf_Buffer,"staticBuffer","Lmpjbuf/RawBuffer;");  FID_mpjbuf_Buffer_dynamicBuffer =     (*env)->GetFieldID(env,CL_mpjbuf_Buffer,"dynamicBuffer","[B");  FID_mpjbuf_NIOBuffer_buffer =    (*env)->GetFieldID(env,CL_mpjbuf_NIOBuffer,"buffer","Ljava/nio/ByteBuffer;");  FID_mpjbuf_Type_code = (*env)->GetFieldID(env,CL_mpjbuf_Type,"code","I");  FID_xdev_ProcessID_uuid =      (*env)->GetFieldID(env,CL_xdev_ProcessID,"uuid","Ljava/util/UUID;");  processhandleID =     (*env)->GetFieldID(env, CL_xdev_mxdev_MXProcessID,"processHandle", "J") ;  processidID = (*env)->GetFieldID(env, CL_xdev_mxdev_MXProcessID, 		    "id", "I");  status_src_ID = (*env)->GetFieldID(env, CL_mpjdev_Status, 		    "source", "I");  status_tag_ID = (*env)->GetFieldID(env, CL_mpjdev_Status, 		    "tag", "I");  countInBytesID = (*env)->GetFieldID(env, CL_mpjdev_Status, 		    "countInBytes", "I");  FID_xdev_mxdev_MXRequest_status = (*env)->GetFieldID(env, 		  CL_xdev_mxdev_MXRequest,"status","Lmpjdev/Status;");   FID_xdev_mxdev_MXRequest_requestStruct = (*env)->GetFieldID(env, 		  CL_xdev_mxdev_MXRequest,"requestStruct","J");     if (FID_mpjbuf_Buffer_staticBuffer && FID_mpjbuf_Buffer_size \      && FID_mpjbuf_NIOBuffer_buffer && FID_mpjbuf_Buffer_capacity \      && FID_xdev_ProcessID_uuid) {    return JNI_VERSION_1_4;   } else {    {fprintf(stderr,"\n Fatal error getting FIDs"); exit(3);}  }}/* Caching of JNI stuff (done)..*/mx_endpoint_t local_endpoint; uint32_t filter = 0xcafebabe; //extern mx_endpoint_t local_endpoint;static mx_endpoint_addr_t * peer_endpoints = NULL;   /* * Class:     xdev_mxdev_MXDevice * Method:    nativeInit * Signature: ([Ljava/lang/String;I[Ljava/lang/String;[II[Lxdev/mxdev/MXProcessID;JJ)V */JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeInit  (JNIEnv *env, jobject jthis, jobjectArray argv, jint rank,     jobjectArray processNames, jintArray ranks, jint nprocs,      jobjectArray pids , jlong msb, jlong lsb) {  mx_return_t rc;  rc = mx_init();  mx_set_error_handler( MX_ERRORS_RETURN );  if(rc == MX_SUCCESS) {    //printf("mx_init called \n");  }  rc = mx_open_endpoint(MX_ANY_NIC, MX_ANY_ENDPOINT, filter, 0, 0,                       &local_endpoint);    //printf(" nic id upper <%d> \n", MX_U32(nic_id));    //printf(" nic id lower <%d> \n", MX_L32(nic_id));  if(rc == MX_SUCCESS) {     //printf("opened a local end-point \n");   }  //sleep(5);   jclass CL_java_util_UUID = (*env)->FindClass(env, "java/util/UUID");  jmethodID uuid_c =  (*env)->GetMethodID(env, CL_java_util_UUID,  		       "<init>", "(JJ)V") ;   jmethodID pid_c =  (*env)->GetMethodID(env, CL_xdev_mxdev_MXProcessID,  		       "<init>", "(Ljava/util/UUID;)V") ;   myRank = rank ;   procs = nprocs ;  peer_endpoints = (mx_endpoint_addr_t *)  	  malloc(procs*sizeof(mx_endpoint_addr_t));   //.. .. 	  uint64_t nic_id;  //printf("native init method \n"); 	      jobject pid;  int len = (*env)->GetArrayLength(env,processNames);  char** pNames = (char**)calloc(len, sizeof(char*));  int i=0;  char *pName;    for (i=0; i<len; i++) {     pName =(jstring)(*env)->GetObjectArrayElement(env,processNames,i);     pNames[i] = (*env)->GetStringUTFChars(env,pName,0);  }	  /* connect loop */   for(i=0 ; i<nprocs ; i++) {    //printf(" connecting to <%s> \n", pNames[i]);    rc = mx_hostname_to_nic_id( pNames[i] , &nic_id);    if(rc == MX_SUCCESS) {       //printf("getting nic_id from hostname \n");     }        NC:     //printf("calling connect() \n");     //printf(" nic id upper <%d> \n", MX_U32(nic_id));    //printf(" nic id lower <%d> \n", MX_L32(nic_id));    rc = mx_connect(local_endpoint, nic_id, 0, filter,   		  MX_INFINITE, &peer_endpoints [i]);    //printf("called connect() \n");    if(rc == MX_SUCCESS) {       //printf("connected to remote host <%s> \n", pNames[i]);    } else {      //printf("could not connect to <%s> \n", pNames[i]);       //printf("trying again ...");      goto NC; //need to get rid of this ..we just need a do while loop.     }      }  //fflush(stdout);  // ids set up could be done here ...  //printf("nativeIdsSetup rank <%d> of <%d> \n", myRank, procs); 	      // sending accessories  mx_segment_t buffer_desc[3];  mx_request_t send_handle[procs] ;  uint64_t send_tag ;  mx_status_t send_status ;  uint32_t result ;    // irecv accessories   mx_segment_t recv_buffer[3];  mx_request_t recv_handle ;  uint64_t match_tag;  uint64_t match_mask;  mx_status_t recv_status;  uint32_t recv_result;  buffer_desc[0].segment_ptr = &msb;   buffer_desc[0].segment_length = 64;   buffer_desc[1].segment_ptr = &lsb;   buffer_desc[1].segment_length = 64;    buffer_desc[2].segment_ptr = &rank;   buffer_desc[2].segment_length = 4;    //printf("rank <%d> \n",rank);   //this needs to be fixed ..basically use the tag macros once I have   //defined them ...  send_tag = myRank << 32; //| UINT64_C(0x0000000000000000);     for(i=0 ; i<procs ; i++) {    if( i == myRank) {       continue;     	        }     //printf("pro <%d> calling send to pro <%d> \n",     //		    myRank, i); fflush(stdout);    rc = mx_isend(local_endpoint, buffer_desc, 3, peer_endpoints[i], 		    send_tag, NULL, &send_handle[i]);     //if( rc == MX_SUCCESS ) {     //  printf("pro <%d> called isend to pro <%d> \n",     //	    myRank, i); fflush(stdout);    //}  }  //printf("pro <%d> reached A \n",myRank ); fflush(stdout);    jlong _msb, _lsb;   jint _rank ;   match_mask = UINT64_C(0x00000000ffffffff);     for(i=0 ; i<procs ; i++) {     if( i == myRank) {       continue;     	        }    recv_buffer[0].segment_ptr = &_msb;     recv_buffer[0].segment_length = 64;     recv_buffer[1].segment_ptr = &_lsb;     recv_buffer[1].segment_length = 64;      recv_buffer[2].segment_ptr = &_rank;     recv_buffer[2].segment_length = 4;          match_tag = i << 32 ;        //printf("pro <%d> receiving from process <%d> \n",     //		    myRank, i); fflush(stdout);    rc = mx_irecv(local_endpoint, recv_buffer, 3, match_tag, match_mask,		 NULL , &recv_handle);    //printf(" calling wait \n"); fflush(stdout);     rc = mx_wait(local_endpoint, & recv_handle, MX_INFINITE, 		  &recv_status, &result);    if(rc == MX_SUCCESS) {     //  printf("pro <%d> received from process <%d> \n",     //		    myRank, i); fflush(stdout);    }else {      printf("error \n");  	        }        jobject uid = (*env)->NewObject(env, CL_java_util_UUID, 	     uuid_c, _msb, _lsb);    pid = (*env)->NewObject(env, CL_xdev_mxdev_MXProcessID,  	     pid_c, uid);     (*env)->SetLongField(env,pid, processhandleID, (jlong)&peer_endpoints[i]);    (*env)->SetIntField(env,pid, processidID, _rank);    (*env)->SetObjectArrayElement(env,pids,i,pid);  }    //printf("process <%d> reached B \n",myRank ); fflush(stdout);  for(i=0 ; i<procs ; i++) {     if( i == myRank) {       continue;     	        }    //printf("pro <%d> calling send_wait to process <%d> \n",     //		    myRank, i); fflush(stdout);    mx_wait(local_endpoint, &send_handle[i], MX_INFINITE,     		  &recv_status, &recv_result);      //printf("pro <%d> called send_wait to process <%d> \n",     //		    myRank, i); fflush(stdout);  }  //a. <get objectarrayelement for this process>  pid = (*env)->GetObjectArrayElement(env,pids,myRank);  //b. <set long field for it ...>  (*env)->SetLongField(env,pid, processhandleID, 		       (jlong)&peer_endpoints[myRank]);  //c. <set integer field for it ...>  (*env)->SetIntField(env,pid, processidID, myRank);   //printf("process <%d> reached C \n",myRank ); fflush(stdout) ;}/* * Class:     xdev_mxdev_MXDevice * Method:    nativeSsend * Signature: (Lmpjbuf/Buffer;Lxdev/ProcessID;IIII)V */JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeSsend  (JNIEnv *env, jobject this, jobject buf, jobject dstID,    jint tag, jint context, jint sbuf_length, jint dbuf_length) {	    /* MX accessories for calling mx_isend */	    mx_return_t rc ;   mx_request_t send_handle;  mx_status_t status;  mx_segment_t buffer_desc[1];  uint64_t match_send, dbuf_tag ;     uint32_t result;  mx_endpoint_addr_t * dest;  dest = (mx_endpoint_addr_t *)     ((*env)->GetLongField(env, dstID, processhandleID )) ;    match_send = PRI_MATCH(context, myRank, tag);  //printf("send_recv U32 <%x> \n",MX_U32(match_send));fflush(stdout);  //printf("send_recv L32 <%x> \n",MX_L32(match_send));fflush(stdout);   /* static buffer related declarations */  char *buffer_address=NULL;  jobject staticBuffer;  jbyteArray directbuffer;  /* dynamic buffer related declarations */  jboolean isCopy=JNI_TRUE;  jbyteArray dynamicBuffer ;  jbyte* dBuffer;  /* get static buffer related stuff */  staticBuffer = 	  (*env)->GetObjectField(env,buf,FID_mpjbuf_Buffer_staticBuffer);  directbuffer =           (jbyteArray) (*env)->GetObjectField(env,                       staticBuffer, FID_mpjbuf_NIOBuffer_buffer);  buffer_address = (char *)(*env)->GetDirectBufferAddress(env,                            (jobject)directbuffer);  /* get dynamic buffer related stuff */  dynamicBuffer =     (jbyteArray) (*env)->GetObjectField(env,buf,                         FID_mpjbuf_Buffer_dynamicBuffer);  if(dbuf_length > 0) {    dBuffer = (*env)->GetByteArrayElements(env, dynamicBuffer, &isCopy);  }  //.. write the first eight bytes ..  //   _________________________  	    //   | E | X | X | X | DSIZE |  //   -------------------------  char encoding = 1;   buffer_address[0] = encoding ;  buffer_address[4] = (((unsigned int) dbuf_length) >> 24) & 0xFF ;   buffer_address[5] = (((unsigned int) dbuf_length) >> 16) & 0xFF;   buffer_address[6] = (((unsigned int) dbuf_length) >> 8) & 0xFF ;   buffer_address[7] = ((unsigned int) dbuf_length) & 0xFF; 	    /* compose message, sort out tag/context and remote endpoints */  buffer_desc[0].segment_ptr = buffer_address ;  buffer_desc[0].segment_length = sbuf_length+8; //+offset;  /* send message */  //printf("native:send sending \n");   rc = mx_issend(local_endpoint, buffer_desc, 1, * dest, match_send, 		  NULL, &send_handle);   //printf("native:send sent \n");   mx_segment_t dbuf_desc [1] ;  dbuf_tag = SEC_MATCH(context, myRank, tag);   mx_request_t dbufsend_handle;    dbuf_desc[0].segment_ptr = dBuffer ;   dbuf_desc[0].segment_length = dbuf_length;    if(dbuf_length > 0) {     rc = mx_issend(local_endpoint, dbuf_desc, 1, * dest, dbuf_tag, 		  NULL, &dbufsend_handle);   }  // so which one should be called here first?   rc = mx_wait(local_endpoint, &send_handle, MX_INFINITE, 		  &status, &result);    if(dbuf_length > 0) {     rc = mx_wait(local_endpoint, &dbufsend_handle, MX_INFINITE, 		  &status, &result);    }}/* * Class:     xdev_mxdev_MXDevice * Method:    nativeSend * Signature: (Lmpjbuf/Buffer;Lxdev/ProcessID;IIII)V */  JNIEXPORT void JNICALL Java_xdev_mxdev_MXDevice_nativeSend  (JNIEnv *env, jobject this, jobject buf, jobject dstID,    jint tag, jint context, jint sbuf_length, jint dbuf_length) {  //printf(" nativeSend first statement \n"); fflush(stdout);   /* MX accessories for calling mx_isend */	    mx_return_t rc ;   mx_request_t send_handle;  mx_status_t status;  mx_segment_t buffer_desc[1];  uint64_t match_send, dbuf_tag ;     uint32_t result;  mx_endpoint_addr_t * dest;  dest = (mx_endpoint_addr_t *)     ((*env)->GetLongField(env, dstID, processhandleID )) ;    match_send = PRI_MATCH(context, myRank, tag);  //printf("send_recv U32 <%x> \n",MX_U32(match_send));fflush(stdout);  //printf("send_recv L32 <%x> \n",MX_L32(match_send));fflush(stdout);   /* static buffer related declarations */  char *buffer_address=NULL;  jobject staticBuffer;  jbyteArray directbuffer;  /* dynamic buffer related declarations */  jboolean isCopy=JNI_TRUE;  jbyteArray dynamicBuffer ;  jbyte* dBuffer;  /* get static buffer related stuff */  staticBuffer = 	  (*env)->GetObjectField(env,buf,FID_mpjbuf_Buffer_staticBuffer);  directbuffer =           (jbyteArray) (*env)->GetObjectField(env,                       staticBuffer, FID_mpjbuf_NIOBuffer_buffer);  buffer_address = (char *)(*env)->GetDirectBufferAddress(env,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -