📄 dstore_mysql.c
字号:
/*
This file is part of GNUnet.
(C) 2006, 2007, 2008 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
by the Free Software Foundation; either version 2, or (at your
option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with GNUnet; see the file COPYING. If not, write to the
Free Software Foundation, Inc., 59 Temple Place - Suite 330,
Boston, MA 02111-1307, USA.
*/
/**
* @file applications/dstore_mysql/dstore_mysql.c
* @brief MySQL based implementation of the dstore service
* @author Christian Grothoff
*
* Database: MySQL
*/
#include "platform.h"
#include "gnunet_util.h"
#include "gnunet_dstore_service.h"
#include "gnunet_stats_service.h"
#include <mysql/mysql.h>
#define DEBUG_DSTORE GNUNET_NO
/**
* Maximum size for an individual item.
*/
#define MAX_CONTENT_SIZE 65536
/**
* Bytes used
*/
static unsigned long long payload;
/**
* Maximum bytes available
*/
static unsigned long long quota;
static GNUNET_CoreAPIForPlugins *coreAPI;
static struct GNUNET_Mutex *lock;
/**
* Statistics service.
*/
static GNUNET_Stats_ServiceAPI *stats;
static unsigned int stat_dstore_size;
static unsigned int stat_dstore_quota;
/**
* Estimate of the per-entry overhead (including indices).
*/
#define OVERHEAD ((4*2+4*2+8*2+8*2+sizeof(GNUNET_HashCode)*5+8))
struct GNUNET_BloomFilter *bloom;
static char *bloom_name;
/**
* Path to MySQL configuration file.
*/
static char *cnffile;
/**
* Handle for the MySQL database.
*/
static MYSQL *dbf;
#define SELECT_VALUE_STMT "SELECT size, value FROM gn080dstore FORCE INDEX (hashidx) WHERE hash=? AND type=? AND expire >= ? LIMIT 1 OFFSET ?"
static MYSQL_STMT *select_value;
#define COUNT_VALUE_STMT "SELECT count(*) FROM gn080dstore FORCE INDEX (hashidx) WHERE hash=? AND type=? AND expire >= ?"
static MYSQL_STMT *count_value;
#define SELECT_OLD_VALUE_STMT "SELECT hash, vhash, type, size, value FROM gn080dstore FORCE INDEX (expireidx) ORDER BY puttime ASC LIMIT 1"
static MYSQL_STMT *select_old_value;
#define DELETE_VALUE_STMT "DELETE FROM gn080dstore WHERE hash = ? AND vhash = ? AND type = ? AND "\
"size = ? AND value = ?"
static MYSQL_STMT *delete_value;
#define INSERT_VALUE_STMT "INSERT INTO gn080dstore (size, type, puttime, expire, hash, vhash, value) "\
"VALUES (?, ?, ?, ?, ?, ?, ?)"
static MYSQL_STMT *insert_value;
#define UPDATE_VALUE_STMT "UPDATE gn080dstore FORCE INDEX (allidx) SET puttime=?, expire=? "\
"WHERE hash=? AND vhash=? AND type=? AND size=?"
static MYSQL_STMT *update_value;
/**
* Die with an error message that indicates
* a failure of the command 'cmd' with the message given
* by strerror(errno).
*/
#define DIE_MYSQL(cmd, dbh) do { GNUNET_GE_LOG(coreAPI->ectx, GNUNET_GE_FATAL | GNUNET_GE_ADMIN | GNUNET_GE_IMMEDIATE, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)); abort(); } while(0);
/**
* Log an error message at log-level 'level' that indicates
* a failure of the command 'cmd' on file 'filename'
* with the message given by strerror(errno).
*/
#define LOG_MYSQL(level, cmd, dbh) do { GNUNET_GE_LOG(coreAPI->ectx, level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh))); } while(0);
/**
* Close the database connection.
*/
static int
iclose ()
{
#define PEND(h) if (h != NULL) { mysql_stmt_close(h); h = NULL; } else {}
if (dbf == NULL)
return GNUNET_SYSERR;
PEND (select_value);
PEND (count_value);
PEND (select_old_value);
PEND (delete_value);
PEND (insert_value);
PEND (update_value);
#undef PEND
mysql_close (dbf);
payload = 0;
dbf = NULL;
return GNUNET_OK;
}
/**
* Initiate the database connection.
*
* @return GNUNET_OK on success
*/
static int
iopen ()
{
char *dbname;
my_bool reconnect = 0;
unsigned int timeout = 60; /* in seconds */
if (dbf != NULL)
return GNUNET_OK;
if (cnffile == NULL)
{
GNUNET_GE_BREAK (NULL, 0);
return GNUNET_SYSERR;
}
dbf = mysql_init (NULL);
if (dbf == NULL)
{
GNUNET_GE_BREAK (NULL, 0);
return GNUNET_SYSERR;
}
mysql_options (dbf, MYSQL_READ_DEFAULT_FILE, cnffile);
mysql_options (dbf, MYSQL_READ_DEFAULT_GROUP, "client");
mysql_options (dbf, MYSQL_OPT_RECONNECT, &reconnect);
mysql_options (dbf, MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
mysql_options (dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
mysql_options (dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
dbname = NULL;
GNUNET_GC_get_configuration_value_string (coreAPI->cfg,
"MYSQL", "DATABASE", "gnunet",
&dbname);
mysql_real_connect (dbf, NULL, NULL, NULL, dbname, 0, NULL, 0);
GNUNET_free (dbname);
if (mysql_error (dbf)[0])
{
LOG_MYSQL (GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_BULK,
"mysql_real_connect", dbf);
iclose ();
return GNUNET_SYSERR;
}
mysql_query (dbf,
"SET SESSION net_read_timeout=60, SESSION net_write_timeout=60");
if (mysql_error (dbf)[0])
{
LOG_MYSQL (GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_BULK,
"mysql_query", dbf);
iclose ();
return GNUNET_SYSERR;
}
mysql_query (dbf, "DROP TABLE gn080dstore");
mysql_query (dbf,
"CREATE TEMPORARY TABLE gn080dstore ("
" size INT(11) UNSIGNED NOT NULL DEFAULT 0,"
" type INT(11) UNSIGNED NOT NULL DEFAULT 0,"
" puttime BIGINT UNSIGNED NOT NULL DEFAULT 0,"
" expire BIGINT UNSIGNED NOT NULL DEFAULT 0,"
" hash BINARY(64) NOT NULL DEFAULT '',"
" vhash BINARY(64) NOT NULL DEFAULT '',"
" value BLOB NOT NULL DEFAULT '',"
" INDEX hashidx (hash(64),type,expire),"
" INDEX allidx (hash(64),vhash(64),type,size),"
" INDEX expireidx (puttime)" ") ENGINE=InnoDB");
if (mysql_error (dbf)[0])
{
LOG_MYSQL (GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_BULK,
"mysql_query", dbf);
iclose ();
return GNUNET_SYSERR;
}
mysql_query (dbf, "SET AUTOCOMMIT = 1");
if (mysql_error (dbf)[0])
{
LOG_MYSQL (GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_BULK,
"mysql_query", dbf);
iclose ();
return GNUNET_SYSERR;
}
#define PINIT(a,b) a = mysql_stmt_init(dbf); if (a == NULL) { iclose(); return GNUNET_SYSERR; } else { \
if (mysql_stmt_prepare (a, b, strlen(b))) { \
GNUNET_GE_LOG (coreAPI->ectx, GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER, \
_("`%s' failed at %s:%d with error: %s"), "mysql_stmt_prepare", __FILE__, __LINE__, \
mysql_stmt_error (a)); iclose(); return GNUNET_SYSERR; } }
PINIT (select_value, SELECT_VALUE_STMT);
PINIT (count_value, COUNT_VALUE_STMT);
PINIT (select_old_value, SELECT_OLD_VALUE_STMT);
PINIT (delete_value, DELETE_VALUE_STMT);
PINIT (insert_value, INSERT_VALUE_STMT);
PINIT (update_value, UPDATE_VALUE_STMT);
#undef PINIT
return GNUNET_OK;
}
/**
* Check that we are within quota.
* @return GNUNET_OK if we are, GNUNET_NO if not, GNUNET_SYSERR if
* there was an internal error
*/
static int
checkQuota ()
{
MYSQL_BIND rbind[5];
unsigned int v_size;
unsigned int v_type;
GNUNET_HashCode v_key;
GNUNET_HashCode vhash;
unsigned long k_length;
unsigned long h_length;
unsigned long v_length;
if (payload * 10 <= quota * 9)
return GNUNET_OK; /* we seem to be about 10% off */
#if DEBUG_DSTORE
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER,
"DStore above qutoa (have %llu, allowed %llu), will delete some data.\n",
payload, quota);
#endif
k_length = sizeof (GNUNET_HashCode);
h_length = sizeof (GNUNET_HashCode);
v_length = GNUNET_MAX_BUFFER_SIZE;
memset (rbind, 0, sizeof (rbind));
rbind[0].buffer_type = MYSQL_TYPE_BLOB;
rbind[0].buffer_length = sizeof (GNUNET_HashCode);
rbind[0].length = &k_length;
rbind[0].buffer = &v_key;
rbind[1].buffer_type = MYSQL_TYPE_BLOB;
rbind[1].buffer_length = sizeof (GNUNET_HashCode);
rbind[1].length = &h_length;
rbind[1].buffer = &vhash;
rbind[2].buffer_type = MYSQL_TYPE_LONG;
rbind[2].is_unsigned = 1;
rbind[2].buffer = &v_type;
rbind[3].buffer_type = MYSQL_TYPE_LONG;
rbind[3].is_unsigned = 1;
rbind[3].buffer = &v_size;
rbind[4].buffer_type = MYSQL_TYPE_BLOB;
rbind[4].buffer_length = GNUNET_MAX_BUFFER_SIZE;
rbind[4].length = &v_length;
rbind[4].buffer = GNUNET_malloc (GNUNET_MAX_BUFFER_SIZE);
GNUNET_mutex_lock (lock);
mysql_thread_init ();
if (mysql_stmt_execute (select_old_value))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_execute",
__FILE__, __LINE__, mysql_stmt_error (select_old_value));
GNUNET_free (rbind[4].buffer);
iclose ();
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
GNUNET_GE_ASSERT (coreAPI->ectx,
mysql_stmt_field_count (select_old_value) == 5);
if (mysql_stmt_bind_result (select_old_value, rbind))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_bind_result",
__FILE__, __LINE__, mysql_stmt_error (select_old_value));
GNUNET_free (rbind[4].buffer);
iclose ();
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
if (mysql_stmt_fetch (select_old_value))
{
/* odd -- over quota but cannot select old!? */
GNUNET_free (rbind[4].buffer);
mysql_stmt_reset (select_old_value);
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
mysql_stmt_reset (select_old_value);
if (mysql_stmt_bind_param (delete_value, rbind))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_bind_param",
__FILE__, __LINE__, mysql_stmt_error (delete_value));
GNUNET_free (rbind[4].buffer);
iclose ();
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
GNUNET_GE_BREAK (NULL, h_length == sizeof (GNUNET_HashCode));
if (mysql_stmt_execute (delete_value))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_execute",
__FILE__, __LINE__, mysql_stmt_error (delete_value));
GNUNET_free (rbind[4].buffer);
iclose ();
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
GNUNET_free (rbind[4].buffer);
payload -= v_length + OVERHEAD;
mysql_stmt_reset (delete_value);
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
if (bloom != NULL)
GNUNET_bloomfilter_remove (bloom, &v_key);
if (payload * 10 > quota * 9)
return GNUNET_NO;
return GNUNET_OK;
}
/**
* Store an item in the datastore.
*
* @return GNUNET_OK on success, GNUNET_SYSERR on error
*/
static int
d_put (const GNUNET_HashCode * key,
unsigned int type,
GNUNET_CronTime discard_time, unsigned int size, const char *data)
{
MYSQL_BIND rbind[7];
GNUNET_CronTime now;
unsigned long k_length;
unsigned long h_length;
unsigned long v_length;
GNUNET_HashCode vhash;
if (size > MAX_CONTENT_SIZE)
return GNUNET_SYSERR;
GNUNET_hash (data, size, &vhash);
GNUNET_mutex_lock (lock);
mysql_thread_init ();
iopen ();
now = GNUNET_get_time ();
/* first try UPDATE */
h_length = sizeof (GNUNET_HashCode);
k_length = sizeof (GNUNET_HashCode);
v_length = size;
memset (rbind, 0, sizeof (rbind));
rbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
rbind[0].is_unsigned = 1;
rbind[0].buffer = &now;
rbind[1].buffer_type = MYSQL_TYPE_LONGLONG;
rbind[1].is_unsigned = 1;
rbind[1].buffer = &discard_time;
rbind[2].buffer_type = MYSQL_TYPE_BLOB;
rbind[2].buffer_length = sizeof (GNUNET_HashCode);
rbind[2].length = &k_length;
rbind[2].buffer = (void *) key;
rbind[3].buffer_type = MYSQL_TYPE_BLOB;
rbind[3].buffer_length = sizeof (GNUNET_HashCode);
rbind[3].length = &h_length;
rbind[3].buffer = &vhash;
rbind[4].buffer_type = MYSQL_TYPE_LONG;
rbind[4].is_unsigned = 1;
rbind[4].buffer = &type;
rbind[5].buffer_type = MYSQL_TYPE_LONG;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -