📄 dhash.py
字号:
import bisectfrom utils import betweenfrom simulator import eventfrom chord import chordclass dhash (chord): """Provides a paramterizable insertion and repair of blocks. The repair implementation does not delete excess fragments from nodes. Nodes may wish to run GC separately.""" # XXX How long to wait until we start repair? Not instant?? def __init__ (my, args = []): chord.__init__ (my, args) my.do_repair = 0 # Sizes of all blocks my.blocks = {} # Sorted list of blocks.keys () my.block_keys = [] # Incrementally maintained mapping from blocks->num of copies # in look_ahead () of successor. my.available = {} # block -> (time of last unavailability) mapping my.unavailable_time = {} # The total number of seconds that blocks are unavailable. my.total_unavailability = 0 for a in args: if a == 'repair': my.do_repair = 1 my.add_node = my.add_node_repair my.fail_node = my.fail_node_repair my.crash_node = my.crash_node_repair elif a == 'repair++crash': my.do_repair = 1 my.add_node = my.add_node_repair my.crash_node = my.crash_node_repair def process (my, ev): now = ev.time uat = my.unavailable_time for b in uat: my.total_unavailability += now - uat[b] uat[b] = now if ev.type == "insert": return my.insert_block (ev.id, ev.block, ev.size) elif ev.type == "copy": return my.copy_block (ev.time, ev.src_id, ev.id, ev.src_time, ev.block, ev.size, ev.desc) return chord.process (my, ev) def _extant_counts_check (my): """How many copies of each block are there?""" blocks = {} for n in my.nodes: for b in n.blocks: blocks[b] = blocks.get (b, 0) + 1 extant = blocks.values () # assert blocks == my.available # same keys and same values??? return extant def _available_blocks_check (my): """Find how many blocks would be using look_ahead reading.""" scannable = my.look_ahead () needed = my.read_pieces () k = my.block_keys getsucclist = my.succ avail = 0 try: succs = getsucclist (k[0], scannable) szeroid = succs[0].id except IndexError: return 0 for b in k: extant = 0 if b > szeroid: succs = getsucclist (b, scannable) szeroid = succs[0].id for s in succs: if b in s.blocks: extant += 1 if extant >= needed: avail += 1 assert my.available_blocks () == avail return avail def available_blocks (my): """Number of readable blocks""" needed = my.read_pieces () counts = my.available.values () extant = [x for x in counts if x >= needed] return len (extant), counts # Subclass and redefine these methods to produce more interesting # storage behavior. def insert_block (my, id, block, size): """Basic insertion code to store insert_pieces() pieces on successors of key, via id..""" if block not in my.blocks: my.blocks[block] = size bisect.insort (my.block_keys, block) succs = my.succ (block, my.insert_pieces ()) isz = my.insert_piece_size (size) for s in succs: s.store (block, isz) my.available[block] = len (succs) n = my.allnodes[id] n.nrpc += len (succs) n.sent_bytes += isz * len (succs) n.sent_bytes_breakdown['insert'] += isz * len (succs) return None def copy_block (my, t, src, dst, lastsrc, block, size, desc): # Typically used for repairs so should already know about this assert block in my.blocks if src in my.deadnodes or dst in my.deadnodes: return None s = my.allnodes[src] if s.last_alive != lastsrc: print "# Source failed to copy", block return None s.nrpc += 1 s.sent_bytes += size s.sent_bytes_breakdown['%s_repair_write' % desc] += size d = my.allnodes[dst] needed = my.read_pieces () real_succs = my.succ (block, my.look_ahead ()) if block not in d.blocks and d in real_succs: d.store (block, size) my.available[block] += 1 if my.available[block] == needed: del my.unavailable_time[block] return None def _repair_join (my, t, an, succs, resp_blocks): if an == succs[-1]: events = [] # Blocks that someone else is responsible for # should go back to that person because the next # join might push me out of place. # XXX perhaps should only do this when we're actually # out of place, a la pmaint. for b in resp_blocks: try: # If not stored on an, raise KeyError sz = an.blocks[b] except KeyError: continue for s in succs: if b not in s.blocks: nt = int (an.sendremote (t, sz) + 0.5) # XXX should move fragments, or # possibly reconstruct and make a new one. events.append (event (nt, "copy", ["pmaint", an.id, an.last_alive, s.id, b, sz])) break return events def _repair_fail (my, t, an, succs, resp_blocks): """Helper to repair_fail that does real work""" events = [] read_pieces = my.read_pieces () min_pieces = my.min_pieces () repair_pieces = my.repairto_pieces () insert_piece_size = my.insert_piece_size for b in resp_blocks: # Check their availability haves = [] donthaves = [] for s in succs: if b in s.blocks: haves.append (s) else: donthaves.append (s) avail = len (haves) my.available[b] = avail if avail < read_pieces: if b not in my.unavailable_time: my.unavailable_time[b] = t # print "# LOST block", b, "after failure of", an, "|", succs elif avail < min_pieces: # print "# REPAIR block", b, "after failure of", an needed = repair_pieces - avail isz = insert_piece_size (my.blocks[b]) fixer = haves.pop (0) # XXX should pick the best fixers? min nextsendtime # successor directs all repairs? for s in donthaves: # Round event time up to the next whole unit nt = int (fixer.sendremote (t, isz) + 0.5) events.append (event (nt, "copy", ["failure", fixer.id, fixer.last_alive, s.id, b, isz])) needed -= 1 if needed <= 0: break if b not in fixer.cached_blocks: # Account for bytes needed to reassemble the block. nread = read_pieces for s in haves: # the fixer has his own copy nread -= 1 if nread <= 0: break s.sent_bytes += isz s.sent_bytes_breakdown['failure_repair_read'] += isz fixer.cached_blocks[b] = 1 # XXX account for disk used by cache return events def repair_pmaint (my, t, an): """aka partition maintenance""" events = [] runlen = min (my.look_ahead ()+ 1, len (my.nodes)) needed = my.read_pieces () getsucclist = my.succ for b in an.blocks: # Need to call getsucclist for n == an, since an # might be really far out of place or really old, # but for the other nodes, it will be sort of # repeated overlapping so more of a waste of time. succs = getsucclist (b, runlen) if an not in succs: # We're not in the successor list so pmaint for s in succs: if b not in s.blocks: isz = an.blocks[b] nt = int (an.sendremote (t, isz) + 0.5) # XXX should move fragments, or # possibly reconstruct and make a new one. events.append (event (nt, "copy", ["pmaint", an.id, an.last_alive, s.id, b, isz])) break else: my.available[b] += 1 if my.available[b] == needed: del my.unavailable_time[b] return events def repair (my, t, affected_node): newevs = [] runlen = min (my.look_ahead (), len (my.nodes)) preds = my.pred (affected_node, runlen) succs = my.succ (affected_node, runlen) if affected_node.alive: newevs = my.repair_pmaint (t, affected_node) repair = my._repair_join else: repair = my._repair_fail # succ's does not include affected_node if it is dead. span = preds + succs k = my.block_keys # consider the predecessors who should be doing the repair for i in range(1,len(span) - runlen + 1): p = span[i - 1] # let them see further than they would have inserted. s = span[i:i+runlen] if (p.id <= s[0].id): start = bisect.bisect_left (k, p.id) stop = bisect.bisect_right (k, s[0].id) r = k[start:stop] else: start = bisect.bisect_right (k, p.id) stop = bisect.bisect_left (k, s[0].id) r = k[start:] + k[:stop] evs = repair (t, affected_node, s, r) if evs is not None: newevs += evs return newevs # Primary implementations that keep track of what blocks are # available incrementally. def add_node (my, t, id): newevs = chord.add_node (my, t, id) needed = my.read_pieces () n = my.allnodes[id] av = my.available uat = my.unavailable_time la = my.look_ahead () getsucclist = my.succ for b in n.blocks: # XXX optimize to only call my.succ when it should change real_succs = getsucclist (b, la + 1) if n in real_succs[:-1]: if b in real_succs[-1].blocks: av[b] -= 1 if b not in uat and av[b] < needed: uat[b] = t av[b] += 1 if av[b] == needed: del uat[b] return newevs
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -