📄 pgmodule.c
字号:
{ Py_INCREF(self->pgcnx); return (PyObject *) (self->pgcnx); } Py_INCREF(Py_None); return Py_None; } /* large object oid */ if (!strcmp(name, "oid")) { if (check_lo_obj(self, 0)) return PyInt_FromLong(self->lo_oid); Py_INCREF(Py_None); return Py_None; } /* error (status) message */ if (!strcmp(name, "error")) return PyString_FromString(PQerrorMessage(self->pgcnx->cnx)); /* attributes list */ if (!strcmp(name, "__members__")) { PyObject *list = PyList_New(3); if (list) { PyList_SetItem(list, 0, PyString_FromString("oid")); PyList_SetItem(list, 1, PyString_FromString("pgcnx")); PyList_SetItem(list, 2, PyString_FromString("error")); } return list; } /* module name */ if (!strcmp(name, "__module__")) return PyString_FromString(MODULE_NAME); /* class name */ if (!strcmp(name, "__class__")) return PyString_FromString("pglarge"); /* seeks name in methods (fallback) */ return Py_FindMethod(pglarge_methods, (PyObject *) self, name);}/* prints query object in human readable format */static intpglarge_print(pglargeobject * self, FILE *fp, int flags){ char print_buffer[128]; if (self->lo_fd >= 0) { snprintf(print_buffer, sizeof(print_buffer), "Opened large object, oid %ld", (long) self->lo_oid); fputs(print_buffer, fp); } else { snprintf(print_buffer, sizeof(print_buffer), "Closed large object, oid %ld", (long) self->lo_oid); fputs(print_buffer, fp); } return 0;};/* object type definition */staticforward PyTypeObject PglargeType = { PyObject_HEAD_INIT(NULL) 0, /* ob_size */ "pglarge", /* tp_name */ sizeof(pglargeobject), /* tp_basicsize */ 0, /* tp_itemsize */ /* methods */ (destructor) pglarge_dealloc, /* tp_dealloc */ (printfunc) pglarge_print, /* tp_print */ (getattrfunc) pglarge_getattr, /* tp_getattr */ 0, /* tp_setattr */ 0, /* tp_compare */ 0, /* tp_repr */ 0, /* tp_as_number */ 0, /* tp_as_sequence */ 0, /* tp_as_mapping */ 0, /* tp_hash */};#endif /* LARGE_OBJECTS *//* --------------------------------------------------------------------- *//* PG QUERY OBJECT IMPLEMENTATION *//* connects to a database */static char connect__doc__[] ="connect(dbname, host, port, opt, tty) -- connect to a PostgreSQL database ""using specified parameters (optionals, keywords aware).";static PyObject *pgconnect(pgobject * self, PyObject * args, PyObject * dict){ static const char *kwlist[] = {"dbname", "host", "port", "opt", "tty", "user", "passwd", NULL}; char *pghost, *pgopt, *pgtty, *pgdbname, *pguser, *pgpasswd; int pgport; char port_buffer[20]; pgobject *npgobj; pghost = pgopt = pgtty = pgdbname = pguser = pgpasswd = NULL; pgport = -1; /* * parses standard arguments With the right compiler warnings, this * will issue a diagnostic. There is really no way around it. If I * don't declare kwlist as const char *kwlist[] then it complains when * I try to assign all those constant strings to it. */ if (!PyArg_ParseTupleAndKeywords(args, dict, "|zzlzzzz", kwlist, &pgdbname, &pghost, &pgport, &pgopt, &pgtty, &pguser, &pgpasswd)) return NULL;#ifdef DEFAULT_VARS /* handles defaults variables (for unintialised vars) */ if ((!pghost) && (pg_default_host != Py_None)) pghost = PyString_AsString(pg_default_host); if ((pgport == -1) && (pg_default_port != Py_None)) pgport = PyInt_AsLong(pg_default_port); if ((!pgopt) && (pg_default_opt != Py_None)) pgopt = PyString_AsString(pg_default_opt); if ((!pgtty) && (pg_default_tty != Py_None)) pgtty = PyString_AsString(pg_default_tty); if ((!pgdbname) && (pg_default_base != Py_None)) pgdbname = PyString_AsString(pg_default_base); if ((!pguser) && (pg_default_user != Py_None)) pguser = PyString_AsString(pg_default_user); if ((!pgpasswd) && (pg_default_passwd != Py_None)) pgpasswd = PyString_AsString(pg_default_passwd);#endif /* DEFAULT_VARS */ if ((npgobj = PyObject_NEW(pgobject, &PgType)) == NULL) return NULL; if (pgport != -1) { bzero(port_buffer, sizeof(port_buffer)); sprintf(port_buffer, "%d", pgport); npgobj->cnx = PQsetdbLogin(pghost, port_buffer, pgopt, pgtty, pgdbname, pguser, pgpasswd); } else npgobj->cnx = PQsetdbLogin(pghost, NULL, pgopt, pgtty, pgdbname, pguser, pgpasswd); if (PQstatus(npgobj->cnx) == CONNECTION_BAD) { PyErr_SetString(PGError, PQerrorMessage(npgobj->cnx)); Py_XDECREF(npgobj); return NULL; } return (PyObject *) npgobj;}/* pgobject methods *//* destructor */static voidpg_dealloc(pgobject * self){ if (self->cnx) PQfinish(self->cnx); PyMem_DEL(self);}/* close without deleting */static char pg_close__doc__[] ="close() -- close connection. All instances of the connection object and ""derived objects (queries and large objects) can no longer be used after ""this call.";static PyObject *pg_close(pgobject * self, PyObject * args){ /* gets args */ if (!PyArg_ParseTuple(args, "")) { PyErr_SetString(PyExc_TypeError, "close()."); return NULL; } if (self->cnx) PQfinish(self->cnx); self->cnx = NULL; Py_INCREF(Py_None); return Py_None;}static voidpgquery_dealloc(pgqueryobject * self){ if (self->last_result) PQclear(self->last_result); PyMem_DEL(self);}/* resets connection */static char pg_reset__doc__[] ="reset() -- reset connection with current parameters. All derived queries ""and large objects derived from this connection will not be usable after ""this call.";static PyObject *pg_reset(pgobject * self, PyObject * args){ if (!self->cnx) { PyErr_SetString(PyExc_TypeError, "Connection is not valid"); return NULL; } /* checks args */ if (!PyArg_ParseTuple(args, "")) { PyErr_SetString(PyExc_SyntaxError, "method reset() takes no parameters."); return NULL; } /* resets the connection */ PQreset(self->cnx); Py_INCREF(Py_None); return Py_None;}/* get connection socket */static char pg_fileno__doc__[] ="fileno() -- return database connection socket file handle.";static PyObject *pg_fileno(pgobject * self, PyObject * args){ if (!self->cnx) { PyErr_SetString(PyExc_TypeError, "Connection is not valid"); return NULL; } /* checks args */ if (!PyArg_ParseTuple(args, "")) { PyErr_SetString(PyExc_SyntaxError, "method fileno() takes no parameters."); return NULL; }#ifdef NO_PQSOCKET return PyInt_FromLong((long) self->cnx->sock);#else return PyInt_FromLong((long) PQsocket(self->cnx));#endif}/* get number of rows */static char pgquery_ntuples__doc__[] ="ntuples() -- returns number of tuples returned by query.";static PyObject *pgquery_ntuples(pgqueryobject * self, PyObject * args){ /* checks args */ if (!PyArg_ParseTuple(args, "")) { PyErr_SetString(PyExc_SyntaxError, "method ntuples() takes no parameters."); return NULL; } return PyInt_FromLong((long) PQntuples(self->last_result));}/* list fields names from query result */static char pgquery_listfields__doc__[] ="listfields() -- Lists field names from result.";static PyObject *pgquery_listfields(pgqueryobject * self, PyObject * args){ int i, n; char *name; PyObject *fieldstuple, *str; /* checks args */ if (!PyArg_ParseTuple(args, "")) { PyErr_SetString(PyExc_SyntaxError, "method listfields() takes no parameters."); return NULL; } /* builds tuple */ n = PQnfields(self->last_result); fieldstuple = PyTuple_New(n); for (i = 0; i < n; i++) { name = PQfname(self->last_result, i); str = PyString_FromString(name); PyTuple_SetItem(fieldstuple, i, str); } return fieldstuple;}/* get field name from last result */static char pgquery_fieldname__doc__[] ="fieldname() -- returns name of field from result from its position.";static PyObject *pgquery_fieldname(pgqueryobject * self, PyObject * args){ int i; char *name; /* gets args */ if (!PyArg_ParseTuple(args, "i", &i)) { PyErr_SetString(PyExc_TypeError, "fieldname(number), with number(integer)."); return NULL; } /* checks number validity */ if (i >= PQnfields(self->last_result)) { PyErr_SetString(PyExc_ValueError, "invalid field number."); return NULL; } /* gets fields name and builds object */ name = PQfname(self->last_result, i); return PyString_FromString(name);}/* gets fields number from name in last result */static char pgquery_fieldnum__doc__[] ="fieldnum() -- returns position in query for field from its name.";static PyObject *pgquery_fieldnum(pgqueryobject * self, PyObject * args){ char *name; int num; /* gets args */ if (!PyArg_ParseTuple(args, "s", &name)) { PyErr_SetString(PyExc_TypeError, "fieldnum(name), with name (string)."); return NULL; } /* gets field number */ if ((num = PQfnumber(self->last_result, name)) == -1) { PyErr_SetString(PyExc_ValueError, "Unknown field."); return NULL; } return PyInt_FromLong(num);}/* retrieves last result */static char pgquery_getresult__doc__[] ="getresult() -- Gets the result of a query. The result is returned ""as a list of rows, each one a list of fields in the order returned ""by the server.";static PyObject *pgquery_getresult(pgqueryobject * self, PyObject * args){ PyObject *rowtuple, *reslist, *val; int i, j, m, n, *typ; /* checks args (args == NULL for an internal call) */ if ((args != NULL) && (!PyArg_ParseTuple(args, ""))) { PyErr_SetString(PyExc_SyntaxError, "method getresult() takes no parameters."); return NULL; } /* stores result in tuple */ reslist = PyList_New(0); m = PQntuples(self->last_result); n = PQnfields(self->last_result); if ((typ = malloc(sizeof(int) * n)) == NULL) { PyErr_SetString(PyExc_SyntaxError, "memory error in getresult()."); return NULL; } for (j = 0; j < n; j++) { switch (PQftype(self->last_result, j)) { case INT2OID: case INT4OID: case OIDOID: typ[j] = 1; break; case FLOAT4OID: case FLOAT8OID: typ[j] = 2; break; case CASHOID: typ[j] = 3; break; default: typ[j] = 4; break; } } for (i = 0; i < m; i++) { rowtuple = PyTuple_New(n); for (j = 0; j < n; j++) { int k; char *s = PQgetvalue(self->last_result, i, j); char cashbuf[64]; switch (typ[j]) { case 1: val = PyInt_FromLong(strtol(s, NULL, 10)); break; case 2: val = PyFloat_FromDouble(strtod(s, NULL)); break; case 3: /* get rid of the '$' and commas */ if (*s == '$') /* there's talk of getting rid of * it */ s++; if ((s[0] == '-' || s[0] == '(') && s[1] == '$') *(++s) = '-'; for (k = 0; *s; s++) if (*s != ',') cashbuf[k++] = *s; cashbuf[k] = 0; val = PyFloat_FromDouble(strtod(cashbuf, NULL)); break; default: val = PyString_FromString(s); break; } PyTuple_SetItem(rowtuple, j, val); } PyList_Append(reslist, rowtuple); Py_XDECREF(rowtuple); } free(typ); /* returns list */ return reslist;}/* retrieves last result as a list of dictionaries*/static char pgquery_dictresult__doc__[] ="dictresult() -- Gets the result of a query. The result is returned ""as a list of rows, each one a dictionary with the field names used ""as the labels.";static PyObject *pgquery_dictresult(pgqueryobject * self, PyObject * args){ PyObject *dict, *reslist, *val; int i, j, m, n, *typ; /* checks args (args == NULL for an internal call) */ if ((args != NULL) && (!PyArg_ParseTuple(args, ""))) { PyErr_SetString(PyExc_SyntaxError, "method getresult() takes no parameters."); return NULL; } /* stores result in list */ reslist = PyList_New(0); m = PQntuples(self->last_result); n = PQnfields(self->last_result); if ((typ = malloc(sizeof(int) * n)) == NULL) { PyErr_SetString(PyExc_SyntaxError, "memory error in dictresult()."); return NULL; } for (j = 0; j < n; j++) { switch (PQftype(self->last_result, j)) { case INT2OID: case INT4OID: case OIDOID: typ[j] = 1; break; case FLOAT4OID: case FLOAT8OID: typ[j] = 2; break; case CASHOID: typ[j] = 3; break; default: typ[j] = 4; break; } } for (i = 0; i < m; i++) { dict = PyDict_New(); for (j = 0; j < n; j++) { int k;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -