⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 shm_transporter.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <ndb_global.h>#include "SHM_Transporter.hpp"#include "TransporterInternalDefinitions.hpp"#include <TransporterCallback.hpp>#include <NdbSleep.h>#include <NdbOut.hpp>#include <InputStream.hpp>#include <OutputStream.hpp>extern int g_ndb_shm_signum;SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,				 const char *lHostName,				 const char *rHostName, 				 int r_port,				 bool isMgmConnection,				 NodeId lNodeId,				 NodeId rNodeId,				 NodeId serverNodeId,				 bool checksum, 				 bool signalId,				 key_t _shmKey,				 Uint32 _shmSize) :  Transporter(t_reg, tt_SHM_TRANSPORTER,	      lHostName, rHostName, r_port, isMgmConnection,	      lNodeId, rNodeId, serverNodeId,	      0, false, checksum, signalId),  shmKey(_shmKey),  shmSize(_shmSize){#ifndef NDB_WIN32  shmId= 0;#endif  _shmSegCreated = false;  _attached = false;  shmBuf = 0;  reader = 0;  writer = 0;    setupBuffersDone=false;#ifdef DEBUG_TRANSPORTER  printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);#endif  m_signal_threshold = 4096;}SHM_Transporter::~SHM_Transporter(){  doDisconnect();}bool SHM_Transporter::initTransporter(){  if (g_ndb_shm_signum)    return true;  return false;}    voidSHM_Transporter::setupBuffers(){  Uint32 sharedSize = 0;  sharedSize += 28; //SHM_Reader::getSharedSize();  sharedSize += 28; //SHM_Writer::getSharedSize();  const Uint32 slack = MAX_MESSAGE_SIZE;  /**   *  NOTE: There is 7th shared variable in Win2k (sharedCountAttached).   */  Uint32 sizeOfBuffer = shmSize;  sizeOfBuffer -= 2*sharedSize;  sizeOfBuffer /= 2;  Uint32 * base1 = (Uint32*)shmBuf;  Uint32 * sharedReadIndex1 = base1;  Uint32 * sharedWriteIndex1 = base1 + 1;  serverStatusFlag = base1 + 4;  char * startOfBuf1 = shmBuf+sharedSize;  Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);  Uint32 * sharedReadIndex2 = base2;  Uint32 * sharedWriteIndex2 = base2 + 1;  clientStatusFlag = base2 + 4;  char * startOfBuf2 = ((char *)base2)+sharedSize;    if(isServer){    * serverStatusFlag = 0;    reader = new SHM_Reader(startOfBuf1, 			    sizeOfBuffer,			    slack,			    sharedReadIndex1,			    sharedWriteIndex1);    writer = new SHM_Writer(startOfBuf2, 			    sizeOfBuffer,			    slack,			    sharedReadIndex2,			    sharedWriteIndex2);    * sharedReadIndex1 = 0;    * sharedWriteIndex1 = 0;    * sharedReadIndex2 = 0;    * sharedWriteIndex2 = 0;        reader->clear();    writer->clear();        * serverStatusFlag = 1;#ifdef DEBUG_TRANSPORTER     printf("-- (%d - %d) - Server -\n", localNodeId, remoteNodeId);    printf("Reader at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);    printf("sharedReadIndex1 at %d (%p) = %d\n", 	   (char*)sharedReadIndex1-shmBuf, 	   sharedReadIndex1, *sharedReadIndex1);    printf("sharedWriteIndex1 at %d (%p) = %d\n", 	   (char*)sharedWriteIndex1-shmBuf, 	   sharedWriteIndex1, *sharedWriteIndex1);    printf("Writer at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);    printf("sharedReadIndex2 at %d (%p) = %d\n", 	   (char*)sharedReadIndex2-shmBuf, 	   sharedReadIndex2, *sharedReadIndex2);    printf("sharedWriteIndex2 at %d (%p) = %d\n", 	   (char*)sharedWriteIndex2-shmBuf, 	   sharedWriteIndex2, *sharedWriteIndex2);    printf("sizeOfBuffer = %d\n", sizeOfBuffer);#endif  } else {    * clientStatusFlag = 0;    reader = new SHM_Reader(startOfBuf2, 			    sizeOfBuffer,			    slack,			    sharedReadIndex2,			    sharedWriteIndex2);        writer = new SHM_Writer(startOfBuf1, 			    sizeOfBuffer,			    slack,			    sharedReadIndex1,			    sharedWriteIndex1);        * sharedReadIndex2 = 0;    * sharedWriteIndex1 = 0;        reader->clear();    writer->clear();    * clientStatusFlag = 1;#ifdef DEBUG_TRANSPORTER    printf("-- (%d - %d) - Client -\n", localNodeId, remoteNodeId);    printf("Reader at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);    printf("sharedReadIndex2 at %d (%p) = %d\n", 	   (char*)sharedReadIndex2-shmBuf, 	   sharedReadIndex2, *sharedReadIndex2);    printf("sharedWriteIndex2 at %d (%p) = %d\n", 	   (char*)sharedWriteIndex2-shmBuf, 	   sharedWriteIndex2, *sharedWriteIndex2);    printf("Writer at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);    printf("sharedReadIndex1 at %d (%p) = %d\n", 	   (char*)sharedReadIndex1-shmBuf, 	   sharedReadIndex1, *sharedReadIndex1);    printf("sharedWriteIndex1 at %d (%p) = %d\n", 	   (char*)sharedWriteIndex1-shmBuf, 	   sharedWriteIndex1, *sharedWriteIndex1);        printf("sizeOfBuffer = %d\n", sizeOfBuffer);#endif  }#ifdef DEBUG_TRANSPORTER  printf("Mapping from %p to %p\n", shmBuf, shmBuf+shmSize);#endif}boolSHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd){  DBUG_ENTER("SHM_Transporter::connect_server_impl");  SocketOutputStream s_output(sockfd);  SocketInputStream s_input(sockfd);  char buf[256];  // Create  if(!_shmSegCreated){    if (!ndb_shm_create()) {      make_error_info(buf, sizeof(buf));      report_error(TE_SHM_UNABLE_TO_CREATE_SEGMENT, buf);      NDB_CLOSE_SOCKET(sockfd);      DBUG_RETURN(false);    }    _shmSegCreated = true;  }  // Attach  if(!_attached){    if (!ndb_shm_attach()) {      make_error_info(buf, sizeof(buf));      report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT, buf);      NDB_CLOSE_SOCKET(sockfd);      DBUG_RETURN(false);    }    _attached = true;  }  // Send ok to client  s_output.println("shm server 1 ok: %d", 		   m_transporter_registry.m_shm_own_pid);    // Wait for ok from client  DBUG_PRINT("info", ("Wait for ok from client"));  if (s_input.gets(buf, sizeof(buf)) == 0)   {    NDB_CLOSE_SOCKET(sockfd);    DBUG_RETURN(false);  }  if(sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1)  {    NDB_CLOSE_SOCKET(sockfd);    DBUG_RETURN(false);  }  int r= connect_common(sockfd);  if (r) {    // Send ok to client    s_output.println("shm server 2 ok");    // Wait for ok from client    if (s_input.gets(buf, 256) == 0) {      NDB_CLOSE_SOCKET(sockfd);      DBUG_RETURN(false);    }    DBUG_PRINT("info", ("Successfully connected server to node %d",                remoteNodeId));   }  NDB_CLOSE_SOCKET(sockfd);  DBUG_RETURN(r);}boolSHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd){  DBUG_ENTER("SHM_Transporter::connect_client_impl");  SocketInputStream s_input(sockfd);  SocketOutputStream s_output(sockfd);  char buf[256];  // Wait for server to create and attach  DBUG_PRINT("info", ("Wait for server to create and attach"));  if (s_input.gets(buf, 256) == 0) {    NDB_CLOSE_SOCKET(sockfd);    DBUG_PRINT("error", ("Server id %d did not attach",                remoteNodeId));    DBUG_RETURN(false);  }  if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1)  {    NDB_CLOSE_SOCKET(sockfd);    DBUG_RETURN(false);  }    // Create  if(!_shmSegCreated){    if (!ndb_shm_get()) {      NDB_CLOSE_SOCKET(sockfd);      DBUG_PRINT("error", ("Failed create of shm seg to node %d",                  remoteNodeId));      DBUG_RETURN(false);    }    _shmSegCreated = true;  }  // Attach  if(!_attached){    if (!ndb_shm_attach()) {      make_error_info(buf, sizeof(buf));      report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT, buf);      NDB_CLOSE_SOCKET(sockfd);      DBUG_PRINT("error", ("Failed attach of shm seg to node %d",                  remoteNodeId));      DBUG_RETURN(false);    }    _attached = true;  }  // Send ok to server  s_output.println("shm client 1 ok: %d", 		   m_transporter_registry.m_shm_own_pid);    int r= connect_common(sockfd);    if (r) {    // Wait for ok from server    DBUG_PRINT("info", ("Wait for ok from server"));    if (s_input.gets(buf, 256) == 0) {      NDB_CLOSE_SOCKET(sockfd);      DBUG_PRINT("error", ("No ok from server node %d",                  remoteNodeId));      DBUG_RETURN(false);    }    // Send ok to server    s_output.println("shm client 2 ok");    DBUG_PRINT("info", ("Successfully connected client to node %d",                remoteNodeId));   }  NDB_CLOSE_SOCKET(sockfd);  DBUG_RETURN(r);}boolSHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd){  if (!checkConnected()) {    return false;  }    if(!setupBuffersDone)   {    setupBuffers();    setupBuffersDone=true;  }  if(setupBuffersDone)   {    NdbSleep_MilliSleep(m_timeOutMillis);    if(*serverStatusFlag == 1 && *clientStatusFlag == 1)    {      m_last_signal = 0;      return true;    }  }  DBUG_PRINT("error", ("Failed to set up buffers to node %d",              remoteNodeId));  return false;}voidSHM_Transporter::doSend(){  if(m_last_signal)  {    m_last_signal = 0;    kill(m_remote_pid, g_ndb_shm_signum);  }}Uint32SHM_Transporter::get_free_buffer() const {  return writer->get_free_buffer();}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -