⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 model.py

📁 在线编辑器
💻 PY
📖 第 1 页 / 共 2 页
字号:
#  ***** BEGIN LICENSE BLOCK *****# Version: MPL 1.1# # The contents of this file are subject to the Mozilla Public License  # Version# 1.1 (the "License"); you may not use this file except in compliance  # with# the License. You may obtain a copy of the License at# http://www.mozilla.org/MPL/# # Software distributed under the License is distributed on an "AS IS"  # basis,# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the  # License# for the specific language governing rights and limitations under the# License.# # The Original Code is Bespin.# # The Initial Developer of the Original Code is Mozilla.# Portions created by the Initial Developer are Copyright (C) 2009# the Initial Developer. All Rights Reserved.# # Contributor(s):# # ***** END LICENSE BLOCK *****# """Data classes for working with files/projects/users."""import osimport timefrom cStringIO import StringIOimport hashlibimport tarfileimport tempfileimport mimetypesimport zipfilefrom datetime import datetimeimport loggingimport reimport pkg_resourcesfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import (Column, PickleType, String, Integer,                    Boolean, Binary, Table, ForeignKey,                    DateTime, func, UniqueConstraint)from sqlalchemy.orm import relation, deferred, mapper, backreffrom sqlalchemy.exc import DBAPIErrorfrom sqlalchemy.orm.exc import NoResultFoundfrom bespin import configlog = logging.getLogger("bespin.model")Base = declarative_base()# quotas are expressed in 1 million byte incrementsQUOTA_UNITS = 1000000class ConflictError(Exception):    pass    class FSException(Exception):    pass    class FileNotFound(FSException):    passclass FileConflict(FSException):    passclass NotAuthorized(FSException):    pass    class OverQuota(FSException):    passclass BadValue(FSException):    pass    class DB(object):    def __init__(self, user_manager, file_manager):        self.user_manager = user_manager        self.file_manager = file_manager                user_manager.db = self        file_manager.db = self    class User(Base):    __tablename__ = "users"        id = Column(Integer, primary_key=True)    username = Column(String(128), unique=True)    email = Column(String(128))    password = Column(String(20))    settings = Column(PickleType())    projects = relation('Project', backref='owner')    quota = Column(Integer, default=10)    amount_used = Column(Integer, default=0)        def __init__(self, username, password, email):        self.username = username        self.email = email        self.password = password        self.settings = {}        self.quota = config.c.default_quota            def __str__(self):        return "%s (%s-%s)" % (self.username, self.id, id(self))            def check_save(self, amount):        """Confirms that the user can save this amount. Returns True        if the user has enough available in their quota, False otherwise.        """        return (self.quota * QUOTA_UNITS - self.amount_used - amount) > 0            def quota_info(self):        """Returns the tuple of quota and amount_used"""        return (self.quota * QUOTA_UNITS, self.amount_used)    bad_characters = "<>| '\""invalid_chars = re.compile(r'[%s]' % bad_characters)class UserManager(object):    def __init__(self, session):        self.session = session            def create_user(self, username, password, email):        """Adds a new user with the given username and password.        This raises a ConflictError is the user already        exists."""        if invalid_chars.search(username):            raise BadValue("Usernames cannot contain any of: %s"                % bad_characters)        log.debug("Creating user %s", username)        user = User(username, password, email)        self.session.add(user)        # flush to ensure that the user is unique        try:            self.session.flush()        except DBAPIError:            raise ConflictError("Username %s is already in use" % username)                file_manager = self.db.file_manager        project = file_manager.get_project(user, user,                     "SampleProject", create=True)        file_manager.install_template(user, project)        return user            def get_user(self, username):        """Looks up a user by username. Returns None if the user is not        found."""        return self.session.query(User).filter_by(username=username).first()            class FileStatus(Base):    __tablename__ = "filestatus"        user_id = Column(Integer, ForeignKey('users.id', ondelete='cascade'), primary_key=True)    file_id = Column(Integer, ForeignKey('files.id', ondelete='cascade'), primary_key=True)    read_only = Column(Boolean)    user = relation(User, backref="files")    class File(Base):    __tablename__ = "files"        id = Column(Integer, primary_key=True)    project_id = Column(Integer, ForeignKey('projects.id', ondelete='cascade'), nullable=False)    project = relation('Project')    name = Column(String(700), nullable=False)    created = Column(DateTime, default=datetime.now)    modified = Column(DateTime, onupdate=datetime.now)    saved_size = Column(Integer)    data = deferred(Column(Binary))    edits = deferred(Column(PickleType))    dir_id = Column(Integer, ForeignKey('directories.id', ondelete='cascade'))    dir = relation('Directory', backref="files")        users = relation(FileStatus,                     backref="file")                         __table_args__ = (UniqueConstraint("project_id", "name"), {})        @property    def short_name(self):        elems = self.name.rsplit("/", 1)        if len(elems) == 1:            return self.name        else:            return elems[1]            @property    def mimetype(self):        """Returns the mimetype of the file, or application/octet-stream         if it cannot be guessed."""        t = mimetypes.guess_type(self.name)        if t:            return t[0]        return "application/octet-stream"                         def __repr__(self):        return "File: %s" % (self.name)        project_members = Table('members', Base.metadata,                        Column('project_id', Integer, ForeignKey('projects.id', ondelete='cascade')),                        Column('user_id', Integer, ForeignKey('users.id', ondelete='cascade')))    class Directory(Base):    __tablename__ = "directories"        id = Column(Integer, primary_key=True)    project_id = Column(Integer, ForeignKey('projects.id', ondelete='cascade'), nullable=False)    project = relation('Project', backref="directories")    name = Column(String(700), nullable=False)    parent_id = Column(Integer, ForeignKey('directories.id', ondelete='cascade'))    subdirs = relation('Directory', backref=backref("parent",                                         remote_side=[id]))        __table_args__ = (UniqueConstraint("project_id", "name"), {})            @property    def short_name(self):        return self.name.rsplit("/", 2)[-2] + "/"            def __str__(self):        return "Dir: %s" % (self.name)        __repr__ = __str__    class Project(Base):    __tablename__ = "projects"        id = Column(Integer, primary_key=True)    name = Column(String(60), nullable=False)    members = relation("User", secondary=project_members, lazy=False)    user_id = Column(Integer, ForeignKey('users.id', ondelete='cascade'), nullable=False)        __table_args__ = (UniqueConstraint("user_id", "name"), {})        def authorize(self, user):        log.debug("Checking user %s access to project %s owned by %s with members %s",            user, self.name, self.owner, self.members)        if user != self.owner and user not in self.members:            raise NotAuthorized("You are not authorized to access that project.")                def authorize_user(self, user, auth_user):        """user is requesting to allow auth_user to access this        project. user must be this project's owner."""        if user != self.owner:            raise NotAuthorized("Only the project owner can authorize users.")        if auth_user not in self.members:            self.members.append(auth_user)            def unauthorize_user(self, user, auth_user):        """user wants auth_user to no longer be able to access this        project. user must be the project owner."""        if user != self.owner:            raise NotAuthorized("Only the project owner can unauthorize users.")        try:            self.members.remove(auth_user)        except KeyError:            pass        @property    def short_name(self):        return self.name + "/"                def __repr__(self):        return "Project(name=%s)" % (self.name)_text_types = set(['.txt', '.html', '.htm', '.css', '.js', '.py', '.pl'])def _cmp_files_in_project(fs1, fs2):    file1 = fs1.file    file2 = fs2.file    proj_diff = cmp(file1.project.name, file2.project.name)    if not proj_diff:        return cmp(file1.name, file2.name)    return proj_diffclass FileManager(object):    def __init__(self, session):        self.session = session            def get_file(self, user, project, path, mode="rw"):        """Gets the contents of the file as a string. Raises        FileNotFound if the file does not exist. The file is         marked as open after this call."""                file_obj = self._check_and_get_file(user, project, path)        self._save_status(file_obj, user, mode)                contents = str(file_obj.data)        return contents            def _check_and_get_file(self, user, project, path):        """Returns the project, user object, file object."""        s = self.session                try:            file_obj = s.query(File).filter_by(name=path) \                        .filter_by(project=project).one()        except NoResultFound:            raise FileNotFound("File %s in project %s does not exist"                                 % (path, project.name))                if file_obj.data == None:            raise FileNotFound("File %s in project %s does not exist"                                 % (path, project.name))                return file_obj            def get_file_object(self, user, project, path):        """Retrieves the File instance from the project at the        path provided."""        file_obj = self._check_and_get_file(user, project, path)        return file_obj        def _save_status(self, file_obj, user_obj, mode="rw"):        s = self.session                readonly = mode != "rw"        try:            status_obj = s.query(FileStatus).filter_by(file_id=file_obj.id) \                            .filter_by(user_id=user_obj.id).one()            if status_obj.read_only != readonly:                status_obj.read_only = readonly        except NoResultFound:            status_obj = FileStatus(user_id = user_obj.id, file=file_obj,                                    read_only=readonly)            s.add(status_obj)            if status_obj not in user_obj.files:                user_obj.files.append(status_obj)            if status_obj not in file_obj.users:                file_obj.users.append(status_obj)                    def list_files(self, user, project=None, path=""):        """Retrieve a list of files at the path. Directories will have        '/' at the end of the name.                If project is None, this will return the projects        owned by the user."""        if not project:            return sorted(user.projects, key=lambda proj: proj.name)        try:            dir = self.session.query(Directory).filter_by(name=path) \                    .filter_by(project=project).one()        except NoResultFound:            raise FileNotFound(path)                result = set(dir.subdirs)        result.update(set(dir.files))        return sorted(result, key=lambda item: item.name)            def get_project(self, user, owner, project_name, create=False,                     clean=False):        """Retrieves the project object, optionally creating it if it        doesn't exist. Additionally, this will verify that the        user is authorized for the project. If the project_name is        actually a project object, that object is simply returned."""                s = self.session                # a request for a clean project also implies that creating it        # is okay        if clean:            create = True                try:            project = s.query(Project).filter_by(name=project_name)\                        .filter_by(owner=owner).one()            project.authorize(user)            if clean:                # a clean project has been requested, so we will delete its                # contents                self.delete(user, project)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -