⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 consumer_restore.cpp

📁 MySQL数据库开发源码 值得一看哦
💻 CPP
📖 第 1 页 / 共 2 页
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include <NDBT_ReturnCodes.h>#include "consumer_restore.hpp"#include <NdbSleep.h>extern my_bool opt_core;extern FilteredNdbOut err;extern FilteredNdbOut info;extern FilteredNdbOut debug;static void callback(int, NdbTransaction*, void*);extern const char * g_connect_string;extern BaseString g_options;boolBackupRestore::init(){  release();  if (!m_restore && !m_restore_meta)    return true;  m_cluster_connection = new Ndb_cluster_connection(g_connect_string);  m_cluster_connection->set_name(g_options.c_str());  if(m_cluster_connection->connect(12, 5, 1) != 0)  {    return false;  }  m_ndb = new Ndb(m_cluster_connection);  if (m_ndb == NULL)    return false;    m_ndb->init(1024);  if (m_ndb->waitUntilReady(30) != 0)  {    err << "Failed to connect to ndb!!" << endl;    return false;  }  info << "Connected to ndb!!" << endl;  m_callback = new restore_callback_t[m_parallelism];  if (m_callback == 0)  {    err << "Failed to allocate callback structs" << endl;    return false;  }  m_free_callback= m_callback;  for (Uint32 i= 0; i < m_parallelism; i++) {    m_callback[i].restore= this;    m_callback[i].connection= 0;    if (i > 0)      m_callback[i-1].next= &(m_callback[i]);  }  m_callback[m_parallelism-1].next = 0;  return true;}void BackupRestore::release(){  if (m_ndb)  {    delete m_ndb;    m_ndb= 0;  }  if (m_callback)  {    delete [] m_callback;    m_callback= 0;  }  if (m_cluster_connection)  {    delete m_cluster_connection;    m_cluster_connection= 0;  }}BackupRestore::~BackupRestore(){  release();}staticint match_blob(const char * name){  int cnt, id1, id2;  char buf[256];  if((cnt = sscanf(name, "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){    return id1;  }    return -1;}const NdbDictionary::Table*BackupRestore::get_table(const NdbDictionary::Table* tab){  if(m_cache.m_old_table == tab)    return m_cache.m_new_table;  m_cache.m_old_table = tab;  int cnt, id1, id2;  char db[256], schema[256];  if((cnt = sscanf(tab->getName(), "%[^/]/%[^/]/NDB$BLOB_%d_%d", 		   db, schema, &id1, &id2)) == 4){    m_ndb->setDatabaseName(db);    m_ndb->setSchemaName(schema);        BaseString::snprintf(db, sizeof(db), "NDB$BLOB_%d_%d", 			 m_new_tables[id1]->getTableId(), id2);        m_cache.m_new_table = m_ndb->getDictionary()->getTable(db);      } else {    m_cache.m_new_table = m_new_tables[tab->getTableId()];  }  assert(m_cache.m_new_table);  return m_cache.m_new_table;}boolBackupRestore::finalize_table(const TableS & table){  bool ret= true;  if (!m_restore && !m_restore_meta)    return ret;  if (table.have_auto_inc())  {    Uint64 max_val= table.get_max_auto_val();    Uint64 auto_val;    int r= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable), auto_val);    if (r == -1 && m_ndb->getNdbError().code != 626)      ret= false;    else if (r == -1 || max_val+1 > auto_val)      ret= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable), max_val+1, false) != -1;  }  return ret;}boolBackupRestore::table(const TableS & table){  if (!m_restore && !m_restore_meta)    return true;  const char * name = table.getTableName();    /**   * Ignore blob tables   */  if(match_blob(name) >= 0)    return true;    const NdbTableImpl & tmptab = NdbTableImpl::getImpl(* table.m_dictTable);  if(tmptab.m_indexType != NdbDictionary::Index::Undefined){    m_indexes.push_back(table.m_dictTable);    return true;  }    BaseString tmp(name);  Vector<BaseString> split;  if(tmp.split(split, "/") != 3){    err << "Invalid table name format " << name << endl;    return false;  }  m_ndb->setDatabaseName(split[0].c_str());  m_ndb->setSchemaName(split[1].c_str());    NdbDictionary::Dictionary* dict = m_ndb->getDictionary();  if(m_restore_meta){    NdbDictionary::Table copy(*table.m_dictTable);    copy.setName(split[2].c_str());    /*      update min and max rows to reflect the table, this to      ensure that memory is allocated properly in the ndb kernel    */    copy.setMinRows(table.getNoOfRecords());    if (table.getNoOfRecords() > copy.getMaxRows())    {      copy.setMaxRows(table.getNoOfRecords());    }    if (dict->createTable(copy) == -1)     {      err << "Create table " << table.getTableName() << " failed: "	  << dict->getNdbError() << endl;      return false;    }    info << "Successfully restored table " << table.getTableName()<< endl ;  }      const NdbDictionary::Table* tab = dict->getTable(split[2].c_str());  if(tab == 0){    err << "Unable to find table: " << split[2].c_str() << endl;    return false;  }  if(m_restore_meta){    m_ndb->setAutoIncrementValue(tab, ~(Uint64)0, false);  }  const NdbDictionary::Table* null = 0;  m_new_tables.fill(table.m_dictTable->getTableId(), null);  m_new_tables[table.m_dictTable->getTableId()] = tab;  return true;}boolBackupRestore::endOfTables(){  if(!m_restore_meta)    return true;  NdbDictionary::Dictionary* dict = m_ndb->getDictionary();  for(size_t i = 0; i<m_indexes.size(); i++){    NdbTableImpl & indtab = NdbTableImpl::getImpl(* m_indexes[i]);    BaseString tmp(indtab.m_primaryTable.c_str());    Vector<BaseString> split;    if(tmp.split(split, "/") != 3){      err << "Invalid table name format " << indtab.m_primaryTable.c_str()	  << endl;      return false;    }        m_ndb->setDatabaseName(split[0].c_str());    m_ndb->setSchemaName(split[1].c_str());        const NdbDictionary::Table * prim = dict->getTable(split[2].c_str());    if(prim == 0){      err << "Unable to find base table \"" << split[2].c_str() 	  << "\" for index "	  << indtab.getName() << endl;      return false;    }    NdbTableImpl& base = NdbTableImpl::getImpl(*prim);    NdbIndexImpl* idx;    int id;    char idxName[255], buf[255];    if(sscanf(indtab.getName(), "%[^/]/%[^/]/%d/%s",	      buf, buf, &id, idxName) != 4){      err << "Invalid index name format " << indtab.getName() << endl;      return false;    }    if(NdbDictInterface::create_index_obj_from_table(&idx, &indtab, &base))    {      err << "Failed to create index " << idxName	  << " on " << split[2].c_str() << endl;	return false;    }    idx->setName(idxName);    if(dict->createIndex(* idx) != 0)    {      delete idx;      err << "Failed to create index " << idxName	  << " on " << split[2].c_str() << endl	  << dict->getNdbError() << endl;      return false;    }    delete idx;    info << "Successfully created index " << idxName	 << " on " << split[2].c_str() << endl;  }  return true;}void BackupRestore::tuple(const TupleS & tup){  if (!m_restore)     return;  while (m_free_callback == 0)  {    assert(m_transactions == m_parallelism);    // send-poll all transactions    // close transaction is done in callback    m_ndb->sendPollNdb(3000, 1);  }    restore_callback_t * cb = m_free_callback;    if (cb == 0)    assert(false);    m_free_callback = cb->next;  cb->retries = 0;  cb->tup = tup; // must do copy!  tuple_a(cb);}void BackupRestore::tuple_a(restore_callback_t *cb){  while (cb->retries < 10)   {    /**     * start transactions     */    cb->connection = m_ndb->startTransaction();    if (cb->connection == NULL)     {      if (errorHandler(cb))       {	m_ndb->sendPollNdb(3000, 1);	continue;      }      exitHandler();    } // if        const TupleS &tup = cb->tup;    const NdbDictionary::Table * table = get_table(tup.getTable()->m_dictTable);    NdbOperation * op = cb->connection->getNdbOperation(table);        if (op == NULL)     {      if (errorHandler(cb)) 	continue;      exitHandler();    } // if        if (op->writeTuple() == -1)     {      if (errorHandler(cb))	continue;      exitHandler();    } // if        int ret = 0;    for (int j = 0; j < 2; j++)    {      for (int i = 0; i < tup.getNoOfAttributes(); i++)       {	const AttributeDesc * attr_desc = tup.getDesc(i);	const AttributeData * attr_data = tup.getData(i);	int size = attr_desc->size;	int arraySize = attr_desc->arraySize;	char * dataPtr = attr_data->string_value;	Uint32 length = (size * arraySize) / 8;	if (j == 0 && tup.getTable()->have_auto_inc(i))	  tup.getTable()->update_max_auto_val(dataPtr,size);	if (attr_desc->m_column->getPrimaryKey())	{

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -