⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ormsession.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
字号:
import testenv; testenv.configure_for_tests()import timefrom datetime import datetimefrom sqlalchemy import *from sqlalchemy.orm import *from testlib import *from testlib.profiling import profiledclass Item(object):    def __repr__(self):        return 'Item<#%s "%s">' % (self.id, self.name)class SubItem(object):    def __repr__(self):        return 'SubItem<#%s "%s">' % (self.id, self.name)class Customer(object):    def __repr__(self):        return 'Customer<#%s "%s">' % (self.id, self.name)class Purchase(object):    def __repr__(self):        return 'Purchase<#%s "%s">' % (self.id, self.purchase_date)items, subitems, customers, purchases, purchaseitems = \    None, None, None, None, Nonemetadata = MetaData()@profiled('table')def define_tables():    global items, subitems, customers, purchases, purchaseitems    items = Table('items', metadata,                  Column('id', Integer, primary_key=True),                  Column('name', String(100)),                  test_needs_acid=True)    subitems = Table('subitems', metadata,                     Column('id', Integer, primary_key=True),                     Column('item_id', Integer, ForeignKey('items.id'),                            nullable=False),                     Column('name', String(100), PassiveDefault('no name')),                     test_needs_acid=True)    customers = Table('customers', metadata,                      Column('id', Integer, primary_key=True),                      Column('name', String(100)),                      *[Column("col_%s" % chr(i), String(64), default=str(i))                        for i in range(97,117)],                      **dict(test_needs_acid=True))    purchases = Table('purchases', metadata,                      Column('id', Integer, primary_key=True),                      Column('customer_id', Integer,                             ForeignKey('customers.id'), nullable=False),                      Column('purchase_date', DateTime,                             default=datetime.now),                      test_needs_acid=True)    purchaseitems = Table('purchaseitems', metadata,                      Column('purchase_id', Integer,                             ForeignKey('purchases.id'),                             nullable=False, primary_key=True),                      Column('item_id', Integer, ForeignKey('items.id'),                             nullable=False, primary_key=True),                      test_needs_acid=True)@profiled('mapper')def setup_mappers():    mapper(Item, items, properties={            'subitems': relation(SubItem, backref='item', lazy=True)            })    mapper(SubItem, subitems)    mapper(Customer, customers, properties={            'purchases': relation(Purchase, lazy=True, backref='customer')            })    mapper(Purchase, purchases, properties={            'items': relation(Item, lazy=True, secondary=purchaseitems)            })@profiled('inserts')def insert_data():    q_items = 1000    q_sub_per_item = 10    q_customers = 1000    con = testing.db.connect()    transaction = con.begin()    data, subdata = [], []    for item_id in xrange(1, q_items + 1):        data.append({'name': "item number %s" % item_id})        for subitem_id in xrange(1, (item_id % q_sub_per_item) + 1):            subdata.append({'item_id': item_id,                         'name': "subitem number %s" % subitem_id})        if item_id % 100 == 0:            items.insert().execute(*data)            subitems.insert().execute(*subdata)            del data[:]            del subdata[:]    if data:        items.insert().execute(*data)    if subdata:        subitems.insert().execute(*subdata)    transaction.commit()    transaction = con.begin()    data = []    for customer_id in xrange(1, q_customers):        data.append({'name': "customer number %s" % customer_id})        if customer_id % 100 == 0:            customers.insert().execute(*data)            del data[:]    if data:        customers.insert().execute(*data)    transaction.commit()    transaction = con.begin()    data, subdata = [], []    order_t = int(time.time()) - (5000 * 5 * 60)    current = xrange(1, q_customers)    step, purchase_id = 1, 0    while current:        next = []        for customer_id in current:            order_t += 300            data.append({'customer_id': customer_id,                         'purchase_date': datetime.fromtimestamp(order_t)})            purchase_id += 1            for item_id in range(customer_id % 200, customer_id + 1, 200):                if item_id != 0:                    subdata.append({'purchase_id': purchase_id,                                    'item_id': item_id})            if customer_id % 10 > step:                next.append(customer_id)            if len(data) >= 100:                purchases.insert().execute(*data)                if subdata:                    purchaseitems.insert().execute(*subdata)                del data[:]                del subdata[:]        step, current = step + 1, next    if data:        purchases.insert().execute(*data)    if subdata:        purchaseitems.insert().execute(*subdata)    transaction.commit()@profiled('queries')def run_queries():    session = create_session()    # no explicit transaction here.    # build a report of summarizing the last 50 purchases and    # the top 20 items from all purchases    q = session.query(Purchase). \        limit(50).order_by(desc(Purchase.purchase_date)). \        options(eagerload('items'), eagerload('items.subitems'),                eagerload('customer'))    report = []    # "write" the report.  pretend it's going to a web template or something,    # the point is to actually pull data through attributes and collections.    for purchase in q:        report.append(purchase.customer.name)        report.append(purchase.customer.col_a)        report.append(purchase.purchase_date)        for item in purchase.items:            report.append(item.name)            report.extend([s.name for s in item.subitems])    # mix a little low-level with orm    # pull a report of the top 20 items of all time    _item_id = purchaseitems.c.item_id    top_20_q = select([func.distinct(_item_id).label('id')],                      group_by=[purchaseitems.c.purchase_id, _item_id],                      order_by=[desc(func.count(_item_id)), _item_id],                      limit=20)    ids = [r.id for r in top_20_q.execute().fetchall()]    q2 = session.query(Item).filter(Item.id.in_(ids))    for num, item in enumerate(q2):        report.append("number %s: %s" % (num + 1, item.name))@profiled('creating')def create_purchase():    # commit a purchase    customer_id = 100    item_ids = (10,22,34,46,58)    session = create_session()    session.begin()    customer = session.query(Customer).get(customer_id)    items = session.query(Item).filter(Item.id.in_(item_ids))    purchase = Purchase()    purchase.customer = customer    purchase.items.extend(items)    session.flush()    session.commit()    session.expire(customer)def setup_db():    metadata.drop_all()    metadata.create_all()def cleanup_db():    metadata.drop_all()@profiled('default')def default():    run_queries()    create_purchase()@profiled('all')def main():    metadata.bind = testing.db    try:        define_tables()        setup_mappers()        setup_db()        insert_data()        default()    finally:        cleanup_db()main()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -