⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xdev_gmdev_gm.c

📁 MPI for java for Distributed Programming
💻 C
📖 第 1 页 / 共 2 页
字号:
/* ******************************************************************** * KaRMI * * Copyright (C) 1998-2002 The JavaParty Team, University of Karlsruhe * * Permission is hereby granted to use and modify this software. * The software, or modifications thereof, may be redistributed only * if the source code is also provided and this copyright notice stays  * attached. **********************************************************************//* * $Revision: 1.2 $ * $Date: 2005/03/24 10:49:51 $ */#include <xdev_gmdev_GM.h>#include <gm.h>#include <stdlib.h>#include <string.h>#define debugprint(...) // #define debugprint printf/* * if something goes wrong, we abort */void err_exit(char * s, gm_status_t status){  fprintf(stderr,"%s", s);  if (status != GM_SUCCESS)    fprintf(stderr,": %s.\n", gm_strerror(status));  else    fprintf(stderr,".\n");  exit(EXIT_FAILURE);}jclass IOException = 0;jclass InternalError = 0;#define sizesCnt 13#define supportedSizesCnt 4//                                     0,   1,  2 , 3 , 4,  5,  6,  7,  8,  9, 10, 11, 12const unsigned int supportedSizes[] = {12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12};struct hportInfo {  /** array of booleans, true if a receive request for vport could not      be fulfilled and the corresponding thread starts waiting */  unsigned char waiting[xdev_gmdev_GM_VPORTS];  /** list of buffers already filled in receive events per port */  struct receiveBuffer * receivedBuffers[xdev_gmdev_GM_VPORTS];  /** pointer to the last buffer in the receivedBuffers list per port */  struct receiveBuffer * lastReceivedBuffer[xdev_gmdev_GM_VPORTS];  struct receiveBuffer * firstReceivedBufferHead;  struct receiveBuffer * lastReceivedBufferHead;  /** amount of buffers passed to the GM library for receive per port */  unsigned int providedBuffersCnt[sizesCnt];  /** list of free send buffers per port */  struct sendBuffer * sendBuffer[sizesCnt];  /** per port */  unsigned int receiveToken[sizesCnt];  /** per port */  unsigned int sendToken[sizesCnt];  struct gm_port * gm_port;  /** portnr */  unsigned int hportNr;};jint findSignal(struct hportInfo * port);#define hportsCnt 8struct hportInfo hports[hportsCnt];/** guaranteed amount of threads supported per port */unsigned int threads; long long rticks = 0, sticks = 0, gticks = 0;long rcnt = 0, scnt = 0;int maxQueueLen = 0;int unknownCnt = 0;struct transmit {  jint vportFrom;  jint vportTo;  char data[0];};struct sendBuffer {  // memory management  struct sendBuffer * next;  unsigned int size;  jint hport;  struct transmit tx;};const unsigned int MAGIC = 0x474D5445;struct receiveBuffer {  unsigned int magic;  // memory management  /** next message for the same vport */  struct receiveBuffer * next;  struct receiveBuffer * nextHead;  struct receiveBuffer * prevHead;  unsigned int size;  // calculated from event  /** length of user message */  jint length;  jint nodeFrom;  struct transmit tx;};// There is a list of reveived buffers for each// vport. receivedBuffers[vport] points to the head of this list and// lastReceivedBuffer[vport] points to the last buffer in this list.// The heads of these lists are linked in another list used to keep// track of all vports where buffers are waiting. This list is a// double linked list to be able to remove an element anywhere in the// list. firstReceivedBufferHead points to the first element and// lastReceivedBuffer points to the last element in this list.// In contrast to the following picture the list of the heads is not// required to be sorted for vports. This list keeps the buffers in// the order they are received.//                             firstReceivedBufferHead//                               |// receivedBuffers[vport=0] -|   |                        |- lastReceivedBuffer[vport=0]//                               |//                               |// receivedBuffers[vport=1] -|   |                        |- lastReceivedBuffer[vport=1]//                               |//                               v// receivedBuffers[vport=2] -> buffer -> buffer -> buffer <- lastReceivedBuffer[vport=2]//                               |//                               |// receivedBuffers[vport=3] -|   |                        |- lastReceivedBuffer[vport=3]//                               |//                               v// receivedBuffers[vport=4] -> buffer <--------------------- lastReceivedBuffer[vport=4]//                               |//                               v// receivedBuffers[vport=5] -> buffer -> buffer <----------- lastReceivedBuffer[vport=5]//                               ^//                               |//                             lastReceivedBufferHead// prevHead |//          v    next//     -> buffer -> //          |//          v nextHeadinline void queueBuffer(struct receiveBuffer * buffer, struct hportInfo * port, jint vport) {  if (port->lastReceivedBuffer[vport]) {    // append to non-empty list for vport    port->lastReceivedBuffer[vport]->next = buffer;    port->lastReceivedBuffer[vport] = buffer;    buffer->prevHead = 0;    buffer->nextHead = 0;      } else {    // empty list for vport    port->lastReceivedBuffer[vport] = port->receivedBuffers[vport] = buffer;    // append to heads    if (port->lastReceivedBufferHead) {      // append to non-empty list      buffer->prevHead = port->lastReceivedBufferHead;      buffer->nextHead = 0;      port->lastReceivedBufferHead->nextHead = buffer;      port->lastReceivedBufferHead = buffer;    } else {      // empty list      port->lastReceivedBufferHead = port->firstReceivedBufferHead = buffer;      buffer->prevHead = 0;      buffer->nextHead = 0;    }  }  buffer->next = 0;}inline struct receiveBuffer * unqueueBuffer(struct hportInfo * port, jint vport) {  // buffer to be removed from the structure  struct receiveBuffer * buffer = port->receivedBuffers[vport];  // remove buffer from the list of waiting buffers for vport  port->receivedBuffers[vport] = buffer->next;  buffer->next = 0;  if (! port->receivedBuffers[vport]) {    // last buffer removed for vport    port->lastReceivedBuffer[vport] = 0;    if (buffer->prevHead) {      buffer->prevHead->nextHead = buffer->nextHead;    } else {      port->firstReceivedBufferHead = buffer->nextHead;    }    if (buffer->nextHead) {      buffer->nextHead->prevHead = buffer->prevHead;    } else {      port->lastReceivedBufferHead = buffer->prevHead;    }  } else {    // there are more buffers for vport waiting    if (buffer->prevHead) {      buffer->prevHead->nextHead = port->receivedBuffers[vport];    } else {      port->firstReceivedBufferHead = port->receivedBuffers[vport];    }    if (buffer->nextHead) {      buffer->nextHead->prevHead = port->receivedBuffers[vport];    } else {      port->lastReceivedBufferHead = port->receivedBuffers[vport];    }    //Initialize the prevHead and nextHead pointers of the next    //waiting buffer.    port->receivedBuffers[vport]->prevHead = buffer->prevHead;    port->receivedBuffers[vport]->nextHead = buffer->nextHead;      }  buffer->nextHead = 0;  buffer->prevHead = 0;  return buffer;}inline unsigned int getSupportedSize(gm_size_t length) {  unsigned int size = gm_min_size_for_length(length);  return supportedSizes[size];}inline jlong createSignalAndLength(jint signal, jint length) {  return ((((jlong) signal) & 0xFFFFFFFF) << 32) | (((jlong) length) & 0xFFFFFFFF);}static void debugPrintMsgQueue(struct hportInfo * port, jint vport) {  struct receiveBuffer  *buffer;  buffer = port->receivedBuffers[vport];  while (buffer) {    jint vportTo = buffer->tx.vportTo;    debugprint("findSignal() waiting message for vport=%d, size=%d, length=%d\n", vportTo, buffer->size, buffer->length);    buffer = buffer->next;  }}/** character buffer for assembling strings using sprintf */char cbuf[1024];/* * Class:     xdev_gmdev_GM * Method:    gmInit * Signature: ()V */JNIEXPORT void JNICALL Java_xdev_gmdev_GM_gmInit  (JNIEnv * env, jclass clazz){  gm_status_t status;  debugprint("gmInit()\n");  // load exception classes  IOException = (*env)->FindClass(env, "java/io/IOException");  InternalError = (*env)->FindClass(env, "java/lang/InternalError");  IOException = (*env)->NewGlobalRef(env, IOException);  InternalError = (*env)->NewGlobalRef(env, InternalError);  if ((status = gm_init()) != GM_SUCCESS) {    sprintf(cbuf, "gm_init() failed, status=%d", status);    (*env)->ThrowNew(env, IOException, cbuf);    return;  }  debugprint("finished gmInit()\n");}/* * Class:     xdev_gmdev_GM * Method:    gmFinalize * Signature: ()V */JNIEXPORT void JNICALL Java_xdev_gmdev_GM_gmFinalize  (JNIEnv * env, jclass clazz){  gm_finalize();}/* * Class:     xdev_gmdev_GM * Method:    gmOpen * Signature: (IILjava/lang/String */JNIEXPORT void JNICALL Java_xdev_gmdev_GM_gmOpen  (JNIEnv * env, jclass clazz, jint device, jint hport, jstring name){  const char * _name = (*env)->GetStringUTFChars(env, name, 0);  gm_status_t status;  int n;  struct hportInfo * port = &(hports[hport]);  debugprint("gmOpen()\n");  if ((status = gm_open(&(port->gm_port), device, hport, _name, GM_API_VERSION)) != GM_SUCCESS) {    sprintf(cbuf, "gm_open() failed, status=%d", status);    (*env)->ThrowNew(env, IOException, cbuf);    return;  }  (*env)->ReleaseStringUTFChars(env, name, _name);  port->hportNr = hport;  for (n = 0; n < sizesCnt; n++) {    port->sendBuffer[n] = 0;    port->receiveToken[n] = 0;    port->sendToken[n] = 0;    port->providedBuffersCnt[n] = 0;  }  // allocate receive buffers  {    int rxtokens = gm_num_receive_tokens(port->gm_port);    debugprint("gmOpen() rxtokens=%d\n", rxtokens);    rxtokens /= sizesCnt;    // spread tokens for supported sizes    for (n = 0; n < sizesCnt; n++) {      port->receiveToken[supportedSizes[n]] += rxtokens;    }    {      unsigned int rxthreads = rxtokens * sizesCnt;      for (n = 0; n < sizesCnt; n++) {	if (port->receiveToken[supportedSizes[n]] < rxthreads) {	  rxthreads = port->receiveToken[supportedSizes[n]];	}      }      threads = rxthreads;    }  }  port->firstReceivedBufferHead = 0;  port->lastReceivedBufferHead = 0;  for (n = 0; n < xdev_gmdev_GM_VPORTS; n++) {    port->waiting[n] = 0;     port->receivedBuffers[n] = 0;    port->lastReceivedBuffer[n] = 0;  }  for (n = 0; n < sizesCnt; n++) {    while (port->receiveToken[n] > 0) {	struct receiveBuffer * newbuffer = gm_dma_malloc(port->gm_port, gm_max_length_for_size(n) + sizeof(struct receiveBuffer));	if (! newbuffer) {	  debugprint("gmOpen() panic: can not allocate receive buffer for size=%d\n", n);	}	debugprint("gmOpen() creating new buffer size=%d\n", n);	newbuffer->magic = MAGIC;	newbuffer->next = 0;	newbuffer->nextHead = 0;	newbuffer->prevHead = 0;	newbuffer->size = n;  	gm_provide_receive_buffer(port->gm_port, &(newbuffer->tx), n, GM_LOW_PRIORITY);	port->providedBuffersCnt[n]++;	port->receiveToken[n]--;    }  }  {    int txtokens = gm_num_send_tokens(port->gm_port);    debugprint("gmOpen() txtokens=%d\n", txtokens);    txtokens /= sizesCnt;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -