⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 access.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 2 页
字号:
            connectors.append("UID=%s" % user)            connectors.append("PWD=%s" % opts.get("password", ""))        return [[";".join(connectors)], {}]    def create_execution_context(self, *args, **kwargs):        return AccessExecutionContext(self, *args, **kwargs)    def last_inserted_ids(self):        return self.context.last_inserted_ids    def do_execute(self, cursor, statement, params, **kwargs):        if params == {}:            params = ()        super(AccessDialect, self).do_execute(cursor, statement, params, **kwargs)    def _execute(self, c, statement, parameters):        try:            if parameters == {}:                parameters = ()            c.execute(statement, parameters)            self.context.rowcount = c.rowcount        except Exception, e:            raise exceptions.DBAPIError.instance(statement, parameters, e)    def has_table(self, connection, tablename, schema=None):        # This approach seems to be more reliable that using DAO        try:            connection.execute('select top 1 * from [%s]' % tablename)            return True        except Exception, e:            return False    def reflecttable(self, connection, table, include_columns):                # This is defined in the function, as it relies on win32com constants,        # that aren't imported until dbapi method is called        if not hasattr(self, 'ischema_names'):            self.ischema_names = {                const.dbByte:       AcBinary,                const.dbInteger:    AcInteger,                const.dbLong:       AcInteger,                const.dbSingle:     AcFloat,                const.dbDouble:     AcFloat,                const.dbDate:       AcDateTime,                const.dbLongBinary: AcBinary,                const.dbMemo:       AcText,                const.dbBoolean:    AcBoolean,                const.dbText:       AcUnicode, # All Access strings are unicode            }                    # A fresh DAO connection is opened for each reflection        # This is necessary, so we get the latest updates        dtbs = daoEngine.OpenDatabase(connection.engine.url.database)                try:            for tbl in dtbs.TableDefs:                if tbl.Name.lower() == table.name.lower():                    break            else:                raise exceptions.NoSuchTableError(table.name)            for col in tbl.Fields:                coltype = self.ischema_names[col.Type]                if col.Type == const.dbText:                    coltype = coltype(col.Size)                colargs = \                {                    'nullable': not(col.Required or col.Attributes & const.dbAutoIncrField),                }                default = col.DefaultValue                if col.Attributes & const.dbAutoIncrField:                    colargs['default'] = schema.Sequence(col.Name + '_seq')                elif default:                    if col.Type == const.dbBoolean:                        default = default == 'Yes' and '1' or '0'                    colargs['default'] = schema.PassiveDefault(sql.text(default))                table.append_column(schema.Column(col.Name, coltype, **colargs))                # TBD: check constraints            # Find primary key columns first            for idx in tbl.Indexes:                if idx.Primary:                    for col in idx.Fields:                        thecol = table.c[col.Name]                        table.primary_key.add(thecol)                        if isinstance(thecol.type, AcInteger) and \                                not (thecol.default and isinstance(thecol.default.arg, schema.Sequence)):                            thecol.autoincrement = False            # Then add other indexes            for idx in tbl.Indexes:                if not idx.Primary:                    if len(idx.Fields) == 1:                        col = table.c[idx.Fields[0].Name]                        if not col.primary_key:                            col.index = True                            col.unique = idx.Unique                    else:                        pass # TBD: multi-column indexes                            for fk in dtbs.Relations:                if fk.ForeignTable != table.name:                    continue                scols = [c.ForeignName for c in fk.Fields]                rcols = ['%s.%s' % (fk.Table, c.Name) for c in fk.Fields]                table.append_constraint(schema.ForeignKeyConstraint(scols, rcols))        finally:            dtbs.Close()    def table_names(self, connection, schema):        # A fresh DAO connection is opened for each reflection        # This is necessary, so we get the latest updates        dtbs = daoEngine.OpenDatabase(connection.engine.url.database)        names = [t.Name for t in dtbs.TableDefs if t.Name[:4] != "MSys" and t.Name[:4] <> "~TMP"]        dtbs.Close()        return namesclass AccessCompiler(compiler.DefaultCompiler):    def visit_select_precolumns(self, select):        """Access puts TOP, it's version of LIMIT here """        s = select.distinct and "DISTINCT " or ""        if select.limit:            s += "TOP %s " % (select.limit)        if select.offset:            raise exceptions.InvalidRequestError('Access does not support LIMIT with an offset')        return s    def limit_clause(self, select):        """Limit in access is after the select keyword"""        return ""    def binary_operator_string(self, binary):        """Access uses "mod" instead of "%" """        return binary.operator == '%' and 'mod' or binary.operator    def label_select_column(self, select, column, asfrom):        if isinstance(column, expression._Function):            return column.label()        else:            return super(AccessCompiler, self).label_select_column(select, column, asfrom)    function_rewrites =  {'current_date':       'now',                          'current_timestamp':  'now',                          'length':             'len',                          }    def visit_function(self, func):        """Access function names differ from the ANSI SQL names; rewrite common ones"""        func.name = self.function_rewrites.get(func.name, func.name)        super(AccessCompiler, self).visit_function(func)    def for_update_clause(self, select):        """FOR UPDATE is not supported by Access; silently ignore"""        return ''    # Strip schema    def visit_table(self, table, asfrom=False, **kwargs):        if asfrom:            return self.preparer.quote(table, table.name)        else:            return ""class AccessSchemaGenerator(compiler.SchemaGenerator):    def get_column_specification(self, column, **kwargs):        colspec = self.preparer.format_column(column) + " " + column.type.dialect_impl(self.dialect, _for_ddl=column).get_col_spec()        # install a sequence if we have an implicit IDENTITY column        if (not getattr(column.table, 'has_sequence', False)) and column.primary_key and \                column.autoincrement and isinstance(column.type, types.Integer) and not column.foreign_key:            if column.default is None or (isinstance(column.default, schema.Sequence) and column.default.optional):                column.sequence = schema.Sequence(column.name + '_seq')        if not column.nullable:            colspec += " NOT NULL"        if hasattr(column, 'sequence'):            column.table.has_sequence = column            colspec = self.preparer.format_column(column) + " counter"        else:            default = self.get_column_default_string(column)            if default is not None:                colspec += " DEFAULT " + default        return colspecclass AccessSchemaDropper(compiler.SchemaDropper):    def visit_index(self, index):        self.append("\nDROP INDEX [%s].[%s]" % (index.table.name, index.name))        self.execute()class AccessDefaultRunner(base.DefaultRunner):    passclass AccessIdentifierPreparer(compiler.IdentifierPreparer):    reserved_words = compiler.RESERVED_WORDS.copy()    reserved_words.update(['value', 'text'])    def __init__(self, dialect):        super(AccessIdentifierPreparer, self).__init__(dialect, initial_quote='[', final_quote=']')dialect = AccessDialectdialect.poolclass = pool.SingletonThreadPooldialect.statement_compiler = AccessCompilerdialect.schemagenerator = AccessSchemaGeneratordialect.schemadropper = AccessSchemaDropperdialect.preparer = AccessIdentifierPreparerdialect.defaultrunner = AccessDefaultRunner

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -