⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dhash.py

📁 基于chord算法的p2p文件系统。A p2p file system based on chord.
💻 PY
📖 第 1 页 / 共 2 页
字号:
import bisectfrom utils import betweenfrom simulator import eventfrom chord import chordclass dhash (chord):    """Provides a paramterizable insertion and repair of blocks.    The repair implementation does not delete excess fragments from nodes.    Nodes may wish to run GC separately."""    # XXX How long to wait until we start repair? Not instant??    def __init__ (my, args = []):	chord.__init__ (my, args)	my.do_repair = 0	# Sizes of all blocks        my.blocks = {}	# Sorted list of blocks.keys ()        my.block_keys = []	# Incrementally maintained mapping from blocks->num of copies	# in look_ahead () of successor.	my.available = {}	# block -> (time of last unavailability) mapping 	my.unavailable_time = {}	# The total number of seconds that blocks are unavailable.	my.total_unavailability = 0	for a in args: 	    if a == 'repair':		my.do_repair = 1		my.add_node   = my.add_node_repair		my.fail_node  = my.fail_node_repair		my.crash_node = my.crash_node_repair	    elif a == 'repair++crash':		my.do_repair = 1		my.add_node   = my.add_node_repair		my.crash_node = my.crash_node_repair    def process (my, ev):	now = ev.time	uat = my.unavailable_time	for b in uat:	    my.total_unavailability += now - uat[b]	    uat[b] = now	if ev.type == "insert":	    return my.insert_block (ev.id, ev.block, ev.size)	elif ev.type == "copy":	    return my.copy_block (ev.time, ev.src_id, ev.id, ev.src_time, ev.block, ev.size, ev.desc)	return chord.process (my, ev)    def _extant_counts_check (my):	"""How many copies of each block are there?"""	blocks = {}	for n in my.nodes:	    for b in n.blocks:		blocks[b] = blocks.get (b, 0) + 1	extant = blocks.values ()	# assert blocks == my.available # same keys and same values???	return extant    def _available_blocks_check (my):	"""Find how many blocks would be using look_ahead reading."""	scannable = my.look_ahead ()        needed = my.read_pieces ()	k = my.block_keys	getsucclist = my.succ	avail = 0	try:	    succs = getsucclist (k[0], scannable) 	    szeroid = succs[0].id	except IndexError:	    return 0	for b in k:	    extant = 0	    if b > szeroid:		succs = getsucclist (b, scannable)		szeroid = succs[0].id	    for s in succs:		if b in s.blocks:		    extant += 1	    if extant >= needed:		avail += 1	assert my.available_blocks () == avail	return avail    def available_blocks (my):	"""Number of readable blocks"""	needed = my.read_pieces ()	counts = my.available.values ()	extant = [x for x in counts if x >= needed]	return len (extant), counts    # Subclass and redefine these methods to produce more interesting    # storage behavior.    def insert_block (my, id, block, size):        """Basic insertion code to store insert_pieces() pieces on        successors of key, via id.."""        if block not in my.blocks:            my.blocks[block] = size	    bisect.insort (my.block_keys, block)        succs = my.succ (block, my.insert_pieces ())        isz   = my.insert_piece_size (size)        for s in succs:            s.store (block, isz)	my.available[block] = len (succs)        n = my.allnodes[id]        n.nrpc += len (succs)        n.sent_bytes += isz * len (succs)	n.sent_bytes_breakdown['insert'] += isz * len (succs)        return None    def copy_block (my, t, src, dst, lastsrc, block, size, desc):	# Typically used for repairs so should already know about this	assert block in my.blocks	if src in my.deadnodes or dst in my.deadnodes:	    return None	s = my.allnodes[src]	if s.last_alive != lastsrc:	    print "# Source failed to copy", block	    return None	s.nrpc += 1	s.sent_bytes += size	s.sent_bytes_breakdown['%s_repair_write' % desc] += size	d = my.allnodes[dst]	needed = my.read_pieces ()	real_succs = my.succ (block, my.look_ahead ())	if block not in d.blocks and d in real_succs:	    d.store (block, size)	    my.available[block] += 1	    if my.available[block] == needed:		del my.unavailable_time[block]	return None    def _repair_join (my, t, an, succs, resp_blocks):	if an == succs[-1]:	    events = []	    # Blocks that someone else is responsible for	    # should go back to that person because the next	    # join might push me out of place.	    # XXX perhaps should only do this when we're actually	    #     out of place, a la pmaint.	    for b in resp_blocks:		try:		    # If not stored on an, raise KeyError		    sz = an.blocks[b]		except KeyError:		    continue		for s in succs:		    if b not in s.blocks:			nt = int (an.sendremote (t, sz) + 0.5)			# XXX should move fragments, or			# possibly reconstruct and make a new one.			events.append (event (nt, "copy",			    ["pmaint", an.id, an.last_alive, s.id, b, sz]))			break	    return events    def _repair_fail (my, t, an, succs, resp_blocks):	"""Helper to repair_fail that does real work"""	events = []	read_pieces = my.read_pieces ()	min_pieces = my.min_pieces ()	repair_pieces = my.repairto_pieces ()	insert_piece_size = my.insert_piece_size         for b in resp_blocks:            # Check their availability	    haves = []	    donthaves = []            for s in succs:                if b in s.blocks:		    haves.append (s)		else:		    donthaves.append (s)	    avail = len (haves)	    my.available[b] = avail            if avail < read_pieces:		if b not in my.unavailable_time:		    my.unavailable_time[b] = t		    # print "# LOST block", b, "after failure of", an, "|", succs	    elif avail < min_pieces:		# print "# REPAIR block", b, "after failure of", an		needed = repair_pieces - avail                isz = insert_piece_size (my.blocks[b])		fixer = haves.pop (0)		# XXX should pick the best fixers? min nextsendtime		#     successor directs all repairs?                for s in donthaves:		    # Round event time up to the next whole unit		    nt = int (fixer.sendremote (t, isz) + 0.5)		    events.append (event (nt, "copy",			["failure", fixer.id, fixer.last_alive, s.id, b, isz]))		    needed -= 1		    if needed <= 0: break		if b not in fixer.cached_blocks:		    # Account for bytes needed to reassemble the block.		    nread = read_pieces		    for s in haves:			# the fixer has his own copy			nread -= 1			if nread <= 0: break			s.sent_bytes += isz			s.sent_bytes_breakdown['failure_repair_read'] += isz		    fixer.cached_blocks[b] = 1		    # XXX account for disk used by cache	return events    def repair_pmaint (my, t, an):	"""aka partition maintenance"""	events = []	runlen = min (my.look_ahead ()+ 1, len (my.nodes))	needed = my.read_pieces ()	getsucclist = my.succ	for b in an.blocks:	    # Need to call getsucclist for n == an, since an	    # might be really far out of place or really old,	    # but for the other nodes, it will be sort of	    # repeated overlapping so more of a waste of time.	    succs = getsucclist (b, runlen)	    if an not in succs:		# We're not in the successor list so pmaint		for s in succs:		    if b not in s.blocks:			isz = an.blocks[b]			nt = int (an.sendremote (t, isz) + 0.5)			# XXX should move fragments, or			# possibly reconstruct and make a new one.			events.append (event (nt, "copy",			    ["pmaint", an.id, an.last_alive, s.id, b, isz]))			break	    else:		my.available[b] += 1		if my.available[b] == needed:		    del my.unavailable_time[b]	return events    def repair (my, t, affected_node):	newevs = []	runlen = min (my.look_ahead (), len (my.nodes))        preds = my.pred (affected_node, runlen)        succs = my.succ (affected_node, runlen)	if affected_node.alive:	    newevs = my.repair_pmaint (t, affected_node)	    repair = my._repair_join	else:	    repair = my._repair_fail        # succ's does not include affected_node if it is dead.        span = preds + succs	k = my.block_keys	# consider the predecessors who should be doing the repair        for i in range(1,len(span) - runlen + 1):            p = span[i - 1]	    # let them see further than they would have inserted.            s = span[i:i+runlen]	    if (p.id <= s[0].id):		start = bisect.bisect_left (k, p.id)		stop  = bisect.bisect_right (k, s[0].id)		r = k[start:stop]	    else:		start = bisect.bisect_right (k, p.id)		stop  = bisect.bisect_left  (k, s[0].id)		r = k[start:] + k[:stop]	    evs = repair (t, affected_node, s, r)	    if evs is not None:		newevs += evs	return newevs    # Primary implementations that keep track of what blocks are    # available incrementally.    def add_node (my, t, id):	newevs = chord.add_node (my, t, id)	needed = my.read_pieces ()	n = my.allnodes[id]	av = my.available	uat = my.unavailable_time	la = my.look_ahead ()	getsucclist = my.succ	for b in n.blocks:	    # XXX optimize to only call my.succ when it should change	    real_succs = getsucclist (b, la + 1)	    if n in real_succs[:-1]:		if b in real_succs[-1].blocks:		    av[b] -= 1		    if b not in uat and av[b] < needed: uat[b] = t		av[b] += 1		if av[b] == needed:		    del uat[b]	return newevs

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -