📄 dhash.py
字号:
def fail_node (my, t, id): try: n = my.allnodes[id] av = my.available uat = my.unavailable_time la = my.look_ahead () needed = my.read_pieces () getsucclist = my.succ # XXX some bug here for b in n.blocks: real_succs = getsucclist (b, la) if n in real_succs: av[b] -= 1 if b not in uat and av[b] < needed: uat[b] = t return chord.fail_node (my, t, id) except: pass return None def crash_node (my, t, id): try: n = my.allnodes[id] av = my.available uat = my.unavailable_time la = my.look_ahead () needed = my.read_pieces () getsucclist = my.succ for b in n.blocks: real_succs = getsucclist (b, la) if n in real_succs: av[b] -= 1 if b not in uat and av[b] < needed: uat[b] = t return chord.crash_node (my, t, id) except: pass return None # Alternate implementations that are called if do_repair == 1 # Block availability is tracked via repair instead. def add_node_repair (my, t, id): newevs = chord.add_node (my, t, id) evs = my.repair (t, my.allnodes[id]) if newevs is not None: newevs += evs else: newevs = evs return newevs def fail_node_repair (my, t, id): newevs = chord.fail_node (my, t, id) try: evs = my.repair (t, my.allnodes[id]) if newevs is not None: newevs += evs else: newevs = evs except KeyError: pass return newevs def crash_node_repair (my, t, id): newevs = chord.crash_node (my, t, id) try: evs = my.repair (t, my.allnodes[id]) if newevs is not None: newevs += evs else: newevs = evs except KeyError: pass return newevs # Subclass and redefine these methods to explore basic changes. def min_pieces (my): """The minimum number of pieces we need before repairing. Probably should be at least as big as read_pieces.""" return 0 def repairto_pieces (my): """The number of pieces to reach for when repairing.""" return 0 def read_pieces (my): """The number of pieces on different nodes needed to for successful read.""" return 0 def look_ahead (my): """Number of nodes ahead to check to see if there are already fragments.""" return 0 def insert_pieces (my): """The number of pieces to write into the system initially.""" return 0 def insert_piece_size (my, whole_size): """How big is each piece that gets inserted?""" return 0class dhash_replica (dhash): """Simple replication First argument should be number of replicas (defaulting to three). If repair desired, add additional 'repair' argument""" def __init__ (my, args = []): dhash.__init__ (my, args) try: my.replicas = int (args[0]) except: my.replicas = 3 def min_pieces (my): return my.replicas def repairto_pieces (my): return my.replicas def read_pieces (my): return 1 def insert_pieces (my): return my.replicas def insert_piece_size (my, size): return size def look_ahead (my): return 16class total_recall_replica (dhash): """Total recall's repair algorithm with dhash's placement""" def __init__ (my, args = []): # short and long term redundancy factors try: my.shortt = int (args.pop (0)) except: my.shortt = 2 try: my.longt = int (args.pop (0)) except: my.longt = 4 dhash.__init__ (my, args) def min_pieces (my): return my.shortt def repairto_pieces (my): return my.longt def read_pieces (my): return 1 def insert_pieces (my): return my.longt def insert_piece_size (my, size): return size def look_ahead (my): return my.longtclass dhash_fragments (dhash): def __init__ (my, args = []): dhash.__init__ (my, args) try: my.dfrags = int (args[0]) except: my.dfrags = 3 try: my.efrags = int (args[1]) except: my.efrags = 2 * my.dfrags def min_pieces (my): # Should this by dfrags? return my.efrags def repairto_pieces (my): # Same as insert_pieces and min_pieces?? return my.efrags def read_pieces (my): return my.dfrags def insert_pieces (my): return my.efrags def look_ahead (my): return 3 * my.efrags def insert_piece_size (my, size): # A vague estimate of overhead from encoding... 2%-ish return int (1.02 * (size / my.dfrags))class dhash_cates (dhash): def min_pieces (my): return 14 def repairto_pieces (my): return 14 def read_pieces (my): return 7 def insert_pieces (my): return 14 def look_ahead (my): return 16 def insert_piece_size (my, size): # A vague estimate of overhead from encoding... 2%-ish return int (1.02 * (size / 7)) def add_node (my, t, id): newevs = dhash.add_node (my, t, id) if len (my.nodes) >= 16: evs = my._pmaint_join (t, my.allnodes[id]) if newevs is not None: newevs += evs else: newevs = evs return newevs def _repair_join_check (my, t, n, b, succs): if n not in succs: # We're not in the successor list so pmaint for s in succs: if b not in s.blocks: sz = n.blocks[b] nt = int (n.sendremote (t, sz) + 0.5) # XXX should move fragments, or # possibly reconstruct and make a new one. return event (nt, "copy", ["pmaint", n.id, n.last_alive, s.id, b, sz]) def _pmaint_join_XXX (my, t, an): """aka partition maintenance""" events = [] runlen = min (my.look_ahead () + 1, len (my.nodes)) getsucclist = my.succ # Fix blocks that we're no longer in the succlist for. for b in an.blocks: # Need to call getsucclist for n == an, since an # might be really far out of place or really old, succs = getsucclist (b, runlen) e = my._repair_join_check (t, an, b, succs) if e: events.append (e) ansucclist = getsucclist (an, runlen) assert ansucclist[0] == an anpredlist = my.pred (an, runlen - 1) neighborhood = anpredlist + ansucclist assert runlen == len (anpredlist) # Index of an's successor # i = len (anpredlist) + 1 # Each of an's successors has just been bumped off # a successor list, so they'd better do some moving. for i in range (runlen + 1, len(neighborhood)): pid = neighborhood[i - runlen - 1].id sid = neighborhood[i - 1].id for b in neighborhood[i].blocks: if not between (b, pid, sid): isz = an.blocks[b] # We're not in the successor list so pmaint for s in neighborhood[0]: # XXX BOGUS!!! if b not in s.blocks: nt = int (an.sendremote (t, isz) + 0.5) # XXX should move fragments, or # possibly reconstruct and make a new one. events.append (event (nt, "copy", ["pmaint", an.id, an.last_alive, s.id, b, isz])) break return events def _pmaint_join (my, t, n): events = [] preds = my.pred (n, 16) nid = n.id # Perhaps this new guy has returned with some new blocks # that he now should give away to someone else... pid = preds[0].id lostblocks = [b for b in n.blocks if not between (b, pid, nid)] # if len(lostblocks): print "# LOST", len(lostblocks), "blocks" for b in lostblocks: lsuccs = my.succ (b, 14) for s in lsuccs: if b not in s.blocks: assert n != s isz = my.insert_piece_size (my.blocks[b]) # Round event time up to the next whole unit nt = int (n.sendremote (t, isz) + 0.5) events.append (event (nt, "copy", ["pmaint", nid, n.last_alive, s.id, b, isz])) break # Either someone wanted it and they took it, or no one wanted it. # Safe to delete either way n.unstore (b) # Next, fix blocks that are in the wrong place now because # this guy appeared; give them to this new guy. succs = my.succ (n, 18) for s in succs[1:]: lostblocks = [b for b in s.blocks if between (b, pid, nid)] if len(lostblocks): print "# LoST", len(lostblocks), "blocks" for b in lostblocks: if b not in n.blocks: print "# MOVING", b, "from", s, "to", n isz = my.insert_piece_size (my.blocks[b]) n.store (b, isz) s.sent_bytes += isz s.sent_bytes_breakdown['pmaint_repair_write'] += isz s.unstore (b) return events
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -