⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 postgres.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
        if self.__is_server_side:            # use server-side cursors:            # http://lists.initd.org/pipermail/psycopg/2007-January/005251.html            ident = "c" + hex(random.randint(0, 65535))[2:]            return self._connection.connection.cursor(ident)        else:            return self._connection.connection.cursor()    def get_result_proxy(self):        if self.__is_server_side:            return base.BufferedRowResultProxy(self)        else:            return base.ResultProxy(self)    def post_exec(self):        if self.compiled.isinsert and self.last_inserted_ids is None:            if not self.dialect.use_oids:                pass                # will raise invalid error when they go to get them            else:                table = self.compiled.statement.table                if self.cursor.lastrowid is not None and table is not None and len(table.primary_key):                    s = sql.select(table.primary_key, table.oid_column == self.cursor.lastrowid)                    row = self.connection.execute(s).fetchone()                self._last_inserted_ids = [v for v in row]        super(PGExecutionContext, self).post_exec()class PGDialect(default.DefaultDialect):    supports_alter = True    supports_unicode_statements = False    max_identifier_length = 63    supports_sane_rowcount = True    supports_sane_multi_rowcount = False    preexecute_pk_sequences = True    supports_pk_autoincrement = False    def __init__(self, use_oids=False, server_side_cursors=False, **kwargs):        default.DefaultDialect.__init__(self, default_paramstyle='pyformat', **kwargs)        self.use_oids = use_oids        self.server_side_cursors = server_side_cursors        self.paramstyle = 'pyformat'    def dbapi(cls):        import psycopg2 as psycopg        return psycopg    dbapi = classmethod(dbapi)    def create_connect_args(self, url):        opts = url.translate_connect_args(username='user')        if 'port' in opts:            opts['port'] = int(opts['port'])        opts.update(url.query)        return ([], opts)    def create_execution_context(self, *args, **kwargs):        return PGExecutionContext(self, *args, **kwargs)    def type_descriptor(self, typeobj):        return sqltypes.adapt_type(typeobj, colspecs)    def do_begin_twophase(self, connection, xid):        self.do_begin(connection.connection)    def do_prepare_twophase(self, connection, xid):        connection.execute(sql.text("PREPARE TRANSACTION :tid", bindparams=[sql.bindparam('tid', xid)]))    def do_rollback_twophase(self, connection, xid, is_prepared=True, recover=False):        if is_prepared:            if recover:                #FIXME: ugly hack to get out of transaction context when commiting recoverable transactions                # Must find out a way how to make the dbapi not open a transaction.                connection.execute(sql.text("ROLLBACK"))            connection.execute(sql.text("ROLLBACK PREPARED :tid", bindparams=[sql.bindparam('tid', xid)]))            connection.execute(sql.text("BEGIN"))            self.do_rollback(connection.connection)        else:            self.do_rollback(connection.connection)    def do_commit_twophase(self, connection, xid, is_prepared=True, recover=False):        if is_prepared:            if recover:                connection.execute(sql.text("ROLLBACK"))            connection.execute(sql.text("COMMIT PREPARED :tid", bindparams=[sql.bindparam('tid', xid)]))            connection.execute(sql.text("BEGIN"))            self.do_rollback(connection.connection)        else:            self.do_commit(connection.connection)    def do_recover_twophase(self, connection):        resultset = connection.execute(sql.text("SELECT gid FROM pg_prepared_xacts"))        return [row[0] for row in resultset]    def get_default_schema_name(self, connection):        if not hasattr(self, '_default_schema_name'):            self._default_schema_name = connection.scalar("select current_schema()", None)        return self._default_schema_name    def last_inserted_ids(self):        if self.context.last_inserted_ids is None:            raise exceptions.InvalidRequestError("no INSERT executed, or can't use cursor.lastrowid without Postgres OIDs enabled")        else:            return self.context.last_inserted_ids    def oid_column_name(self, column):        if self.use_oids:            return "oid"        else:            return None    def has_table(self, connection, table_name, schema=None):        # seems like case gets folded in pg_class...        if schema is None:            cursor = connection.execute("""select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=current_schema() and lower(relname)=%(name)s""", {'name':table_name.lower().encode(self.encoding)});        else:            cursor = connection.execute("""select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=%(schema)s and lower(relname)=%(name)s""", {'name':table_name.lower().encode(self.encoding), 'schema':schema});        return bool( not not cursor.rowcount )    def has_sequence(self, connection, sequence_name):        cursor = connection.execute('''SELECT relname FROM pg_class WHERE relkind = 'S' AND relnamespace IN ( SELECT oid FROM pg_namespace WHERE nspname NOT LIKE 'pg_%%' AND nspname != 'information_schema' AND relname = %(seqname)s);''', {'seqname': sequence_name.encode(self.encoding)})        return bool(not not cursor.rowcount)    def is_disconnect(self, e):        if isinstance(e, self.dbapi.OperationalError):            return 'closed the connection' in str(e) or 'connection not open' in str(e)        elif isinstance(e, self.dbapi.InterfaceError):            return 'connection already closed' in str(e)        elif isinstance(e, self.dbapi.ProgrammingError):            # yes, it really says "losed", not "closed"            return "losed the connection unexpectedly" in str(e)        else:            return False    def table_names(self, connection, schema):        s = """        SELECT relname        FROM pg_class c        WHERE relkind = 'r'          AND '%(schema)s' = (select nspname from pg_namespace n where n.oid = c.relnamespace)        """ % locals()        return [row[0].decode(self.encoding) for row in connection.execute(s)]    def server_version_info(self, connection):        v = connection.execute("select version()").scalar()        m = re.match('PostgreSQL (\d+)\.(\d+)\.(\d+)', v)        if not m:            raise exceptions.AssertionError("Could not determine version from string '%s'" % v)        return tuple([int(x) for x in m.group(1, 2, 3)])    def reflecttable(self, connection, table, include_columns):        preparer = self.identifier_preparer        if table.schema is not None:            schema_where_clause = "n.nspname = :schema"            schemaname = table.schema            if isinstance(schemaname, str):                schemaname = schemaname.decode(self.encoding)        else:            schema_where_clause = "pg_catalog.pg_table_is_visible(c.oid)"            schemaname = None        SQL_COLS = """            SELECT a.attname,              pg_catalog.format_type(a.atttypid, a.atttypmod),              (SELECT substring(d.adsrc for 128) FROM pg_catalog.pg_attrdef d               WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum AND a.atthasdef)              AS DEFAULT,              a.attnotnull, a.attnum, a.attrelid as table_oid            FROM pg_catalog.pg_attribute a            WHERE a.attrelid = (                SELECT c.oid                FROM pg_catalog.pg_class c                     LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace                     WHERE (%s)                     AND c.relname = :table_name AND c.relkind in ('r','v')            ) AND a.attnum > 0 AND NOT a.attisdropped            ORDER BY a.attnum        """ % schema_where_clause        s = sql.text(SQL_COLS, bindparams=[sql.bindparam('table_name', type_=sqltypes.Unicode), sql.bindparam('schema', type_=sqltypes.Unicode)], typemap={'attname':sqltypes.Unicode, 'default':sqltypes.Unicode})        tablename = table.name        if isinstance(tablename, str):            tablename = tablename.decode(self.encoding)        c = connection.execute(s, table_name=tablename, schema=schemaname)        rows = c.fetchall()        if not rows:            raise exceptions.NoSuchTableError(table.name)        domains = self._load_domains(connection)        for name, format_type, default, notnull, attnum, table_oid in rows:            if include_columns and name not in include_columns:                continue            ## strip (30) from character varying(30)            attype = re.search('([^\([]+)', format_type).group(1)            nullable = not notnull            is_array = format_type.endswith('[]')            try:                charlen = re.search('\(([\d,]+)\)', format_type).group(1)            except:                charlen = False            numericprec = False            numericscale = False            if attype == 'numeric':                if charlen is False:                    numericprec, numericscale = (None, None)                else:                    numericprec, numericscale = charlen.split(',')                charlen = False            if attype == 'double precision':                numericprec, numericscale = (53, False)                charlen = False            if attype == 'integer':                numericprec, numericscale = (32, 0)                charlen = False            args = []            for a in (charlen, numericprec, numericscale):                if a is None:                    args.append(None)                elif a is not False:                    args.append(int(a))            kwargs = {}            if attype == 'timestamp with time zone':                kwargs['timezone'] = True            elif attype == 'timestamp without time zone':                kwargs['timezone'] = False            if attype in ischema_names:                coltype = ischema_names[attype]            else:                if attype in domains:                    domain = domains[attype]                    if domain['attype'] in ischema_names:                        # A table can't override whether the domain is nullable.                        nullable = domain['nullable']                        if domain['default'] and not default:                            # It can, however, override the default value, but can't set it to null.                            default = domain['default']                        coltype = ischema_names[domain['attype']]                else:                    coltype=None            if coltype:                coltype = coltype(*args, **kwargs)                if is_array:                    coltype = PGArray(coltype)            else:                util.warn("Did not recognize type '%s' of column '%s'" %                          (attype, name))                coltype = sqltypes.NULLTYPE            colargs= []            if default is not None:                match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -