📄 sqlsoup.py
字号:
>>> db.nopk Traceback (most recent call last): ... PKNotFoundError: table 'nopk' does not have a primary key defined [columns: i] >>> db.nosuchtable Traceback (most recent call last): ... NoSuchTableError: nosuchtable >>> years_with_count.insert(published_year='2007', n=1) Traceback (most recent call last): ... InvalidRequestError: SQLSoup can only modify mapped Tables (found: Alias) [tests clear()] >>> db.loans.count() 1 >>> _ = db.loans.insert(book_id=1, user_name='Bhargan Basepair') >>> db.clear() >>> db.flush() >>> db.loans.count() 1"""from sqlalchemy import *from sqlalchemy import schema, sqlfrom sqlalchemy.orm import *from sqlalchemy.ext.sessioncontext import SessionContextfrom sqlalchemy.exceptions import *from sqlalchemy.sql import expression_testsql = """CREATE TABLE books ( id integer PRIMARY KEY, -- auto-increments in sqlite title text NOT NULL, published_year char(4) NOT NULL, authors text NOT NULL);CREATE TABLE users ( name varchar(32) PRIMARY KEY, email varchar(128) NOT NULL, password varchar(128) NOT NULL, classname text, admin int NOT NULL -- 0 = false);CREATE TABLE loans ( book_id int PRIMARY KEY REFERENCES books(id), user_name varchar(32) references users(name) ON DELETE SET NULL ON UPDATE CASCADE, loan_date datetime DEFAULT current_timestamp);insert into users(name, email, password, admin)values('Bhargan Basepair', 'basepair@example.edu', 'basepair', 1);insert into users(name, email, password, admin)values('Joe Student', 'student@example.edu', 'student', 0);insert into books(title, published_year, authors)values('Mustards I Have Known', '1989', 'Jones');insert into books(title, published_year, authors)values('Regional Variation in Moss', '1971', 'Flim and Flam');insert into loans(book_id, user_name, loan_date)values ( (select min(id) from books), (select name from users where name like 'Joe%'), '2006-07-12 0:0:0');CREATE TABLE nopk ( i int);""".split(';')__all__ = ['PKNotFoundError', 'SqlSoup']## thread local SessionContext#class Objectstore(SessionContext): def __getattr__(self, key): return getattr(self.current, key) def get_session(self): return self.currentobjectstore = Objectstore(create_session)class PKNotFoundError(SQLAlchemyError): pass# metaclass is necessary to expose class methods with getattr, e.g.# we want to pass db.users.select through to users._mapper.selectdef _ddl_error(cls): msg = 'SQLSoup can only modify mapped Tables (found: %s)' \ % cls._table.__class__.__name__ raise InvalidRequestError(msg)class SelectableClassType(type): def insert(cls, **kwargs): _ddl_error(cls) def delete(cls, *args, **kwargs): _ddl_error(cls) def update(cls, whereclause=None, values=None, **kwargs): _ddl_error(cls) def __selectable__(cls): return cls._table def __getattr__(cls, attr): if attr == '_query': # called during mapper init raise AttributeError() return getattr(cls._query, attr)class TableClassType(SelectableClassType): def insert(cls, **kwargs): o = cls() o.__dict__.update(kwargs) return o def delete(cls, *args, **kwargs): cls._table.delete(*args, **kwargs).execute() def update(cls, whereclause=None, values=None, **kwargs): cls._table.update(whereclause, values).execute(**kwargs)def _is_outer_join(selectable): if not isinstance(selectable, sql.Join): return False if selectable.isouter: return True return _is_outer_join(selectable.left) or _is_outer_join(selectable.right)def _selectable_name(selectable): if isinstance(selectable, sql.Alias): return _selectable_name(selectable.selectable) elif isinstance(selectable, sql.Select): return ''.join([_selectable_name(s) for s in selectable.froms]) elif isinstance(selectable, schema.Table): return selectable.name.capitalize() else: x = selectable.__class__.__name__ if x[0] == '_': x = x[1:] return xdef class_for_table(selectable, **mapper_kwargs): selectable = expression._selectable(selectable) mapname = 'Mapped' + _selectable_name(selectable) if isinstance(selectable, Table): klass = TableClassType(mapname, (object,), {}) else: klass = SelectableClassType(mapname, (object,), {}) def __cmp__(self, o): L = self.__class__.c.keys() L.sort() t1 = [getattr(self, k) for k in L] try: t2 = [getattr(o, k) for k in L] except AttributeError: raise TypeError('unable to compare with %s' % o.__class__) return cmp(t1, t2) def __repr__(self): import locale encoding = locale.getdefaultlocale()[1] or 'ascii' L = [] for k in self.__class__.c.keys(): value = getattr(self, k, '') if isinstance(value, unicode): value = value.encode(encoding) L.append("%s=%r" % (k, value)) return '%s(%s)' % (self.__class__.__name__, ','.join(L)) for m in ['__cmp__', '__repr__']: setattr(klass, m, eval(m)) klass._table = selectable mappr = mapper(klass, selectable, extension=objectstore.mapper_extension, allow_null_pks=_is_outer_join(selectable), **mapper_kwargs) klass._query = Query(mappr) return klassclass SqlSoup: def __init__(self, *args, **kwargs): """Initialize a new ``SqlSoup``. `args` may either be an ``SQLEngine`` or a set of arguments suitable for passing to ``create_engine``. """ # meh, sometimes having method overloading instead of kwargs would be easier if isinstance(args[0], MetaData): args = list(args) metadata = args.pop(0) if args or kwargs: raise ArgumentError('Extra arguments not allowed when metadata is given') else: metadata = MetaData(*args, **kwargs) self._metadata = metadata self._cache = {} self.schema = None def engine(self): return self._metadata.bind engine = property(engine) bind = engine def delete(self, *args, **kwargs): objectstore.delete(*args, **kwargs) def flush(self): objectstore.get_session().flush() def clear(self): objectstore.clear() def map(self, selectable, **kwargs): try: t = self._cache[selectable] except KeyError: t = class_for_table(selectable, **kwargs) self._cache[selectable] = t return t def with_labels(self, item): # TODO give meaningful aliases return self.map(expression._selectable(item).select(use_labels=True).alias('foo')) def join(self, *args, **kwargs): j = join(*args, **kwargs) return self.map(j) def __getattr__(self, attr): try: t = self._cache[attr] except KeyError: table = Table(attr, self._metadata, autoload=True, schema=self.schema) if not table.primary_key.columns: raise PKNotFoundError('table %r does not have a primary key defined [columns: %s]' % (attr, ','.join(table.c.keys()))) if table.columns: t = class_for_table(table) else: t = None self._cache[attr] = t return t def __repr__(self): return 'SqlSoup(%r)' % self._metadataif __name__ == '__main__': import logging logging.basicConfig() import doctest doctest.testmod()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -