mhlib.py

来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,004 行 · 第 1/3 页

PY
1,004
字号
"""MH interface -- purely object-oriented (well, almost)Executive summary:import mhlibmh = mhlib.MH()         # use default mailbox directory and profilemh = mhlib.MH(mailbox)  # override mailbox location (default from profile)mh = mhlib.MH(mailbox, profile) # override mailbox and profilemh.error(format, ...)   # print error message -- can be overriddens = mh.getprofile(key)  # profile entry (None if not set)path = mh.getpath()     # mailbox pathnamename = mh.getcontext()  # name of current foldermh.setcontext(name)     # set name of current folderlist = mh.listfolders() # names of top-level folderslist = mh.listallfolders() # names of all folders, including subfolderslist = mh.listsubfolders(name) # direct subfolders of given folderlist = mh.listallsubfolders(name) # all subfolders of given foldermh.makefolder(name)     # create new foldermh.deletefolder(name)   # delete folder -- must have no subfoldersf = mh.openfolder(name) # new open folder objectf.error(format, ...)    # same as mh.error(format, ...)path = f.getfullname()  # folder's full pathnamepath = f.getsequencesfilename() # full pathname of folder's sequences filepath = f.getmessagefilename(n)  # full pathname of message n in folderlist = f.listmessages() # list of messages in folder (as numbers)n = f.getcurrent()      # get current messagef.setcurrent(n)         # set current messagelist = f.parsesequence(seq)     # parse msgs syntax into list of messagesn = f.getlast()         # get last message (0 if no messagse)f.setlast(n)            # set last message (internal use only)dict = f.getsequences() # dictionary of sequences in folder {name: list}f.putsequences(dict)    # write sequences back to folderf.createmessage(n, fp)  # add message from file f as number nf.removemessages(list)  # remove messages in list from folderf.refilemessages(list, tofolder) # move messages in list to other folderf.movemessage(n, tofolder, ton)  # move one message to a given destinationf.copymessage(n, tofolder, ton)  # copy one message to a given destinationm = f.openmessage(n)    # new open message object (costs a file descriptor)m is a derived class of mimetools.Message(rfc822.Message), with:s = m.getheadertext()   # text of message's headerss = m.getheadertext(pred) # text of message's headers, filtered by preds = m.getbodytext()     # text of message's body, decodeds = m.getbodytext(0)    # text of message's body, not decoded"""# XXX To do, functionality:# - annotate messages# - send messages## XXX To do, organization:# - move IntSet to separate file# - move most Message functionality to module mimetools# Customizable defaultsMH_PROFILE = '~/.mh_profile'PATH = '~/Mail'MH_SEQUENCES = '.mh_sequences'FOLDER_PROTECT = 0700# Imported modulesimport osimport sysfrom stat import ST_NLINKimport reimport mimetoolsimport multifileimport shutilfrom bisect import bisect__all__ = ["MH","Error","Folder","Message"]# Exported constantsclass Error(Exception):    passclass MH:    """Class representing a particular collection of folders.    Optional constructor arguments are the pathname for the directory    containing the collection, and the MH profile to use.    If either is omitted or empty a default is used; the default    directory is taken from the MH profile if it is specified there."""    def __init__(self, path = None, profile = None):        """Constructor."""        if not profile: profile = MH_PROFILE        self.profile = os.path.expanduser(profile)        if not path: path = self.getprofile('Path')        if not path: path = PATH        if not os.path.isabs(path) and path[0] != '~':            path = os.path.join('~', path)        path = os.path.expanduser(path)        if not os.path.isdir(path): raise Error, 'MH() path not found'        self.path = path    def __repr__(self):        """String representation."""        return 'MH(%s, %s)' % (`self.path`, `self.profile`)    def error(self, msg, *args):        """Routine to print an error.  May be overridden by a derived class."""        sys.stderr.write('MH error: %s\n' % (msg % args))    def getprofile(self, key):        """Return a profile entry, None if not found."""        return pickline(self.profile, key)    def getpath(self):        """Return the path (the name of the collection's directory)."""        return self.path    def getcontext(self):        """Return the name of the current folder."""        context = pickline(os.path.join(self.getpath(), 'context'),                  'Current-Folder')        if not context: context = 'inbox'        return context    def setcontext(self, context):        """Set the name of the current folder."""        fn = os.path.join(self.getpath(), 'context')        f = open(fn, "w")        f.write("Current-Folder: %s\n" % context)        f.close()    def listfolders(self):        """Return the names of the top-level folders."""        folders = []        path = self.getpath()        for name in os.listdir(path):            fullname = os.path.join(path, name)            if os.path.isdir(fullname):                folders.append(name)        folders.sort()        return folders    def listsubfolders(self, name):        """Return the names of the subfolders in a given folder        (prefixed with the given folder name)."""        fullname = os.path.join(self.path, name)        # Get the link count so we can avoid listing folders        # that have no subfolders.        st = os.stat(fullname)        nlinks = st[ST_NLINK]        if nlinks <= 2:            return []        subfolders = []        subnames = os.listdir(fullname)        for subname in subnames:            fullsubname = os.path.join(fullname, subname)            if os.path.isdir(fullsubname):                name_subname = os.path.join(name, subname)                subfolders.append(name_subname)                # Stop looking for subfolders when                # we've seen them all                nlinks = nlinks - 1                if nlinks <= 2:                    break        subfolders.sort()        return subfolders    def listallfolders(self):        """Return the names of all folders and subfolders, recursively."""        return self.listallsubfolders('')    def listallsubfolders(self, name):        """Return the names of subfolders in a given folder, recursively."""        fullname = os.path.join(self.path, name)        # Get the link count so we can avoid listing folders        # that have no subfolders.        st = os.stat(fullname)        nlinks = st[ST_NLINK]        if nlinks <= 2:            return []        subfolders = []        subnames = os.listdir(fullname)        for subname in subnames:            if subname[0] == ',' or isnumeric(subname): continue            fullsubname = os.path.join(fullname, subname)            if os.path.isdir(fullsubname):                name_subname = os.path.join(name, subname)                subfolders.append(name_subname)                if not os.path.islink(fullsubname):                    subsubfolders = self.listallsubfolders(                              name_subname)                    subfolders = subfolders + subsubfolders                # Stop looking for subfolders when                # we've seen them all                nlinks = nlinks - 1                if nlinks <= 2:                    break        subfolders.sort()        return subfolders    def openfolder(self, name):        """Return a new Folder object for the named folder."""        return Folder(self, name)    def makefolder(self, name):        """Create a new folder (or raise os.error if it cannot be created)."""        protect = pickline(self.profile, 'Folder-Protect')        if protect and isnumeric(protect):            mode = int(protect, 8)        else:            mode = FOLDER_PROTECT        os.mkdir(os.path.join(self.getpath(), name), mode)    def deletefolder(self, name):        """Delete a folder.  This removes files in the folder but not        subdirectories.  Raise os.error if deleting the folder itself fails."""        fullname = os.path.join(self.getpath(), name)        for subname in os.listdir(fullname):            fullsubname = os.path.join(fullname, subname)            try:                os.unlink(fullsubname)            except os.error:                self.error('%s not deleted, continuing...' %                          fullsubname)        os.rmdir(fullname)numericprog = re.compile('^[1-9][0-9]*$')def isnumeric(str):    return numericprog.match(str) is not Noneclass Folder:    """Class representing a particular folder."""    def __init__(self, mh, name):        """Constructor."""        self.mh = mh        self.name = name        if not os.path.isdir(self.getfullname()):            raise Error, 'no folder %s' % name    def __repr__(self):        """String representation."""        return 'Folder(%s, %s)' % (`self.mh`, `self.name`)    def error(self, *args):        """Error message handler."""        apply(self.mh.error, args)    def getfullname(self):        """Return the full pathname of the folder."""        return os.path.join(self.mh.path, self.name)    def getsequencesfilename(self):        """Return the full pathname of the folder's sequences file."""        return os.path.join(self.getfullname(), MH_SEQUENCES)    def getmessagefilename(self, n):        """Return the full pathname of a message in the folder."""        return os.path.join(self.getfullname(), str(n))    def listsubfolders(self):        """Return list of direct subfolders."""        return self.mh.listsubfolders(self.name)    def listallsubfolders(self):        """Return list of all subfolders."""        return self.mh.listallsubfolders(self.name)    def listmessages(self):        """Return the list of messages currently present in the folder.        As a side effect, set self.last to the last message (or 0)."""        messages = []        match = numericprog.match        append = messages.append        for name in os.listdir(self.getfullname()):            if match(name):                append(name)        messages = map(int, messages)        messages.sort()        if messages:            self.last = messages[-1]        else:            self.last = 0        return messages    def getsequences(self):        """Return the set of sequences for the folder."""        sequences = {}        fullname = self.getsequencesfilename()        try:            f = open(fullname, 'r')        except IOError:            return sequences        while 1:            line = f.readline()            if not line: break            fields = line.split(':')            if len(fields) != 2:                self.error('bad sequence in %s: %s' %                          (fullname, line.strip()))            key = fields[0].strip()            value = IntSet(fields[1].strip(), ' ').tolist()            sequences[key] = value        return sequences    def putsequences(self, sequences):        """Write the set of sequences back to the folder."""        fullname = self.getsequencesfilename()        f = None        for key in sequences.keys():            s = IntSet('', ' ')            s.fromlist(sequences[key])            if not f: f = open(fullname, 'w')            f.write('%s: %s\n' % (key, s.tostring()))        if not f:            try:                os.unlink(fullname)            except os.error:                pass        else:            f.close()    def getcurrent(self):        """Return the current message.  Raise Error when there is none."""        seqs = self.getsequences()

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?