📄 ha_memcache.cc
字号:
/* Copyright (C) 2006 Brian Aker, TangentOrg This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA *//* The memcache engine allows you to expose simple functions as tables to the storage engine interface. Happy coding! -Brian*/#ifdef USE_PRAGMA_IMPLEMENTATION#pragma implementation // gcc: Class implementation#endif#define MYSQL_SERVER 1#include "mysql_priv.h"#include "ha_memcache.h"#include <my_dir.h>#include <mysql/plugin.h>#include <mysql.h>/* Global variables */static char* row_format= NULL;/* prototypes */static handler* memcache_create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root);static int init_func(void *p);bool memcache_show_status(handlerton *hton, THD* thd, stat_print_fn *stat_print, enum ha_stat_type stat_type);bool schema_table_store_record(THD *thd, TABLE *table);int memcache_discover(handlerton *hton, THD* thd, const char *db, const char *name, unsigned char** frmblob, size_t* frmlen);/* Variables for memcache share methods */static HASH memcache_open_tables; // Hash used to track open tablespthread_mutex_t memcache_mutex; // This is the mutex we use to init the hash/* Variables for memcache server table */static HASH memcache_server_hash; // Hash used to track open tablespthread_mutex_t memcache_server_mutex; // This is the mutex we use to init the hash/* Function we use in the creation of our hash to get key.*/static unsigned char* memcache_get_key(MEMCACHE_SHARE *share, uint *length, my_bool not_used __attribute__((unused))){ *length=share->table_name_length; return (unsigned char*) share->table_name;}static unsigned char* memcache_server_get_key(memcache_server_st *server, uint *length, my_bool not_used __attribute__((unused))){ *length= server->server_name_length; return (unsigned char*) server->server_name;}static int init_func(void *p){ DBUG_ENTER("init_func"); handlerton *memcache_hton= (handlerton *)p; xml_row_startup(); VOID(pthread_mutex_init(&memcache_mutex,MY_MUTEX_INIT_FAST)); (void) hash_init(&memcache_open_tables,system_charset_info,32,0,0, (hash_get_key) memcache_get_key,0,0); VOID(pthread_mutex_init(&memcache_server_mutex,MY_MUTEX_INIT_FAST)); (void) hash_init(&memcache_server_hash,system_charset_info,32,0,0, (hash_get_key) memcache_server_get_key,0,0); memcache_hton->state= SHOW_OPTION_YES; memcache_hton->create= memcache_create_handler; memcache_hton->discover= memcache_discover; memcache_hton->flags= HTON_CAN_RECREATE; DBUG_RETURN(0);}static int done_func(void *p){ int error= 0; DBUG_ENTER("done_func"); if (memcache_open_tables.records) error= 1; hash_free(&memcache_open_tables); pthread_mutex_destroy(&memcache_mutex); hash_free(&memcache_server_hash); pthread_mutex_destroy(&memcache_server_mutex); xml_row_shutdown(); DBUG_RETURN(0);}/* Example of simple lock controls. The "share" it creates is structure we will pass to each memcache handler. Do you have to have one of these? Well, you have pieces that are used for locking, and they are needed to function.*/static MEMCACHE_SHARE *get_share(const char *table_name, TABLE *table){ MEMCACHE_SHARE *share; uint length; char *tmp_name; pthread_mutex_lock(&memcache_mutex); length=(uint) strlen(table_name); if (!(share=(MEMCACHE_SHARE*) hash_search(&memcache_open_tables, (unsigned char*) table_name, length))) { if (!(share=(MEMCACHE_SHARE *) my_multi_malloc(MYF(MY_WME | MY_ZEROFILL), &share, sizeof(*share), &tmp_name, length+1, NullS))) { pthread_mutex_unlock(&memcache_mutex); return NULL; } share->use_count=0; if (!memcmp("NATIVE", table->s->comment.str, sizeof("NATIVE"))) share->native= TRUE; share->use_count=0; share->table_name_length=length; share->table_name=tmp_name; strmov(share->table_name,table_name); if (my_hash_insert(&memcache_open_tables, (unsigned char*) share)) goto error; thr_lock_init(&share->lock); pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST); } share->use_count++; pthread_mutex_unlock(&memcache_mutex); return share;error: pthread_mutex_destroy(&share->mutex); my_free((unsigned char *) share, MYF(0)); return NULL;}/* Free lock controls. We call this whenever we close a table. If the table had the last reference to the share then we free memory associated with it.*/static int free_share(MEMCACHE_SHARE *share){ pthread_mutex_lock(&memcache_mutex); if (!--share->use_count) { hash_delete(&memcache_open_tables, (unsigned char*) share); thr_lock_delete(&share->lock); pthread_mutex_destroy(&share->mutex); my_free((unsigned char *) share, MYF(0)); } pthread_mutex_unlock(&memcache_mutex); return 0;}memcache_server_st *get_server(char *server_name, uint server_name_length){ memcache_server_st *server; pthread_mutex_lock(&memcache_server_mutex); if (!(server=(memcache_server_st *) hash_search(&memcache_server_hash, (unsigned char*) server_name, server_name_length))) { server= (memcache_server_st *) my_malloc(sizeof(memcache_server_st), MYF(MY_WME | MY_ZEROFILL)); server->server_name= my_strndup(server_name, server_name_length, MYF(0)); server->server_name_length= server_name_length; if (my_hash_insert(&memcache_server_hash, (unsigned char*) server)) goto error; } server->use_count++; pthread_mutex_unlock(&memcache_server_mutex); return server;error: my_free((unsigned char *) server, MYF(0)); return NULL;}static int free_server(memcache_server_st *server){ pthread_mutex_lock(&memcache_server_mutex); if (!--server->use_count) { hash_delete(&memcache_server_hash, (unsigned char*) server); my_free((unsigned char *) server, MYF(0)); } pthread_mutex_unlock(&memcache_server_mutex); return 0;}static handler* memcache_create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root){ return new (mem_root) ha_memcache(hton, table);}ha_memcache::ha_memcache(handlerton *hton, TABLE_SHARE *table_arg) :handler(hton, table_arg){ rec_buff= NULL; alloced_rec_buff_length= 0;}/* If frm_error() is called then we will use this to to find out what file extentions exist for the storage engine. This is also used by the default rename_table and delete_table method in handler.cc.*/static const char *ha_memcache_exts[] = { NullS};const char **ha_memcache::bas_ext() const{ return ha_memcache_exts;}/* Used for opening tables. The name will be the name of the file. A table is opened when it needs to be opened. For instance when a request comes in for a select on the table (tables are not open and closed for each request, they are cached). Called from handler.cc by handler::ha_open(). The server opens all tables by calling ha_open() which then calls the handler specific open().*/int ha_memcache::open(const char *name, int mode, uint test_if_locked){ char buffer[1024]; char *begin, *end, *ptr; DBUG_ENTER("ha_memcache::open"); if (open_connection(table->s->connect_string.str, table->s->connect_string.length)) DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE); if (!(share = get_share(name, table))) DBUG_RETURN(1); thr_lock_data_init(&share->lock,&lock,NULL); ref_length= find_primary_key_length() + 4; DBUG_RETURN(0);}void ha_memcache::close_connection(void){ DBUG_ENTER("ha_memcache::close_connection"); memcached_free(memc); DBUG_VOID_RETURN;}int ha_memcache::open_connection(char *connect_string, unsigned int length){ DBUG_ENTER("ha_memcache::open_connection"); if (!length) DBUG_RETURN(1); memc= memcached_create(NULL);#ifdef NOTDONE FOREIGN_SERVER *server; if ((server= get_server_by_name(connect_string))) { ptr=begin= server->host; end= begin + strlen(server->host); } else#endif { memcached_return rc; memcached_server_st *servers; printf("servers %s\n", connect_string); servers= memcached_servers_parse(connect_string); unsigned int x; for (x= 0; x < memcached_server_list_count(servers); x++) { printf("\t%s : %u\n", servers[x].hostname, servers[x].port); } printf("\n"); rc= memcached_server_push(memc, servers); assert(rc == MEMCACHED_SUCCESS); memcached_server_list_free(servers); } DBUG_RETURN(0);}/* Key lookup*/ha_rows ha_memcache::records_in_range(uint inx, key_range *min_key, key_range *max_key){ DBUG_ENTER("ha_memcache::records_in_range"); DBUG_RETURN((ha_rows)1); }int ha_memcache::index_read(unsigned char *buf, const unsigned char *key, uint key_len, enum ha_rkey_function find_flag){ DBUG_ENTER("ha_memcache::index_read"); unsigned int rc= 0; ha_statistic_increment(&SSV::ha_read_key_count); rc= find_row(buf, key, key_len); DBUG_RETURN(rc);}int ha_memcache::read_range_first(const key_range *start_key, const key_range *end_key, bool eq_range, bool sorted){ DBUG_ENTER("ha_memcache::read_range_first"); int ret; /* We are not fetching a row, we are checking for existence */ ret= find_row(table->record[0], start_key->key, start_key->length); DBUG_RETURN(ret); }int ha_memcache::read_range_next(){ DBUG_ENTER("ha_memcache::read_range_next"); DBUG_RETURN(HA_ERR_END_OF_FILE); }#ifdef NOT_DONEconst COND* ha_memcache::cond_push(const COND *cond) { DBUG_ENTER("cond_push"); DBUG_RETURN(0);}#endifint ha_memcache::update_row(const unsigned char *old_data, unsigned char *new_data){ memcached_return rc; char prev_key[2048]; size_t prev_key_length; DBUG_ENTER("ha_memcache::update_row"); Field *key_field= table->key_info[table->s->primary_key].key_part->field; my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set); if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) table->timestamp_field->set_time(); ha_statistic_increment(&SSV::ha_update_count); pack_row(new_data); /* We need to save our old key to make sure that the primary key has not been updated if it has we will need to delete first. */ memcpy(prev_key, current_key, current_key_length); prev_key_length= current_key_length; make_key(key_field); if (current_key_length != prev_key_length || memcmp(current_key, prev_key, current_key_length)) { /* This can fail if a concurrent delete happens */ rc= memcached_add(memc, current_key, current_key_length, (char *)rec_buff, rec_buff_length, 0, 0); if (rc == MEMCACHED_SUCCESS) /* If we fail at this, it means someone has already updated the key */ rc= memcached_delete(memc, prev_key, prev_key_length, (time_t)0); } else { /* This can fail if a concurrent delete happens */ rc= memcached_replace(memc, current_key, current_key_length, (char *)rec_buff, rec_buff_length, 0, 0); } dbug_tmp_restore_column_map(table->read_set, org_bitmap); /* If rc is anything other then success then something went wrong, how to handle this needs to be thought out. */ DBUG_RETURN((rc == MEMCACHED_SUCCESS) ? 0 : -1);}int ha_memcache::delete_row(const unsigned char *buf){ memcached_return rc; DBUG_ENTER("ha_memcache::delete_row"); Field *key_field= table->key_info[table->s->primary_key].key_part->field; make_key(key_field); rc= memcached_delete(memc, current_key, current_key_length, (time_t)0); DBUG_RETURN((rc == MEMCACHED_SUCCESS) ? 0 : -1);}/* Closes a table. We call the free_share() function to free any resources that we have allocated in the "shared" structure. Called from sql_base.cc, sql_select.cc, and table.cc. In sql_select.cc it is only used to close up temporary tables or during the process where a temporary table is converted over to being a myisam table. For sql_base.cc look at close_data_tables().*/int ha_memcache::close(void){ DBUG_ENTER("ha_memcache::close"); close_connection(); DBUG_RETURN(free_share(share));}int ha_memcache::rnd_init(bool scan){ DBUG_ENTER("ha_memcache::rnd_init"); DBUG_RETURN(0);}int ha_memcache::rnd_end(){ DBUG_ENTER("ha_memcache::rnd_end"); DBUG_RETURN(0);}/* Reallocate buffer if needed */bool ha_memcache::fix_rec_buff(size_t length){ DBUG_ENTER("ha_memcache::fix_rec_buff"); if (! rec_buff || length > alloced_rec_buff_length) { unsigned char *newptr; if (!(newptr= (unsigned char*)my_realloc((unsigned char *) rec_buff, length, MYF(MY_ALLOW_ZERO_PTR)))) DBUG_RETURN(1); /* purecov: inspected */ rec_buff= newptr; alloced_rec_buff_length= length; } DBUG_RETURN(0);}/* Calculate max length needed for row *//* Memory optimization would be to calculate Field completely */size_t ha_memcache::max_row_length(void){ size_t length= table->s->reclength + table->s->fields*2; uint *ptr, *end; DBUG_ENTER("ha_memcache::max_row_length"); for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ; ptr != end ; ptr++) { length += 2 + ((Field_blob*)table->field[*ptr])->get_length(); } DBUG_RETURN(length);}size_t ha_memcache::pack_row(unsigned char *record){ DBUG_ENTER("ha_memcache::pack_row"); if (!row_format) { unsigned char *ptr; if (fix_rec_buff(max_row_length())) DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */ /* Copy null bits */ memcpy(rec_buff, record, table->s->null_bytes); ptr= rec_buff + table->s->null_bytes;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -