mhlib.py
来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,004 行 · 第 1/3 页
PY
1,004 行
try: return max(seqs['cur']) except (ValueError, KeyError): raise Error, "no cur message" def setcurrent(self, n): """Set the current message.""" updateline(self.getsequencesfilename(), 'cur', str(n), 0) def parsesequence(self, seq): """Parse an MH sequence specification into a message list. Attempt to mimic mh-sequence(5) as close as possible. Also attempt to mimic observed behavior regarding which conditions cause which error messages.""" # XXX Still not complete (see mh-format(5)). # Missing are: # - 'prev', 'next' as count # - Sequence-Negation option all = self.listmessages() # Observed behavior: test for empty folder is done first if not all: raise Error, "no messages in %s" % self.name # Common case first: all is frequently the default if seq == 'all': return all # Test for X:Y before X-Y because 'seq:-n' matches both i = seq.find(':') if i >= 0: head, dir, tail = seq[:i], '', seq[i+1:] if tail[:1] in '-+': dir, tail = tail[:1], tail[1:] if not isnumeric(tail): raise Error, "bad message list %s" % seq try: count = int(tail) except (ValueError, OverflowError): # Can't use sys.maxint because of i+count below count = len(all) try: anchor = self._parseindex(head, all) except Error, msg: seqs = self.getsequences() if not seqs.has_key(head): if not msg: msg = "bad message list %s" % seq raise Error, msg, sys.exc_info()[2] msgs = seqs[head] if not msgs: raise Error, "sequence %s empty" % head if dir == '-': return msgs[-count:] else: return msgs[:count] else: if not dir: if head in ('prev', 'last'): dir = '-' if dir == '-': i = bisect(all, anchor) return all[max(0, i-count):i] else: i = bisect(all, anchor-1) return all[i:i+count] # Test for X-Y next i = seq.find('-') if i >= 0: begin = self._parseindex(seq[:i], all) end = self._parseindex(seq[i+1:], all) i = bisect(all, begin-1) j = bisect(all, end) r = all[i:j] if not r: raise Error, "bad message list %s" % seq return r # Neither X:Y nor X-Y; must be a number or a (pseudo-)sequence try: n = self._parseindex(seq, all) except Error, msg: seqs = self.getsequences() if not seqs.has_key(seq): if not msg: msg = "bad message list %s" % seq raise Error, msg return seqs[seq] else: if n not in all: if isnumeric(seq): raise Error, "message %d doesn't exist" % n else: raise Error, "no %s message" % seq else: return [n] def _parseindex(self, seq, all): """Internal: parse a message number (or cur, first, etc.).""" if isnumeric(seq): try: return int(seq) except (OverflowError, ValueError): return sys.maxint if seq in ('cur', '.'): return self.getcurrent() if seq == 'first': return all[0] if seq == 'last': return all[-1] if seq == 'next': n = self.getcurrent() i = bisect(all, n) try: return all[i] except IndexError: raise Error, "no next message" if seq == 'prev': n = self.getcurrent() i = bisect(all, n-1) if i == 0: raise Error, "no prev message" try: return all[i-1] except IndexError: raise Error, "no prev message" raise Error, None def openmessage(self, n): """Open a message -- returns a Message object.""" return Message(self, n) def removemessages(self, list): """Remove one or more messages -- may raise os.error.""" errors = [] deleted = [] for n in list: path = self.getmessagefilename(n) commapath = self.getmessagefilename(',' + str(n)) try: os.unlink(commapath) except os.error: pass try: os.rename(path, commapath) except os.error, msg: errors.append(msg) else: deleted.append(n) if deleted: self.removefromallsequences(deleted) if errors: if len(errors) == 1: raise os.error, errors[0] else: raise os.error, ('multiple errors:', errors) def refilemessages(self, list, tofolder, keepsequences=0): """Refile one or more messages -- may raise os.error. 'tofolder' is an open folder object.""" errors = [] refiled = {} for n in list: ton = tofolder.getlast() + 1 path = self.getmessagefilename(n) topath = tofolder.getmessagefilename(ton) try: os.rename(path, topath) except os.error: # Try copying try: shutil.copy2(path, topath) os.unlink(path) except (IOError, os.error), msg: errors.append(msg) try: os.unlink(topath) except os.error: pass continue tofolder.setlast(ton) refiled[n] = ton if refiled: if keepsequences: tofolder._copysequences(self, refiled.items()) self.removefromallsequences(refiled.keys()) if errors: if len(errors) == 1: raise os.error, errors[0] else: raise os.error, ('multiple errors:', errors) def _copysequences(self, fromfolder, refileditems): """Helper for refilemessages() to copy sequences.""" fromsequences = fromfolder.getsequences() tosequences = self.getsequences() changed = 0 for name, seq in fromsequences.items(): try: toseq = tosequences[name] new = 0 except KeyError: toseq = [] new = 1 for fromn, ton in refileditems: if fromn in seq: toseq.append(ton) changed = 1 if new and toseq: tosequences[name] = toseq if changed: self.putsequences(tosequences) def movemessage(self, n, tofolder, ton): """Move one message over a specific destination message, which may or may not already exist.""" path = self.getmessagefilename(n) # Open it to check that it exists f = open(path) f.close() del f topath = tofolder.getmessagefilename(ton) backuptopath = tofolder.getmessagefilename(',%d' % ton) try: os.rename(topath, backuptopath) except os.error: pass try: os.rename(path, topath) except os.error: # Try copying ok = 0 try: tofolder.setlast(None) shutil.copy2(path, topath) ok = 1 finally: if not ok: try: os.unlink(topath) except os.error: pass os.unlink(path) self.removefromallsequences([n]) def copymessage(self, n, tofolder, ton): """Copy one message over a specific destination message, which may or may not already exist.""" path = self.getmessagefilename(n) # Open it to check that it exists f = open(path) f.close() del f topath = tofolder.getmessagefilename(ton) backuptopath = tofolder.getmessagefilename(',%d' % ton) try: os.rename(topath, backuptopath) except os.error: pass ok = 0 try: tofolder.setlast(None) shutil.copy2(path, topath) ok = 1 finally: if not ok: try: os.unlink(topath) except os.error: pass def createmessage(self, n, txt): """Create a message, with text from the open file txt.""" path = self.getmessagefilename(n) backuppath = self.getmessagefilename(',%d' % n) try: os.rename(path, backuppath) except os.error: pass ok = 0 BUFSIZE = 16*1024 try: f = open(path, "w") while 1: buf = txt.read(BUFSIZE) if not buf: break f.write(buf) f.close() ok = 1 finally: if not ok: try: os.unlink(path) except os.error: pass def removefromallsequences(self, list): """Remove one or more messages from all sequences (including last) -- but not from 'cur'!!!""" if hasattr(self, 'last') and self.last in list: del self.last sequences = self.getsequences() changed = 0 for name, seq in sequences.items(): if name == 'cur': continue for n in list: if n in seq: seq.remove(n) changed = 1 if not seq: del sequences[name] if changed: self.putsequences(sequences) def getlast(self): """Return the last message number.""" if not hasattr(self, 'last'): self.listmessages() # Set self.last return self.last def setlast(self, last): """Set the last message number.""" if last is None: if hasattr(self, 'last'): del self.last else: self.last = lastclass Message(mimetools.Message): def __init__(self, f, n, fp = None): """Constructor.""" self.folder = f self.number = n if not fp: path = f.getmessagefilename(n) fp = open(path, 'r')
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?