storage_pgsql.c

来自「这是一个完全开放的」· C语言 代码 · 共 639 行 · 第 1/2 页

C
639
字号
        }        PQclear(res);    }    if(_st_pgsql_put_guts(drv, type, owner, os) != st_SUCCESS) {        if(data->txn)            PQclear(PQexec(data->conn, "ROLLBACK;"));        return st_FAILED;    }    if(data->txn) {        res = PQexec(data->conn, "COMMIT;");        if(PQresultStatus(res) != PGRES_COMMAND_OK && PQstatus(data->conn) != CONNECTION_OK) {            log_write(drv->st->sm->log, LOG_ERR, "pgsql: lost connection to database, attempting reconnect");            PQclear(res);            PQreset(data->conn);            res = PQexec(data->conn, "COMMIT;");        }        if(PQresultStatus(res) != PGRES_COMMAND_OK) {            log_write(drv->st->sm->log, LOG_ERR, "pgsql: sql transaction commit failed: %s", PQresultErrorMessage(res));            PQclear(res);            PQclear(PQexec(data->conn, "ROLLBACK;"));            return st_FAILED;        }        PQclear(res);    }    return st_SUCCESS;}static st_ret_t _st_pgsql_get(st_driver_t drv, const char *type, const char *owner, const char *filter, os_t *os) {    drvdata_t data = (drvdata_t) drv->private;    char *cond, *buf = NULL;    int buflen = 0;    PGresult *res;    int ntuples, nfields, i, j;    os_object_t o;    char *fname, *val;    os_type_t ot;    int ival;    nad_t nad;    char tbuf[128];    if(data->prefix != NULL) {        snprintf(tbuf, sizeof(tbuf), "%s%s", data->prefix, type);        type = tbuf;    }    cond = _st_pgsql_convert_filter(drv, owner, filter);    log_debug(ZONE, "generated filter: %s", cond);    PGSQL_SAFE(buf, strlen(type) + strlen(cond) + 51, buflen);    sprintf(buf, "SELECT * FROM \"%s\" WHERE %s ORDER BY \"object-sequence\";", type, cond);    free(cond);    log_debug(ZONE, "prepared sql: %s", buf);    res = PQexec(data->conn, buf);    if(PQresultStatus(res) != PGRES_TUPLES_OK && PQstatus(data->conn) != CONNECTION_OK) {        log_write(drv->st->sm->log, LOG_ERR, "pgsql: lost connection to database, attempting reconnect");        PQclear(res);        PQreset(data->conn);        res = PQexec(data->conn, buf);    }    free(buf);    if(PQresultStatus(res) != PGRES_TUPLES_OK) {        log_write(drv->st->sm->log, LOG_ERR, "pgsql: sql select failed: %s", PQresultErrorMessage(res));        PQclear(res);        return st_FAILED;    }    ntuples = PQntuples(res);    if(ntuples == 0) {        PQclear(res);        return st_NOTFOUND;    }    log_debug(ZONE, "%d tuples returned", ntuples);    nfields = PQnfields(res);    if(nfields == 0) {        log_debug(ZONE, "weird, tuples were returned but no fields *shrug*");        PQclear(res);        return st_NOTFOUND;    }    *os = os_new();    for(i = 0; i < ntuples; i++) {        o = os_object_new(*os);        for(j = 0; j < nfields; j++) {            fname = PQfname(res, j);            if(strcmp(fname, "collection-owner") == 0 || strcmp(fname, "object-sequence") == 0)                continue;            switch(PQftype(res, j)) {                case 16:    /* boolean */                    ot = os_type_BOOLEAN;                    break;                case 23:    /* integer */                    ot = os_type_INTEGER;                    break;                case 25:    /* text */                    ot = os_type_STRING;                    break;                default:                    log_debug(ZONE, "unknown oid %d, ignoring it", PQfname(res, j));                    continue;            }            if(PQgetisnull(res, i, j))                continue;            val = PQgetvalue(res, i, j);            switch(ot) {                case os_type_BOOLEAN:                    ival = (val[0] == 't') ? 1 : 0;                    os_object_put(o, fname, &ival, ot);                    break;                case os_type_INTEGER:                    ival = atoi(val);                    os_object_put(o, fname, &ival, ot);                    break;                case os_type_STRING:                    os_object_put(o, fname, val, os_type_STRING);                    break;            }        }    }    PQclear(res);    return st_SUCCESS;}static st_ret_t _st_pgsql_delete(st_driver_t drv, const char *type, const char *owner, const char *filter) {    drvdata_t data = (drvdata_t) drv->private;    char *cond, *buf = NULL;    int buflen = 0;    PGresult *res;    char tbuf[128];    if(data->prefix != NULL) {        snprintf(tbuf, sizeof(tbuf), "%s%s", data->prefix, type);        type = tbuf;    }    cond = _st_pgsql_convert_filter(drv, owner, filter);    log_debug(ZONE, "generated filter: %s", cond);    PGSQL_SAFE(buf, strlen(type) + strlen(cond) + 23, buflen);    sprintf(buf, "DELETE FROM \"%s\" WHERE %s;", type, cond);    free(cond);    log_debug(ZONE, "prepared sql: %s", buf);    res = PQexec(data->conn, buf);    if(PQresultStatus(res) != PGRES_COMMAND_OK && PQstatus(data->conn) != CONNECTION_OK) {        log_write(drv->st->sm->log, LOG_ERR, "pgsql: lost connection to database, attempting reconnect");        PQclear(res);        PQreset(data->conn);        res = PQexec(data->conn, buf);    }    free(buf);    if(PQresultStatus(res) != PGRES_COMMAND_OK) {        log_write(drv->st->sm->log, LOG_ERR, "pgsql: sql delete failed: %s", PQresultErrorMessage(res));        PQclear(res);        return st_FAILED;    }    PQclear(res);    return st_SUCCESS;}static st_ret_t _st_pgsql_replace(st_driver_t drv, const char *type, const char *owner, const char *filter, os_t os) {    drvdata_t data = (drvdata_t) drv->private;    PGresult *res;    if(data->txn) {        res = PQexec(data->conn, "BEGIN;");        if(PQresultStatus(res) != PGRES_COMMAND_OK && PQstatus(data->conn) != CONNECTION_OK) {            log_write(drv->st->sm->log, LOG_ERR, "pgsql: lost connection to database, attempting reconnect");            PQclear(res);            PQreset(data->conn);            res = PQexec(data->conn, "BEGIN;");        }        if(PQresultStatus(res) != PGRES_COMMAND_OK) {            log_write(drv->st->sm->log, LOG_ERR, "pgsql: sql transaction begin failed: %s", PQresultErrorMessage(res));            PQclear(res);            return st_FAILED;        }        PQclear(res);        res = PQexec(data->conn, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;");        if(PQresultStatus(res) != PGRES_COMMAND_OK && PQstatus(data->conn) != CONNECTION_OK) {            log_write(drv->st->sm->log, LOG_ERR, "pgsql: lost connection to database, attempting reconnect");            PQclear(res);            PQreset(data->conn);            res = PQexec(data->conn, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;");        }        if(PQresultStatus(res) != PGRES_COMMAND_OK) {            log_write(drv->st->sm->log, LOG_ERR, "pgsql: sql transaction setup failed: %s", PQresultErrorMessage(res));            PQclear(res);            PQclear(PQexec(data->conn, "ROLLBACK;"));            return st_FAILED;        }        PQclear(res);    }    if(_st_pgsql_delete(drv, type, owner, filter) == st_FAILED) {        if(data->txn)            PQclear(PQexec(data->conn, "ROLLBACK;"));        return st_FAILED;    }    if(_st_pgsql_put_guts(drv, type, owner, os) == st_FAILED) {        if(data->txn)            PQclear(PQexec(data->conn, "ROLLBACK;"));        return st_FAILED;    }    if(data->txn) {        res = PQexec(data->conn, "COMMIT;");        if(PQresultStatus(res) != PGRES_COMMAND_OK && PQstatus(data->conn) != CONNECTION_OK) {            log_write(drv->st->sm->log, LOG_ERR, "pgsql: lost connection to database, attempting reconnect");            PQclear(res);            PQreset(data->conn);            res = PQexec(data->conn, "COMMIT;");        }        if(PQresultStatus(res) != PGRES_COMMAND_OK) {            log_write(drv->st->sm->log, LOG_ERR, "pgsql: sql transaction commit failed: %s", PQresultErrorMessage(res));            PQclear(res);            PQclear(PQexec(data->conn, "ROLLBACK;"));            return st_FAILED;        }        PQclear(res);    }    return st_SUCCESS;}static void _st_pgsql_free(st_driver_t drv) {    drvdata_t data = (drvdata_t) drv->private;    PQfinish(data->conn);    xhash_free(data->filters);    free(data);}st_ret_t st_pgsql_init(st_driver_t drv) {    char *host, *port, *dbname, *user, *pass;    PGconn *conn;    drvdata_t data;    host = config_get_one(drv->st->sm->config, "storage.pgsql.host", 0);    port = config_get_one(drv->st->sm->config, "storage.pgsql.port", 0);    dbname = config_get_one(drv->st->sm->config, "storage.pgsql.dbname", 0);    user = config_get_one(drv->st->sm->config, "storage.pgsql.user", 0);    pass = config_get_one(drv->st->sm->config, "storage.pgsql.pass", 0);    if(host == NULL || port == NULL || dbname == NULL || user == NULL || pass == NULL) {        log_write(drv->st->sm->log, LOG_ERR, "pgsql: invalid driver config");        return st_FAILED;    }    conn = PQsetdbLogin(host, port, NULL, NULL, dbname, user, pass);    if(conn == NULL) {        log_write(drv->st->sm->log, LOG_ERR, "pgsql: unable to allocate database connection state");        return st_FAILED;    }    if(PQstatus(conn) != CONNECTION_OK)        log_write(drv->st->sm->log, LOG_ERR, "pgsql: connection to database failed: %s", PQerrorMessage(conn));    data = (drvdata_t) malloc(sizeof(struct drvdata_st));    memset(data, 0, sizeof(struct drvdata_st));    data->conn = conn;    data->filters = xhash_new(17);    if(config_get_one(drv->st->sm->config, "storage.pgsql.transactions", 0) != NULL)        data->txn = 1;    else        log_write(drv->st->sm->log, LOG_WARNING, "pgsql: transactions disabled");    data->prefix = config_get_one(drv->st->sm->config, "storage.pgsql.prefix", 0);    drv->private = (void *) data;    drv->add_type = _st_pgsql_add_type;    drv->put = _st_pgsql_put;    drv->get = _st_pgsql_get;    drv->delete = _st_pgsql_delete;    drv->replace = _st_pgsql_replace;    drv->free = _st_pgsql_free;    return st_SUCCESS;}#endif

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?