basictransportertest.cpp

来自「MySQL数据库开发源码 值得一看哦」· C++ 代码 · 共 536 行

CPP
536
字号
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <ndb_global.h>#include "TransporterRegistry.hpp"#include "TransporterDefinitions.hpp"#include "TransporterCallback.hpp"#include <RefConvert.hpp>#include <NdbTick.h>#include <NdbMain.h>#include <NdbOut.hpp>#include <NdbSleep.h>int basePortTCP = 17000;SCI_TransporterConfiguration sciTemplate = {  8000,        // Packet size  2500000,      // Buffer size  2,           // number of adapters  1,           // remote node id SCI   2,           // Remote node Id SCI  0,           // local ndb node id (server)  0,           // remote ndb node id (client)  0,              // byteOrder;  false,          // compression;  true,           // checksum;  true            // signalId;};TCP_TransporterConfiguration tcpTemplate = {  17000,          // port;   "",             // remoteHostName;  "",             // localhostname  2,              // remoteNodeId;  1,              // localNodeId;  10000,          // sendBufferSize - Size of SendBuffer of priority B   10000,          // maxReceiveSize - Maximum no of bytes to receive  0,              // byteOrder;  false,          // compression;  true,           // checksum;  true            // signalId;};OSE_TransporterConfiguration oseTemplate = {  "",    // remoteHostName;  "",    // localHostName;  0,     // remoteNodeId;  0,     // localNodeId;  false, // compression;  true,  // checksum;  true,  // signalId;  0,     // byteOrder;    2000,  // prioASignalSize;  1000,  // prioBSignalSize;  10};SHM_TransporterConfiguration shmTemplate = {  0,      //remoteNodeId  0,      //localNodeId;  false,  //compression  true,   //checksum;  true,   //signalId;  0,      //byteOrder;  123,    //shmKey;  2500000 //shmSize;};TransporterRegistry *tReg = 0;#ifndef OSE_DELTA#include <signal.h>#endifextern "C"voidsignalHandler(int signo){#ifndef OSE_DELTA  ::signal(13, signalHandler);#endif  char buf[255];  sprintf(buf,"Signal: %d\n", signo);  ndbout << buf << endl;}void usage(const char * progName){  ndbout << "Usage: " << progName << " <type> localNodeId localHostName" 	 << " remoteHostName1 remoteHostName2" << endl;  ndbout << "  type = shm tcp ose sci" << endl;  ndbout << "  localNodeId - 1 to 3" << endl;}typedef void (* CreateTransporterFunc)(void * conf, 				       NodeId localNodeId,				       NodeId remoteNodeId,				       const char * localHostName,				       const char * remoteHostName);void createOSETransporter(void *, NodeId, NodeId, const char *, const char *);void createSCITransporter(void *, NodeId, NodeId, const char *, const char *);void createTCPTransporter(void *, NodeId, NodeId, const char *, const char *);void createSHMTransporter(void *, NodeId, NodeId, const char *, const char *);int signalReceived[4];intmain(int argc, const char **argv){    signalHandler(0);    for(int i = 0; i<4; i++)    signalReceived[i] = 0;    if(argc < 5){    usage(argv[0]);    return 0;  }  Uint32 noOfConnections     = 0;  const char * progName      = argv[0];  const char * type          = argv[1];  const NodeId localNodeId   = atoi(argv[2]);  const char * localHostName = argv[3];  const char * remoteHost1   = argv[4];  const char * remoteHost2   = NULL;    if(argc == 5)    noOfConnections = 1;  else {    noOfConnections = 2;    remoteHost2 = argv[5];  }    if(localNodeId < 1 || localNodeId > 3){    ndbout << "localNodeId = " << localNodeId << endl << endl;    usage(progName);    return 0;  }    ndbout << "-----------------" << endl;  ndbout << "localNodeId:           " << localNodeId << endl;  ndbout << "localHostName:         " << localHostName << endl;  ndbout << "remoteHost1 (node " << (localNodeId == 1?2:1) << "): " 	 << remoteHost1 << endl;  if(noOfConnections == 2){    ndbout << "remoteHost2 (node " << (localNodeId == 3?2:3) << "): " 	   << remoteHost2 << endl;  }  ndbout << "-----------------" << endl;    void * confTemplate = 0;  CreateTransporterFunc func = 0;  if(strcasecmp(type, "tcp") == 0){    func = createTCPTransporter;    confTemplate = &tcpTemplate;  } else if(strcasecmp(type, "ose") == 0){    func = createOSETransporter;    confTemplate = &oseTemplate;  } else if(strcasecmp(type, "sci") == 0){    func = createSCITransporter;    confTemplate = &sciTemplate;  } else if(strcasecmp(type, "shm") == 0){    func = createSHMTransporter;    confTemplate = &shmTemplate;  } else {    ndbout << "Unsupported transporter type" << endl;    return 0;  }    ndbout << "Creating transporter registry" << endl;  tReg = new TransporterRegistry;  tReg->init(localNodeId);    switch(localNodeId){  case 1:    (* func)(confTemplate, 1, 2, localHostName, remoteHost1);    if(noOfConnections == 2)      (* func)(confTemplate, 1, 3, localHostName, remoteHost2);    break;  case 2:    (* func)(confTemplate, 2, 1, localHostName, remoteHost1);    if(noOfConnections == 2)      (* func)(confTemplate, 2, 3, localHostName, remoteHost2);    break;  case 3:    (* func)(confTemplate, 3, 1, localHostName, remoteHost1);    if(noOfConnections == 2)      (* func)(confTemplate, 3, 2, localHostName, remoteHost2);    break;  }    ndbout << "Doing startSending/startReceiving" << endl;  tReg->startSending();  tReg->startReceiving();    ndbout << "Connecting" << endl;  tReg->setPerformState(PerformConnect);  tReg->checkConnections();    unsigned sum = 0;  do {    sum = 0;    for(int i = 0; i<4; i++)      sum += signalReceived[i];        tReg->checkConnections();        tReg->external_IO(500);    NdbSleep_MilliSleep(500);    ndbout << "In main loop" << endl;  } while(sum != 2*noOfConnections);    ndbout << "Doing setPerformState(Disconnect)" << endl;  tReg->setPerformState(PerformDisconnect);    ndbout << "Doing checkConnections()" << endl;  tReg->checkConnections();    ndbout << "Sleeping 3 secs" << endl;  NdbSleep_SecSleep(3);    ndbout << "Deleting transporter registry" << endl;  delete tReg; tReg = 0;    return 0;}voidcheckData(SignalHeader * const header, Uint8 prio, Uint32 * const theData,	  LinearSectionPtr ptr[3]){  Uint32 expectedLength = 0;  if(prio == 0)    expectedLength = 17;  else    expectedLength = 19;  if(header->theLength != expectedLength){    ndbout << "Unexpected signal length: " << header->theLength 	   << " expected: " << expectedLength << endl;    abort();  }  if(header->theVerId_signalNumber != expectedLength + 1)    abort();  if(header->theReceiversBlockNumber != expectedLength + 2)    abort();  if(refToBlock(header->theSendersBlockRef) != expectedLength + 3)    abort();  if(header->theSendersSignalId != expectedLength + 5)    abort();  if(header->theTrace != expectedLength + 6)    abort();  if(header->m_noOfSections != (prio == 0 ? 0 : 1))    abort();  if(header->m_fragmentInfo != (prio + 1))    abort();    Uint32 dataWordStart = header->theLength ;  for(unsigned i = 0; i<header->theLength; i++){    if(theData[i] != i){ //dataWordStart){      ndbout << "data corrupt!\n" << endl;      abort();    }    dataWordStart ^= (~i*i);  }  if(prio != 0){    ndbout_c("Found section");    if(ptr[0].sz != header->theLength)      abort();    if(memcmp(ptr[0].p, theData, (ptr[0].sz * 4)) != 0)      abort();  }}voidsendSignalTo(NodeId nodeId, int prio){  SignalHeader sh;  sh.theLength = (prio == 0 ? 17 : 19);   sh.theVerId_signalNumber   = sh.theLength + 1;  sh.theReceiversBlockNumber = sh.theLength + 2;  sh.theSendersBlockRef      = sh.theLength + 3;  sh.theSendersSignalId      = sh.theLength + 4;  sh.theSignalId             = sh.theLength + 5;  sh.theTrace                = sh.theLength + 6;  sh.m_noOfSections          = (prio == 0 ? 0 : 1);  sh.m_fragmentInfo          = prio + 1;    Uint32 theData[25];  Uint32 dataWordStart = sh.theLength;  for(unsigned i = 0; i<sh.theLength; i++){    theData[i] = i;    dataWordStart ^= (~i*i);  }  ndbout << "Sending prio " << (int)prio << " signal to node: " 	 << nodeId 	 << " gsn = " << sh.theVerId_signalNumber << endl;    LinearSectionPtr ptr[3];  ptr[0].p = &theData[0];  ptr[0].sz = sh.theLength;  SendStatus s = tReg->prepareSend(&sh, prio, theData, nodeId, ptr);  if(s != SEND_OK){    ndbout << "Send was not ok. Send was: " << s << endl;  }}voidexecute(void* callbackObj, 	SignalHeader * const header, Uint8 prio, Uint32 * const theData, 	LinearSectionPtr ptr[3]){  const NodeId nodeId = refToNode(header->theSendersBlockRef);    ndbout << "Recieved prio " << (int)prio << " signal from node: " 	 << nodeId	 << " gsn = " << header->theVerId_signalNumber << endl;  checkData(header, prio, theData, ptr);  ndbout << " Data is ok!\n" << endl;    signalReceived[nodeId]++;    if(prio == 0)    sendSignalTo(nodeId, 1);  else    tReg->setPerformState(nodeId, PerformDisconnect);}void copy(Uint32 * & insertPtr,      class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){  abort();}voidreportError(void* callbackObj, NodeId nodeId, TransporterError errorCode){  char buf[255];  sprintf(buf, "reportError (%d, %x)", nodeId, errorCode);  ndbout << buf << endl;  if(errorCode & 0x8000){    tReg->setPerformState(nodeId, PerformDisconnect);    abort();  }}/** * Report average send theLength in bytes (4096 last sends) */voidreportSendLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){  char buf[255];  sprintf(buf, "reportSendLen(%d, %d)", nodeId, (Uint32)(bytes/count));  ndbout << buf << endl;}/** * Report average receive theLength in bytes (4096 last receives) */voidreportReceiveLen(void* callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){  char buf[255];  sprintf(buf, "reportReceiveLen(%d, %d)", nodeId, (Uint32)(bytes/count));  ndbout << buf << endl;}/** * Report connection established */voidreportConnect(void* callbackObj, NodeId nodeId){  char buf[255];  sprintf(buf, "reportConnect(%d)", nodeId);  ndbout << buf << endl;  tReg->setPerformState(nodeId, PerformIO);    sendSignalTo(nodeId, 0);}/** * Report connection broken */voidreportDisconnect(void* callbackObj, NodeId nodeId, Uint32 errNo){  char buf[255];  sprintf(buf, "reportDisconnect(%d)", nodeId);  ndbout << buf << endl;  if(signalReceived[nodeId] < 2)    tReg->setPerformState(nodeId, PerformConnect);}intcheckJobBuffer() {  /**    * Check to see if jobbbuffers are starting to get full   * and if so call doJob   */  return 0;}voidcreateOSETransporter(void * _conf,		     NodeId localNodeId,		     NodeId remoteNodeId,		     const char * localHostName,		     const char * remoteHostName){  ndbout << "Creating OSE transporter from node " 	 << localNodeId << "(" << localHostName << ") to "	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;    OSE_TransporterConfiguration * conf = (OSE_TransporterConfiguration*)_conf;    conf->localNodeId    = localNodeId;  conf->localHostName  = localHostName;  conf->remoteNodeId   = remoteNodeId;  conf->remoteHostName = remoteHostName;  bool res = tReg->createTransporter(conf);  if(res)    ndbout << "... -- Success " << endl;  else    ndbout << "... -- Failure " << endl;}voidcreateTCPTransporter(void * _conf,		     NodeId localNodeId,		     NodeId remoteNodeId,		     const char * localHostName,		     const char * remoteHostName){  ndbout << "Creating TCP transporter from node " 	 << localNodeId << "(" << localHostName << ") to "	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;    TCP_TransporterConfiguration * conf = (TCP_TransporterConfiguration*)_conf;    int port;  if(localNodeId == 1 && remoteNodeId == 2) port = basePortTCP + 0;  if(localNodeId == 1 && remoteNodeId == 3) port = basePortTCP + 1;  if(localNodeId == 2 && remoteNodeId == 1) port = basePortTCP + 0;  if(localNodeId == 2 && remoteNodeId == 3) port = basePortTCP + 2;  if(localNodeId == 3 && remoteNodeId == 1) port = basePortTCP + 1;  if(localNodeId == 3 && remoteNodeId == 2) port = basePortTCP + 2;    conf->localNodeId    = localNodeId;  conf->localHostName  = localHostName;  conf->remoteNodeId   = remoteNodeId;  conf->remoteHostName = remoteHostName;  conf->port           = port;  bool res = tReg->createTransporter(conf);  if(res)    ndbout << "... -- Success " << endl;  else    ndbout << "... -- Failure " << endl;}voidcreateSCITransporter(void * _conf,		     NodeId localNodeId,		     NodeId remoteNodeId,		     const char * localHostName,		     const char * remoteHostName){  ndbout << "Creating SCI transporter from node " 	 << localNodeId << "(" << localHostName << ") to "	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;      SCI_TransporterConfiguration * conf = (SCI_TransporterConfiguration*)_conf;  conf->remoteSciNodeId0= (Uint16)atoi(localHostName);  conf->remoteSciNodeId1= (Uint16)atoi(remoteHostName);  conf->localNodeId    = localNodeId;  conf->remoteNodeId   = remoteNodeId;  bool res = tReg->createTransporter(conf);  if(res)    ndbout << "... -- Success " << endl;  else    ndbout << "... -- Failure " << endl;}voidcreateSHMTransporter(void * _conf,		     NodeId localNodeId,		     NodeId remoteNodeId,		     const char * localHostName,		     const char * remoteHostName){  ndbout << "Creating SHM transporter from node " 	 << localNodeId << "(" << localHostName << ") to "	 << remoteNodeId << "(" << remoteHostName << ")..." << endl;;      SHM_TransporterConfiguration * conf = (SHM_TransporterConfiguration*)_conf;  conf->localNodeId    = localNodeId;  conf->remoteNodeId   = remoteNodeId;    bool res = tReg->createTransporter(conf);  if(res)    ndbout << "... -- Success " << endl;  else    ndbout << "... -- Failure " << endl;}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?