⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mysql.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 5 页
字号:
          value.  Takes precendence to 'ascii' or 'unicode' short-hand.        collation          Optional, a column-level collation for this string value.          Takes precedence to 'binary' short-hand.        ascii          Defaults to False: short-hand for the ``latin1`` character set,          generates ASCII in schema.        unicode          Defaults to False: short-hand for the ``ucs2`` character set,          generates UNICODE in schema.        binary          Defaults to False: short-hand, pick the binary collation type          that matches the column's character set.  Generates BINARY in          schema.  This does not affect the type of data stored, only the          collation of character data.        """        self.__ddl_values = enums        strip_enums = []        for a in enums:            if a[0:1] == '"' or a[0:1] == "'":                # strip enclosing quotes and unquote interior                a = a[1:-1].replace(a[0] * 2, a[0])            strip_enums.append(a)        self.enums = strip_enums        self.strict = kw.pop('strict', False)        length = max([len(v) for v in strip_enums] + [0])        super(MSEnum, self).__init__(length, **kw)    def bind_processor(self, dialect):        super_convert = super(MSEnum, self).bind_processor(dialect)        def process(value):            if self.strict and value is not None and value not in self.enums:                raise exceptions.InvalidRequestError('"%s" not a valid value for '                                                     'this enum' % value)            if super_convert:                return super_convert(value)            else:                return value        return process    def get_col_spec(self):        return self._extend("ENUM(%s)" % ",".join(self.__ddl_values))class MSSet(MSString):    """MySQL SET type."""    def __init__(self, *values, **kw):        """Construct a SET.        Example::          Column('myset', MSSet("'foo'", "'bar'", "'baz'"))        Arguments are:        values          The range of valid values for this SET.  Values will be used          exactly as they appear when generating schemas.  Strings must          be quoted, as in the example above.  Single-quotes are suggested          for ANSI compatability and are required for portability to servers          with ANSI_QUOTES enabled.        charset          Optional, a column-level character set for this string          value.  Takes precendence to 'ascii' or 'unicode' short-hand.        collation          Optional, a column-level collation for this string value.          Takes precedence to 'binary' short-hand.        ascii          Defaults to False: short-hand for the ``latin1`` character set,          generates ASCII in schema.        unicode          Defaults to False: short-hand for the ``ucs2`` character set,          generates UNICODE in schema.        binary          Defaults to False: short-hand, pick the binary collation type          that matches the column's character set.  Generates BINARY in          schema.  This does not affect the type of data stored, only the          collation of character data.        """        self.__ddl_values = values        strip_values = []        for a in values:            if a[0:1] == '"' or a[0:1] == "'":                # strip enclosing quotes and unquote interior                a = a[1:-1].replace(a[0] * 2, a[0])            strip_values.append(a)        self.values = strip_values        length = max([len(v) for v in strip_values] + [0])        super(MSSet, self).__init__(length, **kw)    def result_processor(self, dialect):        def process(value):            # The good news:            #   No ',' quoting issues- commas aren't allowed in SET values            # The bad news:            #   Plenty of driver inconsistencies here.            if isinstance(value, util.set_types):                # ..some versions convert '' to an empty set                if not value:                    value.add('')                # ..some return sets.Set, even for pythons that have __builtin__.set                if not isinstance(value, util.Set):                    value = util.Set(value)                return value            # ...and some versions return strings            if value is not None:                return util.Set(value.split(','))            else:                return value        return process    def bind_processor(self, dialect):        super_convert = super(MSSet, self).bind_processor(dialect)        def process(value):            if value is None or isinstance(value, (int, long, basestring)):                pass            else:                if None in value:                    value = util.Set(value)                    value.remove(None)                    value.add('')                value = ','.join(value)            if super_convert:                return super_convert(value)            else:                return value        return process    def get_col_spec(self):        return self._extend("SET(%s)" % ",".join(self.__ddl_values))class MSBoolean(sqltypes.Boolean):    """MySQL BOOLEAN type."""    def get_col_spec(self):        return "BOOL"    def result_processor(self, dialect):        def process(value):            if value is None:                return None            return value and True or False        return process    def bind_processor(self, dialect):        def process(value):            if value is True:                return 1            elif value is False:                return 0            elif value is None:                return None            else:                return value and True or False        return processcolspecs = {    sqltypes.Integer: MSInteger,    sqltypes.Smallinteger: MSSmallInteger,    sqltypes.Numeric: MSNumeric,    sqltypes.Float: MSFloat,    sqltypes.DateTime: MSDateTime,    sqltypes.Date: MSDate,    sqltypes.Time: MSTime,    sqltypes.String: MSString,    sqltypes.Binary: MSBlob,    sqltypes.Boolean: MSBoolean,    sqltypes.Text: MSText,    sqltypes.CHAR: MSChar,    sqltypes.NCHAR: MSNChar,    sqltypes.TIMESTAMP: MSTimeStamp,    sqltypes.BLOB: MSBlob,    MSDouble: MSDouble,    MSReal: MSReal,    _BinaryType: _BinaryType,}# Everything 3.23 through 5.1 excepting OpenGIS types.ischema_names = {    'bigint': MSBigInteger,    'binary': MSBinary,    'bit': MSBit,    'blob': MSBlob,    'boolean':MSBoolean,    'char': MSChar,    'date': MSDate,    'datetime': MSDateTime,    'decimal': MSDecimal,    'double': MSDouble,    'enum': MSEnum,    'fixed': MSDecimal,    'float': MSFloat,    'int': MSInteger,    'integer': MSInteger,    'longblob': MSLongBlob,    'longtext': MSLongText,    'mediumblob': MSMediumBlob,    'mediumint': MSInteger,    'mediumtext': MSMediumText,    'nchar': MSNChar,    'nvarchar': MSNVarChar,    'numeric': MSNumeric,    'set': MSSet,    'smallint': MSSmallInteger,    'text': MSText,    'time': MSTime,    'timestamp': MSTimeStamp,    'tinyblob': MSTinyBlob,    'tinyint': MSTinyInteger,    'tinytext': MSTinyText,    'varbinary': MSVarBinary,    'varchar': MSString,    'year': MSYear,}def descriptor():    return {'name':'mysql',    'description':'MySQL',    'arguments':[        ('username',"Database Username",None),        ('password',"Database Password",None),        ('database',"Database Name",None),        ('host',"Hostname", None),    ]}class MySQLExecutionContext(default.DefaultExecutionContext):    def post_exec(self):        if self.compiled.isinsert and not self.executemany:            if (not len(self._last_inserted_ids) or                self._last_inserted_ids[0] is None):                self._last_inserted_ids = ([self.cursor.lastrowid] +                                           self._last_inserted_ids[1:])    def returns_rows_text(self, statement):        return SELECT_RE.match(statement)    def should_autocommit_text(self, statement):        return AUTOCOMMIT_RE.match(statement)class MySQLDialect(default.DefaultDialect):    """Details of the MySQL dialect.  Not used directly in application code."""    supports_alter = True    supports_unicode_statements = False    # identifiers are 64, however aliases can be 255...    max_identifier_length = 255    supports_sane_rowcount = True    def __init__(self, use_ansiquotes=None, **kwargs):        self.use_ansiquotes = use_ansiquotes        kwargs.setdefault('default_paramstyle', 'format')        default.DefaultDialect.__init__(self, **kwargs)    def dbapi(cls):        import MySQLdb as mysql        return mysql    dbapi = classmethod(dbapi)    def create_connect_args(self, url):        opts = url.translate_connect_args(database='db', username='user',                                          password='passwd')        opts.update(url.query)        util.coerce_kw_type(opts, 'compress', bool)        util.coerce_kw_type(opts, 'connect_timeout', int)        util.coerce_kw_type(opts, 'client_flag', int)        util.coerce_kw_type(opts, 'local_infile', int)        # Note: using either of the below will cause all strings to be returned        # as Unicode, both in raw SQL operations and with column types like        # String and MSString.        util.coerce_kw_type(opts, 'use_unicode', bool)        util.coerce_kw_type(opts, 'charset', str)        # Rich values 'cursorclass' and 'conv' are not supported via        # query string.        ssl = {}        for key in ['ssl_ca', 'ssl_key', 'ssl_cert', 'ssl_capath', 'ssl_cipher']:            if key in opts:                ssl[key[4:]] = opts[key]                util.coerce_kw_type(ssl, key[4:], str)                del opts[key]        if ssl:            opts['ssl'] = ssl        # FOUND_ROWS must be set in CLIENT_FLAGS to enable        # supports_sane_rowcount.        client_flag = opts.get('client_flag', 0)        if self.dbapi is not None:            try:                import MySQLdb.constants.CLIENT as CLIENT_FLAGS                client_flag |= CLIENT_FLAGS.FOUND_ROWS            except:                pass            opts['client_flag'] = client_flag        return [[], opts]    def create_execution_context(self, connection, **kwargs):        return MySQLExecutionContext(self, connection, **kwargs)    def type_descriptor(self, typeobj):        return sqltypes.adapt_type(typeobj, colspecs)    def do_executemany(self, cursor, statement, parameters, context=None):        rowcount = cursor.executemany(statement, parameters)        if context is not None:            context._rowcount = rowcount    def supports_unicode_statements(self):        return True    def do_execute(self, cursor, statement, parameters, context=None):        cursor.execute(statement, parameters)    def do_commit(self, connection):        """Execute a COMMIT."""        # COMMIT/ROLLBACK were introduced in 3.23.15.        # Yes, we have at least one user who has to talk to these old versions!        #        # Ignore commit/rollback if support isn't present, otherwise even basic        # operations via autocommit fail.        try:            connection.commit()        except:            if self._server_version_info(connection) < (3, 23, 15):                args = sys.exc_info()[1].args                if args and args[0] == 1064:                    return            raise    def do_rollback(self, connection):        """Execute a ROLLBACK."""        try:            connection.rollback()        except:            if self._server_version_info(connection) < (3, 23, 15):                args = sys.exc_info()[1].args                if args and args[0] == 1064:                    return            raise    def do_begin_twophase(self, connection, xid):        connection.execute("XA BEGIN %s", xid)    def do_prepare_twophase(self, connection, xid):        connection.execute("XA END %s", xid)        connection.execute("XA PREPARE %s", xid)    def do_rollback_twophase(self, connection, xid, is_prepared=True,                             recover=False):        if not is_prepared:            connection.execute("XA END %s", xid)        connection.execute("XA ROLLBACK %s", xid)    def do_commit_twophase(self, connection, xid, is_prepared=True,                           recover=False):        if not is_prepared:            self.do_prepare_twophase(connection, xid)        connection.execute("XA COMMIT %s", xid)    def do_recover_twophase(self, connection):

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -