📄 model.py
字号:
# ***** BEGIN LICENSE BLOCK *****# Version: MPL 1.1# # The contents of this file are subject to the Mozilla Public License # Version# 1.1 (the "License"); you may not use this file except in compliance # with# the License. You may obtain a copy of the License at# http://www.mozilla.org/MPL/# # Software distributed under the License is distributed on an "AS IS" # basis,# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the # License# for the specific language governing rights and limitations under the# License.# # The Original Code is Bespin.# # The Initial Developer of the Original Code is Mozilla.# Portions created by the Initial Developer are Copyright (C) 2009# the Initial Developer. All Rights Reserved.# # Contributor(s):# # ***** END LICENSE BLOCK *****# """Data classes for working with files/projects/users."""import osimport timefrom cStringIO import StringIOimport hashlibimport tarfileimport tempfileimport mimetypesimport zipfilefrom datetime import datetimeimport loggingimport reimport pkg_resourcesfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import (Column, PickleType, String, Integer, Boolean, Binary, Table, ForeignKey, DateTime, func, UniqueConstraint)from sqlalchemy.orm import relation, deferred, mapper, backreffrom sqlalchemy.exc import DBAPIErrorfrom sqlalchemy.orm.exc import NoResultFoundfrom bespin import configlog = logging.getLogger("bespin.model")Base = declarative_base()# quotas are expressed in 1 million byte incrementsQUOTA_UNITS = 1000000class ConflictError(Exception): pass class FSException(Exception): pass class FileNotFound(FSException): passclass FileConflict(FSException): passclass NotAuthorized(FSException): pass class OverQuota(FSException): passclass BadValue(FSException): pass class DB(object): def __init__(self, user_manager, file_manager): self.user_manager = user_manager self.file_manager = file_manager user_manager.db = self file_manager.db = self class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True) username = Column(String(128), unique=True) email = Column(String(128)) password = Column(String(20)) settings = Column(PickleType()) projects = relation('Project', backref='owner') quota = Column(Integer, default=10) amount_used = Column(Integer, default=0) def __init__(self, username, password, email): self.username = username self.email = email self.password = password self.settings = {} self.quota = config.c.default_quota def __str__(self): return "%s (%s-%s)" % (self.username, self.id, id(self)) def check_save(self, amount): """Confirms that the user can save this amount. Returns True if the user has enough available in their quota, False otherwise. """ return (self.quota * QUOTA_UNITS - self.amount_used - amount) > 0 def quota_info(self): """Returns the tuple of quota and amount_used""" return (self.quota * QUOTA_UNITS, self.amount_used) bad_characters = "<>| '\""invalid_chars = re.compile(r'[%s]' % bad_characters)class UserManager(object): def __init__(self, session): self.session = session def create_user(self, username, password, email): """Adds a new user with the given username and password. This raises a ConflictError is the user already exists.""" if invalid_chars.search(username): raise BadValue("Usernames cannot contain any of: %s" % bad_characters) log.debug("Creating user %s", username) user = User(username, password, email) self.session.add(user) # flush to ensure that the user is unique try: self.session.flush() except DBAPIError: raise ConflictError("Username %s is already in use" % username) file_manager = self.db.file_manager project = file_manager.get_project(user, user, "SampleProject", create=True) file_manager.install_template(user, project) return user def get_user(self, username): """Looks up a user by username. Returns None if the user is not found.""" return self.session.query(User).filter_by(username=username).first() class FileStatus(Base): __tablename__ = "filestatus" user_id = Column(Integer, ForeignKey('users.id', ondelete='cascade'), primary_key=True) file_id = Column(Integer, ForeignKey('files.id', ondelete='cascade'), primary_key=True) read_only = Column(Boolean) user = relation(User, backref="files") class File(Base): __tablename__ = "files" id = Column(Integer, primary_key=True) project_id = Column(Integer, ForeignKey('projects.id', ondelete='cascade'), nullable=False) project = relation('Project') name = Column(String(700), nullable=False) created = Column(DateTime, default=datetime.now) modified = Column(DateTime, onupdate=datetime.now) saved_size = Column(Integer) data = deferred(Column(Binary)) edits = deferred(Column(PickleType)) dir_id = Column(Integer, ForeignKey('directories.id', ondelete='cascade')) dir = relation('Directory', backref="files") users = relation(FileStatus, backref="file") __table_args__ = (UniqueConstraint("project_id", "name"), {}) @property def short_name(self): elems = self.name.rsplit("/", 1) if len(elems) == 1: return self.name else: return elems[1] @property def mimetype(self): """Returns the mimetype of the file, or application/octet-stream if it cannot be guessed.""" t = mimetypes.guess_type(self.name) if t: return t[0] return "application/octet-stream" def __repr__(self): return "File: %s" % (self.name) project_members = Table('members', Base.metadata, Column('project_id', Integer, ForeignKey('projects.id', ondelete='cascade')), Column('user_id', Integer, ForeignKey('users.id', ondelete='cascade'))) class Directory(Base): __tablename__ = "directories" id = Column(Integer, primary_key=True) project_id = Column(Integer, ForeignKey('projects.id', ondelete='cascade'), nullable=False) project = relation('Project', backref="directories") name = Column(String(700), nullable=False) parent_id = Column(Integer, ForeignKey('directories.id', ondelete='cascade')) subdirs = relation('Directory', backref=backref("parent", remote_side=[id])) __table_args__ = (UniqueConstraint("project_id", "name"), {}) @property def short_name(self): return self.name.rsplit("/", 2)[-2] + "/" def __str__(self): return "Dir: %s" % (self.name) __repr__ = __str__ class Project(Base): __tablename__ = "projects" id = Column(Integer, primary_key=True) name = Column(String(60), nullable=False) members = relation("User", secondary=project_members, lazy=False) user_id = Column(Integer, ForeignKey('users.id', ondelete='cascade'), nullable=False) __table_args__ = (UniqueConstraint("user_id", "name"), {}) def authorize(self, user): log.debug("Checking user %s access to project %s owned by %s with members %s", user, self.name, self.owner, self.members) if user != self.owner and user not in self.members: raise NotAuthorized("You are not authorized to access that project.") def authorize_user(self, user, auth_user): """user is requesting to allow auth_user to access this project. user must be this project's owner.""" if user != self.owner: raise NotAuthorized("Only the project owner can authorize users.") if auth_user not in self.members: self.members.append(auth_user) def unauthorize_user(self, user, auth_user): """user wants auth_user to no longer be able to access this project. user must be the project owner.""" if user != self.owner: raise NotAuthorized("Only the project owner can unauthorize users.") try: self.members.remove(auth_user) except KeyError: pass @property def short_name(self): return self.name + "/" def __repr__(self): return "Project(name=%s)" % (self.name)_text_types = set(['.txt', '.html', '.htm', '.css', '.js', '.py', '.pl'])def _cmp_files_in_project(fs1, fs2): file1 = fs1.file file2 = fs2.file proj_diff = cmp(file1.project.name, file2.project.name) if not proj_diff: return cmp(file1.name, file2.name) return proj_diffclass FileManager(object): def __init__(self, session): self.session = session def get_file(self, user, project, path, mode="rw"): """Gets the contents of the file as a string. Raises FileNotFound if the file does not exist. The file is marked as open after this call.""" file_obj = self._check_and_get_file(user, project, path) self._save_status(file_obj, user, mode) contents = str(file_obj.data) return contents def _check_and_get_file(self, user, project, path): """Returns the project, user object, file object.""" s = self.session try: file_obj = s.query(File).filter_by(name=path) \ .filter_by(project=project).one() except NoResultFound: raise FileNotFound("File %s in project %s does not exist" % (path, project.name)) if file_obj.data == None: raise FileNotFound("File %s in project %s does not exist" % (path, project.name)) return file_obj def get_file_object(self, user, project, path): """Retrieves the File instance from the project at the path provided.""" file_obj = self._check_and_get_file(user, project, path) return file_obj def _save_status(self, file_obj, user_obj, mode="rw"): s = self.session readonly = mode != "rw" try: status_obj = s.query(FileStatus).filter_by(file_id=file_obj.id) \ .filter_by(user_id=user_obj.id).one() if status_obj.read_only != readonly: status_obj.read_only = readonly except NoResultFound: status_obj = FileStatus(user_id = user_obj.id, file=file_obj, read_only=readonly) s.add(status_obj) if status_obj not in user_obj.files: user_obj.files.append(status_obj) if status_obj not in file_obj.users: file_obj.users.append(status_obj) def list_files(self, user, project=None, path=""): """Retrieve a list of files at the path. Directories will have '/' at the end of the name. If project is None, this will return the projects owned by the user.""" if not project: return sorted(user.projects, key=lambda proj: proj.name) try: dir = self.session.query(Directory).filter_by(name=path) \ .filter_by(project=project).one() except NoResultFound: raise FileNotFound(path) result = set(dir.subdirs) result.update(set(dir.files)) return sorted(result, key=lambda item: item.name) def get_project(self, user, owner, project_name, create=False, clean=False): """Retrieves the project object, optionally creating it if it doesn't exist. Additionally, this will verify that the user is authorized for the project. If the project_name is actually a project object, that object is simply returned.""" s = self.session # a request for a clean project also implies that creating it # is okay if clean: create = True try: project = s.query(Project).filter_by(name=project_name)\ .filter_by(owner=owner).one() project.authorize(user) if clean: # a clean project has been requested, so we will delete its # contents self.delete(user, project)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -