⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 sybase.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
            args = []            for a in (charlen, numericprec, numericscale):                if a is not None:                    args.append(a)            coltype = self.ischema_names.get(type, None)            if coltype == SybaseString and charlen == -1:                coltype = SybaseText()            else:                if coltype is None:                    util.warn("Did not recognize type '%s' of column '%s'" %                              (type, name))                    coltype = sqltypes.NULLTYPE                coltype = coltype(*args)            colargs= []            if default is not None:                colargs.append(schema.PassiveDefault(sql.text(default)))            # any sequences ?            col = schema.Column(name, coltype, nullable=nullable, primary_key=primary_key, *colargs)            if int(max_identity) > 0:                col.sequence = schema.Sequence(name + '_identity')                col.sequence.start = int(max_identity)                col.sequence.increment = 1            # append the column            table.append_column(col)        # any foreign key constraint for this table ?        # note: no multi-column foreign keys are considered        s = "select st1.table_name, sc1.column_name, st2.table_name, sc2.column_name from systable as st1 join sysfkcol on st1.table_id=sysfkcol.foreign_table_id join sysforeignkey join systable as st2 on sysforeignkey.primary_table_id = st2.table_id join syscolumn as sc1 on sysfkcol.foreign_column_id=sc1.column_id and sc1.table_id=st1.table_id join syscolumn as sc2 on sysfkcol.primary_column_id=sc2.column_id and sc2.table_id=st2.table_id where st1.table_name='%(table_name)s';" % { 'table_name' : table.name }        c = connection.execute(s)        foreignKeys = {}        while True:            row = c.fetchone()            if row is None:                break            (foreign_table, foreign_column, primary_table, primary_column) = (                row[0], row[1], row[2], row[3],            )            if not primary_table in foreignKeys.keys():                foreignKeys[primary_table] = [['%s'%(foreign_column)], ['%s.%s'%(primary_table,primary_column)]]            else:                foreignKeys[primary_table][0].append('%s'%(foreign_column))                foreignKeys[primary_table][1].append('%s.%s'%(primary_table,primary_column))        for primary_table in foreignKeys.keys():            #table.append_constraint(schema.ForeignKeyConstraint(['%s.%s'%(foreign_table, foreign_column)], ['%s.%s'%(primary_table,primary_column)]))            table.append_constraint(schema.ForeignKeyConstraint(foreignKeys[primary_table][0], foreignKeys[primary_table][1]))        if not found_table:            raise exceptions.NoSuchTableError(table.name)class SybaseSQLDialect_mxodbc(SybaseSQLDialect):    def __init__(self, **params):        super(SybaseSQLDialect_mxodbc, self).__init__(**params)        self.dbapi_type_map = {'getdate' : SybaseDate_mxodbc()}    def import_dbapi(cls):        #import mx.ODBC.Windows as module        import mxODBC as module        return module    import_dbapi = classmethod(import_dbapi)    colspecs = SybaseSQLDialect.colspecs.copy()    colspecs[sqltypes.Time] = SybaseTime_mxodbc    colspecs[sqltypes.Date] = SybaseDate_mxodbc    colspecs[sqltypes.DateTime] = SybaseDateTime_mxodbc    ischema_names = SybaseSQLDialect.ischema_names.copy()    ischema_names['time'] = SybaseTime_mxodbc    ischema_names['date'] = SybaseDate_mxodbc    ischema_names['datetime'] = SybaseDateTime_mxodbc    ischema_names['smalldatetime'] = SybaseDateTime_mxodbc    def is_disconnect(self, e):        # FIXME: optimize        #return isinstance(e, self.dbapi.Error) and '[08S01]' in str(e)        #return True        return False    def create_execution_context(self, *args, **kwargs):        return SybaseSQLExecutionContext_mxodbc(self, *args, **kwargs)    def do_execute(self, cursor, statement, parameters, context=None, **kwargs):        super(SybaseSQLDialect_mxodbc, self).do_execute(cursor, statement, parameters, context=context, **kwargs)    def create_connect_args(self, url):        '''Return a tuple of *args,**kwargs'''        # FIXME: handle mx.odbc.Windows proprietary args        opts = url.translate_connect_args(username='user')        opts.update(url.query)        argsDict = {}        argsDict['user'] = opts['user']        argsDict['password'] = opts['password']        connArgs = [[opts['dsn']], argsDict]        return connArgsclass SybaseSQLDialect_pyodbc(SybaseSQLDialect):    def __init__(self, **params):        super(SybaseSQLDialect_pyodbc, self).__init__(**params)        self.dbapi_type_map = {'getdate' : SybaseDate_pyodbc()}    def import_dbapi(cls):        import mypyodbc as module        return module    import_dbapi = classmethod(import_dbapi)    colspecs = SybaseSQLDialect.colspecs.copy()    colspecs[sqltypes.Time] = SybaseTime_pyodbc    colspecs[sqltypes.Date] = SybaseDate_pyodbc    colspecs[sqltypes.DateTime] = SybaseDateTime_pyodbc    ischema_names = SybaseSQLDialect.ischema_names.copy()    ischema_names['time'] = SybaseTime_pyodbc    ischema_names['date'] = SybaseDate_pyodbc    ischema_names['datetime'] = SybaseDateTime_pyodbc    ischema_names['smalldatetime'] = SybaseDateTime_pyodbc    def is_disconnect(self, e):        # FIXME: optimize        #return isinstance(e, self.dbapi.Error) and '[08S01]' in str(e)        #return True        return False    def create_execution_context(self, *args, **kwargs):        return SybaseSQLExecutionContext_pyodbc(self, *args, **kwargs)    def do_execute(self, cursor, statement, parameters, context=None, **kwargs):        super(SybaseSQLDialect_pyodbc, self).do_execute(cursor, statement, parameters, context=context, **kwargs)    def create_connect_args(self, url):        '''Return a tuple of *args,**kwargs'''        # FIXME: handle pyodbc proprietary args        opts = url.translate_connect_args(username='user')        opts.update(url.query)        self.autocommit = False        if 'autocommit' in opts:            self.autocommit = bool(int(opts.pop('autocommit')))        argsDict = {}        argsDict['UID'] = opts['user']        argsDict['PWD'] = opts['password']        argsDict['DSN'] = opts['dsn']        connArgs = [[';'.join(["%s=%s"%(key, argsDict[key]) for key in argsDict])], {'autocommit' : self.autocommit}]        return connArgsdialect_mapping = {    'sqlalchemy.databases.mxODBC' : SybaseSQLDialect_mxodbc,#    'pyodbc' : SybaseSQLDialect_pyodbc,    }class SybaseSQLCompiler(compiler.DefaultCompiler):    operators = compiler.DefaultCompiler.operators.copy()    operators.update({        sql_operators.mod: lambda x, y: "MOD(%s, %s)" % (x, y),    })    def bindparam_string(self, name):        res = super(SybaseSQLCompiler, self).bindparam_string(name)        if name.lower().startswith('literal'):            res = 'STRING(%s)'%res        return res    def get_select_precolumns(self, select):        s = select._distinct and "DISTINCT " or ""        if select._limit:            #if select._limit == 1:                #s += "FIRST "            #else:                #s += "TOP %s " % (select._limit,)            s += "TOP %s " % (select._limit,)        if select._offset:            if not select._limit:                # FIXME: sybase doesn't allow an offset without a limit                # so use a huge value for TOP here                s += "TOP 1000000 "            s += "START AT %s " % (select._offset+1,)        return s    def limit_clause(self, select):        # Limit in sybase is after the select keyword        return ""    def visit_binary(self, binary):        """Move bind parameters to the right-hand side of an operator, where possible."""        if isinstance(binary.left, expression._BindParamClause) and binary.operator == operator.eq:            return self.process(expression._BinaryExpression(binary.right, binary.left, binary.operator))        else:            return super(SybaseSQLCompiler, self).visit_binary(binary)    def label_select_column(self, select, column, asfrom):        if isinstance(column, expression._Function):            return column.label(None)        else:            return super(SybaseSQLCompiler, self).label_select_column(select, column, asfrom)    function_rewrites =  {'current_date': 'getdate',                         }    def visit_function(self, func):        func.name = self.function_rewrites.get(func.name, func.name)        res = super(SybaseSQLCompiler, self).visit_function(func)        if func.name.lower() == 'getdate':            # apply CAST operator            # FIXME: what about _pyodbc ?            cast = expression._Cast(func, SybaseDate_mxodbc)            # infinite recursion            # res = self.visit_cast(cast)            res = "CAST(%s AS %s)" % (res, self.process(cast.typeclause))        return res    def for_update_clause(self, select):        # "FOR UPDATE" is only allowed on "DECLARE CURSOR" which SQLAlchemy doesn't use        return ''    def order_by_clause(self, select):        order_by = self.process(select._order_by_clause)        # SybaseSQL only allows ORDER BY in subqueries if there is a LIMIT        if order_by and (not self.is_subquery(select) or select._limit):            return " ORDER BY " + order_by        else:            return ""class SybaseSQLSchemaGenerator(compiler.SchemaGenerator):    def get_column_specification(self, column, **kwargs):        colspec = self.preparer.format_column(column)        if (not getattr(column.table, 'has_sequence', False)) and column.primary_key and \                column.autoincrement and isinstance(column.type, sqltypes.Integer):            if column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional):                column.sequence = schema.Sequence(column.name + '_seq')        if hasattr(column, 'sequence'):            column.table.has_sequence = column            #colspec += " numeric(30,0) IDENTITY"            colspec += " Integer IDENTITY"        else:            colspec += " " + column.type.dialect_impl(self.dialect, _for_ddl=column).get_col_spec()        if not column.nullable:            colspec += " NOT NULL"        default = self.get_column_default_string(column)        if default is not None:            colspec += " DEFAULT " + default        return colspecclass SybaseSQLSchemaDropper(compiler.SchemaDropper):    def visit_index(self, index):        self.append("\nDROP INDEX %s.%s" % (            self.preparer.quote_identifier(index.table.name),            self.preparer.quote_identifier(index.name)            ))        self.execute()class SybaseSQLDefaultRunner(base.DefaultRunner):    passclass SybaseSQLIdentifierPreparer(compiler.IdentifierPreparer):    reserved_words = RESERVED_WORDS    def __init__(self, dialect):        super(SybaseSQLIdentifierPreparer, self).__init__(dialect)    def _escape_identifier(self, value):        #TODO: determin SybaseSQL's escapeing rules        return value    def _fold_identifier_case(self, value):        #TODO: determin SybaseSQL's case folding rules        return valuedialect = SybaseSQLDialectdialect.statement_compiler = SybaseSQLCompilerdialect.schemagenerator = SybaseSQLSchemaGeneratordialect.schemadropper = SybaseSQLSchemaDropperdialect.preparer = SybaseSQLIdentifierPreparerdialect.defaultrunner = SybaseSQLDefaultRunner

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -