⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 postgres.py

📁 Python的一个ORM,现在很火
💻 PY
字号:
## Copyright (c) 2006, 2007 Canonical## Written by Gustavo Niemeyer <gustavo@niemeyer.net>## This file is part of Storm Object Relational Mapper.## Storm is free software; you can redistribute it and/or modify# it under the terms of the GNU Lesser General Public License as# published by the Free Software Foundation; either version 2.1 of# the License, or (at your option) any later version.## Storm is distributed in the hope that it will be useful,# but WITHOUT ANY WARRANTY; without even the implied warranty of# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the# GNU Lesser General Public License for more details.## You should have received a copy of the GNU Lesser General Public License# along with this program.  If not, see <http://www.gnu.org/licenses/>.#from datetime import datetime, date, timefrom time import strptimefrom storm.databases import dummytry:    import psycopg2    import psycopg2.extensionsexcept:    psycopg2 = dummyfrom storm.expr import (    Undef, SetExpr, Select, Alias, And, Eq, FuncExpr, SQLRaw, COLUMN_NAME,    compile, compile_select, compile_set_expr)from storm.variables import Variable, ListVariablefrom storm.database import *from storm.exceptions import install_exceptions, DatabaseModuleErrorinstall_exceptions(psycopg2)compile = compile.fork()class currval(FuncExpr):    name = "currval"    def __init__(self, column):        self.column = column@compile.when(currval)def compile_currval(compile, state, expr):    return "currval('%s_%s_seq')" % (compile(state, expr.column.table),                                     expr.column.name)@compile.when(ListVariable)def compile_list_variable(compile, state, list_variable):    elements = []    variables = list_variable.get(to_db=True)    if variables is None:        return "NULL"    if not variables:        return "'{}'"    for variable in variables:        elements.append(compile(state, variable))    return "ARRAY[%s]" % ",".join(elements)@compile.when(SetExpr)def compile_set_expr_postgres(compile, state, expr):    if expr.order_by is not Undef:        # The following statement breaks in postgres:        #     SELECT 1 AS id UNION SELECT 1 ORDER BY id+1        # With the error:        #     ORDER BY on a UNION/INTERSECT/EXCEPT result must        #     be on one of the result columns        # So we transform it into something close to:        #     SELECT * FROM (SELECT 1 AS id UNION SELECT 1) AS a ORDER BY id+1        # Build new set expression without arguments (order_by, etc).        new_expr = expr.__class__()        new_expr.exprs = expr.exprs        new_expr.all = expr.all        # Make sure that state.aliases isn't None, since we want them to        # compile our order_by statement below.        no_aliases = state.aliases is None        if no_aliases:            state.push("aliases", {})        # Build set expression, collecting aliases.        set_stmt = SQLRaw("(%s)" % compile_set_expr(compile, state, new_expr))        # Build order_by statement, using aliases.        state.push("context", COLUMN_NAME)        order_by_stmt = SQLRaw(compile(state, expr.order_by))        state.pop()        # Discard aliases, if they were not being collected previously.        if no_aliases:            state.pop()        # Build wrapping select statement.        select = Select(SQLRaw("*"), tables=Alias(set_stmt), limit=expr.limit,                        offset=expr.offset, order_by=order_by_stmt)        return compile_select(compile, state, select)    else:        return compile_set_expr(compile, state, expr)class PostgresResult(Result):    def get_insert_identity(self, primary_key, primary_variables):        equals = []        for column, variable in zip(primary_key, primary_variables):            if not variable.is_defined():                variable = currval(column)            equals.append(Eq(column, variable))        return And(*equals)class PostgresConnection(Connection):    _result_factory = PostgresResult    _param_mark = "%s"    _compile = compile    def _raw_execute(self, statement, params):        if type(statement) is unicode:            # psycopg breaks with unicode statements.            statement = statement.encode("UTF-8")        return Connection._raw_execute(self, statement, params)    def _to_database(self, params):        for param in params:            if isinstance(param, Variable):                param = param.get(to_db=True)            if isinstance(param, (datetime, date, time)):                yield str(param)            elif isinstance(param, unicode):                yield param.encode("UTF-8")            elif isinstance(param, str):                yield psycopg2.Binary(param)            else:                yield paramclass Postgres(Database):    _connection_factory = PostgresConnection    def __init__(self, uri):        if psycopg2 is dummy:            raise DatabaseModuleError("'psycopg2' module not found")        self._dsn = make_dsn(uri)    def connect(self):        raw_connection = psycopg2.connect(self._dsn)        raw_connection.set_client_encoding("UTF8")        return self._connection_factory(self, raw_connection)create_from_uri = Postgresif psycopg2 is not dummy:    psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)    psycopg2.extensions.register_type(psycopg2._psycopg.UNICODEARRAY)def make_dsn(uri):    """Convert a URI object to a PostgreSQL DSN string."""    dsn = "dbname=%s" % uri.database    if uri.host is not None:        dsn += " host=%s" % uri.host    if uri.port is not None:        dsn += " port=%d" % uri.port    if uri.username is not None:        dsn += " user=%s" % uri.username    if uri.password is not None:        dsn += " password=%s" % uri.password    return dsn

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -