📄 postgres.py
字号:
if self.__is_server_side: # use server-side cursors: # http://lists.initd.org/pipermail/psycopg/2007-January/005251.html ident = "c" + hex(random.randint(0, 65535))[2:] return self._connection.connection.cursor(ident) else: return self._connection.connection.cursor() def get_result_proxy(self): if self.__is_server_side: return base.BufferedRowResultProxy(self) else: return base.ResultProxy(self) def post_exec(self): if self.compiled.isinsert and self.last_inserted_ids is None: if not self.dialect.use_oids: pass # will raise invalid error when they go to get them else: table = self.compiled.statement.table if self.cursor.lastrowid is not None and table is not None and len(table.primary_key): s = sql.select(table.primary_key, table.oid_column == self.cursor.lastrowid) row = self.connection.execute(s).fetchone() self._last_inserted_ids = [v for v in row] super(PGExecutionContext, self).post_exec()class PGDialect(default.DefaultDialect): supports_alter = True supports_unicode_statements = False max_identifier_length = 63 supports_sane_rowcount = True supports_sane_multi_rowcount = False preexecute_pk_sequences = True supports_pk_autoincrement = False def __init__(self, use_oids=False, server_side_cursors=False, **kwargs): default.DefaultDialect.__init__(self, default_paramstyle='pyformat', **kwargs) self.use_oids = use_oids self.server_side_cursors = server_side_cursors self.paramstyle = 'pyformat' def dbapi(cls): import psycopg2 as psycopg return psycopg dbapi = classmethod(dbapi) def create_connect_args(self, url): opts = url.translate_connect_args(username='user') if 'port' in opts: opts['port'] = int(opts['port']) opts.update(url.query) return ([], opts) def create_execution_context(self, *args, **kwargs): return PGExecutionContext(self, *args, **kwargs) def type_descriptor(self, typeobj): return sqltypes.adapt_type(typeobj, colspecs) def do_begin_twophase(self, connection, xid): self.do_begin(connection.connection) def do_prepare_twophase(self, connection, xid): connection.execute(sql.text("PREPARE TRANSACTION :tid", bindparams=[sql.bindparam('tid', xid)])) def do_rollback_twophase(self, connection, xid, is_prepared=True, recover=False): if is_prepared: if recover: #FIXME: ugly hack to get out of transaction context when commiting recoverable transactions # Must find out a way how to make the dbapi not open a transaction. connection.execute(sql.text("ROLLBACK")) connection.execute(sql.text("ROLLBACK PREPARED :tid", bindparams=[sql.bindparam('tid', xid)])) connection.execute(sql.text("BEGIN")) self.do_rollback(connection.connection) else: self.do_rollback(connection.connection) def do_commit_twophase(self, connection, xid, is_prepared=True, recover=False): if is_prepared: if recover: connection.execute(sql.text("ROLLBACK")) connection.execute(sql.text("COMMIT PREPARED :tid", bindparams=[sql.bindparam('tid', xid)])) connection.execute(sql.text("BEGIN")) self.do_rollback(connection.connection) else: self.do_commit(connection.connection) def do_recover_twophase(self, connection): resultset = connection.execute(sql.text("SELECT gid FROM pg_prepared_xacts")) return [row[0] for row in resultset] def get_default_schema_name(self, connection): if not hasattr(self, '_default_schema_name'): self._default_schema_name = connection.scalar("select current_schema()", None) return self._default_schema_name def last_inserted_ids(self): if self.context.last_inserted_ids is None: raise exceptions.InvalidRequestError("no INSERT executed, or can't use cursor.lastrowid without Postgres OIDs enabled") else: return self.context.last_inserted_ids def oid_column_name(self, column): if self.use_oids: return "oid" else: return None def has_table(self, connection, table_name, schema=None): # seems like case gets folded in pg_class... if schema is None: cursor = connection.execute("""select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=current_schema() and lower(relname)=%(name)s""", {'name':table_name.lower().encode(self.encoding)}); else: cursor = connection.execute("""select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=%(schema)s and lower(relname)=%(name)s""", {'name':table_name.lower().encode(self.encoding), 'schema':schema}); return bool( not not cursor.rowcount ) def has_sequence(self, connection, sequence_name): cursor = connection.execute('''SELECT relname FROM pg_class WHERE relkind = 'S' AND relnamespace IN ( SELECT oid FROM pg_namespace WHERE nspname NOT LIKE 'pg_%%' AND nspname != 'information_schema' AND relname = %(seqname)s);''', {'seqname': sequence_name.encode(self.encoding)}) return bool(not not cursor.rowcount) def is_disconnect(self, e): if isinstance(e, self.dbapi.OperationalError): return 'closed the connection' in str(e) or 'connection not open' in str(e) elif isinstance(e, self.dbapi.InterfaceError): return 'connection already closed' in str(e) elif isinstance(e, self.dbapi.ProgrammingError): # yes, it really says "losed", not "closed" return "losed the connection unexpectedly" in str(e) else: return False def table_names(self, connection, schema): s = """ SELECT relname FROM pg_class c WHERE relkind = 'r' AND '%(schema)s' = (select nspname from pg_namespace n where n.oid = c.relnamespace) """ % locals() return [row[0].decode(self.encoding) for row in connection.execute(s)] def server_version_info(self, connection): v = connection.execute("select version()").scalar() m = re.match('PostgreSQL (\d+)\.(\d+)\.(\d+)', v) if not m: raise exceptions.AssertionError("Could not determine version from string '%s'" % v) return tuple([int(x) for x in m.group(1, 2, 3)]) def reflecttable(self, connection, table, include_columns): preparer = self.identifier_preparer if table.schema is not None: schema_where_clause = "n.nspname = :schema" schemaname = table.schema if isinstance(schemaname, str): schemaname = schemaname.decode(self.encoding) else: schema_where_clause = "pg_catalog.pg_table_is_visible(c.oid)" schemaname = None SQL_COLS = """ SELECT a.attname, pg_catalog.format_type(a.atttypid, a.atttypmod), (SELECT substring(d.adsrc for 128) FROM pg_catalog.pg_attrdef d WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef) AS DEFAULT, a.attnotnull, a.attnum, a.attrelid as table_oid FROM pg_catalog.pg_attribute a WHERE a.attrelid = ( SELECT c.oid FROM pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE (%s) AND c.relname = :table_name AND c.relkind in ('r','v') ) AND a.attnum > 0 AND NOT a.attisdropped ORDER BY a.attnum """ % schema_where_clause s = sql.text(SQL_COLS, bindparams=[sql.bindparam('table_name', type_=sqltypes.Unicode), sql.bindparam('schema', type_=sqltypes.Unicode)], typemap={'attname':sqltypes.Unicode, 'default':sqltypes.Unicode}) tablename = table.name if isinstance(tablename, str): tablename = tablename.decode(self.encoding) c = connection.execute(s, table_name=tablename, schema=schemaname) rows = c.fetchall() if not rows: raise exceptions.NoSuchTableError(table.name) domains = self._load_domains(connection) for name, format_type, default, notnull, attnum, table_oid in rows: if include_columns and name not in include_columns: continue ## strip (30) from character varying(30) attype = re.search('([^\([]+)', format_type).group(1) nullable = not notnull is_array = format_type.endswith('[]') try: charlen = re.search('\(([\d,]+)\)', format_type).group(1) except: charlen = False numericprec = False numericscale = False if attype == 'numeric': if charlen is False: numericprec, numericscale = (None, None) else: numericprec, numericscale = charlen.split(',') charlen = False if attype == 'double precision': numericprec, numericscale = (53, False) charlen = False if attype == 'integer': numericprec, numericscale = (32, 0) charlen = False args = [] for a in (charlen, numericprec, numericscale): if a is None: args.append(None) elif a is not False: args.append(int(a)) kwargs = {} if attype == 'timestamp with time zone': kwargs['timezone'] = True elif attype == 'timestamp without time zone': kwargs['timezone'] = False if attype in ischema_names: coltype = ischema_names[attype] else: if attype in domains: domain = domains[attype] if domain['attype'] in ischema_names: # A table can't override whether the domain is nullable. nullable = domain['nullable'] if domain['default'] and not default: # It can, however, override the default value, but can't set it to null. default = domain['default'] coltype = ischema_names[domain['attype']] else: coltype=None if coltype: coltype = coltype(*args, **kwargs) if is_array: coltype = PGArray(coltype) else: util.warn("Did not recognize type '%s' of column '%s'" % (attype, name)) coltype = sqltypes.NULLTYPE colargs= [] if default is not None: match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -