📄 ndbapi_async.cpp
字号:
/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA *//** * ndbapi_async.cpp: * Illustrates how to use callbacks and error handling using the asynchronous * part of the NDBAPI. * * Classes and methods in NDBAPI used in this example: * * Ndb_cluster_connection * connect() * wait_until_ready() * * Ndb * init() * startTransaction() * closeTransaction() * sendPollNdb() * getNdbError() * * NdbConnection * getNdbOperation() * executeAsynchPrepare() * getNdbError() * * NdbOperation * insertTuple() * equal() * setValue() * */#include <mysql.h>#include <mysqld_error.h>#include <NdbApi.hpp>#include <iostream> // Used for cout/** * Helper sleep function */static voidmilliSleep(int milliseconds){ struct timeval sleeptime; sleeptime.tv_sec = milliseconds / 1000; sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000; select(0, 0, 0, 0, &sleeptime);}/** * error printout macro */#define PRINT_ERROR(code,msg) \ std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \ << ", code: " << code \ << ", msg: " << msg << "." << std::endl#define MYSQLERROR(mysql) { \ PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \ exit(-1); }#define APIERROR(error) { \ PRINT_ERROR(error.code,error.message); \ exit(-1); }#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL/** * callback struct. * transaction : index of the transaction in transaction[] array below * data : the data that the transaction was modifying. * retries : counter for how many times the trans. has been retried */typedef struct { Ndb * ndb; int transaction; int data; int retries;} async_callback_t;/** * Structure used in "free list" to a NdbTransaction */typedef struct { NdbTransaction* conn; int used; } transaction_t;/** * Free list holding transactions */transaction_t transaction[1024]; //1024 - max number of outstanding //transaction in one Ndb object#endif /** * prototypes *//** * Prepare and send transaction */int populate(Ndb * myNdb, int data, async_callback_t * cbData);/** * Error handler. */bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb);/** * Exit function */void asynchExitHandler(Ndb * m_ndb) ;/** * Helper function used in callback(...) */void closeTransaction(Ndb * ndb , async_callback_t * cb);/** * Function to create table */int create_table(Ndb * myNdb);/** * stat. variables */int tempErrors = 0;int permErrors = 0;voidcloseTransaction(Ndb * ndb , async_callback_t * cb){ ndb->closeTransaction(transaction[cb->transaction].conn); transaction[cb->transaction].conn = 0; transaction[cb->transaction].used = 0; cb->retries++; }/** * Callback executed when transaction has return from NDB */static voidcallback(int result, NdbTransaction* trans, void* aObject){ async_callback_t * cbData = (async_callback_t *)aObject; if (result<0) { /** * Error: Temporary or permanent? */ if (asynchErrorHandler(trans, (Ndb*)cbData->ndb)) { closeTransaction((Ndb*)cbData->ndb, cbData); while(populate((Ndb*)cbData->ndb, cbData->data, cbData) < 0) milliSleep(10); } else { std::cout << "Restore: Failed to restore data " << "due to a unrecoverable error. Exiting..." << std::endl; delete cbData; asynchExitHandler((Ndb*)cbData->ndb); } } else { /** * OK! close transaction */ closeTransaction((Ndb*)cbData->ndb, cbData); delete cbData; }}/** * Create table "GARAGE" */int create_table(MYSQL &mysql) { while (mysql_query(&mysql, "CREATE TABLE" " GARAGE" " (REG_NO INT UNSIGNED NOT NULL," " BRAND CHAR(20) NOT NULL," " COLOR CHAR(20) NOT NULL," " PRIMARY KEY USING HASH (REG_NO))" " ENGINE=NDB")) { if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR) MYSQLERROR(mysql); std::cout << "MySQL Cluster already has example table: GARAGE. " << "Dropping it..." << std::endl; /************** * Drop table * **************/ if (mysql_query(&mysql, "DROP TABLE GARAGE")) MYSQLERROR(mysql); } return 1;}void asynchExitHandler(Ndb * m_ndb) { if (m_ndb != NULL) delete m_ndb; exit(-1);}/* returns true if is recoverable (temporary), * false if it is an error that is permanent. */bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb) { NdbError error = trans->getNdbError(); switch(error.status) { case NdbError::Success: return false; break; case NdbError::TemporaryError: /** * The error code indicates a temporary error. * The application should typically retry. * (Includes classifications: NdbError::InsufficientSpace, * NdbError::TemporaryResourceError, NdbError::NodeRecoveryError, * NdbError::OverloadError, NdbError::NodeShutdown * and NdbError::TimeoutExpired.) * * We should sleep for a while and retry, except for insufficient space */ if(error.classification == NdbError::InsufficientSpace) return false; milliSleep(10); tempErrors++; return true; break; case NdbError::UnknownResult: std::cout << error.message << std::endl; return false; break; default: case NdbError::PermanentError: switch (error.code) { case 499: case 250: milliSleep(10); return true; // SCAN errors that can be retried. Requires restart of scan. default: break; } //ERROR std::cout << error.message << std::endl; return false; break; } return false;}static int nPreparedTransactions = 0;static int MAX_RETRIES = 10;static int parallelism = 100;/************************************************************************ * populate() * 1. Prepare 'parallelism' number of insert transactions. * 2. Send transactions to NDB and wait for callbacks to execute */int populate(Ndb * myNdb, int data, async_callback_t * cbData){ NdbOperation* myNdbOperation; // For operations const NdbDictionary::Dictionary* myDict= myNdb->getDictionary(); const NdbDictionary::Table *myTable= myDict->getTable("GARAGE"); if (myTable == NULL) APIERROR(myDict->getNdbError()); async_callback_t * cb; int retries = 0; int current = 0; for(int i=0; i<1024; i++) { if(transaction[i].used == 0) { current = i; if (cbData == 0) { /** * We already have a callback * This is an absolutely new transaction */ cb = new async_callback_t; cb->retries = 0; } else { /** * We already have a callback */ cb =cbData; retries = cbData->retries; } /** * Set data used by the callback */ cb->ndb = myNdb; //handle to Ndb object so that we can close transaction // in the callback (alt. make myNdb global). cb->data = data; //this is the data we want to insert cb->transaction = current; //This is the number (id) of this transaction transaction[current].used = 1 ; //Mark the transaction as used break; } } if(!current) return -1; while(retries < MAX_RETRIES) { transaction[current].conn = myNdb->startTransaction(); if (transaction[current].conn == NULL) { if (asynchErrorHandler(transaction[current].conn, myNdb)) { /** * no transaction to close since conn == null */ milliSleep(10); retries++; continue; } asynchExitHandler(myNdb); } myNdbOperation = transaction[current].conn->getNdbOperation(myTable); if (myNdbOperation == NULL) { if (asynchErrorHandler(transaction[current].conn, myNdb)) { myNdb->closeTransaction(transaction[current].conn); transaction[current].conn = 0; milliSleep(10); retries++; continue; } asynchExitHandler(myNdb); } // if if(myNdbOperation->insertTuple() < 0 || myNdbOperation->equal("REG_NO", data) < 0 || myNdbOperation->setValue("BRAND", "Mercedes") <0 || myNdbOperation->setValue("COLOR", "Blue") < 0) { if (asynchErrorHandler(transaction[current].conn, myNdb)) { myNdb->closeTransaction(transaction[current].conn); transaction[current].conn = 0; retries++; milliSleep(10); continue; } asynchExitHandler(myNdb); } /*Prepare transaction (the transaction is NOT yet sent to NDB)*/ transaction[current].conn->executeAsynchPrepare(NdbTransaction::Commit, &callback, cb); /** * When we have prepared parallelism number of transactions -> * send the transaction to ndb. * Next time we will deal with the transactions are in the * callback. There we will see which ones that were successful * and which ones to retry. */ if (nPreparedTransactions == parallelism-1) { // send-poll all transactions // close transaction is done in callback myNdb->sendPollNdb(3000, parallelism ); nPreparedTransactions=0; } else nPreparedTransactions++; return 1; } std::cout << "Unable to recover from errors. Exiting..." << std::endl; asynchExitHandler(myNdb); return -1;}int main(){ ndb_init(); MYSQL mysql; /************************************************************** * Connect to mysql server and create table * **************************************************************/ { if ( !mysql_init(&mysql) ) { std::cout << "mysql_init failed\n"; exit(-1); } if ( !mysql_real_connect(&mysql, "localhost", "root", "", "", 3306, "/tmp/mysql.sock", 0) ) MYSQLERROR(mysql); mysql_query(&mysql, "CREATE DATABASE TEST_DB"); if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql); create_table(mysql); } /************************************************************** * Connect to ndb cluster * **************************************************************/ Ndb_cluster_connection cluster_connection; if (cluster_connection.connect(4, 5, 1)) { std::cout << "Unable to connect to cluster within 30 secs." << std::endl; exit(-1); } // Optionally connect and wait for the storage nodes (ndbd's) if (cluster_connection.wait_until_ready(30,0) < 0) { std::cout << "Cluster was not ready within 30 secs.\n"; exit(-1); } Ndb* myNdb = new Ndb( &cluster_connection, "TEST_DB" ); // Object representing the database if (myNdb->init(1024) == -1) { // Set max 1024 parallel transactions APIERROR(myNdb->getNdbError()); } /** * Initialise transaction array */ for(int i = 0 ; i < 1024 ; i++) { transaction[i].used = 0; transaction[i].conn = 0; } int i=0; /** * Do 20000 insert transactions. */ while(i < 20000) { while(populate(myNdb,i,0)<0) // <0, no space on free list. Sleep and try again. milliSleep(10); i++; } std::cout << "Number of temporary errors: " << tempErrors << std::endl; delete myNdb; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -