📄 mstate.cc
字号:
//// mstate.cc//// Copyright (C) 1996 Limit Point Systems, Inc.//// Author: Curtis Janssen <cljanss@limitpt.com>// Maintainer: LPS//// This file is part of the SC Toolkit.//// The SC Toolkit is free software; you can redistribute it and/or modify// it under the terms of the GNU Library General Public License as published by// the Free Software Foundation; either version 2, or (at your option)// any later version.//// The SC Toolkit is distributed in the hope that it will be useful,// but WITHOUT ANY WARRANTY; without even the implied warranty of// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the// GNU Library General Public License for more details.//// You should have received a copy of the GNU Library General Public License// along with the SC Toolkit; see the file COPYING.LIB. If not, write to// the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.//// The U.S. Government is granted a limited license as per AL 91-7.//#ifdef __GNUC__#pragma implementation#endif#include <util/misc/bug.h>#include <util/misc/formio.h>#include <util/group/mstate.h>#include <util/state/translate.h>using namespace std;using namespace sc;#define DEBUG 0// This sets up a communication buffer. It is made up of a of// an integer that gives the number of bytes used in the buffer// by the data region of size bufsize.staticvoidobtain_buffer(int*& nbuf_buffer, char*& send_buffer, int& nheader, char*& buffer, int& bufsize, int size){ if (size == bufsize) return; if (send_buffer) delete[] (int*) send_buffer; bufsize = size; int min_bytes_to_allocate = bufsize + sizeof(int); int ints_to_allocate = min_bytes_to_allocate/sizeof(int); if (min_bytes_to_allocate%sizeof(int)) ints_to_allocate++; nheader = sizeof(int); int * isend_buffer = new int[ints_to_allocate]; send_buffer = (char*) isend_buffer; buffer = (char*) & isend_buffer[1]; nbuf_buffer = isend_buffer;}staticvoidrelease_buffer(char* send_buffer){ if (send_buffer) delete[] (int*)send_buffer;}///////////////////////////////////////////////////////////////////////////// MsgStateSend member functionsMsgStateSend::MsgStateSend(const Ref<MessageGrp>&grp_): grp(grp_){ nbuf = 0; bufsize = 0; send_buffer = 0; node_to_node_ = 1; obtain_buffer(nbuf_buffer,send_buffer,nheader,buffer,bufsize,8192);}MsgStateSend::~MsgStateSend(){ release_buffer(send_buffer);}voidMsgStateSend::set_buffer_size(int size){ flush(); obtain_buffer(nbuf_buffer,send_buffer,nheader,buffer,bufsize,size);}intMsgStateSend::put_array_void(const void* vd, int n){ const char* d = (const char*) vd; int remaining = n; while (remaining) { if (nbuf == bufsize) flush(); int ncurrent; if (bufsize - nbuf < remaining) { ncurrent = bufsize - nbuf; } else { ncurrent = remaining; } memcpy(&buffer[nbuf],d,ncurrent); remaining -= ncurrent; nbuf += ncurrent; d = &d[ncurrent]; } return n;}intMsgStateSend::put(const ClassDesc*cd){ int index = grp->classdesc_to_index(cd); return StateOut::put(index);}intMsgStateSend::put(char d){ return StateOut::put(d);}intMsgStateSend::put(unsigned int d){ return StateOut::put(d);}intMsgStateSend::put(int d){ return StateOut::put(d);}intMsgStateSend::put(float d){ return StateOut::put(d);}intMsgStateSend::put(double d){ return StateOut::put(d);}intMsgStateSend::put(const char* d, int n){ return StateOut::put(d, n);}intMsgStateSend::put(const unsigned int* d, int n){ return StateOut::put(d, n);}intMsgStateSend::put(const int* d, int n){ return StateOut::put(d, n);}intMsgStateSend::put(const float* d, int n){ return StateOut::put(d, n);}intMsgStateSend::put(const double* d, int n){ return StateOut::put(d, n);}///////////////////////////////////////////////////////////////////////////// MsgStateBufRecv member functionsstatic ClassDesc MsgStateBufRecv_cd( typeid(MsgStateBufRecv),"MsgStateBufRecv",1,"public StateIn", 0, 0, 0);MsgStateBufRecv::MsgStateBufRecv(){ grp = MessageGrp::get_default_messagegrp(); nbuf = 0; ibuf = 0; send_buffer = 0; bufsize = 0; obtain_buffer(nbuf_buffer,send_buffer,nheader,buffer,bufsize,8192);}MsgStateBufRecv::MsgStateBufRecv(const Ref<MessageGrp>&grp_): grp(grp_){ nbuf = 0; ibuf = 0; send_buffer = 0; bufsize = 0; obtain_buffer(nbuf_buffer,send_buffer,nheader,buffer,bufsize,8192);}MsgStateBufRecv::~MsgStateBufRecv(){ if (ibuf && (nbuf != ibuf)) { ExEnv::errn() << scprintf("MsgStateBufRecv::~MsgStateBufRecv(): buffer still has" " %d bytes of data on %d\n", nbuf - ibuf, grp->me()); } release_buffer(send_buffer);}voidMsgStateBufRecv::set_buffer_size(int size){ if (ibuf && (nbuf != ibuf)) { ExEnv::errn() << "MsgStateBufRecv::set_buffer_size(): old buffer has data" << endl; } obtain_buffer(nbuf_buffer, send_buffer, nheader, buffer, bufsize, size);}intMsgStateBufRecv::get_array_void(void* vd, int n){ char* d = (char*) vd; int remaining = n; while (remaining) { if (ibuf == nbuf) next_buffer(); int ncurrent; if (nbuf - ibuf < remaining) { ncurrent = nbuf - ibuf; } else { ncurrent = remaining; } memcpy(d,&buffer[ibuf],ncurrent); remaining -= ncurrent; ibuf += ncurrent; d = &d[ncurrent]; } return n;}///////////////////////////////////////////////////////////////////////////// MsgStateRecv member functionsMsgStateRecv::MsgStateRecv(const Ref<MessageGrp>&grp_): MsgStateBufRecv(grp_){ node_to_node_ = 1;}MsgStateRecv::~MsgStateRecv(){}intMsgStateRecv::version(const ClassDesc* cd){ if (!cd) return -1; return cd->version();}intMsgStateRecv::get(const ClassDesc**cd){ int index; int r = StateIn::get(index); *cd = grp->index_to_classdesc(index); if (!*cd) { ExEnv::errn() << "MsgStateRecvt::get(const ClassDesc**cd): " << "class not available on this processor:" << endl; ExEnv::errn() << " index = " << index << endl; abort(); } return r;}intMsgStateRecv::get(char& d, const char *key){ return StateIn::get(d,key);}intMsgStateRecv::get(int& d, const char *key){ return StateIn::get(d,key);}intMsgStateRecv::get(unsigned int& d, const char *key){ return StateIn::get(d,key);}intMsgStateRecv::get(float& d, const char *key){ return StateIn::get(d,key);}intMsgStateRecv::get(double& d, const char *key){ return StateIn::get(d,key);}intMsgStateRecv::get(char*& d){ return StateIn::get(d);}intMsgStateRecv::get(unsigned int*& d){ return StateIn::get(d);}intMsgStateRecv::get(int*& d){ return StateIn::get(d);}intMsgStateRecv::get(float*& d){ return StateIn::get(d);}intMsgStateRecv::get(double*& d){ return StateIn::get(d);}///////////////////////////////////////////////////////////////////////////// StateSend member functionsStateSend::StateSend(const Ref<MessageGrp>&grp_): MsgStateSend(grp_), target_(0){}StateSend::~StateSend(){ flush();}voidStateSend::flush(){ if (nbuf == 0) return; *nbuf_buffer = nbuf; translate_->translator()->to_external(nbuf_buffer,1); grp->raw_send(target_, send_buffer, nbuf + nheader); nbuf = 0;}voidStateSend::target(int t){ target_ = t; ps_.clear();}///////////////////////////////////////////////////////////////////////////// StateRecv member functionsStateRecv::StateRecv(const Ref<MessageGrp>&grp_): MsgStateRecv(grp_), source_(0){}voidStateRecv::next_buffer(){ grp->raw_recv(source_, send_buffer, bufsize+nheader); translate_->translator()->to_native(nbuf_buffer,1); nbuf = *nbuf_buffer; ibuf = 0;}voidStateRecv::source(int s){ source_ = s; ps_.clear();}///////////////////////////////////////////////////////////////////////////// BcastStateSend member functionsBcastStateSend::BcastStateSend(const Ref<MessageGrp>&grp_): MsgStateSend(grp_){}BcastStateSend::~BcastStateSend(){ flush();}voidBcastStateSend::flush(){ if (nbuf == 0) return; *nbuf_buffer = nbuf; translate_->translator()->to_external(nbuf_buffer,1); grp->raw_bcast(send_buffer, nbuf + nheader, grp->me()); nbuf = 0;}///////////////////////////////////////////////////////////////////////////// BcastStateRecv member functionsBcastStateRecv::BcastStateRecv(const Ref<MessageGrp>&grp_, int s): MsgStateRecv(grp_){ source(s);}voidBcastStateRecv::source(int s){ if (s == grp->me()) { ExEnv::errn() << scprintf("BcastStateRecv::source(%d): cannot receive my own" " broadcast\n", s); abort(); } source_ = s; ps_.clear();}voidBcastStateRecv::next_buffer(){ grp->raw_bcast(send_buffer, bufsize+nheader, source_); translate_->translator()->to_native(nbuf_buffer,1); nbuf = *nbuf_buffer; ibuf = 0;}///////////////////////////////////////////////////////////////////////////// BcastState member functionsBcastState::BcastState(const Ref<MessageGrp> &grp, int source){ if (grp->n() == 1) { recv_ = 0; send_ = 0; } else if (grp->me() == source) { recv_ = 0; send_ = new BcastStateSend(grp); } else { recv_ = new BcastStateRecv(grp,source); send_ = 0; }}BcastState::~BcastState(){ delete recv_; delete send_;}voidBcastState::bcast(int &a){ if (recv_) recv_->get(a); else if (send_) send_->put(a);}voidBcastState::bcast(double &a){ if (recv_) recv_->get(a); else if (send_) send_->put(a);}voidBcastState::bcast(int *&a, int n){ if (recv_) recv_->get(a); else if (send_) send_->put(a,n);}voidBcastState::bcast(double *&a, int n){ if (recv_) recv_->get(a); else if (send_) send_->put(a,n);}voidBcastState::flush(){ if (send_) send_->flush();}voidBcastState::set_buffer_size(int n){ if (send_) send_->set_buffer_size(n); if (recv_) recv_->set_buffer_size(n);}voidBcastState::forget_references(){ if (send_) send_->forget_references();}///////////////////////////////////////////////////////////////////////////// BcastStateRecv member functionsstatic ClassDesc BcastStateInBin_cd( typeid(BcastStateInBin),"BcastStateInBin",1,"public MsgStateBufRecv", 0, create<BcastStateInBin>, 0);BcastStateInBin::BcastStateInBin(const Ref<MessageGrp>&grp_, const char *filename): MsgStateBufRecv(grp_){ opened_ = 0; open(filename);}BcastStateInBin::BcastStateInBin(const Ref<KeyVal> &keyval){ char *path = keyval->pcharvalue("file"); if (!path) { ExEnv::errn() << "StateInBin(const Ref<KeyVal>&): no path given" << endl; } opened_ = 0; open(path); delete[] path;}BcastStateInBin::~BcastStateInBin(){ close();}voidBcastStateInBin::next_buffer(){ if (grp->me() == 0) { // fill the buffer#if HAVE_SGETN *nbuf_buffer = buf_->sgetn(buffer,bufsize);#else *nbuf_buffer = buf_->xsgetn(buffer,bufsize);#endif if (*nbuf_buffer == 0) { ExEnv::errn() << "BcastStateInBin: read failed" << endl; abort(); } translate_->translator()->to_external(nbuf_buffer,1); } grp->raw_bcast(send_buffer, bufsize+nheader); translate_->translator()->to_native(nbuf_buffer,1); nbuf = *nbuf_buffer; ibuf = 0;}voidBcastStateInBin::close(){ if(opened_) delete buf_; opened_=0; buf_=0; nbuf = 0; ibuf = 0; classidmap_.clear(); nextclassid_ = 0; classdatamap_.clear(); ps_.clear();}intBcastStateInBin::open(const char *path){ file_position_ = 0; if (grp->me() == 0) { if (opened_) close(); filebuf *fbuf = new filebuf(); fbuf->open(path, ios::in); if (!fbuf->is_open()) { ExEnv::errn() << "ERROR: BcastStateInBin: problems opening " << path << endl; abort(); } buf_ = fbuf; opened_ = 1; } nbuf = 0; ibuf = 0; get_header(); find_and_get_directory(); return 0;}intBcastStateInBin::tell(){ return file_position_;}voidBcastStateInBin::seek(int loc){ file_position_ = loc;#if defined(HAVE_PUBSEEKOFF) if (grp->me() == 0) { buf_->pubseekoff(loc,ios::beg,ios::in);# if DEBUG ExEnv::outn() << "pubseekoff to " << loc << endl;# endif }#elif defined(HAVE_SEEKOFF) if (grp->me() == 0) { buf_->seekoff(loc,ios::beg,ios::in);# if DEBUG ExEnv::outn() << "seekoff to " << loc << endl;# endif }#endif nbuf = 0; ibuf = 0;}intBcastStateInBin::seekable(){#if defined(HAVE_PUBSEEKOFF) || defined(HAVE_SEEKOFF) return 1;#else return 0;#endif}intBcastStateInBin::use_directory(){ return seekable();}intBcastStateInBin::get_array_void(void* vd, int n){ MsgStateBufRecv::get_array_void(vd, n); file_position_ += n;#if DEBUG ExEnv::outn() << "Read " << n << " bytes:"; for (int i=0; i<n; i++) { ExEnv::outn() << " " << (int) ((unsigned char*)vd)[i]; } ExEnv::outn() << endl;#endif return n;}/////////////////////////////////////////////////////////////////////////////// Local Variables:// mode: c++// c-file-style: "CLJ"// End:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -