⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 test_sb_imapfilter.py

📁 用python实现的邮件过滤器
💻 PY
📖 第 1 页 / 共 3 页
字号:
# Test sb_imapfilter script.import reimport sysimport timeimport emailimport typesimport socketimport threadimport imaplibimport unittestimport asyncoreimport StringIOimport sb_test_supportsb_test_support.fix_sys_path()from spambayes import messagefrom spambayes import Dibblerfrom spambayes.Options import optionsfrom spambayes.classifier import Classifierfrom sb_imapfilter import run, BadIMAPResponseError, LoginFailurefrom sb_imapfilter import IMAPSession, IMAPMessage, IMAPFolder, IMAPFilterIMAP_PORT = 8143IMAP_USERNAME = "testu"IMAP_PASSWORD = "testp"IMAP_FOLDER_LIST = ["INBOX", "unsure", "ham_to_train", "spam",                    "spam_to_train"]# Must be different.SB_ID_1 = "test@spambayes.invalid"SB_ID_2 = "14102004"# Key is UID.IMAP_MESSAGES = {    # 101 should be valid and have a MessageID header, but no    # X-Spambayes-MessageID header.    101 : """Subject: Test\rMessage-ID: <%s>\r\rBody test.""" % (SB_ID_1,),    # 102 should be valid and have both a MessageID header and a    # X-Spambayes-MessageID header.    102 : """Subject: Test2\rMessage-ID: <%s>\r%s: %s\r\rAnother body test.""" % (SB_ID_1, options["Headers", "mailid_header_name"],                         SB_ID_2),    # 103 is taken from Anthony's email torture test (the    # test_zero-length-boundary file).    103 : """Received: from noisy-2-82-67-182-141.fbx.proxad.net(82.67.182.141) via SMTP by mx1.example.com, id smtpdAAAzMayUR; Tue Apr 27 18:56:48 2004Return-Path: " Freeman" <XLUPSYGSHLBAPN@runbox.com>Received: from  rly-xn05.mx.aol.com (rly-xn05.mail.aol.com [172.20.83.138]) by air-xn02.mail.aol.com (v98.10) with ESMTP id MAILINXN22-6504043449c151; Tue, 27 Apr 2004 16:57:46 -0300Received: from 132.16.224.107 by 82.67.182.141; Tue, 27 Apr 2004 14:54:46 -0500From: " Gilliam" <.@doramail.com>To: To: user@example.comSubject: Your Source For Online Prescriptions....Soma-Watson..VALIUM-Roche    .		Date: Wed, 28 Apr 2004 00:52:46 +0500Mime-Version: 1.0Content-Type: multipart/alternative;        boundary=""X-Mailer: AOL 7.0 for Windows US sub 118X-AOL-IP: 114.204.176.98X-AOL-SCOLL-SCORE: 1:XXX:XXX-AOL-SCOLL-URL_COUNT: 2Message-ID: <@XLUPSYGSHLBAPN@runbox.com>--Content-Type: text/html;        charset="iso-8859-1"Content-Transfer-Encoding: quoted-printable<strong><a href=3D"http://www.ibshels454drugs.biz/c39/">ENTER HERE</a> toORDER MEDS Online, such as XANAX..VALIUM..SOMA..Much MORE SHIPPEDOVERNIGHT,to US and INTERNATIONAL</strong>---""",    # 104 should be valid and have neither a MessageID header nor a    # X-Spambayes-MessageID header.    104 : """Subject: Test2\r\rYet another body test.""",    }# Map of ID -> UIDIMAP_UIDS = {1 : 101, 2: 102, 3:103, 4:104}# Messages that are UNDELETEDUNDELETED_IDS = (1,2)class TestListener(Dibbler.Listener):    """Listener for TestIMAP4Server."""    def __init__(self, socketMap=asyncore.socket_map):        Dibbler.Listener.__init__(self, IMAP_PORT, TestIMAP4Server,                                  (socketMap,), socketMap=socketMap)# If true, the next command will fail, whatever it is.FAIL_NEXT = Falseclass TestIMAP4Server(Dibbler.BrighterAsyncChat):    """Minimal IMAP4 server, for testing purposes.  Accepts a limited    subset of commands, and also a KILL command, to terminate."""    def __init__(self, clientSocket, socketMap):        # Grumble: asynchat.__init__ doesn't take a 'map' argument,        # hence the two-stage construction.        Dibbler.BrighterAsyncChat.__init__(self)        Dibbler.BrighterAsyncChat.set_socket(self, clientSocket, socketMap)        self.set_terminator('\r\n')        # okCommands are just ignored (we pass back a happy this-was-fine        # answer, and do nothing.        self.okCommands = ['NOOP', 'LOGOUT', 'CAPABILITY', 'KILL']        # These commands actually result in something.        self.handlers = {'LIST' : self.onList,                         'LOGIN' : self.onLogin,                         'SELECT' : self.onSelect,                         'FETCH' : self.onFetch,                         'SEARCH' : self.onSearch,                         'UID' : self.onUID,                         'APPEND' : self.onAppend,                         'STORE' : self.onStore,                         }        self.push("* OK [CAPABILITY IMAP4REV1 AUTH=LOGIN] " \                  "localhost IMAP4rev1\r\n")        self.request = ''        self.next_id = 0        self.in_literal = (0, None)    def collect_incoming_data(self, data):        """Asynchat override."""        if self.in_literal[0] > 0:            # Also add the line breaks.            self.request = "%s\r\n%s" % (self.request, data)        else:            self.request = self.request + data    def found_terminator(self):        """Asynchat override."""        global FAIL_NEXT        if self.in_literal[0] > 0:            if len(self.request) >= self.in_literal[0]:                self.push(self.in_literal[1](self.request,                                             *self.in_literal[2]))                self.in_literal = (0, None)                self.request = ''            return                id, command = self.request.split(None, 1)        if FAIL_NEXT:            FAIL_NEXT = False            self.push("%s NO Was told to fail.\r\n" % (id,))        if ' ' in command:            command, args = command.split(None, 1)        else:            args = ''        command = command.upper()        if command in self.okCommands:            self.push("%s OK (we hope)\r\n" % (id,))            if command == 'LOGOUT':                self.close_when_done()            if command == 'KILL':                self.socket.shutdown(2)                self.close()                raise SystemExit()        else:            handler = self.handlers.get(command, self.onUnknown)            self.push(handler(id, command, args, False))  # Or push_slowly for testing        self.request = ''    def push_slowly(self, response):        """Useful for testing."""        for c in response:            self.push(c)            time.sleep(0.02)    def onLogin(self, id, command, args, uid=False):        """Log in to server."""        username, password = args.split(None, 1)        username = username.strip('"')        password = password.strip('"')        if username == IMAP_USERNAME and password == IMAP_PASSWORD:            return "%s OK [CAPABILITY IMAP4REV1] User %s " \                   "authenticated.\r\n" % (id, username)        return "%s NO LOGIN failed\r\n" % (id,)    def onList(self, id, command, args, uid=False):        """Return list of folders."""        base = '\r\n* LIST (\\NoInferiors \\UnMarked) "/" '        return "%s%s\r\n%s OK LIST completed\r\n" % \               (base[2:], base.join(IMAP_FOLDER_LIST), id)    def onStore(self, id, command, args, uid=False):        # We ignore flags.        return "%s OK STORE completed\r\n" % (id,)    def onSelect(self, id, command, args, uid=False):        exists = "* %d EXISTS" % (len(IMAP_MESSAGES),)        recent = "* 0 RECENT"        uidv = "* OK [UIDVALIDITY 1091599302] UID validity status"        next_uid = "* OK [UIDNEXT 23] Predicted next UID"        flags = "* FLAGS (\Answered \Flagged \Deleted \Draft \Seen)"        perm_flags = "* OK [PERMANENTFLAGS (\* \Answered \Flagged " \                     "\Deleted \Draft \Seen)] Permanent flags"        complete = "%s OK [READ-WRITE] SELECT completed" % (id,)        return "%s\r\n" % ("\r\n".join([exists, recent, uidv, next_uid,                                        flags, perm_flags, complete]),)    def onAppend(self, id, command, args, uid=False):        # Only stores for this session.        folder, args = args.split(None, 1)        # We ignore the folder.        if ')' in args:            flags, args = args.split(')', 1)            flags = flags[1:]            # We ignore the flags.        unused, date, args = args.split('"', 2)        # We ignore the date.        if '{' in args:            # A literal.            size = int(args[2:-1])            self.in_literal = (size, self.appendLiteral, (id,))            return "+ Ready for argument\r\n"        # Strip off the space at the front.        return self.appendLiteral(args[1:], id)    def appendLiteral(self, message, command_id):        while True:            id = self.next_id            self.next_id += 1            if id not in IMAP_MESSAGES:                break        IMAP_MESSAGES[id] = message        return "* APPEND %s\r\n%s OK APPEND succeeded\r\n" % \               (id, command_id)    def onSearch(self, id, command, args, uid=False):        args = args.upper()        results = ()        if args.find("UNDELETED") != -1:            for msg_id in UNDELETED_IDS:                if uid:                    results += (IMAP_UIDS[msg_id],)                else:                    results += (msg_id,)        if uid:            command_string = "UID " + command        else:            command_string = command        return "%s\r\n%s OK %s completed\r\n" % \               ("* SEARCH " + ' '.join([str(r) for r in results]), id,                command_string)    def onFetch(self, id, command, args, uid=False):        msg_nums, msg_parts = args.split(None, 1)        msg_nums = msg_nums.split()        response = {}        for msg in msg_nums:            response[msg] = []        if msg_parts.find("UID") != -1:            if uid:                for msg in msg_nums:                    response[msg].append("FETCH (UID %s)" % (msg,))            else:                for msg in msg_nums:                    response[msg].append("FETCH (UID %s)" %                                         (IMAP_UIDS[int(msg)]))        if msg_parts.find("BODY.PEEK[]") != -1:            for msg in msg_nums:                if uid:                    msg_uid = int(msg)                else:                    msg_uid = IMAP_UIDS[int(msg)]                response[msg].append(("FETCH (BODY[] {%s}" %                                     (len(IMAP_MESSAGES[msg_uid])),                                     IMAP_MESSAGES[msg_uid]))        if msg_parts.find("RFC822.HEADER") != -1:            for msg in msg_nums:

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -