📄 postgres.py
字号:
# postgres.py# Copyright (C) 2005, 2006, 2007, 2008 Michael Bayer mike_mp@zzzcomputing.com## This module is part of SQLAlchemy and is released under# the MIT License: http://www.opensource.org/licenses/mit-license.php"""Support for the PostgreSQL database.PostgreSQL supports partial indexes. To create them pass a posgres_whereoption to the Index constructor:: Index('my_index', my_table.c.id, postgres_where=tbl.c.value > 10)PostgreSQL 8.2+ supports returning a result set from inserts and updates.To use this pass the column/expression list to the postgres_returningparameter when creating the queries:: raises = tbl.update(empl.c.sales > 100, values=dict(salary=empl.c.salary * 1.1), postgres_returning=[empl.c.id, empl.c.salary]).execute().fetchall()"""import random, re, stringfrom sqlalchemy import sql, schema, exceptions, utilfrom sqlalchemy.engine import base, defaultfrom sqlalchemy.sql import compiler, expressionfrom sqlalchemy.sql import operators as sql_operatorsfrom sqlalchemy import types as sqltypesclass PGInet(sqltypes.TypeEngine): def get_col_spec(self): return "INET"class PGMacAddr(sqltypes.TypeEngine): def get_col_spec(self): return "MACADDR"class PGNumeric(sqltypes.Numeric): def get_col_spec(self): if not self.precision: return "NUMERIC" else: return "NUMERIC(%(precision)s, %(length)s)" % {'precision': self.precision, 'length' : self.length} def bind_processor(self, dialect): return None def result_processor(self, dialect): if self.asdecimal: return None else: def process(value): if isinstance(value, util.decimal_type): return float(value) else: return value return processclass PGFloat(sqltypes.Float): def get_col_spec(self): if not self.precision: return "FLOAT" else: return "FLOAT(%(precision)s)" % {'precision': self.precision}class PGInteger(sqltypes.Integer): def get_col_spec(self): return "INTEGER"class PGSmallInteger(sqltypes.Smallinteger): def get_col_spec(self): return "SMALLINT"class PGBigInteger(PGInteger): def get_col_spec(self): return "BIGINT"class PGDateTime(sqltypes.DateTime): def get_col_spec(self): return "TIMESTAMP " + (self.timezone and "WITH" or "WITHOUT") + " TIME ZONE"class PGDate(sqltypes.Date): def get_col_spec(self): return "DATE"class PGTime(sqltypes.Time): def get_col_spec(self): return "TIME " + (self.timezone and "WITH" or "WITHOUT") + " TIME ZONE"class PGInterval(sqltypes.TypeEngine): def get_col_spec(self): return "INTERVAL"class PGText(sqltypes.Text): def get_col_spec(self): return "TEXT"class PGString(sqltypes.String): def get_col_spec(self): return "VARCHAR(%(length)s)" % {'length' : self.length}class PGChar(sqltypes.CHAR): def get_col_spec(self): return "CHAR(%(length)s)" % {'length' : self.length}class PGBinary(sqltypes.Binary): def get_col_spec(self): return "BYTEA"class PGBoolean(sqltypes.Boolean): def get_col_spec(self): return "BOOLEAN"class PGArray(sqltypes.Concatenable, sqltypes.TypeEngine): def __init__(self, item_type): if isinstance(item_type, type): item_type = item_type() self.item_type = item_type def dialect_impl(self, dialect, **kwargs): impl = self.__class__.__new__(self.__class__) impl.__dict__.update(self.__dict__) impl.item_type = self.item_type.dialect_impl(dialect) return impl def bind_processor(self, dialect): item_proc = self.item_type.bind_processor(dialect) def process(value): if value is None: return value def convert_item(item): if isinstance(item, (list,tuple)): return [convert_item(child) for child in item] else: if item_proc: return item_proc(item) else: return item return [convert_item(item) for item in value] return process def result_processor(self, dialect): item_proc = self.item_type.result_processor(dialect) def process(value): if value is None: return value def convert_item(item): if isinstance(item, list): return [convert_item(child) for child in item] else: if item_proc: return item_proc(item) else: return item return [convert_item(item) for item in value] return process def get_col_spec(self): return self.item_type.get_col_spec() + '[]'colspecs = { sqltypes.Integer : PGInteger, sqltypes.Smallinteger : PGSmallInteger, sqltypes.Numeric : PGNumeric, sqltypes.Float : PGFloat, sqltypes.DateTime : PGDateTime, sqltypes.Date : PGDate, sqltypes.Time : PGTime, sqltypes.String : PGString, sqltypes.Binary : PGBinary, sqltypes.Boolean : PGBoolean, sqltypes.Text : PGText, sqltypes.CHAR: PGChar,}ischema_names = { 'integer' : PGInteger, 'bigint' : PGBigInteger, 'smallint' : PGSmallInteger, 'character varying' : PGString, 'character' : PGChar, 'text' : PGText, 'numeric' : PGNumeric, 'float' : PGFloat, 'real' : PGFloat, 'inet': PGInet, 'macaddr': PGMacAddr, 'double precision' : PGFloat, 'timestamp' : PGDateTime, 'timestamp with time zone' : PGDateTime, 'timestamp without time zone' : PGDateTime, 'time with time zone' : PGTime, 'time without time zone' : PGTime, 'date' : PGDate, 'time': PGTime, 'bytea' : PGBinary, 'boolean' : PGBoolean, 'interval':PGInterval,}def descriptor(): return {'name':'postgres', 'description':'PostGres', 'arguments':[ ('username',"Database Username",None), ('password',"Database Password",None), ('database',"Database Name",None), ('host',"Hostname", None), ]}SELECT_RE = re.compile( r'\s*(?:SELECT|FETCH|(UPDATE|INSERT))', re.I | re.UNICODE)RETURNING_RE = re.compile( 'RETURNING', re.I | re.UNICODE)# This finds if the RETURNING is not inside a quoted/commented values. Handles string literals,# quoted identifiers, dollar quotes, SQL comments and C style multiline comments. This does not# handle correctly nested C style quotes, lets hope no one does the following:# UPDATE tbl SET x=y /* foo /* bar */ RETURNING */RETURNING_QUOTED_RE = re.compile( """\s*(?:UPDATE|INSERT)\s (?: # handle quoted and commented tokens separately [^'"$/-] # non quote/comment character | -(?!-) # a dash that does not begin a comment | /(?!\*) # a slash that does not begin a comment | "(?:[^"]|"")*" # quoted literal | '(?:[^']|'')*' # quoted string | \$(?P<dquote>[^$]*)\$.*?\$(?P=dquote)\$ # dollar quotes | --[^\\n]*(?=\\n) # SQL comment, leave out line ending as that counts as whitespace # for the returning token | /\*([^*]|\*(?!/))*\*/ # C style comment, doesn't handle nesting )* \sRETURNING\s""", re.I | re.UNICODE | re.VERBOSE)class PGExecutionContext(default.DefaultExecutionContext): def returns_rows_text(self, statement): m = SELECT_RE.match(statement) return m and (not m.group(1) or (RETURNING_RE.search(statement) and RETURNING_QUOTED_RE.match(statement))) def returns_rows_compiled(self, compiled): return isinstance(compiled.statement, expression.Selectable) or \ ( (compiled.isupdate or compiled.isinsert) and "postgres_returning" in compiled.statement.kwargs ) def create_cursor(self): # executing a default or Sequence standalone creates an execution context without a statement. # so slightly hacky "if no statement assume we're server side" logic # TODO: dont use regexp if Compiled is used ? self.__is_server_side = \ self.dialect.server_side_cursors and \ (self.statement is None or \ (SELECT_RE.match(self.statement) and not re.search(r'FOR UPDATE(?: NOWAIT)?\s*$', self.statement, re.I)) )
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -