mhlib.py

来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,004 行 · 第 1/3 页

PY
1,004
字号
        try:            return max(seqs['cur'])        except (ValueError, KeyError):            raise Error, "no cur message"    def setcurrent(self, n):        """Set the current message."""        updateline(self.getsequencesfilename(), 'cur', str(n), 0)    def parsesequence(self, seq):        """Parse an MH sequence specification into a message list.        Attempt to mimic mh-sequence(5) as close as possible.        Also attempt to mimic observed behavior regarding which        conditions cause which error messages."""        # XXX Still not complete (see mh-format(5)).        # Missing are:        # - 'prev', 'next' as count        # - Sequence-Negation option        all = self.listmessages()        # Observed behavior: test for empty folder is done first        if not all:            raise Error, "no messages in %s" % self.name        # Common case first: all is frequently the default        if seq == 'all':            return all        # Test for X:Y before X-Y because 'seq:-n' matches both        i = seq.find(':')        if i >= 0:            head, dir, tail = seq[:i], '', seq[i+1:]            if tail[:1] in '-+':                dir, tail = tail[:1], tail[1:]            if not isnumeric(tail):                raise Error, "bad message list %s" % seq            try:                count = int(tail)            except (ValueError, OverflowError):                # Can't use sys.maxint because of i+count below                count = len(all)            try:                anchor = self._parseindex(head, all)            except Error, msg:                seqs = self.getsequences()                if not seqs.has_key(head):                    if not msg:                        msg = "bad message list %s" % seq                    raise Error, msg, sys.exc_info()[2]                msgs = seqs[head]                if not msgs:                    raise Error, "sequence %s empty" % head                if dir == '-':                    return msgs[-count:]                else:                    return msgs[:count]            else:                if not dir:                    if head in ('prev', 'last'):                        dir = '-'                if dir == '-':                    i = bisect(all, anchor)                    return all[max(0, i-count):i]                else:                    i = bisect(all, anchor-1)                    return all[i:i+count]        # Test for X-Y next        i = seq.find('-')        if i >= 0:            begin = self._parseindex(seq[:i], all)            end = self._parseindex(seq[i+1:], all)            i = bisect(all, begin-1)            j = bisect(all, end)            r = all[i:j]            if not r:                raise Error, "bad message list %s" % seq            return r        # Neither X:Y nor X-Y; must be a number or a (pseudo-)sequence        try:            n = self._parseindex(seq, all)        except Error, msg:            seqs = self.getsequences()            if not seqs.has_key(seq):                if not msg:                    msg = "bad message list %s" % seq                raise Error, msg            return seqs[seq]        else:            if n not in all:                if isnumeric(seq):                    raise Error, "message %d doesn't exist" % n                else:                    raise Error, "no %s message" % seq            else:                return [n]    def _parseindex(self, seq, all):        """Internal: parse a message number (or cur, first, etc.)."""        if isnumeric(seq):            try:                return int(seq)            except (OverflowError, ValueError):                return sys.maxint        if seq in ('cur', '.'):            return self.getcurrent()        if seq == 'first':            return all[0]        if seq == 'last':            return all[-1]        if seq == 'next':            n = self.getcurrent()            i = bisect(all, n)            try:                return all[i]            except IndexError:                raise Error, "no next message"        if seq == 'prev':            n = self.getcurrent()            i = bisect(all, n-1)            if i == 0:                raise Error, "no prev message"            try:                return all[i-1]            except IndexError:                raise Error, "no prev message"        raise Error, None    def openmessage(self, n):        """Open a message -- returns a Message object."""        return Message(self, n)    def removemessages(self, list):        """Remove one or more messages -- may raise os.error."""        errors = []        deleted = []        for n in list:            path = self.getmessagefilename(n)            commapath = self.getmessagefilename(',' + str(n))            try:                os.unlink(commapath)            except os.error:                pass            try:                os.rename(path, commapath)            except os.error, msg:                errors.append(msg)            else:                deleted.append(n)        if deleted:            self.removefromallsequences(deleted)        if errors:            if len(errors) == 1:                raise os.error, errors[0]            else:                raise os.error, ('multiple errors:', errors)    def refilemessages(self, list, tofolder, keepsequences=0):        """Refile one or more messages -- may raise os.error.        'tofolder' is an open folder object."""        errors = []        refiled = {}        for n in list:            ton = tofolder.getlast() + 1            path = self.getmessagefilename(n)            topath = tofolder.getmessagefilename(ton)            try:                os.rename(path, topath)            except os.error:                # Try copying                try:                    shutil.copy2(path, topath)                    os.unlink(path)                except (IOError, os.error), msg:                    errors.append(msg)                    try:                        os.unlink(topath)                    except os.error:                        pass                    continue            tofolder.setlast(ton)            refiled[n] = ton        if refiled:            if keepsequences:                tofolder._copysequences(self, refiled.items())            self.removefromallsequences(refiled.keys())        if errors:            if len(errors) == 1:                raise os.error, errors[0]            else:                raise os.error, ('multiple errors:', errors)    def _copysequences(self, fromfolder, refileditems):        """Helper for refilemessages() to copy sequences."""        fromsequences = fromfolder.getsequences()        tosequences = self.getsequences()        changed = 0        for name, seq in fromsequences.items():            try:                toseq = tosequences[name]                new = 0            except KeyError:                toseq = []                new = 1            for fromn, ton in refileditems:                if fromn in seq:                    toseq.append(ton)                    changed = 1            if new and toseq:                tosequences[name] = toseq        if changed:            self.putsequences(tosequences)    def movemessage(self, n, tofolder, ton):        """Move one message over a specific destination message,        which may or may not already exist."""        path = self.getmessagefilename(n)        # Open it to check that it exists        f = open(path)        f.close()        del f        topath = tofolder.getmessagefilename(ton)        backuptopath = tofolder.getmessagefilename(',%d' % ton)        try:            os.rename(topath, backuptopath)        except os.error:            pass        try:            os.rename(path, topath)        except os.error:            # Try copying            ok = 0            try:                tofolder.setlast(None)                shutil.copy2(path, topath)                ok = 1            finally:                if not ok:                    try:                        os.unlink(topath)                    except os.error:                        pass            os.unlink(path)        self.removefromallsequences([n])    def copymessage(self, n, tofolder, ton):        """Copy one message over a specific destination message,        which may or may not already exist."""        path = self.getmessagefilename(n)        # Open it to check that it exists        f = open(path)        f.close()        del f        topath = tofolder.getmessagefilename(ton)        backuptopath = tofolder.getmessagefilename(',%d' % ton)        try:            os.rename(topath, backuptopath)        except os.error:            pass        ok = 0        try:            tofolder.setlast(None)            shutil.copy2(path, topath)            ok = 1        finally:            if not ok:                try:                    os.unlink(topath)                except os.error:                    pass    def createmessage(self, n, txt):        """Create a message, with text from the open file txt."""        path = self.getmessagefilename(n)        backuppath = self.getmessagefilename(',%d' % n)        try:            os.rename(path, backuppath)        except os.error:            pass        ok = 0        BUFSIZE = 16*1024        try:            f = open(path, "w")            while 1:                buf = txt.read(BUFSIZE)                if not buf:                    break                f.write(buf)            f.close()            ok = 1        finally:            if not ok:                try:                    os.unlink(path)                except os.error:                    pass    def removefromallsequences(self, list):        """Remove one or more messages from all sequences (including last)        -- but not from 'cur'!!!"""        if hasattr(self, 'last') and self.last in list:            del self.last        sequences = self.getsequences()        changed = 0        for name, seq in sequences.items():            if name == 'cur':                continue            for n in list:                if n in seq:                    seq.remove(n)                    changed = 1                    if not seq:                        del sequences[name]        if changed:            self.putsequences(sequences)    def getlast(self):        """Return the last message number."""        if not hasattr(self, 'last'):            self.listmessages() # Set self.last        return self.last    def setlast(self, last):        """Set the last message number."""        if last is None:            if hasattr(self, 'last'):                del self.last        else:            self.last = lastclass Message(mimetools.Message):    def __init__(self, f, n, fp = None):        """Constructor."""        self.folder = f        self.number = n        if not fp:            path = f.getmessagefilename(n)            fp = open(path, 'r')

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?