📄 svn_fs.py
字号:
## Copyright (C) 2005 Edgewall Software# Copyright (C) 2005 Christopher Lenz <cmlenz@gmx.de>## This software is licensed as described in the file# LICENSE_FOR_PYTHON_BINDINGS, which you should have received as part# of this distribution. The terms are also available at# < http://subversion.tigris.org/license-for-python-bindings.html >.# If newer versions of this license are posted there, you may use a# newer version instead, at your option.## Author: Christopher Lenz <cmlenz@gmx.de>from __future__ import generatorsfrom trac.versioncontrol import Changeset, Node, Repositoryimport os.pathimport timeimport weakrefimport posixpathfrom svn import fs, repos, core, delta_kindmap = {core.svn_node_dir: Node.DIRECTORY, core.svn_node_file: Node.FILE}def _get_history(path, authz, fs_ptr, start, end, limit=None): history = [] if hasattr(repos, 'svn_repos_history2'): # For Subversion >= 1.1 def authz_cb(root, path, pool): if limit and len(history) >= limit: return 0 return authz.has_permission(path) and 1 or 0 def history2_cb(path, rev, pool): history.append((path, rev)) repos.svn_repos_history2(fs_ptr, path, history2_cb, authz_cb, start, end, 1) else: # For Subversion 1.0.x def history_cb(path, rev, pool): if authz.has_permission(path): history.append((path, rev)) repos.svn_repos_history(fs_ptr, path, history_cb, start, end, 1) for item in history: yield itemclass SubversionRepository(Repository): """ Repository implementation based on the svn.fs API. """ def __init__(self, path, authz): Repository.__init__(self, authz) if core.SVN_VER_MAJOR < 1: raise TracError, \ "Subversion >= 1.0 required: Found %d.%d.%d" % \ (core.SVN_VER_MAJOR, core.SVN_VER_MINOR, core.SVN_VER_MICRO) self.repos = None self.fs_ptr = None self.path = path # Remove any trailing slash or else subversion might abort if not os.path.split(path)[1]: path = os.path.split(path)[0] self.path = repos.svn_repos_find_root_path(path) if self.path is None: raise TracError, "%s does not appear to be a Subversion repository." % (path, ) if self.path != path: self.scope = path[len(self.path):] if not self.scope[-1] == '/': self.scope += '/' else: self.scope = '/' self.repos = repos.svn_repos_open(self.path) self.fs_ptr = repos.svn_repos_fs(self.repos) self.rev = fs.youngest_rev(self.fs_ptr) self.history = None if self.scope != '/': self.history = [] for path,rev in _get_history(self.scope[1:], self.authz, self.fs_ptr, 0, self.rev): self.history.append(rev) def __del__(self): self.close() def has_node(self, path, rev): rev_root = fs.revision_root(self.fs_ptr, rev) node_type = fs.check_path(rev_root, path) return node_type in _kindmap def normalize_path(self, path): return path == '/' and path or path and path.strip('/') or '' def normalize_rev(self, rev): try: rev = int(rev) except (ValueError, TypeError): rev = None if rev is None: rev = self.youngest_rev elif rev > self.youngest_rev: raise TracError, "Revision %s doesn't exist yet" % rev return rev def close(self): if self.repos: self.repos = None self.fs_ptr = None self.rev = None def get_changeset(self, rev): return SubversionChangeset(int(rev), self.authz, self.scope, self.fs_ptr) def get_node(self, path, rev=None): self.authz.assert_permission(self.scope + path) if path and path[-1] == '/': path = path[:-1] rev = self.normalize_rev(rev) return SubversionNode(path, rev, self.authz, self.scope, self.fs_ptr) def get_oldest_rev(self): rev = 0 if self.scope == '/': return rev return self.history[-1] def get_youngest_rev(self): rev = self.rev if self.scope == '/': return rev return self.history[0] def previous_rev(self, rev): rev = int(rev) if rev == 0: return None if self.scope == '/': return rev - 1 idx = self.history.index(rev) if idx + 1 < len(self.history): return self.history[idx + 1] return None def next_rev(self, rev): rev = int(rev) if rev == self.rev: return None if self.scope == '/': return rev + 1 if rev == 0: return self.oldest_rev idx = self.history.index(rev) if idx > 0: return self.history[idx - 1] return None def rev_older_than(self, rev1, rev2): return self.normalize_rev(rev1) < self.normalize_rev(rev2) def get_path_history(self, path, rev=None, limit=None): path = self.normalize_path(path) rev = self.normalize_rev(rev) expect_deletion = False while rev: if self.has_node(path, rev): if expect_deletion: # it was missing, now it's there again: rev+1 must be a delete yield path, rev+1, Changeset.DELETE newer = None # 'newer' is the previously seen history tuple older = None # 'older' is the currently examined history tuple for p, r in _get_history(path, self.authz, self.fs_ptr, 0, rev, limit): older = (self.normalize_path(p), r, Changeset.ADD) rev = self.previous_rev(r) if newer: if older[0] == path: # still on the path: 'newer' was an edit yield newer[0], newer[1], Changeset.EDIT else: # the path changed: 'newer' was a copy rev = self.previous_rev(newer[1]) # restart before the copy op yield newer[0], newer[1], Changeset.COPY older = (older[0], older[1], 'unknown') break newer = older if older: # either a real ADD or the source of a COPY yield older else: expect_deletion = True rev = self.previous_rev(rev) def get_deltas(self, old_path, old_rev, new_path, new_rev, ignore_ancestry=0): old_node = new_node = None old_rev = self.normalize_rev(old_rev) new_rev = self.normalize_rev(new_rev) if self.has_node(old_path, old_rev): old_node = self.get_node(old_path, old_rev) else: raise TracError, ('The Base for Diff is invalid: path %s' ' doesn\'t exist in revision %s' \ % (old_path, old_rev)) if self.has_node(new_path, new_rev): new_node = self.get_node(new_path, new_rev) else: raise TracError, ('The Target for Diff is invalid: path %s' ' doesn\'t exist in revision %s' \ % (new_path, new_rev)) if new_node.kind != old_node.kind: raise TracError, ('Diff mismatch: Base is a %s (%s in revision %s) ' 'and Target is a %s (%s in revision %s).' \ % (old_node.kind, old_path, old_rev, new_node.kind, new_path, new_rev)) if new_node.isdir: editor = DiffChangeEditor() e_ptr, e_baton = delta.make_editor(editor) old_root = fs.revision_root(self.fs_ptr, old_rev) new_root = fs.revision_root(self.fs_ptr, new_rev) def authz_cb(root, path, pool): return 1 text_deltas = 0 # as this is anyway re-done in Diff.py...
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -