📄 svnmerge.py
字号:
of the range of our log. Once self.revs and self.values are filled, we can find the value of the property at any arbitrary revision using a binary search on self.revs. Once we find the last revision during which the property was changed, we can lookup the associated value in self.values. (If the associated value is None, the associated value was not cached and we have to do a full propget.) An example: We know that the 'svnmerge' property was added in r10, and changed in r21. We gathered log info up until r40. revs = [0, 10, 21, 40] values = [None, "val1", "val2", None] What these values say: - From r0 to r9, we know nothing about the property. - In r10, the property was set to "val1". This property stayed the same until r21, when it was changed to "val2". - We don't know what happened after r40. """ def __init__(self, url, name): """View the history of a versioned property at URL with name""" self.url = url self.name = name # We know nothing about the value of the property. Setup revs # and values to indicate as such. self.revs = [0] self.values = [None] # We don't have any revisions cached self._initial_value = None self._changed_revs = [] self._changed_values = [] def load(self, log): """ Load the history of property changes from the specified RevisionLog object. """ # Get the property value before the range of the log if log.begin > 1: self.revs.append(log.begin-1) try: self._initial_value = self.raw_get(log.begin-1) except LaunchError: # The specified URL might not exist before the # range of the log. If so, we can safely assume # that the property was empty at that time. self._initial_value = { } self.values.append(self._initial_value) else: self._initial_value = { } self.values[0] = self._initial_value # Cache the property values in the log range old_value = self._initial_value for rev in log.propchange_revs: new_value = self.raw_get(rev) if new_value != old_value: self._changed_revs.append(rev) self._changed_values.append(new_value) self.revs.append(rev) self.values.append(new_value) new_value = old_value # Indicate that we know nothing about the value of the property # after the range of the log. if log.revs: self.revs.append(log.end+1) self.values.append(None) def raw_get(self, rev=None): """ Get the property at revision 'rev'. If rev is not specified, get the property at 'head'. """ return get_revlist_prop(self.url, self.name, rev) def get(self, rev=None): """ Get the property at revision 'rev'. If rev is not specified, get the property at 'head'. """ if rev is not None: # Find the index using a binary search i = bisect(self.revs, rev) - 1 # Return the value of the property, if it was cached if self.values[i] is not None: return self.values[i] # Get the current value of the property return self.raw_get(rev) def changed_revs(self, key=None): """ Get a list of the revisions in which the specified dictionary key was changed in this property. If key is not specified, return a list of revisions in which any key was changed. """ if key is None: return self._changed_revs else: changed_revs = [] old_val = self._initial_value for rev, val in zip(self._changed_revs, self._changed_values): if val.get(key) != old_val.get(key): changed_revs.append(rev) old_val = val return changed_revsclass RevisionSet: """ A set of revisions, held in dictionary form for easy manipulation. If we were to rewrite this script for Python 2.3+, we would subclass this from set (or UserSet). As this class does not include branch information, it's assumed that one instance will be used per branch. """ def __init__(self, parm): """Constructs a RevisionSet from a string in property form, or from a dictionary whose keys are the revisions. Raises ValueError if the input string is invalid.""" self._revs = {} revision_range_split_re = re.compile('[-:]') if isinstance(parm, types.DictType): self._revs = parm.copy() elif isinstance(parm, types.ListType): for R in parm: self._revs[int(R)] = 1 else: parm = parm.strip() if parm: for R in parm.split(","): rev_or_revs = re.split(revision_range_split_re, R) if len(rev_or_revs) == 1: self._revs[int(rev_or_revs[0])] = 1 elif len(rev_or_revs) == 2: for rev in range(int(rev_or_revs[0]), int(rev_or_revs[1])+1): self._revs[rev] = 1 else: raise ValueError, 'Ill formatted revision range: ' + R def sorted(self): revnums = self._revs.keys() revnums.sort() return revnums def normalized(self): """Returns a normalized version of the revision set, which is an ordered list of couples (start,end), with the minimum number of intervals.""" revnums = self.sorted() revnums.reverse() ret = [] while revnums: s = e = revnums.pop() while revnums and revnums[-1] in (e, e+1): e = revnums.pop() ret.append((s, e)) return ret def __str__(self): """Convert the revision set to a string, using its normalized form.""" L = [] for s,e in self.normalized(): if s == e: L.append(str(s)) else: L.append(str(s) + "-" + str(e)) return ",".join(L) def __contains__(self, rev): return self._revs.has_key(rev) def __sub__(self, rs): """Compute subtraction as in sets.""" revs = {} for r in self._revs.keys(): if r not in rs: revs[r] = 1 return RevisionSet(revs) def __and__(self, rs): """Compute intersections as in sets.""" revs = {} for r in self._revs.keys(): if r in rs: revs[r] = 1 return RevisionSet(revs) def __nonzero__(self): return len(self._revs) != 0 def __len__(self): """Return the number of revisions in the set.""" return len(self._revs) def __iter__(self): return iter(self.sorted()) def __or__(self, rs): """Compute set union.""" revs = self._revs.copy() revs.update(rs._revs) return RevisionSet(revs)def merge_props_to_revision_set(merge_props, path): """A converter which returns a RevisionSet instance containing the revisions from PATH as known to BRANCH_PROPS. BRANCH_PROPS is a dictionary of path -> revision set branch integration information (as returned by get_merge_props()).""" if not merge_props.has_key(path): error('no integration info available for repository path "%s"' % path) return RevisionSet(merge_props[path])def dict_from_revlist_prop(propvalue): """Given a property value as a string containing per-head revision lists, return a dictionary whose key is a relative path to a head (in the repository), and whose value is the revisions for that head.""" prop = {} # Multiple heads are separated by any whitespace. for L in propvalue.split(): # We use rsplit to play safe and allow colons in paths. head, revs = rsplit(L.strip(), ":", 1) prop[head] = revs return propdef get_revlist_prop(url_or_dir, propname, rev=None): """Given a repository URL or working copy path and a property name, extract the values of the property which store per-head revision lists and return a dictionary whose key is a relative path to a head (in the repository), and whose value is the revisions for that head.""" # Note that propget does not return an error if the property does # not exist, it simply does not output anything. So we do not need # to check for LaunchError here. args = '--strict "%s" "%s"' % (propname, url_or_dir) if rev: args = '-r %s %s' % (rev, args) out = launchsvn('propget %s' % args, split_lines=False) return dict_from_revlist_prop(out)def get_merge_props(dir): """Extract the merged revisions.""" return get_revlist_prop(dir, opts["prop"])def get_block_props(dir): """Extract the blocked revisions.""" return get_revlist_prop(dir, opts["block-prop"])def get_blocked_revs(dir, head_path): p = get_block_props(dir) if p.has_key(head_path): return RevisionSet(p[head_path]) return RevisionSet("")def format_merge_props(props, sep=" "): """Formats the hash PROPS as a string suitable for use as a Subversion property value.""" assert sep in ["\t", "\n", " "] # must be a whitespace props = props.items() props.sort() L = [] for h, r in props: L.append(h + ":" + r) return sep.join(L)def _run_propset(dir, prop, value): """Set the property 'prop' of directory 'dir' to value 'value'. We go through a temporary file to not run into command line length limits.""" try: fd, fname = tempfile.mkstemp() f = os.fdopen(fd, "wb") except AttributeError: # Fallback for Python <= 2.3 which does not have mkstemp (mktemp # suffers from race conditions. Not that we care...) fname = tempfile.mktemp() f = open(fname, "wb") try: f.write(value) f.close() report("property data written to temp file: %s" % value) svn_command('propset "%s" -F "%s" "%s"' % (prop, fname, dir)) finally: os.remove(fname)def set_props(dir, name, props): props = format_merge_props(props) if props: _run_propset(dir, name, props) else: svn_command('propdel "%s" "%s"' % (name, dir))def set_merge_props(dir, props): set_props(dir, opts["prop"], props)def set_block_props(dir, props): set_props(dir, opts["block-prop"], props)def set_blocked_revs(dir, head_path, revs): props = get_block_props(dir) if revs: props[head_path] = str(revs) elif props.has_key(head_path): del props[head_path] set_block_props(dir, props)def is_url(url): """Check if url is a valid url.""" return re.search(r"^[a-zA-Z][-+\.\w]*://", url) is not Nonedef is_wc(dir): """Check if a directory is a working copy.""" return os.path.isdir(os.path.join(dir, ".svn")) or \ os.path.isdir(os.path.join(dir, "_svn"))_cache_svninfo = {}def get_svninfo(path): """Extract the subversion information for a path (through 'svn info'). This function uses an internal cache to let clients query information many times.""" global _cache_svninfo if _cache_svninfo.has_key(path): return _cache_svninfo[path] info = {} for L in launchsvn('info "%s"' % path): L = L.strip() if not L: continue key, value = L.split(": ", 1) info[key] = value.strip() _cache_svninfo[path] = info return infodef target_to_url(dir): """Convert working copy path or repos URL to a repos URL.""" if is_wc(dir): info = get_svninfo(dir) return info["URL"] return dirdef get_repo_root(dir): """Compute the root repos URL given a working-copy path, or a URL."""
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -