📄 informix.py
字号:
s = "select tabname from systables" return [row[0] for row in connection.execute(s)] def has_table(self, connection, table_name,schema=None): cursor = connection.execute("""select tabname from systables where tabname=?""", table_name.lower() ) return bool( cursor.fetchone() is not None ) def reflecttable(self, connection, table, include_columns): c = connection.execute ("select distinct OWNER from systables where tabname=?", table.name.lower() ) rows = c.fetchall() if not rows : raise exceptions.NoSuchTableError(table.name) else: if table.owner is not None: if table.owner.lower() in [r[0] for r in rows]: owner = table.owner.lower() else: raise exceptions.AssertionError("Specified owner %s does not own table %s"%(table.owner, table.name)) else: if len(rows)==1: owner = rows[0][0] else: raise exceptions.AssertionError("There are multiple tables with name %s in the schema, you must specifie owner"%table.name) c = connection.execute ("""select colname , coltype , collength , t3.default , t1.colno from syscolumns as t1 , systables as t2 , OUTER sysdefaults as t3 where t1.tabid = t2.tabid and t2.tabname=? and t2.owner=? and t3.tabid = t2.tabid and t3.colno = t1.colno order by t1.colno""", table.name.lower(), owner ) rows = c.fetchall() if not rows: raise exceptions.NoSuchTableError(table.name) for name , colattr , collength , default , colno in rows: name = name.lower() if include_columns and name not in include_columns: continue # in 7.31, coltype = 0x000 # ^^-- column type # ^-- 1 not null , 0 null nullable , coltype = divmod( colattr , 256 ) if coltype not in ( 0 , 13 ) and default: default = default.split()[-1] if coltype == 0 or coltype == 13: # char , varchar coltype = ischema_names.get(coltype, InfoString)(collength) if default: default = "'%s'" % default elif coltype == 5: # decimal precision , scale = ( collength & 0xFF00 ) >> 8 , collength & 0xFF if scale == 255: scale = 0 coltype = InfoNumeric(precision, scale) else: try: coltype = ischema_names[coltype] except KeyError: util.warn("Did not recognize type '%s' of column '%s'" % (coltype, name)) coltype = sqltypes.NULLTYPE colargs = [] if default is not None: colargs.append(schema.PassiveDefault(sql.text(default))) table.append_column(schema.Column(name, coltype, nullable = (nullable == 0), *colargs)) # FK c = connection.execute("""select t1.constrname as cons_name , t1.constrtype as cons_type , t4.colname as local_column , t7.tabname as remote_table , t6.colname as remote_column from sysconstraints as t1 , systables as t2 , sysindexes as t3 , syscolumns as t4 , sysreferences as t5 , syscolumns as t6 , systables as t7 , sysconstraints as t8 , sysindexes as t9 where t1.tabid = t2.tabid and t2.tabname=? and t2.owner=? and t1.constrtype = 'R' and t3.tabid = t2.tabid and t3.idxname = t1.idxname and t4.tabid = t2.tabid and t4.colno = t3.part1 and t5.constrid = t1.constrid and t8.constrid = t5.primary and t6.tabid = t5.ptabid and t6.colno = t9.part1 and t9.idxname = t8.idxname and t7.tabid = t5.ptabid""", table.name.lower(), owner ) rows = c.fetchall() fks = {} for cons_name, cons_type, local_column, remote_table, remote_column in rows: try: fk = fks[cons_name] except KeyError: fk = ([], []) fks[cons_name] = fk refspec = ".".join([remote_table, remote_column]) schema.Table(remote_table, table.metadata, autoload=True, autoload_with=connection) if local_column not in fk[0]: fk[0].append(local_column) if refspec not in fk[1]: fk[1].append(refspec) for name, value in fks.iteritems(): table.append_constraint(schema.ForeignKeyConstraint(value[0], value[1] , None )) # PK c = connection.execute("""select t1.constrname as cons_name , t1.constrtype as cons_type , t4.colname as local_column from sysconstraints as t1 , systables as t2 , sysindexes as t3 , syscolumns as t4 where t1.tabid = t2.tabid and t2.tabname=? and t2.owner=? and t1.constrtype = 'P' and t3.tabid = t2.tabid and t3.idxname = t1.idxname and t4.tabid = t2.tabid and t4.colno = t3.part1""", table.name.lower(), owner ) rows = c.fetchall() for cons_name, cons_type, local_column in rows: table.primary_key.add( table.c[local_column] )class InfoCompiler(compiler.DefaultCompiler): """Info compiler modifies the lexical structure of Select statements to work under non-ANSI configured Oracle databases, if the use_ansi flag is False.""" def __init__(self, dialect, statement, parameters=None, **kwargs): self.limit = 0 self.offset = 0 compiler.DefaultCompiler.__init__( self , dialect , statement , parameters , **kwargs ) def default_from(self): return " from systables where tabname = 'systables' " def get_select_precolumns( self , select ): s = select._distinct and "DISTINCT " or "" # only has limit if select._limit: off = select._offset or 0 s += " FIRST %s " % ( select._limit + off ) else: s += "" return s def visit_select(self, select): if select._offset: self.offset = select._offset self.limit = select._limit or 0 # the column in order by clause must in select too def __label( c ): try: return c._label.lower() except: return '' # TODO: dont modify the original select, generate a new one a = [ __label(c) for c in select._raw_columns ] for c in select.order_by_clause.clauses: if ( __label(c) not in a ) and getattr( c , 'name' , '' ) != 'oid': select.append_column( c ) return compiler.DefaultCompiler.visit_select(self, select) def limit_clause(self, select): return "" def visit_function( self , func ): if func.name.lower() == 'current_date': return "today" elif func.name.lower() == 'current_time': return "CURRENT HOUR TO SECOND" elif func.name.lower() in ( 'current_timestamp' , 'now' ): return "CURRENT YEAR TO SECOND" else: return compiler.DefaultCompiler.visit_function( self , func ) def visit_clauselist(self, list): try: li = [ c for c in list.clauses if c.name != 'oid' ] except: li = [ c for c in list.clauses ] return ', '.join([s for s in [self.process(c) for c in li] if s is not None])class InfoSchemaGenerator(compiler.SchemaGenerator): def get_column_specification(self, column, first_pk=False): colspec = self.preparer.format_column(column) if column.primary_key and len(column.foreign_keys)==0 and column.autoincrement and \ isinstance(column.type, sqltypes.Integer) and not getattr( self , 'has_serial' , False ) and first_pk: colspec += " SERIAL" self.has_serial = True else: colspec += " " + column.type.dialect_impl(self.dialect, _for_ddl=column).get_col_spec() default = self.get_column_default_string(column) if default is not None: colspec += " DEFAULT " + default if not column.nullable: colspec += " NOT NULL" return colspec def post_create_table(self, table): if hasattr( self , 'has_serial' ): del self.has_serial return '' def visit_primary_key_constraint(self, constraint): # for informix 7.31 not support constraint name name = constraint.name constraint.name = None super(InfoSchemaGenerator, self).visit_primary_key_constraint(constraint) constraint.name = name def visit_unique_constraint(self, constraint): # for informix 7.31 not support constraint name name = constraint.name constraint.name = None super(InfoSchemaGenerator, self).visit_unique_constraint(constraint) constraint.name = name def visit_foreign_key_constraint( self , constraint ): if constraint.name is not None: constraint.use_alter = True else: super( InfoSchemaGenerator , self ).visit_foreign_key_constraint( constraint ) def define_foreign_key(self, constraint): # for informix 7.31 not support constraint name if constraint.use_alter: name = constraint.name constraint.name = None self.append( "CONSTRAINT " ) super(InfoSchemaGenerator, self).define_foreign_key(constraint) constraint.name = name if name is not None: self.append( " CONSTRAINT " + name ) else: super(InfoSchemaGenerator, self).define_foreign_key(constraint) def visit_index(self, index): if len( index.columns ) == 1 and index.columns[0].foreign_key: return super(InfoSchemaGenerator, self).visit_index(index)class InfoIdentifierPreparer(compiler.IdentifierPreparer): def __init__(self, dialect): super(InfoIdentifierPreparer, self).__init__(dialect, initial_quote="'") def _requires_quotes(self, value): return Falseclass InfoSchemaDropper(compiler.SchemaDropper): def drop_foreignkey(self, constraint): if constraint.name is not None: super( InfoSchemaDropper , self ).drop_foreignkey( constraint )dialect = InfoDialectpoolclass = pool.SingletonThreadPooldialect.statement_compiler = InfoCompilerdialect.schemagenerator = InfoSchemaGeneratordialect.schemadropper = InfoSchemaDropperdialect.preparer = InfoIdentifierPreparer
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -