⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 assorted_eager.py

📁 SQLAlchemy. 经典的Python ORM框架。学习必看。
💻 PY
📖 第 1 页 / 共 3 页
字号:
            'company': relation(Company, lazy=False, )            }, extension=ctx.mapper_extension)        c1 = Company()        c1.company_name = 'company 1'        a1 = Address()        a1.address = 'a1 address'        c1.addresses.append(a1)        a2 = Address()        a2.address = 'a2 address'        c1.addresses.append(a2)        i1 = Invoice()        i1.date = datetime.datetime.now()        i1.company = c1        ctx.current.flush()        company_id = c1.company_id        invoice_id = i1.invoice_id        ctx.current.clear()        c = ctx.current.query(Company).get(company_id)        ctx.current.clear()        i = ctx.current.query(Invoice).get(invoice_id)        print repr(c)        print repr(i.company)        self.assert_(repr(c) == repr(i.company))    def testtwo(self):        """this is the original testcase that includes various complicating factors"""        mapper(Phone, phones_table, extension=ctx.mapper_extension)        mapper(Address, addresses_table, properties={            'phones': relation(Phone, lazy=False, backref='address')            }, extension=ctx.mapper_extension)        mapper(Company, companies_table, properties={            'addresses' : relation(Address, lazy=False, backref='company'),            }, extension=ctx.mapper_extension)        mapper(Item, items_table, extension=ctx.mapper_extension)        mapper(Invoice, invoice_table, properties={            'items': relation(Item, lazy=False, backref='invoice'),            'company': relation(Company, lazy=False, backref='invoices')            }, extension=ctx.mapper_extension)        ctx.current.clear()        c1 = Company()        c1.company_name = 'company 1'        a1 = Address()        a1.address = 'a1 address'        p1 = Phone()        p1.type = 'home'        p1.number = '1111'        a1.phones.append(p1)        p2 = Phone()        p2.type = 'work'        p2.number = '22222'        a1.phones.append(p2)        c1.addresses.append(a1)        a2 = Address()        a2.address = 'a2 address'        p3 = Phone()        p3.type = 'home'        p3.number = '3333'        a2.phones.append(p3)        p4 = Phone()        p4.type = 'work'        p4.number = '44444'        a2.phones.append(p4)        c1.addresses.append(a2)        ctx.current.flush()        company_id = c1.company_id        ctx.current.clear()        a = ctx.current.query(Company).get(company_id)        print repr(a)        # set up an invoice        i1 = Invoice()        i1.date = datetime.datetime.now()        i1.company = a        item1 = Item()        item1.code = 'aaaa'        item1.qty = 1        item1.invoice = i1        item2 = Item()        item2.code = 'bbbb'        item2.qty = 2        item2.invoice = i1        item3 = Item()        item3.code = 'cccc'        item3.qty = 3        item3.invoice = i1        ctx.current.flush()        invoice_id = i1.invoice_id        ctx.current.clear()        c = ctx.current.query(Company).get(company_id)        print repr(c)        ctx.current.clear()        i = ctx.current.query(Invoice).get(invoice_id)        assert repr(i.company) == repr(c), repr(i.company) +  " does not match " + repr(c)class EagerTest8(ORMTest):    def define_tables(self, metadata):        global project_t, task_t, task_status_t, task_type_t, message_t, message_type_t        project_t = Table('prj', metadata,                          Column('id',            Integer,      primary_key=True),                          Column('created',       DateTime ,    ),                          Column('title',         Unicode(100)),                          )        task_t = Table('task', metadata,                          Column('id',            Integer,      primary_key=True),                          Column('status_id',     Integer,      ForeignKey('task_status.id'), nullable=False),                          Column('title',         Unicode(100)),                          Column('task_type_id',  Integer ,     ForeignKey('task_type.id'), nullable=False),                          Column('prj_id',        Integer ,     ForeignKey('prj.id'), nullable=False),                          )        task_status_t = Table('task_status', metadata,                                Column('id',                Integer,      primary_key=True),                                )        task_type_t = Table('task_type', metadata,                            Column('id',   Integer,    primary_key=True),                            )        message_t  = Table('msg', metadata,                            Column('id', Integer,  primary_key=True),                            Column('posted',    DateTime, index=True,),                            Column('type_id',   Integer, ForeignKey('msg_type.id')),                            Column('task_id',   Integer, ForeignKey('task.id')),                            )        message_type_t = Table('msg_type', metadata,                                Column('id',                Integer,      primary_key=True),                                Column('name',              Unicode(20)),                                Column('display_name',      Unicode(20)),                                )    def setUp(self):        testing.db.execute(project_t.insert(), {'id':1})        testing.db.execute(task_status_t.insert(), {'id':1})        testing.db.execute(task_type_t.insert(), {'id':1})        testing.db.execute(task_t.insert(), {'title':u'task 1', 'task_type_id':1, 'status_id':1, 'prj_id':1})    @testing.fails_on('maxdb')    def test_nested_joins(self):        # this is testing some subtle column resolution stuff,        # concerning corresponding_column() being extremely accurate        # as well as how mapper sets up its column properties        class Task(object):pass        class Task_Type(object):pass        class Message(object):pass        class Message_Type(object):pass        tsk_cnt_join = outerjoin(project_t, task_t, task_t.c.prj_id==project_t.c.id)        ss = select([project_t.c.id.label('prj_id'), func.count(task_t.c.id).label('tasks_number')],                    from_obj=[tsk_cnt_join], group_by=[project_t.c.id]).alias('prj_tsk_cnt_s')        j = join(project_t, ss, project_t.c.id == ss.c.prj_id)        mapper(Task_Type, task_type_t)        mapper( Task, task_t,                              properties=dict(type=relation(Task_Type, lazy=False),                                             ))        mapper(Message_Type, message_type_t)        mapper(Message, message_t,                         properties=dict(type=relation(Message_Type, lazy=False, uselist=False),                                         ))        tsk_cnt_join = outerjoin(project_t, task_t, task_t.c.prj_id==project_t.c.id)        ss = select([project_t.c.id.label('prj_id'), func.count(task_t.c.id).label('tasks_number')],                    from_obj=[tsk_cnt_join], group_by=[project_t.c.id]).alias('prj_tsk_cnt_s')        j = join(project_t, ss, project_t.c.id == ss.c.prj_id)        j  = outerjoin( task_t, message_t, task_t.c.id==message_t.c.task_id)        jj = select([ task_t.c.id.label('task_id'),                      func.count(message_t.c.id).label('props_cnt')],                      from_obj=[j], group_by=[task_t.c.id]).alias('prop_c_s')        jjj = join(task_t, jj, task_t.c.id == jj.c.task_id)        class cls(object):pass        props =dict(type=relation(Task_Type, lazy=False))        print [c.key for c in jjj.c]        cls.mapper = mapper( cls, jjj, properties=props)        session = create_session()        for t in session.query(cls.mapper).limit(10).offset(0).all():            print t.id, t.title, t.props_cntclass EagerTest9(ORMTest):    """test the usage of query options to eagerly load specific paths.    this relies upon the 'path' construct used by PropertyOption to relate    LoaderStrategies to specific paths, as well as the path state maintained    throughout the query setup/mapper instances process.    """    def define_tables(self, metadata):        global accounts_table, transactions_table, entries_table        accounts_table = Table('accounts', metadata,            Column('account_id', Integer, primary_key=True),            Column('name', String(40)),        )        transactions_table = Table('transactions', metadata,            Column('transaction_id', Integer, primary_key=True),            Column('name', String(40)),        )        entries_table = Table('entries', metadata,            Column('entry_id', Integer, primary_key=True),            Column('name', String(40)),            Column('account_id', Integer, ForeignKey(accounts_table.c.account_id)),            Column('transaction_id', Integer, ForeignKey(transactions_table.c.transaction_id)),        )    @testing.fails_on('maxdb')    def test_eagerload_on_path(self):        class Account(fixtures.Base):            pass        class Transaction(fixtures.Base):            pass        class Entry(fixtures.Base):            pass        mapper(Account, accounts_table)        mapper(Transaction, transactions_table)        mapper(Entry, entries_table, properties = dict(            account = relation(Account, uselist=False, backref=backref('entries', lazy=True)),            transaction = relation(Transaction, uselist=False, backref=backref('entries', lazy=False)),        ))        session = create_session()        tx1 = Transaction(name='tx1')        tx2 = Transaction(name='tx2')        acc1 = Account(name='acc1')        ent11 = Entry(name='ent11', account=acc1, transaction=tx1)        ent12 = Entry(name='ent12', account=acc1, transaction=tx2)        acc2 = Account(name='acc2')        ent21 = Entry(name='ent21', account=acc2, transaction=tx1)        ent22 = Entry(name='ent22', account=acc2, transaction=tx2)        session.save(acc1)        session.flush()        session.clear()        def go():            # load just the first Account.  eager loading will actually load all objects saved thus far,            # but will not eagerly load the "accounts" off the immediate "entries"; only the            # "accounts" off the entries->transaction->entries            acc = session.query(Account).options(eagerload_all('entries.transaction.entries.account')).first()            # no sql occurs            assert acc.name == 'acc1'            assert acc.entries[0].transaction.entries[0].account.name == 'acc1'            assert acc.entries[0].transaction.entries[1].account.name == 'acc2'            # lazyload triggers but no sql occurs because many-to-one uses cached query.get()            for e in acc.entries:                assert e.account is acc        self.assert_sql_count(testing.db, go, 1)if __name__ == "__main__":    testenv.main()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -