📄 svnmerge.py
字号:
# Try using "svn info WCDIR". This works only on SVN clients >= 1.3 if not is_url(dir): try: info = get_svninfo(dir) return info["Repository Root"] except KeyError: pass url = target_to_url(dir) assert url[-1] != '/' else: url = dir # Try using "svn info URL". This works only on SVN clients >= 1.2 try: info = get_svninfo(url) return info["Repository Root"] except LaunchError: pass # Constrained to older svn clients, we are stuck with this ugly # trial-and-error implementation. It could be made faster with a # binary search. while url: temp = os.path.dirname(url) try: launchsvn('proplist "%s"' % temp) except LaunchError: return url url = temp assert False, "svn repos root not found"def target_to_repos_relative_path(target): """Convert a target (either a working copy path or an URL) into a repository-relative path.""" root = get_repo_root(target) url = target_to_url(target) assert root[-1] != "/" assert url[:len(root)] == root, "url=%r, root=%r" % (url, root) return url[len(root):]def get_copyfrom(dir): """Get copyfrom info for a given target (it represents the directory from where it was branched). NOTE: repos root has no copyfrom info. In this case None is returned.""" repos_path = target_to_repos_relative_path(dir) out = launchsvn('log -v --xml --stop-on-copy "%s"' % dir, split_lines=False) out = out.replace("\n", " ") try: m = re.search(r'(<path\s[^>]*action="A"[^>]*>%s</path>)' % re.escape(repos_path), out) head = re.search(r'copyfrom-path="([^"]*)"', m.group(1)).group(1) rev = re.search(r'copyfrom-rev="([^"]*)"', m.group(1)).group(1) return head,rev except AttributeError: return None,Nonedef get_latest_rev(url): """Get the latest revision of the repository of which URL is part.""" try: return get_svninfo(url)["Revision"] except LaunchError: # Alternative method for latest revision checking (for svn < 1.2) report('checking latest revision of "%s"' % url) L = launchsvn('proplist --revprop -r HEAD "%s"' % opts["head-url"])[0] rev = re.search("revision (\d+)", L).group(1) report('latest revision of "%s" is %s' % (url, rev)) return revdef get_created_rev(url): """Lookup the revision at which the path identified by the provided URL was first created.""" oldest_rev = -1 report('determining oldest revision for URL "%s"' % url) ### TODO: Refactor this to use a modified RevisionLog class. lines = None cmd = "log -r1:HEAD --stop-on-copy -q " + url try: lines = launchsvn(cmd + " --limit=1") except LaunchError: # Assume that --limit isn't supported by the installed 'svn'. lines = launchsvn(cmd) if lines and len(lines) > 1: i = lines[1].find(" ") if i != -1: oldest_rev = int(lines[1][1:i]) if oldest_rev == -1: error('unable to determine oldest revision for URL "%s"' % url) return oldest_revdef get_commit_log(url, revnum): """Return the log message for a specific integer revision number.""" out = launchsvn("log --incremental -r%d %s" % (revnum, url)) return "".join(out[1:])def construct_merged_log_message(url, revnums): """Return a commit log message containing all the commit messages in the specified revisions at the given URL. The separator used in this log message is determined by searching for the longest svnmerge separator existing in the commit log messages and extending it by one more separator. This results in a new commit log message that is clearer in describing merges that contain other merges. Trailing newlines are removed from the embedded log messages.""" messages = [''] longest_sep = '' for r in revnums.sorted(): message = get_commit_log(url, r) if message: message = rstrip(message, "\n") + "\n" messages.append(prefix_lines(LOG_LINE_PREFIX, message)) for match in LOG_SEPARATOR_RE.findall(message): sep = match[1] if len(sep) > len(longest_sep): longest_sep = sep longest_sep += LOG_SEPARATOR + "\n" messages.append('') return longest_sep.join(messages)def get_default_head(branch_dir, branch_props): """Return the default head for branch_dir (given its branch_props). Error out if there is ambiguity.""" if not branch_props: error("no integration info available") props = branch_props.copy() directory = target_to_repos_relative_path(branch_dir) # To make bidirectional merges easier, find the target's # repository local path so it can be removed from the list of # possible integration sources. if props.has_key(directory): del props[directory] if len(props) > 1: err_msg = "multiple heads found. " err_msg += "Explicit head argument (-S/--head) required.\n" err_msg += "The head values available are:" for prop in props: err_msg += "\n " + prop error(err_msg) return props.keys()[0]def check_old_prop_version(branch_dir, props): """Check if props (of branch_dir) are svnmerge properties in old format, and emit an error if so.""" # Previous svnmerge versions allowed trailing /'s in the repository # local path. Newer versions of svnmerge will trim trailing /'s # appearing in the command line, so if there are any properties with # trailing /'s, they will not be properly matched later on, so require # the user to change them now. fixed = {} changed = False for head, revs in props.items(): h = rstrip(head, "/") fixed[h] = revs if h != head: changed = True if changed: err_msg = "old property values detected; an upgrade is required.\n\n" err_msg += "Please execute and commit these changes to upgrade:\n\n" err_msg += 'svn propset "%s" "%s" "%s"' % \ (opts["prop"], format_merge_props(fixed), branch_dir) error(err_msg)def analyze_revs(target_dir, url, begin=1, end=None, find_reflected=False): """For the source of the merges in the head url being merged into target_dir, analyze the revisions in the interval begin-end (which defaults to 1-HEAD), to find out which revisions are changes in the url, which are changes elsewhere (so-called 'phantom' revisions), and optionally which are reflected changes (to avoid conflicts that can occur when doing bidirectional merging between branches). Return a tuple of three RevisionSet's: (real_revs, phantom_revs, reflected_revs). NOTE: To maximize speed, if "end" is not provided, the function is not able to find phantom revisions following the last real revision in the URL. """ begin = str(begin) if end is None: end = "HEAD" else: end = str(end) if long(begin) > long(end): return RevisionSet(""), RevisionSet(""), RevisionSet("") logs[url] = RevisionLog(url, begin, end, find_reflected) revs = RevisionSet(logs[url].revs) if end == "HEAD": # If end is not provided, we do not know which is the latest revision # in the repository. So return the phantom revision set only up to # the latest known revision. end = str(list(revs)[-1]) phantom_revs = RevisionSet("%s-%s" % (begin, end)) - revs if find_reflected: reflected_revs = logs[url].merge_metadata().changed_revs(target_dir) else: reflected_revs = [] reflected_revs = RevisionSet(reflected_revs) return revs, phantom_revs, reflected_revsdef analyze_head_revs(branch_dir, head_url, **kwargs): """For the given branch and head, extract the real and phantom head revisions.""" branch_url = target_to_url(branch_dir) target_dir = target_to_repos_relative_path(branch_dir) # Extract the latest repository revision from the URL of the branch # directory (which is already cached at this point). end_rev = get_latest_rev(branch_url) # Calculate the base of analysis. If there is a "1-XX" interval in the # merged_revs, we do not need to check those. base = 1 r = opts["merged-revs"].normalized() if r and r[0][0] == 1: base = r[0][1] + 1 # See if the user filtered the revision set. If so, we are not # interested in something outside that range. if opts["revision"]: revs = RevisionSet(opts["revision"]).sorted() if base < revs[0]: base = revs[0] if end_rev > revs[-1]: end_rev = revs[-1] return analyze_revs(target_dir, head_url, base, end_rev, **kwargs)def minimal_merge_intervals(revs, phantom_revs): """Produce the smallest number of intervals suitable for merging. revs is the RevisionSet which we want to merge, and phantom_revs are phantom revisions which can be used to concatenate intervals, thus minimizing the number of operations.""" revnums = revs.normalized() ret = [] cur = revnums.pop() while revnums: next = revnums.pop() assert next[1] < cur[0] # otherwise it is not ordered assert cur[0] - next[1] > 1 # otherwise it is not normalized for i in range(next[1]+1, cur[0]): if i not in phantom_revs: ret.append(cur) cur = next break else: cur = (next[0], cur[1]) ret.append(cur) ret.reverse() return retdef display_revisions(revs, display_style, revisions_msg, head_url): """Show REVS as dictated by DISPLAY_STYLE, either numerically, in log format, or as diffs. When displaying revisions numerically, prefix output with REVISIONS_MSG when in verbose mode. Otherwise, request logs or diffs using HEAD_URL.""" if display_style == "revisions": if revs: report(revisions_msg) print revs elif display_style == "logs": for start,end in revs.normalized(): svn_command('log --incremental -v -r %d:%d %s' % \ (start, end, head_url)) elif display_style == "diffs": for start, end in revs.normalized(): print if start == end: print "%s: changes in revision %d follow" % (NAME, start) else: print "%s: changes in revisions %d-%d follow" % (NAME, start, end) print # Note: the starting revision number to 'svn diff' is # NOT inclusive so we have to subtract one from ${START}. svn_command("diff -r %d:%d %s" % (start - 1, end, head_url)) else: assert False, "unhandled display style: %s" % display_styledef action_init(branch_dir, branch_props): """Initialize a branch for merges.""" # Check branch directory is ready for being modified check_dir_clean(branch_dir) # Get the initial revision set if not explicitly specified. revs = opts["revision"] or "1-" + get_latest_rev(opts["head-url"]) revs = RevisionSet(revs) report('marking "%s" as already containing revisions "%s" of "%s"' % (branch_dir, revs, opts["head-url"])) revs = str(revs) # If the head-path already has an entry in the svnmerge-integrated # property, simply error out. if not opts["force"] and branch_props.has_key(opts["head-path"]): error('%s has already been initialized at %s\n' 'Use --force to re-initialize' % (opts["head-path"], branch_dir)) branch_props[opts["head-path"]] = revs # Set property set_merge_props(branch_dir, branch_props) # Write out commit message if desired if opts["commit-file"]: f = open(opts["commit-file"], "w") print >>f, 'Initialized merge tracking via "%s" with revisions "%s" from ' \ % (NAME, revs) print >>f, '%s' % opts["head-url"] f.close() report('wrote commit message to "%s"' % opts["commit-file"])def action_avail(branch_dir, branch_props): """Show commits available for merges.""" head_revs, phantom_revs, reflected_revs = \ analyze_head_revs(branch_dir, opts["head-url"], find_reflected=opts["bidirectional"]) report('skipping phantom revisions: %s' % phantom_revs) if reflected_revs: report('skipping reflected revisions: %s' % reflected_revs) blocked_revs = get_blocked_revs(branch_dir, opts["head-path"]) avail_revs = head_revs - opts["merged-revs"] - blocked_revs - reflected_revs # Compose the set of revisions to show revs = RevisionSet("") report_msg = "revisions available to be merged are:" if "avail" in opts["avail-showwhat"]: revs |= avail_revs if "blocked" in opts["avail-showwhat"]: revs |= blocked_revs report_msg = "revisions blocked are:" # Limit to revisions specified by -r (if any) if opts["revision"]: revs = revs & RevisionSet(opts["revision"]) display_revisions(revs, opts["avail-display"], report_msg, opts["head-url"])def action_integrated(branch_dir, branch_props):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -