📄 ha_memcache.cc
字号:
for (Field **field=table->field ; *field ; field++) { if (!((*field)->is_null())) ptr=(unsigned char*) (*field)->pack((unsigned char*) ptr, (unsigned char*) record + (*field)->offset(record)); } rec_buff_length= (size_t)(ptr - rec_buff); } else { xml_row_st engine; xml_row_st myrow, *rowptr; char *string; char content_buffer[1024]; String content(content_buffer, sizeof(content_buffer), &my_charset_bin); content.length(0); xml_row_create(&myrow); for (Field **field=table->field ; *field ; field++) { (*field)->val_str(&content); if ((*field)->is_null()) { xml_row_field_add(&myrow, (char *)(*field)->field_name, NULL, 0, XML_ENGINE_IS_NULL); } else { (*field)->val_str(&content); xml_row_field_add(&myrow, (char *)(*field)->field_name, content.c_ptr_safe(), content.length(), XML_ENGINE_NOT_NULL); } } string= xml_row_serialize(&myrow, &rec_buff_length); if (fix_rec_buff(rec_buff_length)) DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */ memcpy(rec_buff, string, rec_buff_length); xmlFree(string); xml_row_close(&myrow); } DBUG_RETURN(rec_buff_length);}void ha_memcache::unpack_row(unsigned char *record, char *reciever, size_t reciever_length){ /* Copy null bits */ if (!row_format) { const unsigned char *ptr= (const unsigned char *)reciever; memcpy(record, ptr, table->s->null_bytes); ptr+= table->s->null_bytes; for (Field **field=table->field ; *field ; field++) if (!((*field)->is_null())) { ptr= (*field)->unpack((unsigned char *)record + (*field)->offset(table->record[0]), ptr); } } else { xml_row_return rc; xml_row_st myrow; xml_row_field_st *row_ptr; xml_row_create(&myrow); if ((rc == xml_row_set(&myrow, reciever, reciever_length)) == XML_ROW_FAILURE) return; Field **field=table->field; row_ptr= xml_row_read(&myrow); for (; row_ptr; row_ptr= row_ptr->next) { (*field)->store((char *)row_ptr->str, row_ptr->length, &my_charset_bin); (*field)->set_notnull(); field++; } xml_row_close(&myrow); }}int ha_memcache::write_row(unsigned char *buf){ memcached_return rc; DBUG_ENTER("ha_memcache::write_row"); Field *key_field= table->key_info[table->s->primary_key].key_part->field; my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set); ha_statistic_increment(&SSV::ha_write_count); if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) table->timestamp_field->set_time(); pack_row(buf); make_key(key_field); rc= memcached_add(memc, current_key, current_key_length, (char *)rec_buff, rec_buff_length, (time_t)0, (uint16_t)0); dbug_tmp_restore_column_map(table->read_set, org_bitmap); switch (rc) { case MEMCACHED_NOTSTORED: DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY); case MEMCACHED_SUCCESS: DBUG_RETURN(0); default: DBUG_RETURN(-1); }}int ha_memcache::rnd_next(unsigned char *buf){ DBUG_ENTER("ha_memcache::rnd_next"); DBUG_RETURN(HA_ERR_END_OF_FILE);}void ha_memcache::position(const unsigned char *record){ DBUG_ENTER("ha_memcache::position"); unsigned int key_length; Field *key_field= table->key_info[table->s->primary_key].key_part->field; key_length= key_field->data_length(); memcpy(ref, &key_length, sizeof(unsigned int)); memcpy(ref+sizeof(unsigned int), key_field->ptr, key_length); DBUG_VOID_RETURN;}void ha_memcache::make_key(Field *primary){ char *end_ptr= key_built_buffer; char attribute_buffer[1024]; String attribute(attribute_buffer, sizeof(attribute_buffer), &my_charset_bin); DBUG_ENTER("ha_memcache::make_key"); current_key= key_built_buffer; primary->val_str(&attribute, &attribute); if (!share->native) { memcpy(key_built_buffer, table->s->db.str, table->s->db.length); end_ptr+= table->s->db.length; memcpy(end_ptr, table->s->table_name.str, table->s->table_name.length); end_ptr+= table->s->table_name.length; } memcpy(end_ptr, attribute.ptr(), attribute.length()); end_ptr+= attribute.length(); current_key_length= (size_t)(end_ptr - key_built_buffer); DBUG_VOID_RETURN;}int ha_memcache::find_row(unsigned char *buf, const unsigned char *key, uint key_len){ DBUG_ENTER("ha_memcache::find_row"); size_t ret_length= 0; char *ret; uint16_t flags; memcached_return rc; Field *key_field= table->key_info[table->s->primary_key].key_part->field; my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set); my_bitmap_map *old_write_map= dbug_tmp_use_all_columns(table, table->write_set); key_field->set_key_image(key, key_len); key_field->set_notnull(); dbug_tmp_restore_column_map(table->write_set, old_write_map); /* Now we fetch the key we will use */ make_key(key_field); ret= (char *)memcached_get(memc, (char *)current_key, current_key_length, &ret_length, &flags, &rc); if (rc == MEMCACHED_SUCCESS) { unpack_row(buf, ret, ret_length); free(ret); } dbug_tmp_restore_column_map(table->read_set, old_map); DBUG_RETURN((rc == MEMCACHED_SUCCESS) ? 0 : HA_ERR_END_OF_FILE);}/* Used for ::open */unsigned int ha_memcache::find_primary_key_length(void){ DBUG_ENTER("ha_memcache::find_primary_key_length"); unsigned int key_length; Field *key_field= table->key_info[table->s->primary_key].key_part->field; key_length= key_field->max_packed_col_length(key_field->field_length); if (!share->native) { key_length+= table->s->db.length; key_length+= table->s->table_name.length; } DBUG_RETURN(key_length);}/* This is like rnd_next, but you are given a position to use to determine the row. The position will be of the type that you stored in ref. You can use ha_get_ptr(pos,ref_length) to retrieve whatever key or position you saved when position() was called. Called from filesort.cc records.cc sql_insert.cc sql_select.cc sql_update.cc.*/int ha_memcache::rnd_pos(unsigned char * buf, unsigned char *pos){ int ret; unsigned int key_length; DBUG_ENTER("ha_memcache::rnd_pos"); memcpy(&key_length, pos, sizeof(unsigned int)); ret= find_row(buf, pos + sizeof(unsigned int), key_length); DBUG_RETURN(ret); }/**/int ha_memcache::info(uint flag){ DBUG_ENTER("ha_memcache::info"); /* Without setting this to something high we won't get IN() to work */ stats.records= 300000; DBUG_RETURN(0);}/**/THR_LOCK_DATA **ha_memcache::store_lock(THD *thd, THR_LOCK_DATA **to, enum thr_lock_type lock_type){ if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) { /* If we are not doing a LOCK TABLE, then allow multiple writers */ /* Since MEMCACHED does not currently have table locks this is treated as a ordinary lock */ if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && lock_type <= TL_WRITE) && !(thd_in_lock_tables(thd))) lock_type= TL_WRITE_ALLOW_WRITE; /* In queries of type INSERT INTO t1 SELECT ... FROM t2 ... MySQL would use the lock TL_READ_NO_INSERT on t2, and that would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts to t2. Convert the lock to a normal read lock to allow concurrent inserts to t2. */ if (lock_type == TL_READ_NO_INSERT && !(thd_in_lock_tables(thd))) lock_type= TL_READ; lock.type=lock_type; } *to++= &lock; return to;}/* create() is called to create a database. The variable name will have the name of the table. When create() is called you do not need to worry about opening the table. Also, the FRM file will have already been created so adjusting create_info will not do you any good. You can overwrite the frm file at this point if you wish to change the table definition, but there are no methods currently provided for doing that. Called from handle.cc by ha_create_table().*/int ha_memcache::create(const char *name, TABLE *table_arg, HA_CREATE_INFO *create_info){ unsigned int count; DBUG_ENTER("ha_memcache::create"); File frm_file; /* File handler for readers */ MY_STAT file_stat; // Stat information for the data file unsigned char *frm_ptr; char name_buff[FN_REFLEN]; fn_format(name_buff, name, "", ".frm", MY_REPLACE_EXT | MY_UNPACK_FILENAME); if (open_connection(create_info->connect_string.str, create_info->connect_string.length)) DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE); /* Here is where we open up the frm and pass it to memcache to store */ if ((frm_file= my_open(name_buff, O_RDONLY, MYF(0))) > 0) { if (!my_fstat(frm_file, &file_stat, MYF(MY_WME))) { frm_ptr= (unsigned char *)my_malloc(sizeof(unsigned char) * file_stat.st_size , MYF(0)); if (frm_ptr) { memcached_return rc; my_read(frm_file, frm_ptr, file_stat.st_size, MYF(0)); // Save it for discovery rc= memcached_set(memc, (char *)table_arg->s->table_name.str, table_arg->s->table_name.length, (char *)frm_ptr, file_stat.st_size, 0, 0); my_free((unsigned char *)frm_ptr, MYF(0)); } /* Warning should go here that discovery will not work */ } my_close(frm_file, MYF(0)); } close_connection(); DBUG_RETURN(0);}int memcache_discover(handlerton *hton, THD* thd, const char *db, const char *name, unsigned char** frmblob, size_t* frmlen){ DBUG_ENTER("memcache_discover"); DBUG_PRINT("memcache_discover", ("db: %s, name: %s", db, name)); size_t ret_length= 0; char *frm_ptr; char *ret; pthread_mutex_lock(&memcache_server_mutex); for (uint x= 0; x < memcache_server_hash.records; ++x) { memcached_return rc; memcached_st *dmc; uint16_t flags; memcache_server_st *server= (memcache_server_st *) hash_element(&memcache_server_hash, x); dmc= memcached_create(NULL); rc= memcached_server_add(dmc, server->server_name, 0); ret= memcached_get(dmc, (char *)name, strlen(name), &ret_length, &flags, &rc); if (ret_length) { *frmblob= (unsigned char *)my_strndup((const char *)ret, ret_length, MYF(0)); free(ret); *frmlen= ret_length; break; } memcached_free(dmc); } pthread_mutex_unlock(&memcache_server_mutex); if (ret_length) DBUG_RETURN(0); my_errno= 0; DBUG_RETURN(1);}static int fill_memcache_server_schema(THD *thd, TABLE_LIST *tables, COND *cond){ CHARSET_INFO *scs= system_charset_info; TABLE *table= (TABLE *) tables->table; memcached_st *memc_local; unsigned int x; DBUG_ENTER("fill_memcache_server_schema"); int returnable= 0; pthread_mutex_lock(&memcache_server_mutex); for (x= 0; x < memcache_server_hash.records; ++x) { memcached_return rc; memcached_st *memc_local; memcached_stat_st *stats; memcache_server_st *server= (memcache_server_st *) hash_element(&memcache_server_hash, x); memc_local= memcached_create(NULL); rc= memcached_server_add(memc_local, server->server_name, 0); stats= memcached_stat(memc_local, NULL, &rc); table->field[0]->store(server->server_name, server->server_name_length, scs); table->field[1]->store(server->use_count); table->field[2]->store(stats->curr_items); table->field[3]->store(stats->total_items);// table->field[4]->store(stats->u_chars); table->field[5]->store(stats->curr_connections); table->field[6]->store(stats->total_connections); table->field[7]->store(stats->connection_structures); table->field[8]->store(stats->cmd_get); table->field[9]->store(stats->cmd_set); table->field[10]->store(stats->get_hits); table->field[11]->store(stats->get_misses);// table->field[12]->store(stats->uchars_read);// table->field[13]->store(stats->uchars_written);// table->field[14]->store(stats->limit_maxunsigned chars); memcached_stat_free(memc_local, stats); memcached_free(memc_local); if (schema_table_store_record(thd, table)) { returnable= 1; break; } } pthread_mutex_unlock(&memcache_server_mutex); DBUG_RETURN(returnable);}ST_FIELD_INFO memcache_server_field_info[]={ {"NAME", 120, MYSQL_TYPE_STRING, 0, 0, "Name"}, {"COUNT", 4, MYSQL_TYPE_LONG, 0, 0, "Count"}, {"CURRENT_ITEMS", 4, MYSQL_TYPE_LONG, 0, 0, "Current Items"}, {"TOTAL_ITEMS", 4, MYSQL_TYPE_LONG, 0, 0, "Total Items"}, {"BYTES", 4, MYSQL_TYPE_LONG, 0, 0, "Bytes"}, {"CURRENT_CONNECTIONS", 4, MYSQL_TYPE_LONG, 0, 0, "Current Connections"}, {"TOTAL_CONNECTIONS", 4, MYSQL_TYPE_LONG, 0, 0, "Total Connections"}, {"CONNECTION_STRUCTURES", 4, MYSQL_TYPE_LONG, 0, 0, "Connection Structure"}, {"GETS", 4, MYSQL_TYPE_LONG, 0, 0, "Gets"}, {"SETS", 4, MYSQL_TYPE_LONG, 0, 0, "Sets"}, {"HITS", 4, MYSQL_TYPE_LONG, 0, 0, "Hits"}, {"MISSES", 4, MYSQL_TYPE_LONG, 0, 0, "Misses"}, {"BYTES_READ", 4, MYSQL_TYPE_LONG, 0, 0, "Bytes Read"}, {"BYTES_WRITTEN", 4, MYSQL_TYPE_LONG, 0, 0, "Bytes Written"}, {"LIMIT_MAXBYTES", 4, MYSQL_TYPE_LONG, 0, 0, "Limit Maxunsigned chars"}, {0, 0, MYSQL_TYPE_STRING, 0, 0, 0}};static int is_init(void *p){ DBUG_ENTER("is_init"); ST_SCHEMA_TABLE *schema= (ST_SCHEMA_TABLE *)p; struct st_plugin_int *plugin= (struct st_plugin_int *)p; schema->fields_info= memcache_server_field_info; schema->fill_table= fill_memcache_server_schema; DBUG_RETURN(0);}static int is_deinit(void *p){ DBUG_ENTER("is_deinit"); DBUG_RETURN(0);}static MYSQL_SYSVAR_STR(row_format, row_format, PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY, "Row format to use, native(default) or xml.", NULL, NULL, NULL);static struct st_mysql_sys_var* system_variables[]= { MYSQL_SYSVAR(row_format), NULL};struct st_mysql_storage_engine memcache_storage_engine={ MYSQL_HANDLERTON_INTERFACE_VERSION };struct st_mysql_information_schema memcache_server_is={ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION };mysql_declare_plugin(memcache){ MYSQL_STORAGE_ENGINE_PLUGIN, &memcache_storage_engine, "MEMCACHE", "Brian Aker, Tangent Org", "Simple Interface for working with memcache as a storage engine", PLUGIN_LICENSE_GPL, init_func, /* Plugin Init */ done_func, /* Plugin Deinit */ 0x0006, NULL, /* status variables */ NULL, /* system variables */ NULL /* config options */},{ MYSQL_INFORMATION_SCHEMA_PLUGIN, &memcache_server_is, "memcache_servers", "Brian Aker", "Active Memached Servers.", PLUGIN_LICENSE_GPL, is_init, /* Plugin Init */ is_deinit, /* Plugin Deinit */ 0x0007, NULL, /* status variables */ system_variables, /* system variables */ NULL /* config options */}mysql_declare_plugin_end;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -