📄 generators_prototype.py
字号:
return False
return True
def _string_multiset(s):
x = {}
for t in _sequence_of_strings(s):
x[t] = x.get(t,0) + 1
return x
def parent_sets(chosen, signature, all_groups, generator):
"""Given an already-chosen tuple of TargetTypeGroups and a signature
of the groups left to choose, generates all mutually-compatible
combinations of groups starting with chosen
>>> TargetTypeGroup.instances = 0
>>> groups = {
... 'x': [ TargetTypeGroup('x', 1) ],
... 'y': [ TargetTypeGroup('y', 1), TargetTypeGroup('y',2) ],
... 'z': [ TargetTypeGroup('z', 1) ]
... }
>>> signature = (('y',0,'*'),('z',1,'1'))
>>> chosen = (groups['x'][0],)
>>> [ x for x in parent_sets(chosen, signature, groups, Generator('x',('y*', 'z'))) ]
[(1.x(#0$0), 1.z(#3$0)), (1.x(#0$0), 1.y(#1$0), 1.z(#3$0)), (1.x(#0$0), 2.y(#2$0), 1.z(#3$0))]
"""
if len(signature) == 0:
# The entire signature was satisfied; we can just yield the
# one result
yield chosen
else:
# find all ways to satisfy the next element of the signature
# which are compatible with the already-chosen groups. If
# there are no ways, we will fall off the end here and the
# ultimate result will be empty.
t, min, max = signature[0]
if min == 0:
for s in parent_sets(chosen, signature[1:], all_groups, generator):
yield s
for g in all_groups[t]:
# can only use a generator once in any path
if generator in g.generators:
continue
if (g.size >= min and g.size <= max and
g.all_compatible(chosen)):
for s in parent_sets(
chosen + (g,), signature[1:], all_groups, generator
):
yield s
debug = None
def _sort_tuple(t):
"""copies the given sequence into a new tuple in sorted order"""
l = list(t)
l.sort()
return tuple(l)
def _sequence_of_strings(s_or_l):
"""if the argument is a string, wraps a tuple around it.
Otherwise, returns the argument untouched
"""
if isinstance(s_or_l, type('')):
return (s_or_l,)
else:
return s_or_l
def _examine_edge(states, queue, g):
"""Handle a possible new state in the search.
"""
g_state = State(g)
v = states.setdefault(g_state, g_state)
if v.group is g:
queue.append(g_state)
return False
if v.group.cost > g.cost:
if debug:
print 'reducing cost of state(%s) via %s' % (v.group,g)
v.group.ambiguous = None
v.group = g
elif v.group.cost < g.cost:
if debug:
print 'discarding %s due to lower cost state(%s)' % (g, v.group)
elif not (g.generator or v.group.generator) \
and _sort_tuple(g.atoms()) == _sort_tuple(v.group.atoms()):
# These are two different ways of combining the same groups of
# a given type to produce a larger group, without using a generator
if debug:
print 'discarding %s as a redundant formulation of %s' % (g,v.group)
else:
if debug:
print '%s is an ambiguous path due to %s' % (v.group, g)
# Remember the group which caused the ambiguity
v.group.ambiguous = g
return True
class State(object):
"""A wrapper around a TargetTypeGroup which makes it hashable on
the part of its data which determines its ability to contribute to
producing the goal target type.
"""
def __init__(self, group):
self.group = group
def __hash__(self):
g = self.group
x = g.consumed_sources.keys()
x.sort()
return hash((g.target_type, g.size, _sort_tuple(g.extra_targets), tuple(x)))
def __eq__(self, other):
return (
self.group.target_type == other.group.target_type
and self.group.size == other.group.size
and self.group.extra_targets == other.group.extra_targets
and self.group.consumed_sources == other.group.consumed_sources)
queue_moves = 0
def optimal_graphs(target_type, source_groups, generators):
"""A 'simple generator' that produces the sequence of least-cost
solutions for producing the
target type from a subset of the source_groups, using the given
generators.
"""
# An index from target type to lists of groups with that type.
all_groups = {}
# Prime the priority Queue
q = [ State(g) for g in source_groups ]
# Keep a record of all known states in the search
states = dict([ (s,s) for s in q ])
solution_cost = None
while q:
# remove a group from the queue
g = q[0].group
del q[0]
global queue_moves
queue_moves += 1
global debug
if debug:
print '-------'
print graph(g)
if g.target_type == target_type: # and g.consumes(source_groups):
solution_cost = g.cost
yield g
if g.consumes(source_groups): # Nothing left to find
return
# combine with all like groups which are compatible
for g2 in all_groups.get(g.target_type,()):
if g2.is_compatible_with(g):
_examine_edge(
states, q,
TargetTypeGroup(g.target_type, g2.size + g.size, (g,g2)))
# expand with all generators which can be triggered as a
# result of adding this group
for generator in generators[g.target_type]:
match = generator.match(g)
if match is None:
continue
if debug:
print generator,' matched with ', match
# for all sets of parents which match the generator and
# include g
for s in parent_sets((g,), match, all_groups, generator):
# Create the products of running this generator with
# the given parent set
siblings = ()
for t,n in generator.targets_.items():
# Unary generators run as many times as necessary
# to consume the group
if (generator.unary):
n *= g.size
siblings += (TargetTypeGroup(t, n, s, generator),)
# Make sure groups know about their siblings
if len(siblings) > 1:
for product in siblings:
product.set_siblings(siblings)
if debug:
print siblings, '<-', list(s)
# Add new search states to the queue
for sib in siblings:
_examine_edge(states, q, sib)
# Add to the set of all groups so that we can combine it with
# others in future iterations
l = all_groups.get(g.target_type)
if l is None:
l = all_groups.setdefault(g.target_type,[])
l.append(g)
# Sort the queue; in 'real life' use a priority queue
q.sort(lambda v1,v2: v1.group.cost - v2.group.cost)
def graph(group, indent = 0, visited = None):
"""Produce a string representation of the search graph
that produced the given group.
"""
if (visited is None):
visited = {}
s = indent * ' '
s += repr(group)
if group in visited:
s += '...\n'
else:
visited[group] = True
s += '[%s]' % group.generator
if group.ambiguous:
s += ' *ambiguous* '
if type(group.ambiguous) is not type(1):
s += 'due to %s' % group.ambiguous
if group.siblings:
s += ' siblings ' + str([sib for sib in group.siblings if
sib != group])
s += '\n' + '\n'.join(
[graph(g,indent+1,visited) for g in group.parents])
return s
def ambiguities(group):
"""Returns a list of groups that caused ambiguities with this one
or its constituents.
"""
result = []
for g in group.parents:
result.extend(ambiguities(g))
if group.ambiguous and type(group.ambiguous) is not type(1):
result.append(group.ambiguous)
return result
def search(generators, targets, sources):
import sys
global queue_moves
TargetTypeGroup.instances = 0
queue_moves = 0
# Remember what we started with
source_groups = tuple([
TargetTypeGroup(i[0],i[1])
for i in _string_multiset(sources).items() ])
solutions = {}
max_consumed = 0
for g in optimal_graphs(targets, source_groups, generators):
if len(g.consumed_sources) > max_consumed:
max_consumed = len(g.consumed_sources)
g2 = solutions.setdefault(len(g.consumed_sources), g)
if g2 is not g:
if g2.cost == g.cost:
g2.ambiguous = g
if max_consumed:
g = solutions[max_consumed]
print 80 * '='
print graph(g)
if g.ambiguous:
print 40 * '-'
print 'ambiguities:'
for a in ambiguities(g):
print graph(a)
print
print queue_moves, 'queue_moves'
print 80 * '='
sys.stdout.flush()
print queue_moves, 'queue moves'
print '\n\n*****\n\n'
def test():
"""Runs Volodya's example, but doesn't get the result he'd like.
"""
# EST_EXE <- OBJ*
# OBJ <- CPP
# {CPP,STATIC_DATA} <- NM_ASM
# {CPP,CPP} <- ECPP (only first CPP must be further converted into NM_ASM)
# NM_ASM <- CPP
# {CPP,STATIC_DATA} <- STATIC_DATA*
# STATIC_DATA <- NM_ASM
# NM_OBJ <- NM_ASM
# NM_EXE <- NM_OBJ*
generators = GeneratorSet()
generators += Generator('OBJ*', 'EST_EXE')
generators += Generator('CPP', 'OBJ')
generators += Generator('NM_ASM', ('CPP', 'STATIC_DATA'))
generators += Generator('ECPP', ('CPP','CPP'))
generators += Generator('CPP', 'NM_ASM')
generators += Generator('STATIC_DATA*', ('CPP', 'STATIC_DATA'))
generators += Generator('NM_ASM', 'NM_OBJ')
generators += Generator('NM_OBJ*', 'NM_EXE')
search(generators, 'EST_EXE', ('CPP', 'NM_ASM', 'ECPP'))
# Try the same search with a source type that can't be consumed.
# This will exhaust all transformations before stopping.
search(generators, 'EST_EXE', ('CPP', 'NM_ASM', 'ECPP', 'FOO'))
search(generators, 'NM_EXE', ('CPP'))
def run(args = None):
import doctest
import sys
if args is not None:
sys.argv = args
error = doctest.testmod(sys.modules.get(__name__))
if not error[0]:
global debug
if '--debug' in sys.argv:
debug = 1
test()
return error
if __name__ == '__main__':
import sys
sys.exit(run()[0])
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -