⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ndbpoolimpl.cpp

📁 mysql-5.0.22.tar.gz源码包
💻 CPP
字号:
/* Copyright (C) 2003 MySQL AB   This program is free software; you can redistribute it and/or modify   it under the terms of the GNU General Public License as published by   the Free Software Foundation; either version 2 of the License, or   (at your option) any later version.   This program is distributed in the hope that it will be useful,   but WITHOUT ANY WARRANTY; without even the implied warranty of   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the   GNU General Public License for more details.   You should have received a copy of the GNU General Public License   along with this program; if not, write to the Free Software   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */#include "NdbPoolImpl.hpp"NdbMutex *NdbPool::pool_mutex = NULL;NdbPool *the_pool = NULL;NdbPool*NdbPool::create_instance(Ndb_cluster_connection* cc,			 Uint32 max_ndb_obj,                         Uint32 no_conn_obj,                         Uint32 init_no_ndb_objects){  if (!initPoolMutex()) {    return NULL;  }  NdbMutex_Lock(pool_mutex);  NdbPool* a_pool;  if (the_pool != NULL) {    a_pool = NULL;  } else {    the_pool = new NdbPool(cc, max_ndb_obj, no_conn_obj);    if (!the_pool->init(init_no_ndb_objects)) {      delete the_pool;      the_pool = NULL;    }    a_pool = the_pool;  }  NdbMutex* temp = pool_mutex;  if (a_pool == NULL) {    pool_mutex = NULL;  }  NdbMutex_Unlock(pool_mutex);  if (a_pool == NULL) {    NdbMutex_Destroy(temp);  }  return a_pool;}voidNdbPool::drop_instance(){  if (pool_mutex == NULL) {    return;  }  NdbMutex_Lock(pool_mutex);  the_pool->release_all();  delete the_pool;  the_pool = NULL;  NdbMutex* temp = pool_mutex;  NdbMutex_Unlock(temp);  NdbMutex_Destroy(temp);}boolNdbPool::initPoolMutex(){  bool ret_result = false;  if (pool_mutex == NULL) {    pool_mutex = NdbMutex_Create();    ret_result = ((pool_mutex == NULL) ? false : true);  }  return ret_result;}NdbPool::NdbPool(Ndb_cluster_connection* cc,		 Uint32 max_no_objects,                 Uint32 no_conn_objects){  if (no_conn_objects > 1024) {    no_conn_objects = 1024;  }  if (max_no_objects > MAX_NDB_OBJECTS) {    max_no_objects = MAX_NDB_OBJECTS;  } else if (max_no_objects == 0) {    max_no_objects = 1;  }  m_max_ndb_objects = max_no_objects;  m_no_of_conn_objects = no_conn_objects;  m_no_of_objects = 0;  m_waiting = 0;  m_pool_reference = NULL;  m_hash_entry = NULL;  m_first_free = NULL_POOL;  m_first_not_in_use = NULL_POOL;  m_last_free = NULL_POOL;  input_pool_cond = NULL;  output_pool_cond = NULL;  m_output_queue = 0;  m_input_queue = 0;  m_signal_count = 0;  m_cluster_connection = cc;}NdbPool::~NdbPool(){  NdbCondition_Destroy(input_pool_cond);  NdbCondition_Destroy(output_pool_cond);}voidNdbPool::release_all(){  int i;  for (i = 0; i < m_max_ndb_objects + 1; i++) {    if (m_pool_reference[i].ndb_reference != NULL) {      assert(m_pool_reference[i].in_use);      assert(m_pool_reference[i].free_entry);      delete m_pool_reference[i].ndb_reference;    }  }  delete [] m_pool_reference;  delete [] m_hash_entry;  m_pool_reference = NULL;  m_hash_entry = NULL;}boolNdbPool::init(Uint32 init_no_objects){  bool ret_result = false;  int i;  do {    input_pool_cond = NdbCondition_Create();    output_pool_cond = NdbCondition_Create();    if (input_pool_cond == NULL || output_pool_cond == NULL) {      break;    }    if (init_no_objects > m_max_ndb_objects) {      init_no_objects = m_max_ndb_objects;    }    if (init_no_objects == 0) {      init_no_objects = 1;    }    m_pool_reference = new NdbPool::POOL_STRUCT[m_max_ndb_objects + 1];    m_hash_entry     = new Uint8[POOL_HASH_TABLE_SIZE];    if ((m_pool_reference == NULL) || (m_hash_entry == NULL)) {      delete [] m_pool_reference;      delete [] m_hash_entry;      break;    }    for (i = 0; i < m_max_ndb_objects + 1; i++) {      m_pool_reference[i].ndb_reference = NULL;      m_pool_reference[i].in_use = false;      m_pool_reference[i].next_free_object = i+1;      m_pool_reference[i].prev_free_object = i-1;      m_pool_reference[i].next_db_object = NULL_POOL;      m_pool_reference[i].prev_db_object = NULL_POOL;    }    for (i = 0; i < POOL_HASH_TABLE_SIZE; i++) {      m_hash_entry[i] = NULL_HASH;    }    m_pool_reference[m_max_ndb_objects].next_free_object = NULL_POOL;    m_pool_reference[1].prev_free_object = NULL_POOL;    m_first_not_in_use = 1;    m_no_of_objects = init_no_objects;    for (i = init_no_objects; i > 0 ; i--) {      Uint32 fake_id;      if (!allocate_ndb(fake_id, (const char*)NULL, (const char*)NULL)) {        release_all();        break;      }    }    ret_result = true;    break;  } while (1);  return ret_result;}/*Get an Ndb object.Input:hint_id: 0 = no hint, otherwise a hint of which Ndb object the thread         used the last time.a_db_name: NULL = don't check for database specific Ndb  object, otherwise           a hint of which database is preferred.Output:hint_id: Returns id of Ndb object returnedReturn value: Ndb object pointer*/Ndb*NdbPool::get_ndb_object(Uint32 &hint_id,                        const char* a_catalog_name,                        const char* a_schema_name){  Ndb* ret_ndb = NULL;  Uint32 hash_entry = compute_hash(a_schema_name);  NdbMutex_Lock(pool_mutex);  while (1) {    /*    We start by checking if we can use the hinted Ndb object.    */    if ((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL) {      break;    }    /*    The hinted Ndb object was not free. We need to allocate another object.    We start by checking for a free Ndb object connected to the same database.    */    if (a_schema_name && (ret_ndb = get_db_hash(hint_id,                                                hash_entry,                                                a_catalog_name,                                                a_schema_name))) {      break;    }    /*    No Ndb object connected to the preferred database was found.    We look for a free Ndb object in general.    */    if ((ret_ndb = get_free_list(hint_id, hash_entry)) != NULL) {      break;    }    /*    No free Ndb object was found. If we haven't allocated objects up until the    maximum number yet then we can allocate a new Ndb object here.    */    if (m_no_of_objects < m_max_ndb_objects) {      if (allocate_ndb(hint_id, a_catalog_name, a_schema_name)) {        assert((ret_ndb = get_hint_ndb(hint_id, hash_entry)) != NULL);        break;      }    }    /*    We need to wait until an Ndb object becomes    available.    */    if ((ret_ndb = wait_free_ndb(hint_id)) != NULL) {      break;    }    /*    Not even after waiting were we able to get hold of an Ndb object. We     return NULL to indicate this problem.    */    ret_ndb = NULL;    break;  }  NdbMutex_Unlock(pool_mutex);  if (ret_ndb != NULL) {    /*    We need to set the catalog and schema name of the Ndb object before    returning it to the caller.    */    ret_ndb->setCatalogName(a_catalog_name);    ret_ndb->setSchemaName(a_schema_name);  }  return ret_ndb;}voidNdbPool::return_ndb_object(Ndb* returned_ndb, Uint32 id){  NdbMutex_Lock(pool_mutex);  assert(id <= m_max_ndb_objects);  assert(id != 0);  assert(returned_ndb == m_pool_reference[id].ndb_reference);  bool wait_cond = m_waiting;  if (wait_cond) {    NdbCondition* pool_cond;    if (m_signal_count > 0) {      pool_cond = output_pool_cond;      m_signal_count--;    } else {      pool_cond = input_pool_cond;    }    add_wait_list(id);    NdbMutex_Unlock(pool_mutex);    NdbCondition_Signal(pool_cond);  } else {    add_free_list(id);    add_db_hash(id);    NdbMutex_Unlock(pool_mutex);  }}boolNdbPool::allocate_ndb(Uint32 &id,                      const char* a_catalog_name,                      const char* a_schema_name){  Ndb* a_ndb;  if (m_first_not_in_use == NULL_POOL) {    return false;  }  if (a_schema_name) {    a_ndb = new Ndb(m_cluster_connection, a_schema_name, a_catalog_name);  } else {    a_ndb = new Ndb(m_cluster_connection, "");  }  if (a_ndb == NULL) {    return false;  }  a_ndb->init(m_no_of_conn_objects);  m_no_of_objects++;  id = m_first_not_in_use;  Uint32 allocated_id = m_first_not_in_use;  m_first_not_in_use = m_pool_reference[allocated_id].next_free_object;  m_pool_reference[allocated_id].ndb_reference = a_ndb;  m_pool_reference[allocated_id].in_use = true;  m_pool_reference[allocated_id].free_entry = false;  add_free_list(allocated_id);  add_db_hash(allocated_id);  return true;}voidNdbPool::add_free_list(Uint32 id){  assert(!m_pool_reference[id].free_entry);  assert(m_pool_reference[id].in_use);  m_pool_reference[id].free_entry = true;  m_pool_reference[id].next_free_object = m_first_free;  m_pool_reference[id].prev_free_object = (Uint8)NULL_POOL;  m_first_free = (Uint8)id;  if (m_last_free == (Uint8)NULL_POOL) {    m_last_free = (Uint8)id;  }}voidNdbPool::add_db_hash(Uint32 id){  Ndb* t_ndb = m_pool_reference[id].ndb_reference;  const char* schema_name = t_ndb->getSchemaName();  Uint32 hash_entry = compute_hash(schema_name);  Uint8 next_db_entry = m_hash_entry[hash_entry];  m_pool_reference[id].next_db_object = next_db_entry;  m_pool_reference[id].prev_db_object = (Uint8)NULL_HASH;  m_hash_entry[hash_entry] = (Uint8)id;}Ndb*NdbPool::get_free_list(Uint32 &id, Uint32 hash_entry){  if (m_first_free == NULL_POOL) {    return NULL;  }  id = m_first_free;  Ndb* ret_ndb = get_hint_ndb(m_first_free, hash_entry);  assert(ret_ndb != NULL);  return ret_ndb;}Ndb*NdbPool::get_db_hash(Uint32 &id,                     Uint32 hash_entry,                     const char *a_catalog_name,                     const char *a_schema_name){  Uint32 entry_id = m_hash_entry[hash_entry];  bool found = false;  while (entry_id != NULL_HASH) {    Ndb* t_ndb = m_pool_reference[entry_id].ndb_reference;    const char *a_ndb_catalog_name = t_ndb->getCatalogName();    if (strcmp(a_catalog_name, a_ndb_catalog_name) == 0) {      const char *a_ndb_schema_name = t_ndb->getSchemaName();      if (strcmp(a_schema_name, a_ndb_schema_name) == 0) {        found = true;        break;      }    }    entry_id = m_pool_reference[entry_id].next_db_object;  }  if (found) {    id = entry_id;    Ndb* ret_ndb = get_hint_ndb(entry_id, hash_entry);    assert(ret_ndb != NULL);    return ret_ndb;  }  return NULL;}Ndb*NdbPool::get_hint_ndb(Uint32 hint_id, Uint32 hash_entry){  Ndb* ret_ndb = NULL;  do {    if ((hint_id != 0) &&        (hint_id <= m_max_ndb_objects) &&        (m_pool_reference[hint_id].in_use) &&        (m_pool_reference[hint_id].free_entry)) {      ret_ndb = m_pool_reference[hint_id].ndb_reference;      if (ret_ndb != NULL) {        break;      } else {        assert(false);      }    }    return NULL;  } while (1);  /*  This is where we remove the entry from the free list and from the db hash  table.  */  remove_free_list(hint_id);  remove_db_hash(hint_id, hash_entry);  return ret_ndb;}voidNdbPool::remove_free_list(Uint32 id){  Uint8 next_free_entry = m_pool_reference[id].next_free_object;  Uint8 prev_free_entry = m_pool_reference[id].prev_free_object;  if (prev_free_entry == (Uint8)NULL_POOL) {    m_first_free = next_free_entry;  } else {    m_pool_reference[prev_free_entry].next_free_object = next_free_entry;  }  if (next_free_entry == (Uint8)NULL_POOL) {    m_last_free = prev_free_entry;  } else {    m_pool_reference[next_free_entry].prev_free_object = prev_free_entry;  }  m_pool_reference[id].next_free_object = NULL_POOL;  m_pool_reference[id].prev_free_object = NULL_POOL;  m_pool_reference[id].free_entry = false;}voidNdbPool::remove_db_hash(Uint32 id, Uint32 hash_entry){  Uint8 next_free_entry = m_pool_reference[id].next_db_object;  Uint8 prev_free_entry = m_pool_reference[id].prev_db_object;  if (prev_free_entry == (Uint8)NULL_HASH) {    m_hash_entry[hash_entry] = next_free_entry;  } else {    m_pool_reference[prev_free_entry].next_db_object = next_free_entry;  }  if (next_free_entry == (Uint8)NULL_HASH) {    ;  } else {    m_pool_reference[next_free_entry].prev_db_object = prev_free_entry;  }  m_pool_reference[id].next_db_object = NULL_HASH;  m_pool_reference[id].prev_db_object = NULL_HASH;}Uint32NdbPool::compute_hash(const char *a_schema_name){  Uint32 len = strlen(a_schema_name);  Uint32 h = 147;  for (Uint32 i = 0; i < len; i++) {    Uint32 c = a_schema_name[i];    h = (h << 5) + h + c;  }  h &= (POOL_HASH_TABLE_SIZE - 1);  return h;}Ndb*NdbPool::wait_free_ndb(Uint32 &id){  int res;  int time_out = 3500;  do {    NdbCondition* tmp = input_pool_cond;    m_waiting++;    m_input_queue++;    time_out -= 500;    res = NdbCondition_WaitTimeout(input_pool_cond, pool_mutex, time_out);    if (tmp == input_pool_cond) {      m_input_queue--;    } else {      m_output_queue--;      if (m_output_queue == 0) {        switch_condition_queue();      }    }    m_waiting--;  } while (res == 0 && m_first_wait == NULL_POOL);  if (res != 0 && m_first_wait == NULL_POOL) {    return NULL;  }  id = m_first_wait;  remove_wait_list();  assert(m_waiting != 0 || m_first_wait == NULL_POOL);  return m_pool_reference[id].ndb_reference;}voidNdbPool::remove_wait_list(){  Uint32 id = m_first_wait;  m_first_wait = m_pool_reference[id].next_free_object;  m_pool_reference[id].next_free_object = NULL_POOL;  m_pool_reference[id].prev_free_object = NULL_POOL;  m_pool_reference[id].free_entry = false;}voidNdbPool::add_wait_list(Uint32 id){  m_pool_reference[id].next_free_object = m_first_wait;  m_first_wait = id;}voidNdbPool::switch_condition_queue(){  m_signal_count = m_input_queue;  Uint8 move_queue = m_input_queue;  m_input_queue = m_output_queue;  m_output_queue = move_queue;  NdbCondition* move_cond = input_pool_cond;  input_pool_cond = output_pool_cond;  output_pool_cond = move_cond;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -