📄 oe_mailbox.py
字号:
# label of each indexed info INDEX_LABEL = \ [ "message index" , "flags" , "time message created/send" , "body lines" , "message address" , "original subject" , "time message saved" , "message id" , "subject" , "sender eMail address and name" , "answered to message id" , "server/newsgroup/message number", "server" , "sender name" , "sender eMail address" , "id 0f" , "message priority" , "message text length" , "time message created/received", "receiver name" , "receiver eMail address" , "id 15" , "id 16" , "id 17" , "id 18" , "id 19" , "OE account name" , "OE account registry key" , "message text structure" , "id 1d" , "id 1e" , "id 1f" ] DT_NONE = 0 # index is none DT_INT4 = 1 # index is a long integer (32 bits) DT_STRING = 2 # index is a string DT_DATE_TIME = 3 # index is date/time DT_DATA = 4 # index is data # the data type of each index INDEX_DATA_TYPE = \ [ DT_INT4 , DT_INT4 , DT_DATE_TIME, DT_INT4 , DT_INT4 , DT_STRING, DT_DATE_TIME, DT_STRING, DT_STRING, DT_STRING, DT_STRING , DT_STRING, DT_STRING, DT_STRING, DT_STRING , DT_NONE , DT_INT4 , DT_INT4 , DT_DATE_TIME, DT_STRING, DT_STRING, DT_NONE , DT_INT4 , DT_NONE , DT_INT4 , DT_INT4 , DT_STRING , DT_STRING, DT_DATA , DT_NONE , DT_NONE , DT_NONE ] def getIndexText(self, dbxIndex): return dbxMessageInfo.INDEX_LABEL[dbxIndex] def getIndexDataType(self, dbxIndex): return dbxMessageInfo.INDEX_DATA_TYPE[dbxIndex]############################################################################# DBX MESSAGE###########################################################################class dbxMessage: def __init__(self, dbxStream, dbxAddress): self.dbxAddress = dbxAddress self.dbxText = "" self.dbxLength = 0L self.__readMessageText(dbxStream) def __getEntry(self, dbxBuffer, dbxEntry): if len(dbxBuffer) < (dbxEntry * 4) + 4: return None return struct.unpack("L", dbxBuffer[dbxEntry * 4:(dbxEntry * 4) + 4])[0] def __readMessageText(self, dbxStream): address = self.dbxAddress header = "" while (address): dbxStream.seek(address) header = dbxStream.read(16) self.dbxLength += self.__getEntry(header, 2) address = self.__getEntry(header, 3) pos = "" address = self.dbxAddress while (address): dbxStream.seek(address) header = dbxStream.read(16) pos += dbxStream.read(self.__getEntry(header, 2)) address = self.__getEntry(header, 3) self.dbxText = pos def getText(self): return self.dbxText# This started its SpamBayes life as a private method of the UserInterface# class, but is really a general purpose (Outlook Express) function.def convertToMbox(content): """Check if the given buffer is in a non-mbox format, and convert it into mbox format if so. If it's already an mbox, return it unchanged. """ dbxStream = StringIO.StringIO(content) header = dbxFileHeader(dbxStream) if header.isValid() and header.isMessages(): file_info_len = dbxFileHeader.FH_FILE_INFO_LENGTH fh_entries = dbxFileHeader.FH_ENTRIES fh_ptr = dbxFileHeader.FH_TREE_ROOT_NODE_PTR info = dbxFileInfo(dbxStream, header.getEntry(file_info_len)) entries = header.getEntry(fh_entries) address = header.getEntry(fh_ptr) if address and entries: tree = dbxTree(dbxStream, address, entries) dbxBuffer = [] for i in range(entries): address = tree.getValue(i) messageInfo = dbxMessageInfo(dbxStream, address) if messageInfo.isIndexed(dbxMessageInfo.MI_MESSAGE_ADDRESS): address = dbxMessageInfo.MI_MESSAGE_ADDRESS messageAddress = messageInfo.getValueAsLong(address) message = dbxMessage(dbxStream, messageAddress) # This fakes up a from header to conform to mbox # standards. It would be better to extract this # data from the message itself, as this will # result in incorrect tokens. dbxBuffer.append("From spambayes@spambayes.org %s\n%s" \ % (strftime("%a %b %d %H:%M:%S MET %Y", gmtime()), message.getText())) content = "".join(dbxBuffer) dbxStream.close() return contentdef OEIdentityKeys(): """Return the OE identity keys. Tested with Outlook Express 6.0 with Windows XP.""" if win32api is None: # Delayed import error from top. raise ImportError("pywin32 not installed") reg = win32api.RegOpenKeyEx(win32con.HKEY_USERS, "") user_index = 0 while True: # Loop through all the users try: user_name = "%s\\Identities" % \ (win32api.RegEnumKey(reg, user_index),) except win32api.error: break user_index += 1 try: user_key = win32api.RegOpenKeyEx(win32con.HKEY_USERS, user_name) except win32api.error: # Not this one continue identity_index = 0 while True: # Loop through all the identities try: identity_name = win32api.RegEnumKey(user_key, identity_index) except win32api.error: break identity_index += 1 subkey_name = "%s\\%s\\%s" % (user_name, identity_name, "Software\\Microsoft\\Outlook " \ "Express\\5.0") try: subkey = win32api.RegOpenKeyEx(win32con.HKEY_USERS, subkey_name, 0, win32con.KEY_READ) except win32api.error: # Not this user continue yield subkeydef OECurrentUserKey(): """Returns the root registry key for current user Outlook Express settings.""" if win32api is None: # Delayed import error from top. raise ImportError("pywin32 not installed") key = "Identities" reg = win32api.RegOpenKeyEx(win32con.HKEY_CURRENT_USER, key) id = win32api.RegQueryValueEx(reg, "Default User ID")[0] subKey = "%s\\%s\\Software\\Microsoft\\Outlook Express\\5.0" % (key, id) return subKeydef OEStoreRoot(): """Return the path to the Outlook Express Store Root. Tested with Outlook Express 6.0 with Windows XP.""" subKey = OECurrentUserKey() reg = win32api.RegOpenKeyEx(win32con.HKEY_CURRENT_USER, subKey) path = win32api.RegQueryValueEx(reg, "Store Root")[0] # I can't find a shellcon to that is the same as %UserProfile%, # so extract it from CSIDL_LOCAL_APPDATA UserDirectory = shell.SHGetFolderPath \ (0, shellcon.CSIDL_LOCAL_APPDATA, 0, 0) parts = UserDirectory.split(os.sep) UserProfile = os.sep.join(parts[:-2]) return path.replace("%UserProfile%", UserProfile)def OEDBXFilesList(): """Returns a list of DBX files for current user.""" path = OEStoreRoot() dbx_re = re.compile('.+\.dbx') dbxs = [f for f in os.listdir(path) if dbx_re.search(f) != None] return dbxsdef OEAccountKeys(permission = None): """Return registry keys for each of the OE mail accounts, along with information about what type of mail account it is.""" if permission is None: # Can't do this in the parameter, because then it requires # win32con to be available for the module to be imported. permission = win32con.KEY_READ | win32con.KEY_SET_VALUE possible_root_keys = [] # This appears to be the place for OE6 and WinXP # (So I'm guessing also for NT4) if sys.getwindowsversion()[0] >= 4: possible_root_keys = ["Software\\Microsoft\\" \ "Internet Account Manager\\Accounts"] else: # This appears to be the place for OE6 and Win98 # (So I'm guessing also for Win95) possible_root_keys = oe_mailbox.OEIdentityKeys() for key in possible_root_keys: reg = win32api.RegOpenKeyEx(win32con.HKEY_CURRENT_USER, key) account_index = 0 while True: # Loop through all the accounts account = {} try: subkey_name = "%s\\%s" % \ (key, win32api.RegEnumKey(reg, account_index)) except win32api.error: break account_index += 1 index = 0 subkey = win32api.RegOpenKeyEx(win32con.HKEY_CURRENT_USER, subkey_name, 0, permission) while True: # Loop through all the keys so that we can determine # what type of account this is. try: name, value, typ = win32api.RegEnumValue(subkey, index) except win32api.error: break account[name] = (value, typ) index += 1 # Yield, as appropriate. if account.has_key("POP3 Server"): yield("POP3", subkey, account) elif account.has_key("IMAP Server"): yield("IMAP4", subkey, account)def OEIsInstalled(): """Return True if Outlook Express appears to be installed, and in use (I think if sys.platform == "win32" would say if it was installed at all).""" # Our heuristic is that there is at least one mail account setup. if len(list(OEAccountKeys)) > 0: return True return False## For use by the test tools.class OEMsg(msgs.Msg): def __init__(self, guts, id): self.tag = id self.guts = guts# The iterator yields a stream of Msg objects, taken from a list of# dbx files.class OEMsgStream(msgs.MsgStream): def __init__(self, tag, dbxes, keep=None): msgs.MsgStream.__init__(self, tag, dbxes, keep) def produce(self): if self.keep is None: for dbx in self.directories: folder = convertToMbox(file(dbx)) all = folder.split("\nFrom ") # XXX Is this right? count = 0 for msg in all: id = "%s::%s" % (dbx, count) count += 1 yield OEMsg(msg, id) return # We only want part of the msgs. Shuffle each directory list, but # in such a way that we'll get the same result each time this is # called on the same directory list. for directory in self.directories: folder = convertToMbox(file(dbx)) all = folder.split("\nFrom ") # XXX Is this right? random.seed(hash(max(all)) ^ SEED) # reproducible across calls random.shuffle(all) del all[self.keep:] all.sort() # for consistency with MsgStream count = 0 for msg in all: id = "%s::%s" % (dbx, count) count += 1 yield OEMsg(msg, id)class OEHamStream(msgs.HamStream): def __init__(self, tag, dbxes, train=0): msgs.HamStream.__init__(self, tag, dbxes, train)class OESpamStream(msgs.SpamStream): def __init__(self, tag, dbxes, train=0): msgs.SpamStream.__init__(self, tag, dbxes, train)############################################################################# TEST DRIVER###########################################################################def test(): import sys import getopt try: opts, args = getopt.getopt(sys.argv[1:], 'hp') except getopt.error, msg: print >>sys.stderr, str(msg) + '\n\n' + __doc__ sys.exit() print_message = False for opt, arg in opts: if opt == '-h': print >>sys.stderr, __doc__ sys.exit() elif opt == '-p': print_message = True MAILBOX_DIR = OEStoreRoot() files = [os.path.join(MAILBOX_DIR, f) for f in OEDBXFilesList()] for file in files: try: print print file dbx = open(file, "rb", 0) header = dbxFileHeader(dbx) print "IS VALID DBX :", header.isValid() if header.isMessages(): info = dbxFileInfo(dbx, header.getEntry(dbxFileHeader.FH_FILE_INFO_LENGTH)) print "MAILBOX NAME :", info.getFolderName() print "CREATION TIME :", info.getCreationTime() entries = header.getEntry(dbxFileHeader.FH_ENTRIES) address = header.getEntry(dbxFileHeader.FH_TREE_ROOT_NODE_PTR) if address and entries: tree = dbxTree(dbx, address, entries) for i in range(entries): address = tree.getValue(i) messageInfo = dbxMessageInfo(dbx, address) if messageInfo.isIndexed(dbxMessageInfo.MI_MESSAGE_ADDRESS): messageAddress = messageInfo.getValueAsLong(dbxMessageInfo.MI_MESSAGE_ADDRESS) message = dbxMessage(dbx, messageAddress) if print_message: print print "Message :", messageInfo.getString(dbxMessageInfo.MI_SUBJECT) print "=" * (len(messageInfo.getString(dbxMessageInfo.MI_SUBJECT)) + 9) print print message.getText() dbx.close() except Exception, (strerror): print strerrorif __name__ == '__main__': test()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -