📄 model.py
字号:
except NoResultFound: if not create: raise FileNotFound("Project %s not found" % project_name) log.debug("Creating new project %s", project_name) project = Project(name=project_name, owner=user) s.add(project) return project def save_file(self, user, project, path, contents=None, last_edit=None): """Saves the contents to the file path provided, creating directories as needed in between. If last_edit is not provided, the file must not be opened for editing. Otherwise, the last_edit parameter should include the last edit ID received by the user.""" saved_size = len(contents) if contents is not None else 0 if not user.check_save(saved_size): raise OverQuota() s = self.session segments = path.split("/") fn = segments[-1] # temporary step to replace tabs with 4 spaces. file_type = os.path.splitext(path)[1] if file_type in _text_types: contents = contents.replace("\t", " ") last_d = None for i in range(0, len(segments)): if i == 0: segment = "" else: segment = "/".join(segments[0:i]) + "/" try: d = s.query(Directory).filter_by(name=segment) \ .filter_by(project=project).one() except NoResultFound: d = Directory(name=segment, project=project) s.add(d) if last_d: last_d.subdirs.append(d) last_d = d if not last_d: raise FSException("Unable to get to path %s from the root" % path) # we're actually just creating a directory if path.endswith('/') or not path: return subdir_names = [item.name for item in last_d.subdirs] if (path + "/") in subdir_names: raise FileConflict("Cannot save a file at %s because there is a directory there." % path) try: file = s.query(File).filter_by(name=path) \ .filter_by(project=project).one() file.data = contents size_change = saved_size - file.saved_size user.amount_used += size_change file.saved_size = saved_size except NoResultFound: file = File(name=path, dir=last_d, data=contents, saved_size=saved_size, project=project) user.amount_used += saved_size s.add(file) self.reset_edits(user, project, path) return file def list_open(self, user): """list open files for the current user. a dictionary of { project: { filename: mode } } will be returned. For example, if subdir1/subdir2/test.py is open read/write, openfiles will return { "subdir1": { "somedir2/test.py": {"mode" : "rw"} } }""" output = {} current_files = None last_proj = None for fs in sorted(user.files, cmp=_cmp_files_in_project): file = fs.file path = file.name project = file.project.name if project != last_proj: last_proj = project current_files = {} output[project] = current_files mode = "rw" if not fs.read_only else "r" current_files[path] = dict(mode=mode) return output def close(self, user, project, path): """Close the file for the given user""" s = self.session try: file_obj = s.query(File).filter_by(name=path) \ .filter_by(project=project).one() except NoResultFound: return try: fs = s.query(FileStatus).filter_by(user_id=user.id) \ .filter_by(file_id=file_obj.id).one() except NoResultFound: return self.reset_edits(user, project, path) def delete(self, user, project, path=""): """Deletes a file, as long as it is not opened. If the file is open, a FileConflict is raised. If the path is a directory, the directory and everything underneath it will be deleted. If the path is empty, the project will be deleted.""" s = self.session if not path or path.endswith("/"): try: dir_obj = s.query(Directory).filter_by(name=path) \ .filter_by(project=project).one() except NoResultFound: raise FileNotFound("Directory %s not found in project %s" % (path, project.name)) file_space = s.query(func.sum(File.saved_size)) \ .filter(File.name.like(path + "%")) \ .filter_by(project=project).one()[0] q = s.query(Directory).filter(Directory.name.like(path + "%")) \ .filter_by(project=project) q.delete() if file_space is not None: user.amount_used -= file_space s.query(File).filter(File.name.like(path + "%")) \ .filter_by(project=project).delete() if not path: s.delete(project) else: try: file_obj = s.query(File).filter_by(name=path) \ .filter_by(project=project).one() except NoResultFound: raise FileNotFound("File %s not found in project %s" % (path, project.name)) open_users = set(s.user for s in file_obj.users) open_users.difference_update(set([user])) if open_users: raise FileConflict( "File %s in project %s is in use by another user" % (path, project.name)) # make sure we delete the open status if the current # user has it open for fs in file_obj.users: s.delete(fs) user.amount_used -= file_obj.saved_size s.delete(file_obj) def save_edit(self, user, project, path, edit): s = self.session try: file_obj = s.query(File).filter_by(name=path) \ .filter_by(project=project).one() except NoResultFound: file_obj = self.save_file(user, project, path, None) if file_obj.edits is None: file_obj.edits = [] file_obj.edits.append(edit) self._save_status(file_obj, user, "rw") def list_edits(self, user, project, path, start_at=0): try: file_obj = self.session.query(File).filter_by(name=path) \ .filter_by(project=project).one() except NoResultFound: raise FileNotFound("File %s in project %s does not exist" % (path, project.name)) edits = file_obj.edits if start_at: if start_at >= len(edits): raise FSException("%s only has %s edits (after %s requested)" % (path, len(edits), start_at)) edits = edits[start_at:] return edits def reset_edits(self, user, project=None, path=None): if not project or not path: for fs in user.files[:]: file_obj = fs.file self.reset_edits(user, file_obj.project, file_obj.name) return s = self.session try: file_obj = s.query(File).filter_by(name=path) \ .filter_by(project=project).one() except NoResultFound: return file_obj.edits = [] for fs in file_obj.users: if fs.user == user: s.delete(fs) file_obj.users.remove(fs) if fs in user.files: user.files.remove(fs) break def install_template(self, user, project, template="template"): """Installs a set of template files into a new project.""" log.debug("Installing template %s for user %s as project %s", template, user, project) source_dir = pkg_resources.resource_filename("bespin", template) common_path_len = len(source_dir) + 1 for dirpath, dirnames, filenames in os.walk(source_dir): destdir = dirpath[common_path_len:] if '.svn' in destdir: continue for f in filenames: if destdir: destpath = "%s/%s" % (destdir, f) else: destpath = f contents = open(os.path.join(dirpath, f)).read() self.save_file(user, project, destpath, contents) def authorize_user(self, user, project, auth_user): """Allow auth_user to access project which is owned by user.""" project.authorize_user(user, auth_user) def unauthorize_user(self, user, project, auth_user): """Disallow auth_user from accessing project_name which is owned by user.""" project.unauthorize_user(user, auth_user) def import_tarball(self, user, project, filename, file_obj): """Imports the tarball in the file_obj into the project project owned by user. If the project already exists, IT WILL BE WIPED OUT AND REPLACED.""" pfile = tarfile.open(filename, fileobj=file_obj) max_import_file_size = config.c.max_import_file_size info = list(pfile) base = _find_common_base(member.name for member in info) base_len = len(base) for member in info: # save the files, directories are created automatically # note that this does not currently support empty directories. if member.isreg(): if member.size > max_import_file_size: raise FSException("File %s too large (max is %s bytes)" % (member.name, max_import_file_size)) self.save_file(user, project, member.name[base_len:], pfile.extractfile(member).read()) def import_zipfile(self, user, project, filename, file_obj): """Imports the zip file in the file_obj into the project project owned by user. If the project already exists, IT WILL BE WIPED OUT AND REPLACED.""" max_import_file_size = config.c.max_import_file_size pfile = zipfile.ZipFile(file_obj) info = pfile.infolist() base = _find_common_base(member.filename for member in info) base_len = len(base) for member in pfile.infolist(): if member.filename.endswith("/"): continue if member.file_size > max_import_file_size: raise FSException("File %s too large (max is %s bytes)" % (member.filename, max_import_file_size)) self.save_file(user, project, member.filename[base_len:], pfile.read(member.filename)) def export_tarball(self, user, project): """Exports the project as a tarball, returning a NamedTemporaryFile object. You can either use that open file handle or use the .name property to get at the file.""" temporaryfile = tempfile.NamedTemporaryFile() s = self.session mtime = time.time() tfile = tarfile.open(temporaryfile.name, "w:gz") dirs = s.query(Directory) \ .filter_by(project=project).order_by(Directory.name).all() for dir in dirs: tarinfo = tarfile.TarInfo(str(project.name + "/" + dir.name)) tarinfo.type = tarfile.DIRTYPE # we don't know the original permissions. # we'll default to read/execute for all, write only by user tarinfo.mode = 493 tarinfo.mtime = mtime tfile.addfile(tarinfo) for file in dir.files: tarinfo = tarfile.TarInfo(str(project.name + "/" + file.name)) tarinfo.mtime = mtime # we don't know the original permissions. # we'll default to read for all, write only by user tarinfo.mode = 420 data = str(file.data) tarinfo.size = len(data) fileobj = StringIO(data) tfile.addfile(tarinfo, fileobj) # ditch the file, because these objects can get big s.expunge(file) s.expunge(dir) tfile.close() temporaryfile.seek(0) return temporaryfile def export_zipfile(self, user, project): """Exports the project as a zip file, returning a NamedTemporaryFile object. You can either use that open file handle or use the .name property to get at the file.""" temporaryfile = tempfile.NamedTemporaryFile() s = self.session zfile = zipfile.ZipFile(temporaryfile, "w", zipfile.ZIP_DEFLATED) ztime = time.gmtime()[:6] files = s.query(File) \ .filter_by(project=project).order_by(File.name).all() for file in files: zipinfo = zipfile.ZipInfo(str(project.name + "/" + file.name)) # we don't know the original permissions. # we'll default to read for all, write only by user zipinfo.external_attr = 420 << 16L zipinfo.date_time = ztime zipinfo.compress_type = zipfile.ZIP_DEFLATED zfile.writestr(zipinfo, str(file.data)) s.expunge(file) zfile.close() temporaryfile.seek(0) return temporaryfile def recompute_used(self, user): """Recomputes how much space the user has used.""" s = self.session total = 0 for project in user.projects: total += s.query(func.sum(File.saved_size)) \ .filter_by(project=project).one()[0] user.amount_used = total def rename(self, user, project, path, new_name): """Right now, path is ignored and this just renames the project to new_name.""" segments = new_name.split('/') other_project = self.session.query(Project).filter_by(name=new_name) \ .filter_by(owner=user).first() if other_project: raise ConflictError("You already have a project with the name %s" % new_name) project.name = segments[0] def _find_common_base(member_names): base = None base_len = None for name in member_names: if base is None: slash = name.find("/") base = name[:slash+1] base_len = len(base) continue if name[:base_len] != base: base = "" break return base
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -