⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ndbapi_async.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA *//** * ndbapi_async.cpp:  * Illustrates how to use callbacks and error handling using the asynchronous * part of the NDBAPI. * * Classes and methods in NDBAPI used in this example: * *  Ndb_cluster_connection *       connect() *       wait_until_ready() * *  Ndb *       init() *       startTransaction() *       closeTransaction() *       sendPollNdb() *       getNdbError() * *  NdbConnection *       getNdbOperation() *       executeAsynchPrepare() *       getNdbError() * *  NdbOperation *       insertTuple() *       equal() *       setValue() *        */#include <mysql.h>#include <mysqld_error.h>#include <NdbApi.hpp>#include <iostream> // Used for cout/** * Helper sleep function */static voidmilliSleep(int milliseconds){  struct timeval sleeptime;  sleeptime.tv_sec = milliseconds / 1000;  sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;  select(0, 0, 0, 0, &sleeptime);}/** * error printout macro */#define PRINT_ERROR(code,msg) \  std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \            << ", code: " << code \            << ", msg: " << msg << "." << std::endl#define MYSQLERROR(mysql) { \  PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \  exit(-1); }#define APIERROR(error) { \  PRINT_ERROR(error.code,error.message); \  exit(-1); }#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL/** * callback struct. * transaction :  index of the transaction in transaction[] array below * data : the data that the transaction was modifying. * retries : counter for how many times the trans. has been retried */typedef struct  {  Ndb * ndb;  int    transaction;    int    data;  int    retries;} async_callback_t;/** * Structure used in "free list" to a NdbTransaction */typedef struct  {  NdbTransaction*  conn;     int used; } transaction_t;/** * Free list holding transactions */transaction_t   transaction[1024];  //1024 - max number of outstanding                                    //transaction in one Ndb object#endif /** * prototypes *//** * Prepare and send transaction */int  populate(Ndb * myNdb, int data, async_callback_t * cbData);/** * Error handler. */bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb);/** * Exit function */void asynchExitHandler(Ndb * m_ndb) ;/** * Helper function used in callback(...) */void closeTransaction(Ndb * ndb , async_callback_t * cb);/** * Function to create table */int create_table(Ndb * myNdb);/** * stat. variables */int tempErrors = 0;int permErrors = 0;voidcloseTransaction(Ndb * ndb , async_callback_t * cb){  ndb->closeTransaction(transaction[cb->transaction].conn);  transaction[cb->transaction].conn = 0;  transaction[cb->transaction].used = 0;  cb->retries++;  }/** * Callback executed when transaction has return from NDB */static voidcallback(int result, NdbTransaction* trans, void* aObject){  async_callback_t * cbData = (async_callback_t *)aObject;  if (result<0)  {    /**     * Error: Temporary or permanent?     */    if (asynchErrorHandler(trans,  (Ndb*)cbData->ndb))     {      closeTransaction((Ndb*)cbData->ndb, cbData);      while(populate((Ndb*)cbData->ndb, cbData->data, cbData) < 0)	milliSleep(10);    }    else    {      std::cout << "Restore: Failed to restore data " 		<< "due to a unrecoverable error. Exiting..." << std::endl;      delete cbData;      asynchExitHandler((Ndb*)cbData->ndb);    }  }   else   {    /**     * OK! close transaction     */    closeTransaction((Ndb*)cbData->ndb, cbData);    delete cbData;  }}/** * Create table "GARAGE" */int create_table(MYSQL &mysql) {  while (mysql_query(&mysql, 		  "CREATE TABLE"		  "  GARAGE"		  "    (REG_NO INT UNSIGNED NOT NULL,"		  "     BRAND CHAR(20) NOT NULL,"		  "     COLOR CHAR(20) NOT NULL,"		  "     PRIMARY KEY USING HASH (REG_NO))"		  "  ENGINE=NDB"))  {    if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)      MYSQLERROR(mysql);    std::cout << "MySQL Cluster already has example table: GARAGE. "	      << "Dropping it..." << std::endl;     /**************     * Drop table *     **************/    if (mysql_query(&mysql, "DROP TABLE GARAGE"))      MYSQLERROR(mysql);  }  return 1;}void asynchExitHandler(Ndb * m_ndb) {  if (m_ndb != NULL)    delete m_ndb;  exit(-1);}/* returns true if is recoverable (temporary), *  false if it is an  error that is permanent. */bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb) {    NdbError error = trans->getNdbError();  switch(error.status)  {  case NdbError::Success:    return false;    break;      case NdbError::TemporaryError:    /**     * The error code indicates a temporary error.     * The application should typically retry.     * (Includes classifications: NdbError::InsufficientSpace,      *  NdbError::TemporaryResourceError, NdbError::NodeRecoveryError,     *  NdbError::OverloadError, NdbError::NodeShutdown      *  and NdbError::TimeoutExpired.)     *          * We should sleep for a while and retry, except for insufficient space     */    if(error.classification == NdbError::InsufficientSpace)      return false;    milliSleep(10);      tempErrors++;      return true;    break;      case NdbError::UnknownResult:    std::cout << error.message << std::endl;    return false;    break;  default:  case NdbError::PermanentError:    switch (error.code)    {    case 499:    case 250:      milliSleep(10);          return true; // SCAN errors that can be retried. Requires restart of scan.    default:      break;    }    //ERROR    std::cout << error.message << std::endl;    return false;    break;  }  return false;}static int nPreparedTransactions = 0;static int MAX_RETRIES = 10;static int parallelism = 100;/************************************************************************ * populate() * 1. Prepare 'parallelism' number of insert transactions.  * 2. Send transactions to NDB and wait for callbacks to execute */int populate(Ndb * myNdb, int data, async_callback_t * cbData){  NdbOperation*   myNdbOperation;       // For operations  const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();  const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");  if (myTable == NULL)     APIERROR(myDict->getNdbError());  async_callback_t * cb;  int retries = 0;  int current = 0;  for(int i=0; i<1024; i++)  {    if(transaction[i].used == 0)    {      current = i;      if (cbData == 0)       {       /**        * We already have a callback	* This is an absolutely new transaction        */	cb = new async_callback_t;	cb->retries = 0;      }      else       {        /**        * We already have a callback        */	cb =cbData;	retries = cbData->retries;      }      /**       * Set data used by the callback       */      cb->ndb = myNdb;  //handle to Ndb object so that we can close transaction                        // in the callback (alt. make myNdb global).      cb->data =  data; //this is the data we want to insert      cb->transaction = current; //This is the number (id)  of this transaction      transaction[current].used = 1 ; //Mark the transaction as used      break;    }  }  if(!current)    return -1;  while(retries < MAX_RETRIES)     {      transaction[current].conn = myNdb->startTransaction();      if (transaction[current].conn == NULL) {	if (asynchErrorHandler(transaction[current].conn, myNdb)) 	{          /**           * no transaction to close since conn == null           */	  milliSleep(10);	  retries++;	  continue;	}	asynchExitHandler(myNdb);	      }      myNdbOperation = transaction[current].conn->getNdbOperation(myTable);      if (myNdbOperation == NULL)       {	if (asynchErrorHandler(transaction[current].conn, myNdb)) 	{	  myNdb->closeTransaction(transaction[current].conn);	  transaction[current].conn = 0;	  milliSleep(10);	  retries++;	  continue;	}	asynchExitHandler(myNdb);      } // if      if(myNdbOperation->insertTuple() < 0  ||	 myNdbOperation->equal("REG_NO", data) < 0 ||	 myNdbOperation->setValue("BRAND", "Mercedes") <0 ||	 myNdbOperation->setValue("COLOR", "Blue") < 0)      {	if (asynchErrorHandler(transaction[current].conn, myNdb)) 	{	  myNdb->closeTransaction(transaction[current].conn);	  transaction[current].conn = 0;	  retries++;	  milliSleep(10);	  continue;	}	asynchExitHandler(myNdb);      }           /*Prepare transaction (the transaction is NOT yet sent to NDB)*/      transaction[current].conn->executeAsynchPrepare(NdbTransaction::Commit, 						       &callback,						       cb);      /**       * When we have prepared parallelism number of transactions ->       * send the transaction to ndb.        * Next time we will deal with the transactions are in the        * callback. There we will see which ones that were successful       * and which ones to retry.       */      if (nPreparedTransactions == parallelism-1)       {	// send-poll all transactions	// close transaction is done in callback	myNdb->sendPollNdb(3000, parallelism );	nPreparedTransactions=0;      }       else	nPreparedTransactions++;      return 1;    }    std::cout << "Unable to recover from errors. Exiting..." << std::endl;    asynchExitHandler(myNdb);    return -1;}int main(){  ndb_init();  MYSQL mysql;  /**************************************************************   * Connect to mysql server and create table                   *   **************************************************************/  {    if ( !mysql_init(&mysql) ) {      std::cout << "mysql_init failed\n";      exit(-1);    }    if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",			     3306, "/tmp/mysql.sock", 0) )      MYSQLERROR(mysql);    mysql_query(&mysql, "CREATE DATABASE TEST_DB");    if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql);    create_table(mysql);  }  /**************************************************************   * Connect to ndb cluster                                     *   **************************************************************/  Ndb_cluster_connection cluster_connection;  if (cluster_connection.connect(4, 5, 1))  {    std::cout << "Unable to connect to cluster within 30 secs." << std::endl;    exit(-1);  }  // Optionally connect and wait for the storage nodes (ndbd's)  if (cluster_connection.wait_until_ready(30,0) < 0)  {    std::cout << "Cluster was not ready within 30 secs.\n";    exit(-1);  }  Ndb* myNdb = new Ndb( &cluster_connection,			"TEST_DB" );  // Object representing the database  if (myNdb->init(1024) == -1) {      // Set max 1024  parallel transactions    APIERROR(myNdb->getNdbError());  }  /**   * Initialise transaction array   */  for(int i = 0 ; i < 1024 ; i++)   {    transaction[i].used = 0;    transaction[i].conn = 0;      }  int i=0;  /**   * Do 20000 insert transactions.   */  while(i < 20000)   {    while(populate(myNdb,i,0)<0)  // <0, no space on free list. Sleep and try again.      milliSleep(10);          i++;  }  std::cout << "Number of temporary errors: " << tempErrors << std::endl;  delete myNdb; }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -