📄 dstore_mysql.c
字号:
rbind[5].is_unsigned = 1;
rbind[5].buffer = &size;
if (mysql_stmt_bind_param (update_value, rbind))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_bind_param",
__FILE__, __LINE__, mysql_stmt_error (update_value));
iclose ();
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
if (mysql_stmt_execute (update_value))
{
mysql_stmt_reset (update_value);
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
return GNUNET_OK;
}
mysql_stmt_reset (update_value);
/* now try INSERT */
h_length = sizeof (GNUNET_HashCode);
k_length = sizeof (GNUNET_HashCode);
v_length = size;
memset (rbind, 0, sizeof (rbind));
rbind[0].buffer_type = MYSQL_TYPE_LONG;
rbind[0].is_unsigned = 1;
rbind[0].buffer = &size;
rbind[1].buffer_type = MYSQL_TYPE_LONG;
rbind[1].is_unsigned = 1;
rbind[1].buffer = &type;
rbind[2].buffer_type = MYSQL_TYPE_LONGLONG;
rbind[2].is_unsigned = 1;
rbind[2].buffer = &now;
rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
rbind[3].is_unsigned = 1;
rbind[3].buffer = &discard_time;
rbind[4].buffer_type = MYSQL_TYPE_BLOB;
rbind[4].buffer_length = sizeof (GNUNET_HashCode);
rbind[4].length = &k_length;
rbind[4].buffer = (void *) key;
rbind[5].buffer_type = MYSQL_TYPE_BLOB;
rbind[5].buffer_length = sizeof (GNUNET_HashCode);
rbind[5].length = &h_length;
rbind[5].buffer = &vhash;
rbind[6].buffer_type = MYSQL_TYPE_BLOB;
rbind[6].buffer_length = size;
rbind[6].length = &v_length;
rbind[6].buffer = (void *) data;
if (mysql_stmt_bind_param (insert_value, rbind))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_bind_param",
__FILE__, __LINE__, mysql_stmt_error (insert_value));
iclose ();
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
if (mysql_stmt_execute (insert_value))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_execute",
__FILE__, __LINE__, mysql_stmt_error (insert_value));
iclose ();
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
mysql_stmt_reset (insert_value);
mysql_thread_end ();
if (bloom != NULL)
GNUNET_bloomfilter_add (bloom, key);
payload += size + OVERHEAD;
GNUNET_mutex_unlock (lock);
checkQuota ();
if (stats != NULL)
stats->set (stat_dstore_size, payload);
return GNUNET_OK;
}
/**
* Iterate over the results for a particular key
* in the datastore.
*
* @param key
* @param type entries of which type are relevant?
* @param iter maybe NULL (to just count)
* @return the number of results, GNUNET_SYSERR if the
* iter is non-NULL and aborted the iteration
*/
static int
d_get (const GNUNET_HashCode * key,
unsigned int type, GNUNET_ResultProcessor handler, void *closure)
{
MYSQL_BIND qbind[4];
MYSQL_BIND rbind[2];
unsigned int v_size;
unsigned long h_length;
unsigned long v_length;
GNUNET_CronTime now;
unsigned int cnt;
unsigned long long total;
unsigned int off;
GNUNET_mutex_lock (lock);
if ((bloom != NULL) && (GNUNET_NO == GNUNET_bloomfilter_test (bloom, key)))
{
GNUNET_mutex_unlock (lock);
return 0;
}
#if DEBUG_DSTORE
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_DEVELOPER,
"dstore processes get\n");
#endif
now = GNUNET_get_time ();
h_length = sizeof (GNUNET_HashCode);
v_length = GNUNET_MAX_BUFFER_SIZE;
memset (qbind, 0, sizeof (qbind));
qbind[0].buffer_type = MYSQL_TYPE_BLOB;
qbind[0].buffer_length = sizeof (GNUNET_HashCode);
qbind[0].length = &h_length;
qbind[0].buffer = (void *) key;
qbind[1].buffer_type = MYSQL_TYPE_LONG;
qbind[1].is_unsigned = 1;
qbind[1].buffer = &type;
qbind[2].buffer_type = MYSQL_TYPE_LONGLONG;
qbind[2].is_unsigned = 1;
qbind[2].buffer = &now;
total = -1;
memset (rbind, 0, sizeof (rbind));
rbind[0].buffer_type = MYSQL_TYPE_LONGLONG;
rbind[0].buffer = &total;
rbind[0].is_unsigned = GNUNET_YES;
mysql_thread_init ();
iopen ();
if (mysql_stmt_bind_param (count_value, qbind))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_bind_param",
__FILE__, __LINE__, mysql_stmt_error (count_value));
iclose ();
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
if (mysql_stmt_execute (count_value))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_execute",
__FILE__, __LINE__, mysql_stmt_error (count_value));
iclose ();
GNUNET_mutex_unlock (lock);
mysql_thread_end ();
return GNUNET_SYSERR;
}
if (mysql_stmt_bind_result (count_value, rbind))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_bind_result",
__FILE__, __LINE__, mysql_stmt_error (count_value));
iclose ();
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
if (0 != mysql_stmt_fetch (count_value))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_fetch",
__FILE__, __LINE__, mysql_stmt_error (count_value));
mysql_stmt_reset (count_value);
iclose ();
GNUNET_mutex_unlock (lock);
mysql_thread_end ();
return GNUNET_SYSERR;
}
if (-1 == total)
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_fetch",
__FILE__, __LINE__, mysql_stmt_error (count_value));
iclose ();
GNUNET_mutex_unlock (lock);
mysql_thread_end ();
return GNUNET_SYSERR;
}
mysql_stmt_reset (count_value);
if ((handler == NULL) || (total == 0))
{
GNUNET_mutex_unlock (lock);
mysql_thread_end ();
return (int) total;
}
off = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, total);
qbind[3].buffer_type = MYSQL_TYPE_LONG;
qbind[3].is_unsigned = 1;
qbind[3].buffer = &off;
cnt = 0;
while (cnt < total)
{
off = (off + 1) % total;
if (mysql_stmt_bind_param (select_value, qbind))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_bind_param",
__FILE__, __LINE__, mysql_stmt_error (select_value));
iclose ();
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
if (mysql_stmt_execute (select_value))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_execute",
__FILE__, __LINE__, mysql_stmt_error (select_value));
iclose ();
GNUNET_mutex_unlock (lock);
mysql_thread_end ();
return GNUNET_SYSERR;
}
memset (rbind, 0, sizeof (rbind));
rbind[0].buffer_type = MYSQL_TYPE_LONG;
rbind[0].is_unsigned = 1;
rbind[0].buffer = &v_size;
rbind[1].buffer_type = MYSQL_TYPE_BLOB;
rbind[1].buffer_length = GNUNET_MAX_BUFFER_SIZE;
rbind[1].length = &v_length;
rbind[1].buffer = GNUNET_malloc (GNUNET_MAX_BUFFER_SIZE);
if (mysql_stmt_bind_result (select_value, rbind))
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
_("`%s' failed at %s:%d with error: %s\n"),
"mysql_stmt_bind_result",
__FILE__, __LINE__, mysql_stmt_error (select_value));
iclose ();
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
GNUNET_free (rbind[1].buffer);
return GNUNET_SYSERR;
}
if (0 != mysql_stmt_fetch (select_value))
{
GNUNET_GE_BREAK (NULL, 0);
break;
}
if (v_length != v_size)
{
GNUNET_GE_BREAK (NULL, 0);
iclose ();
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
GNUNET_free (rbind[1].buffer);
return cnt;
}
cnt++;
if (GNUNET_OK != handler (key, type, v_size, rbind[1].buffer, closure))
break;
}
mysql_stmt_reset (select_value);
mysql_thread_end ();
GNUNET_mutex_unlock (lock);
GNUNET_free (rbind[1].buffer);
return cnt;
}
GNUNET_Dstore_ServiceAPI *
provide_module_dstore_mysql (GNUNET_CoreAPIForPlugins * capi)
{
static GNUNET_Dstore_ServiceAPI api;
int fd;
#ifndef WINDOWS
struct passwd *pw;
#endif
size_t nX;
char *home_dir;
coreAPI = capi;
#if DEBUG_SQLITE
GNUNET_GE_LOG (capi->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
"MySQL Dstore: initializing database\n");
#endif
/* verify that .my.cnf can be found */
#ifndef WINDOWS
pw = getpwuid (getuid ());
if (!pw)
GNUNET_GE_DIE_STRERROR (coreAPI->ectx,
GNUNET_GE_FATAL | GNUNET_GE_ADMIN |
GNUNET_GE_IMMEDIATE, "getpwuid");
home_dir = GNUNET_strdup (pw->pw_dir);
#else
home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1);
plibc_conv_to_win_path ("~/", home_dir);
#endif
nX = strlen (home_dir) + 10;
cnffile = GNUNET_malloc (nX);
GNUNET_snprintf (cnffile, nX, "%s/.my.cnf", home_dir);
GNUNET_free (home_dir);
GNUNET_GC_get_configuration_value_filename (capi->cfg,
"MYSQL", "CONFIG", cnffile,
&home_dir);
GNUNET_free (cnffile);
cnffile = home_dir;
GNUNET_GE_ASSERT (NULL, cnffile != NULL);
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
_("Trying to use file `%s' for MySQL configuration.\n"),
cnffile);
if (iopen () != GNUNET_OK)
{
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_ERROR | GNUNET_GE_IMMEDIATE | GNUNET_GE_USER,
_
("Failed to initialize MySQL database connection for dstore.\n"),
cnffile);
GNUNET_free (cnffile);
return NULL;
}
lock = GNUNET_mutex_create (GNUNET_NO);
api.get = &d_get;
api.put = &d_put;
GNUNET_GC_get_configuration_value_number (coreAPI->cfg,
"DSTORE", "QUOTA", 1, 1024, 1,
"a);
if (quota == 0) /* error */
quota = 1;
quota *= 1024 * 1024;
bloom_name = GNUNET_strdup ("/tmp/dbloomXXXXXX");
fd = mkstemp (bloom_name);
if (fd != -1)
{
bloom = GNUNET_bloomfilter_load (coreAPI->ectx, bloom_name, quota / (OVERHEAD + 1024), /* 8 bit per entry in DB, expect 1k entries */
5);
CLOSE (fd);
}
stats = capi->service_request ("stats");
if (stats != NULL)
{
stat_dstore_size = stats->create (gettext_noop ("# bytes in dstore"));
stat_dstore_quota =
stats->create (gettext_noop ("# max bytes allowed in dstore"));
stats->set (stat_dstore_quota, quota);
}
return &api;
}
/**
* Shutdown the module.
*/
void
release_module_dstore_mysql ()
{
if (bloom != NULL)
{
GNUNET_bloomfilter_free (bloom);
bloom = NULL;
}
UNLINK (bloom_name);
GNUNET_free (bloom_name);
bloom_name = NULL;
if (stats != NULL)
{
coreAPI->service_release (stats);
stats = NULL;
}
#if DEBUG_SQLITE
GNUNET_GE_LOG (coreAPI->ectx,
GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
"MySQL Dstore: database shutdown\n");
#endif
GNUNET_mutex_destroy (lock);
coreAPI = NULL;
GNUNET_free (cnffile);
cnffile = NULL;
}
/* end of dstore_mysql.c */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -