📄 cluster.c
字号:
/* Reserve memory in low memory */ addr = (unsigned long)ZMALLOC(size + pagesize,"Cluster"); addr = (addr + pagesize-1) & ~(pagesize-1); clusterArea = (ClusterArea *) mmap((void*)addr,size,PROT_READ|PROT_WRITE,MAP_SHARED|MAP_FIXED,fd,0); if( (unsigned long)clusterArea != addr ) CPUError("CLusterInit: mmap of file %s failed \n",filename); ASSERT( CPUVec.CycleCount && CPUVec.CycleCount(0) == 0 ); CPUPrint("ClusterInit: initialization of PID %i\n",pid); LOCK_CLUSTER(); if( !clusterArea->numInit ) { /* * first one here, init common data structures */ clusterArea->numParticipants = barrierNumSimul; clusterArea->freeList = -1; clusterArea->numMsgsUsed = 0; clusterArea->msgCounter = 1000; bzero(clusterArea->barrierArea,sizeof(clusterArea->barrierArea)); for(i=0;i<barrierNumSimul;i++) { clusterArea->simul[i].hostCPU = -1; } clusterArea->barrierSync = barrierP; } else { ASSERT( barrierNumSimul == clusterArea->numParticipants) ; } barrierMyNum = clusterArea->numInit++; CPUWarning("Cluster, PID %i initialized with ID %i\n",pid,barrierMyNum); if( clusterArea->numInit > clusterArea->numParticipants ) CPUError("Too many participants in this cluster simulation. \n"); /* * Detect if 2 processors are on the same real processor and * move to the next best processor. */ /* XXX This is totally bogus and should be fixed. */ myHostCPU = -1;; for(i=0;i<barrierNumSimul;i++) { if( myHostCPU == clusterArea->simul[i].hostCPU ) break; } if (i!=barrierNumSimul) { #ifdef notdef extern int simcp0MustRun[];#endif#ifdef sgi int nProcs = sysmp(MP_NPROCS,0);#else int nProcs = 1;#endif if (nProcs >= barrierNumSimul) { int max_idle=(myHostCPU ? 0 : 1); int j; for(j = 0; j < nProcs; j++ ) { if( j == myHostCPU ) continue;#ifdef gone if( hostCPUIdleStats[j] > hostCPUIdleStats[max_idle] ) { max_idle = j; }#endif }#ifdef gone hostCPUIdleStats[max_idle] = 0; /* no reuse */#endif#ifdef notdef simcp0MustRun[0] = max_idle;#endif#ifdef sgi (void) sysmp(MP_MUSTRUN, max_idle);#endif myHostCPU = max_idle; CPUWarning("Cluster: moved to real cpu %i\n",max_idle); } else { CPUWarning("Cluster: too many hosts for this machine. Expect poor performance\n"); } } /* * Initialize local state in shared area */ myArea = &clusterArea->simul[barrierMyNum]; myArea->hostCPU = myHostCPU; myArea->pid = pid; myArea->currentBarrier = 0; myArea->nextBarrier = 0; myArea->head = -1; myArea->cpuClock = CPU_CLOCK; myArea->barrierInterval = NanoSecsToCycles(barrierInterval); UNLOCK_CLUSTER(); bzero(myArea->privateLock,sizeof(myArea->privateLock)); SimetherInitCluster(); BarrierCallback(0,&barrierCallback,(void*)0);}void ClusterClose(void){ int i; if( !clusterArea ) return; CPUPrint("CloseMipsyBarrier at %lld close=%lld\n", (uint64)CPUVec.CycleCount(0), (uint64)myArea->close); if( myArea->close ) return; for (i=0;i<barrierNumSimul;i++) { clusterArea->simul[i].close = clusterArea->simul[i].nextBarrier; }}/* *********************************************************** * ClusterCheckpoint: called from Embra/Driver.c * Called when one of the hosts checkpoints. Triggers a checkpoint * on all other hosts. * Delay the checkpoint so that all participants take it at the * next barrier. * return 1 to delay it. * **********************************************************/int ClusterCheckpoint(void) { int i; if( !clusterArea ) return 0 ; /* do the checkpoint now */ if( slaveCheckpoint ) return 0; /* do the checkpoint now */ /* set it up */ for(i=0;i<barrierNumSimul;i++) { clusterArea->simul[i].doCheckpoint = 1; } return 1;} /* ************************************************************************ * ClusterSetEtherAddr * Exported to simether. * ************************************************************************/void ClusterSetEtherAddr( char * addr , int if_num){ int i; if( !myArea ) return; ASSERT(if_num < ETHER_MAX_CONTROLLERS ); for(i=0;i<6;i++) { myArea->etherAddr[if_num].etheraddr[i] = addr[i]; }}/* ************************************************************************** * ClusterSendPacket * Exported to simether. Returns 1 if the packet was sent (the etheraddr * is known and 0 if it could not be sent (unknown receiver) * **************************************************************************/int ClusterSendPacket( int cpuNum, struct iovec *iov, int io_len) {#ifdef sgi struct ether_header* etherhdr = (struct ether_header*)iov[0].iov_base; char *destaddr = (char *)ðerhdr->ether_dhost; Message *msg; int rcv,i,j,k,len=0; if( !clusterArea ) return 0 ; CPUPrint("CLUSTER: looking for dest etheraddr = %i:%i:%i:%i:%i:%i \n", destaddr[0],destaddr[1],destaddr[2], destaddr[3],destaddr[4],destaddr[5]); ASSERT( iov[0].iov_len >= sizeof(struct ether_header)); for(rcv=0;rcv<barrierNumSimul;rcv++) { for(j=0;j<ETHER_MAX_CONTROLLERS;j++) { for(k=0;k<6;k++) { if( destaddr[k] != clusterArea->simul[rcv].etherAddr[j].etheraddr[k] ) break; } if( k == 6 ) break; /* found */ } if( j!=ETHER_MAX_CONTROLLERS ) break; } if( rcv == barrierNumSimul ) { CPUPrint("CLUSTER: Could not find etheraddr = %i:%i:%i:%i:%i:%i, going through ethersim \n", destaddr[0],destaddr[1],destaddr[2], destaddr[3],destaddr[4],destaddr[5]); return 0; /* can't find the etheraddr */ } /* destination is node i */ CPUPrint("CLUSTER: found at ID=%i contr=%i etheraddr = %i:%i:%i:%i:%i:%i \n", rcv,j, clusterArea->simul[rcv].etherAddr[j].etheraddr[0], clusterArea->simul[rcv].etherAddr[j].etheraddr[1], clusterArea->simul[rcv].etherAddr[j].etheraddr[2], clusterArea->simul[rcv].etherAddr[j].etheraddr[3], clusterArea->simul[rcv].etherAddr[j].etheraddr[4], clusterArea->simul[rcv].etherAddr[j].etheraddr[5]); msg = ClusterAllocMsg(); for(i=0;i<io_len;i++) { bcopy(iov[i].iov_base,msg->data+len,iov[i].iov_len); len += iov[i].iov_len; } msg->len = len; msg->if_num = j; ASSERT( len < SIMETHER_MAX_TRANSFER_SIZE ); SendMsg(cpuNum,msg,rcv); return 1;#else CPUPrint("SPARC can't send cluster packets\n"); return 0;#endif}/* ************************************************************************** * Utility functions * **************************************************************************/#ifdef sgistatic Message *ClusterAllocMsg(void){ Message *retval; LOCK_CLUSTER(); if( clusterArea->freeList >= 0 ) { retval = &clusterArea->msgPool[clusterArea->freeList]; clusterArea->freeList = retval->next; retval->next = -1; } else { if( clusterArea->numMsgsUsed >= CLUSTER_MAX_MSGS ) { CPUError("Cluster: Message buffer overflow\n"); } retval = &clusterArea->msgPool[clusterArea->numMsgsUsed]; retval->msgId = clusterArea->numMsgsUsed++; retval->next = -1; } retval->id = clusterArea->msgCounter++; UNLOCK_CLUSTER(); return retval;}#endifstatic void ClusterFreeMsg(Message *msg){ LOCK_CLUSTER(); msg->next = clusterArea->freeList; clusterArea->freeList = msg->msgId; UNLOCK_CLUSTER();}/* ****************************************************************** * SendMsg * The send and receive time is expressed in receiver clock * cycles and not in local cycle count. This is important * if we do not use barriers or if the clock rates are different * on the clients and on the servers * *****************************************************************/#ifdef sgistatic void SendMsg(int cpuNum,Message *msg, int receiver){ /* Propagation delays and preable latencies are ignored. * We model the latency as a linear function of the size * of the packet. */ static int repeatWarning = 0; struct HostArea *destArea = &clusterArea->simul[receiver]; SimTime sendTime = destArea->currentBarrier + (( CPUVec.CycleCount(cpuNum) - myArea->currentBarrier ) * destArea->cpuClock ) / myArea->cpuClock; SimTime latencyNs = clusterLatency * msg->len * 8; SimTime latency = (latencyNs * destArea->cpuClock)/1000; SimTime deliverTime = sendTime + latency; if( latency < destArea->barrierInterval && !repeatWarning) { CPUWarning("Cluster: Simulation is not repeatable: \n"); CPUWarning(" id=%i interval=%lld cycles) msg_len =%i lat=%lld (cycles) \n", receiver,(uint64)destArea->barrierInterval, msg->len, (uint64)latency); repeatWarning = 1; } msg->sendTime = sendTime; msg->deliverTime = deliverTime; msg->source = barrierMyNum; msg->dest = receiver; ASSERT( (uint)msg < (uint)&clusterArea->msgPool[clusterArea->numMsgsUsed]); LOCK_HOST(receiver); if( destArea->head < 0 || deliverTime < clusterArea->msgPool[destArea->head].deliverTime ) { msg->next = destArea->head; destArea->head = msg->msgId; } else { MessageId tId = destArea->head; Message *t = &clusterArea->msgPool[tId]; while( t->next>=0 && deliverTime >= clusterArea->msgPool[t->next].deliverTime ) { tId = t->next; t = &clusterArea->msgPool[tId]; } msg->next = t->next; t->next = msg->msgId; } UNLOCK_HOST(receiver); LogEntry("CLUSTER-snd",cpuNum,"id=%i dest=%i len=%i \n", msg->id,msg->dest,msg->len); }#endif /* sgi */static Message *GetMsg(SimTime now) { Message *retval = &clusterArea->msgPool[myArea->head]; LOCK_HOST(barrierMyNum); if( myArea->head>=0 && retval->deliverTime <= now ) { myArea->head = retval->next; } else { retval = 0; } UNLOCK_HOST(barrierMyNum); return retval;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -