⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 oe_mailbox.py

📁 用python实现的邮件过滤器
💻 PY
📖 第 1 页 / 共 2 页
字号:
    # label of each indexed info    INDEX_LABEL = \    [ "message index"                , "flags"                          ,      "time message created/send"    , "body lines"                     ,      "message address"              , "original subject"               ,      "time message saved"           , "message id"                     ,      "subject"                      , "sender eMail address and name"  ,      "answered to message id"       , "server/newsgroup/message number",      "server"                       , "sender name"                    ,      "sender eMail address"         , "id 0f"                          ,      "message priority"             , "message text length"            ,      "time message created/received", "receiver name"                  ,      "receiver eMail address"       , "id 15"                          ,      "id 16"                        , "id 17"                          ,      "id 18"                        , "id 19"                          ,      "OE account name"              , "OE account registry key"        ,      "message text structure"       , "id 1d"                          ,      "id 1e"                        , "id 1f"                           ]    DT_NONE      = 0                    # index is none    DT_INT4      = 1                    # index is a long integer (32 bits)    DT_STRING    = 2                    # index is a string    DT_DATE_TIME = 3                    # index is date/time    DT_DATA      = 4                    # index is data    # the data type of each index    INDEX_DATA_TYPE = \    [ DT_INT4  , DT_INT4  , DT_DATE_TIME, DT_INT4  , DT_INT4  , DT_STRING, DT_DATE_TIME, DT_STRING,      DT_STRING, DT_STRING, DT_STRING   , DT_STRING, DT_STRING, DT_STRING, DT_STRING   , DT_NONE  ,      DT_INT4  , DT_INT4  , DT_DATE_TIME, DT_STRING, DT_STRING, DT_NONE  , DT_INT4     , DT_NONE  ,      DT_INT4  , DT_INT4  , DT_STRING   , DT_STRING, DT_DATA  , DT_NONE  , DT_NONE     , DT_NONE   ]    def getIndexText(self, dbxIndex):        return dbxMessageInfo.INDEX_LABEL[dbxIndex]    def getIndexDataType(self, dbxIndex):        return dbxMessageInfo.INDEX_DATA_TYPE[dbxIndex]############################################################################# DBX MESSAGE###########################################################################class dbxMessage:    def __init__(self, dbxStream, dbxAddress):        self.dbxAddress = dbxAddress        self.dbxText   = ""        self.dbxLength = 0L        self.__readMessageText(dbxStream)    def __getEntry(self, dbxBuffer, dbxEntry):        if len(dbxBuffer) < (dbxEntry * 4) + 4:            return None        return struct.unpack("L", dbxBuffer[dbxEntry * 4:(dbxEntry * 4) + 4])[0]    def __readMessageText(self, dbxStream):        address = self.dbxAddress        header = ""        while (address):            dbxStream.seek(address)            header = dbxStream.read(16)            self.dbxLength += self.__getEntry(header, 2)            address          = self.__getEntry(header, 3)        pos = ""        address = self.dbxAddress        while (address):            dbxStream.seek(address)            header  = dbxStream.read(16)            pos    += dbxStream.read(self.__getEntry(header, 2))            address  = self.__getEntry(header, 3)        self.dbxText = pos    def getText(self):        return self.dbxText# This started its SpamBayes life as a private method of the UserInterface# class, but is really a general purpose (Outlook Express) function.def convertToMbox(content):    """Check if the given buffer is in a non-mbox format, and convert it    into mbox format if so.  If it's already an mbox, return it unchanged.    """    dbxStream = StringIO.StringIO(content)    header = dbxFileHeader(dbxStream)    if header.isValid() and header.isMessages():        file_info_len = dbxFileHeader.FH_FILE_INFO_LENGTH        fh_entries = dbxFileHeader.FH_ENTRIES        fh_ptr = dbxFileHeader.FH_TREE_ROOT_NODE_PTR        info = dbxFileInfo(dbxStream, header.getEntry(file_info_len))        entries = header.getEntry(fh_entries)        address = header.getEntry(fh_ptr)        if address and entries:            tree = dbxTree(dbxStream, address, entries)            dbxBuffer = []            for i in range(entries):                address = tree.getValue(i)                messageInfo = dbxMessageInfo(dbxStream, address)                if messageInfo.isIndexed(dbxMessageInfo.MI_MESSAGE_ADDRESS):                    address = dbxMessageInfo.MI_MESSAGE_ADDRESS                    messageAddress = messageInfo.getValueAsLong(address)                    message = dbxMessage(dbxStream, messageAddress)                    # This fakes up a from header to conform to mbox                    # standards.  It would be better to extract this                    # data from the message itself, as this will                    # result in incorrect tokens.                    dbxBuffer.append("From spambayes@spambayes.org %s\n%s" \                                     % (strftime("%a %b %d %H:%M:%S MET %Y",                                                 gmtime()),                                        message.getText()))            content = "".join(dbxBuffer)    dbxStream.close()    return contentdef OEIdentityKeys():    """Return the OE identity keys.    Tested with Outlook Express 6.0 with Windows XP."""    if win32api is None:        # Delayed import error from top.        raise ImportError("pywin32 not installed")    reg = win32api.RegOpenKeyEx(win32con.HKEY_USERS, "")    user_index = 0    while True:        # Loop through all the users        try:            user_name = "%s\\Identities" % \                        (win32api.RegEnumKey(reg, user_index),)        except win32api.error:            break        user_index += 1        try:            user_key = win32api.RegOpenKeyEx(win32con.HKEY_USERS, user_name)        except win32api.error:            # Not this one            continue        identity_index = 0        while True:            # Loop through all the identities            try:                identity_name = win32api.RegEnumKey(user_key,                                                    identity_index)            except win32api.error:                break            identity_index += 1            subkey_name = "%s\\%s\\%s" % (user_name, identity_name,                                          "Software\\Microsoft\\Outlook " \                                          "Express\\5.0")            try:                subkey = win32api.RegOpenKeyEx(win32con.HKEY_USERS,                                               subkey_name, 0,                                               win32con.KEY_READ)            except win32api.error:                # Not this user                continue            yield subkeydef OECurrentUserKey():    """Returns the root registry key for current user Outlook    Express settings."""    if win32api is None:        # Delayed import error from top.        raise ImportError("pywin32 not installed")    key    = "Identities"    reg    = win32api.RegOpenKeyEx(win32con.HKEY_CURRENT_USER, key)    id     = win32api.RegQueryValueEx(reg, "Default User ID")[0]    subKey = "%s\\%s\\Software\\Microsoft\\Outlook Express\\5.0" % (key, id)    return subKeydef OEStoreRoot():    """Return the path to the Outlook Express Store Root.    Tested with Outlook Express 6.0 with Windows XP."""    subKey = OECurrentUserKey()    reg    = win32api.RegOpenKeyEx(win32con.HKEY_CURRENT_USER, subKey)    path   = win32api.RegQueryValueEx(reg, "Store Root")[0]    # I can't find a shellcon to that is the same as %UserProfile%,    # so extract it from CSIDL_LOCAL_APPDATA    UserDirectory = shell.SHGetFolderPath \                    (0, shellcon.CSIDL_LOCAL_APPDATA, 0, 0)    parts = UserDirectory.split(os.sep)    UserProfile = os.sep.join(parts[:-2])    return path.replace("%UserProfile%", UserProfile)def OEDBXFilesList():    """Returns a list of DBX files for current user."""    path = OEStoreRoot()    dbx_re = re.compile('.+\.dbx')    dbxs = [f for f in os.listdir(path) if dbx_re.search(f) != None]    return dbxsdef OEAccountKeys(permission = None):    """Return registry keys for each of the OE mail accounts, along    with information about what type of mail account it is."""    if permission is None:        # Can't do this in the parameter, because then it requires        # win32con to be available for the module to be imported.        permission = win32con.KEY_READ | win32con.KEY_SET_VALUE    possible_root_keys = []    # This appears to be the place for OE6 and WinXP    # (So I'm guessing also for NT4)    if sys.getwindowsversion()[0] >= 4:        possible_root_keys = ["Software\\Microsoft\\" \                             "Internet Account Manager\\Accounts"]    else:        # This appears to be the place for OE6 and Win98        # (So I'm guessing also for Win95)        possible_root_keys = oe_mailbox.OEIdentityKeys()    for key in possible_root_keys:        reg = win32api.RegOpenKeyEx(win32con.HKEY_CURRENT_USER, key)        account_index = 0        while True:            # Loop through all the accounts            account = {}            try:                subkey_name = "%s\\%s" % \                              (key, win32api.RegEnumKey(reg, account_index))            except win32api.error:                break            account_index += 1            index = 0            subkey = win32api.RegOpenKeyEx(win32con.HKEY_CURRENT_USER,                                           subkey_name, 0, permission)            while True:                # Loop through all the keys so that we can determine                # what type of account this is.                try:                    name, value, typ = win32api.RegEnumValue(subkey, index)                except win32api.error:                    break                account[name] = (value, typ)                index += 1            # Yield, as appropriate.            if account.has_key("POP3 Server"):                yield("POP3", subkey, account)            elif account.has_key("IMAP Server"):                yield("IMAP4", subkey, account)def OEIsInstalled():    """Return True if Outlook Express appears to be installed,    and in use (I think if sys.platform == "win32" would say if    it was installed at all)."""    # Our heuristic is that there is at least one mail account setup.    if len(list(OEAccountKeys)) > 0:        return True    return False## For use by the test tools.class OEMsg(msgs.Msg):    def __init__(self, guts, id):        self.tag = id        self.guts = guts# The iterator yields a stream of Msg objects, taken from a list of# dbx files.class OEMsgStream(msgs.MsgStream):    def __init__(self, tag, dbxes, keep=None):        msgs.MsgStream.__init__(self, tag, dbxes, keep)    def produce(self):        if self.keep is None:            for dbx in self.directories:                folder = convertToMbox(file(dbx))                all = folder.split("\nFrom ") # XXX Is this right?                count = 0                for msg in all:                    id = "%s::%s" % (dbx, count)                    count += 1                    yield OEMsg(msg, id)            return        # We only want part of the msgs.  Shuffle each directory list, but        # in such a way that we'll get the same result each time this is        # called on the same directory list.        for directory in self.directories:            folder = convertToMbox(file(dbx))            all = folder.split("\nFrom ") # XXX Is this right?            random.seed(hash(max(all)) ^ SEED) # reproducible across calls            random.shuffle(all)            del all[self.keep:]            all.sort()  # for consistency with MsgStream            count = 0            for msg in all:                id = "%s::%s" % (dbx, count)                count += 1                yield OEMsg(msg, id)class OEHamStream(msgs.HamStream):    def __init__(self, tag, dbxes, train=0):        msgs.HamStream.__init__(self, tag, dbxes, train)class OESpamStream(msgs.SpamStream):    def __init__(self, tag, dbxes, train=0):        msgs.SpamStream.__init__(self, tag, dbxes, train)############################################################################# TEST DRIVER###########################################################################def test():    import sys    import getopt    try:        opts, args = getopt.getopt(sys.argv[1:], 'hp')    except getopt.error, msg:        print >>sys.stderr, str(msg) + '\n\n' + __doc__        sys.exit()    print_message = False    for opt, arg in opts:        if opt == '-h':            print >>sys.stderr, __doc__            sys.exit()        elif opt == '-p':            print_message = True    MAILBOX_DIR = OEStoreRoot()    files = [os.path.join(MAILBOX_DIR, f) for f in OEDBXFilesList()]    for file in files:        try:            print            print file            dbx = open(file, "rb", 0)            header = dbxFileHeader(dbx)            print "IS VALID DBX  :", header.isValid()            if header.isMessages():                info = dbxFileInfo(dbx, header.getEntry(dbxFileHeader.FH_FILE_INFO_LENGTH))                print "MAILBOX NAME  :", info.getFolderName()                print "CREATION TIME :", info.getCreationTime()                entries = header.getEntry(dbxFileHeader.FH_ENTRIES)                address  = header.getEntry(dbxFileHeader.FH_TREE_ROOT_NODE_PTR)                if address and entries:                    tree = dbxTree(dbx, address, entries)                for i in range(entries):                    address = tree.getValue(i)                    messageInfo = dbxMessageInfo(dbx, address)                    if messageInfo.isIndexed(dbxMessageInfo.MI_MESSAGE_ADDRESS):                        messageAddress = messageInfo.getValueAsLong(dbxMessageInfo.MI_MESSAGE_ADDRESS)                        message        = dbxMessage(dbx, messageAddress)                        if print_message:                            print                            print "Message :", messageInfo.getString(dbxMessageInfo.MI_SUBJECT)                            print "=" * (len(messageInfo.getString(dbxMessageInfo.MI_SUBJECT)) + 9)                            print                            print message.getText()            dbx.close()        except Exception, (strerror):            print strerrorif __name__ == '__main__':    test()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -