⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 model.py

📁 在线编辑器
💻 PY
📖 第 1 页 / 共 2 页
字号:
        except NoResultFound:            if not create:                raise FileNotFound("Project %s not found" % project_name)            log.debug("Creating new project %s", project_name)            project = Project(name=project_name, owner=user)            s.add(project)        return project            def save_file(self, user, project, path, contents=None, last_edit=None):        """Saves the contents to the file path provided, creating        directories as needed in between. If last_edit is not provided,        the file must not be opened for editing. Otherwise, the        last_edit parameter should include the last edit ID received by        the user."""        saved_size = len(contents) if contents is not None else 0        if not user.check_save(saved_size):            raise OverQuota()                s = self.session        segments = path.split("/")        fn = segments[-1]                # temporary step to replace tabs with 4 spaces.        file_type = os.path.splitext(path)[1]        if file_type in _text_types:            contents = contents.replace("\t", "    ")                last_d = None        for i in range(0, len(segments)):            if i == 0:                segment = ""            else:                segment = "/".join(segments[0:i]) + "/"            try:                d = s.query(Directory).filter_by(name=segment) \                        .filter_by(project=project).one()            except NoResultFound:                d = Directory(name=segment, project=project)                s.add(d)                if last_d:                    last_d.subdirs.append(d)            last_d = d        if not last_d:            raise FSException("Unable to get to path %s from the root" % path)                    # we're actually just creating a directory        if path.endswith('/') or not path:            return                    subdir_names = [item.name for item in last_d.subdirs]        if (path + "/") in subdir_names:            raise FileConflict("Cannot save a file at %s because there is a directory there." % path)                try:            file = s.query(File).filter_by(name=path) \                    .filter_by(project=project).one()            file.data = contents            size_change = saved_size - file.saved_size            user.amount_used += size_change            file.saved_size = saved_size        except NoResultFound:            file = File(name=path, dir=last_d, data=contents,                        saved_size=saved_size, project=project)            user.amount_used += saved_size            s.add(file)                    self.reset_edits(user, project, path)        return file            def list_open(self, user):        """list open files for the current user. a dictionary of { project: { filename: mode } } will be returned. For example, if subdir1/subdir2/test.py is open read/write, openfiles will return { "subdir1": { "somedir2/test.py": {"mode" : "rw"} } }"""        output = {}        current_files = None        last_proj = None        for fs in sorted(user.files, cmp=_cmp_files_in_project):            file = fs.file            path = file.name            project = file.project.name            if project != last_proj:                last_proj = project                current_files = {}                output[project] = current_files            mode = "rw" if not fs.read_only else "r"            current_files[path] = dict(mode=mode)                    return output            def close(self, user, project, path):        """Close the file for the given user"""        s = self.session        try:            file_obj = s.query(File).filter_by(name=path) \                .filter_by(project=project).one()        except NoResultFound:            return                    try:            fs = s.query(FileStatus).filter_by(user_id=user.id) \                    .filter_by(file_id=file_obj.id).one()        except NoResultFound:            return                self.reset_edits(user, project, path)        def delete(self, user, project, path=""):        """Deletes a file, as long as it is not opened. If the file is        open, a FileConflict is raised. If the path is a directory,        the directory and everything underneath it will be deleted.        If the path is empty, the project will be deleted."""        s = self.session        if not path or path.endswith("/"):            try:                dir_obj = s.query(Directory).filter_by(name=path) \                            .filter_by(project=project).one()            except NoResultFound:                raise FileNotFound("Directory %s not found in project %s" %                                    (path, project.name))                            file_space = s.query(func.sum(File.saved_size)) \                            .filter(File.name.like(path + "%")) \                            .filter_by(project=project).one()[0]            q = s.query(Directory).filter(Directory.name.like(path + "%")) \                    .filter_by(project=project)            q.delete()                        if file_space is not None:                user.amount_used -= file_space                            s.query(File).filter(File.name.like(path + "%")) \                .filter_by(project=project).delete()            if not path:                s.delete(project)        else:            try:                file_obj = s.query(File).filter_by(name=path) \                    .filter_by(project=project).one()            except NoResultFound:                raise FileNotFound("File %s not found in project %s" %                                    (path, project.name))            open_users = set(s.user for s in file_obj.users)            open_users.difference_update(set([user]))            if open_users:                raise FileConflict(                    "File %s in project %s is in use by another user"                    % (path, project.name))            # make sure we delete the open status if the current            # user has it open            for fs in file_obj.users:                s.delete(fs)            user.amount_used -= file_obj.saved_size            s.delete(file_obj)            def save_edit(self, user, project, path, edit):        s = self.session        try:            file_obj = s.query(File).filter_by(name=path) \                .filter_by(project=project).one()        except NoResultFound:            file_obj = self.save_file(user, project, path, None)                if file_obj.edits is None:            file_obj.edits = []                file_obj.edits.append(edit)                self._save_status(file_obj, user, "rw")            def list_edits(self, user, project, path, start_at=0):        try:            file_obj = self.session.query(File).filter_by(name=path) \                .filter_by(project=project).one()        except NoResultFound:            raise FileNotFound("File %s in project %s does not exist"                    % (path, project.name))        edits = file_obj.edits        if start_at:            if start_at >= len(edits):                raise FSException("%s only has %s edits (after %s requested)"                    % (path, len(edits), start_at))            edits = edits[start_at:]        return edits            def reset_edits(self, user, project=None, path=None):        if not project or not path:            for fs in user.files[:]:                file_obj = fs.file                self.reset_edits(user, file_obj.project, file_obj.name)            return        s = self.session        try:            file_obj = s.query(File).filter_by(name=path) \                        .filter_by(project=project).one()        except NoResultFound:            return        file_obj.edits = []        for fs in file_obj.users:            if fs.user == user:                s.delete(fs)                file_obj.users.remove(fs)                if fs in user.files:                    user.files.remove(fs)                break                def install_template(self, user, project, template="template"):        """Installs a set of template files into a new project."""        log.debug("Installing template %s for user %s as project %s",                template, user, project)        source_dir = pkg_resources.resource_filename("bespin", template)        common_path_len = len(source_dir) + 1        for dirpath, dirnames, filenames in os.walk(source_dir):            destdir = dirpath[common_path_len:]            if '.svn' in destdir:                continue            for f in filenames:                if destdir:                    destpath = "%s/%s" % (destdir, f)                else:                    destpath = f                contents = open(os.path.join(dirpath, f)).read()                self.save_file(user, project, destpath, contents)                    def authorize_user(self, user, project, auth_user):        """Allow auth_user to access project which is owned by user."""        project.authorize_user(user, auth_user)            def unauthorize_user(self, user, project, auth_user):        """Disallow auth_user from accessing project_name which is owned        by user."""        project.unauthorize_user(user, auth_user)            def import_tarball(self, user, project, filename, file_obj):        """Imports the tarball in the file_obj into the project        project owned by user. If the project already exists,        IT WILL BE WIPED OUT AND REPLACED."""        pfile = tarfile.open(filename, fileobj=file_obj)        max_import_file_size = config.c.max_import_file_size        info = list(pfile)                base = _find_common_base(member.name for member in info)        base_len = len(base)                for member in info:            # save the files, directories are created automatically            # note that this does not currently support empty directories.            if member.isreg():                if member.size > max_import_file_size:                    raise FSException("File %s too large (max is %s bytes)"                         % (member.name, max_import_file_size))                self.save_file(user, project, member.name[base_len:],                     pfile.extractfile(member).read())            def import_zipfile(self, user, project, filename, file_obj):        """Imports the zip file in the file_obj into the project        project owned by user. If the project already exists,        IT WILL BE WIPED OUT AND REPLACED."""        max_import_file_size = config.c.max_import_file_size                pfile = zipfile.ZipFile(file_obj)        info = pfile.infolist()                base = _find_common_base(member.filename for member in info)        base_len = len(base)                for member in pfile.infolist():            if member.filename.endswith("/"):                continue            if member.file_size > max_import_file_size:                raise FSException("File %s too large (max is %s bytes)"                     % (member.filename, max_import_file_size))            self.save_file(user, project, member.filename[base_len:],                pfile.read(member.filename))            def export_tarball(self, user, project):        """Exports the project as a tarball, returning a         NamedTemporaryFile object. You can either use that        open file handle or use the .name property to get        at the file."""        temporaryfile = tempfile.NamedTemporaryFile()        s = self.session                mtime = time.time()        tfile = tarfile.open(temporaryfile.name, "w:gz")                dirs = s.query(Directory) \                        .filter_by(project=project).order_by(Directory.name).all()                                for dir in dirs:            tarinfo = tarfile.TarInfo(str(project.name + "/" + dir.name))            tarinfo.type = tarfile.DIRTYPE            # we don't know the original permissions.            # we'll default to read/execute for all, write only by user            tarinfo.mode = 493            tarinfo.mtime = mtime            tfile.addfile(tarinfo)            for file in dir.files:                tarinfo = tarfile.TarInfo(str(project.name + "/" + file.name))                tarinfo.mtime = mtime                # we don't know the original permissions.                # we'll default to read for all, write only by user                tarinfo.mode = 420                data = str(file.data)                tarinfo.size = len(data)                fileobj = StringIO(data)                tfile.addfile(tarinfo, fileobj)                                # ditch the file, because these objects can get big                s.expunge(file)                            s.expunge(dir)                tfile.close()        temporaryfile.seek(0)        return temporaryfile            def export_zipfile(self, user, project):        """Exports the project as a zip file, returning a         NamedTemporaryFile object. You can either use that        open file handle or use the .name property to get        at the file."""        temporaryfile = tempfile.NamedTemporaryFile()        s = self.session                zfile = zipfile.ZipFile(temporaryfile, "w", zipfile.ZIP_DEFLATED)        ztime = time.gmtime()[:6]                files = s.query(File) \                    .filter_by(project=project).order_by(File.name).all()        for file in files:            zipinfo = zipfile.ZipInfo(str(project.name + "/" + file.name))            # we don't know the original permissions.            # we'll default to read for all, write only by user            zipinfo.external_attr = 420 << 16L            zipinfo.date_time = ztime            zipinfo.compress_type = zipfile.ZIP_DEFLATED            zfile.writestr(zipinfo, str(file.data))            s.expunge(file)                    zfile.close()        temporaryfile.seek(0)        return temporaryfile            def recompute_used(self, user):        """Recomputes how much space the user has used."""        s = self.session                total = 0        for project in user.projects:            total += s.query(func.sum(File.saved_size)) \                            .filter_by(project=project).one()[0]        user.amount_used = total            def rename(self, user, project, path, new_name):        """Right now, path is ignored and this just renames the project        to new_name."""        segments = new_name.split('/')        other_project = self.session.query(Project).filter_by(name=new_name) \            .filter_by(owner=user).first()        if other_project:            raise ConflictError("You already have a project with the name %s" %                                new_name)        project.name = segments[0]                def _find_common_base(member_names):    base = None    base_len = None    for name in member_names:        if base is None:            slash = name.find("/")            base = name[:slash+1]            base_len = len(base)            continue        if name[:base_len] != base:            base = ""            break    return base    

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -