📄 test_sb_imapfilter.py
字号:
# Test sb_imapfilter script.import reimport sysimport timeimport emailimport typesimport socketimport threadimport imaplibimport unittestimport asyncoreimport StringIOimport sb_test_supportsb_test_support.fix_sys_path()from spambayes import messagefrom spambayes import Dibblerfrom spambayes.Options import optionsfrom spambayes.classifier import Classifierfrom sb_imapfilter import run, BadIMAPResponseError, LoginFailurefrom sb_imapfilter import IMAPSession, IMAPMessage, IMAPFolder, IMAPFilterIMAP_PORT = 8143IMAP_USERNAME = "testu"IMAP_PASSWORD = "testp"IMAP_FOLDER_LIST = ["INBOX", "unsure", "ham_to_train", "spam", "spam_to_train"]# Must be different.SB_ID_1 = "test@spambayes.invalid"SB_ID_2 = "14102004"# Key is UID.IMAP_MESSAGES = { # 101 should be valid and have a MessageID header, but no # X-Spambayes-MessageID header. 101 : """Subject: Test\rMessage-ID: <%s>\r\rBody test.""" % (SB_ID_1,), # 102 should be valid and have both a MessageID header and a # X-Spambayes-MessageID header. 102 : """Subject: Test2\rMessage-ID: <%s>\r%s: %s\r\rAnother body test.""" % (SB_ID_1, options["Headers", "mailid_header_name"], SB_ID_2), # 103 is taken from Anthony's email torture test (the # test_zero-length-boundary file). 103 : """Received: from noisy-2-82-67-182-141.fbx.proxad.net(82.67.182.141) via SMTP by mx1.example.com, id smtpdAAAzMayUR; Tue Apr 27 18:56:48 2004Return-Path: " Freeman" <XLUPSYGSHLBAPN@runbox.com>Received: from rly-xn05.mx.aol.com (rly-xn05.mail.aol.com [172.20.83.138]) by air-xn02.mail.aol.com (v98.10) with ESMTP id MAILINXN22-6504043449c151; Tue, 27 Apr 2004 16:57:46 -0300Received: from 132.16.224.107 by 82.67.182.141; Tue, 27 Apr 2004 14:54:46 -0500From: " Gilliam" <.@doramail.com>To: To: user@example.comSubject: Your Source For Online Prescriptions....Soma-Watson..VALIUM-Roche . Date: Wed, 28 Apr 2004 00:52:46 +0500Mime-Version: 1.0Content-Type: multipart/alternative; boundary=""X-Mailer: AOL 7.0 for Windows US sub 118X-AOL-IP: 114.204.176.98X-AOL-SCOLL-SCORE: 1:XXX:XXX-AOL-SCOLL-URL_COUNT: 2Message-ID: <@XLUPSYGSHLBAPN@runbox.com>--Content-Type: text/html; charset="iso-8859-1"Content-Transfer-Encoding: quoted-printable<strong><a href=3D"http://www.ibshels454drugs.biz/c39/">ENTER HERE</a> toORDER MEDS Online, such as XANAX..VALIUM..SOMA..Much MORE SHIPPEDOVERNIGHT,to US and INTERNATIONAL</strong>---""", # 104 should be valid and have neither a MessageID header nor a # X-Spambayes-MessageID header. 104 : """Subject: Test2\r\rYet another body test.""", }# Map of ID -> UIDIMAP_UIDS = {1 : 101, 2: 102, 3:103, 4:104}# Messages that are UNDELETEDUNDELETED_IDS = (1,2)class TestListener(Dibbler.Listener): """Listener for TestIMAP4Server.""" def __init__(self, socketMap=asyncore.socket_map): Dibbler.Listener.__init__(self, IMAP_PORT, TestIMAP4Server, (socketMap,), socketMap=socketMap)# If true, the next command will fail, whatever it is.FAIL_NEXT = Falseclass TestIMAP4Server(Dibbler.BrighterAsyncChat): """Minimal IMAP4 server, for testing purposes. Accepts a limited subset of commands, and also a KILL command, to terminate.""" def __init__(self, clientSocket, socketMap): # Grumble: asynchat.__init__ doesn't take a 'map' argument, # hence the two-stage construction. Dibbler.BrighterAsyncChat.__init__(self) Dibbler.BrighterAsyncChat.set_socket(self, clientSocket, socketMap) self.set_terminator('\r\n') # okCommands are just ignored (we pass back a happy this-was-fine # answer, and do nothing. self.okCommands = ['NOOP', 'LOGOUT', 'CAPABILITY', 'KILL'] # These commands actually result in something. self.handlers = {'LIST' : self.onList, 'LOGIN' : self.onLogin, 'SELECT' : self.onSelect, 'FETCH' : self.onFetch, 'SEARCH' : self.onSearch, 'UID' : self.onUID, 'APPEND' : self.onAppend, 'STORE' : self.onStore, } self.push("* OK [CAPABILITY IMAP4REV1 AUTH=LOGIN] " \ "localhost IMAP4rev1\r\n") self.request = '' self.next_id = 0 self.in_literal = (0, None) def collect_incoming_data(self, data): """Asynchat override.""" if self.in_literal[0] > 0: # Also add the line breaks. self.request = "%s\r\n%s" % (self.request, data) else: self.request = self.request + data def found_terminator(self): """Asynchat override.""" global FAIL_NEXT if self.in_literal[0] > 0: if len(self.request) >= self.in_literal[0]: self.push(self.in_literal[1](self.request, *self.in_literal[2])) self.in_literal = (0, None) self.request = '' return id, command = self.request.split(None, 1) if FAIL_NEXT: FAIL_NEXT = False self.push("%s NO Was told to fail.\r\n" % (id,)) if ' ' in command: command, args = command.split(None, 1) else: args = '' command = command.upper() if command in self.okCommands: self.push("%s OK (we hope)\r\n" % (id,)) if command == 'LOGOUT': self.close_when_done() if command == 'KILL': self.socket.shutdown(2) self.close() raise SystemExit() else: handler = self.handlers.get(command, self.onUnknown) self.push(handler(id, command, args, False)) # Or push_slowly for testing self.request = '' def push_slowly(self, response): """Useful for testing.""" for c in response: self.push(c) time.sleep(0.02) def onLogin(self, id, command, args, uid=False): """Log in to server.""" username, password = args.split(None, 1) username = username.strip('"') password = password.strip('"') if username == IMAP_USERNAME and password == IMAP_PASSWORD: return "%s OK [CAPABILITY IMAP4REV1] User %s " \ "authenticated.\r\n" % (id, username) return "%s NO LOGIN failed\r\n" % (id,) def onList(self, id, command, args, uid=False): """Return list of folders.""" base = '\r\n* LIST (\\NoInferiors \\UnMarked) "/" ' return "%s%s\r\n%s OK LIST completed\r\n" % \ (base[2:], base.join(IMAP_FOLDER_LIST), id) def onStore(self, id, command, args, uid=False): # We ignore flags. return "%s OK STORE completed\r\n" % (id,) def onSelect(self, id, command, args, uid=False): exists = "* %d EXISTS" % (len(IMAP_MESSAGES),) recent = "* 0 RECENT" uidv = "* OK [UIDVALIDITY 1091599302] UID validity status" next_uid = "* OK [UIDNEXT 23] Predicted next UID" flags = "* FLAGS (\Answered \Flagged \Deleted \Draft \Seen)" perm_flags = "* OK [PERMANENTFLAGS (\* \Answered \Flagged " \ "\Deleted \Draft \Seen)] Permanent flags" complete = "%s OK [READ-WRITE] SELECT completed" % (id,) return "%s\r\n" % ("\r\n".join([exists, recent, uidv, next_uid, flags, perm_flags, complete]),) def onAppend(self, id, command, args, uid=False): # Only stores for this session. folder, args = args.split(None, 1) # We ignore the folder. if ')' in args: flags, args = args.split(')', 1) flags = flags[1:] # We ignore the flags. unused, date, args = args.split('"', 2) # We ignore the date. if '{' in args: # A literal. size = int(args[2:-1]) self.in_literal = (size, self.appendLiteral, (id,)) return "+ Ready for argument\r\n" # Strip off the space at the front. return self.appendLiteral(args[1:], id) def appendLiteral(self, message, command_id): while True: id = self.next_id self.next_id += 1 if id not in IMAP_MESSAGES: break IMAP_MESSAGES[id] = message return "* APPEND %s\r\n%s OK APPEND succeeded\r\n" % \ (id, command_id) def onSearch(self, id, command, args, uid=False): args = args.upper() results = () if args.find("UNDELETED") != -1: for msg_id in UNDELETED_IDS: if uid: results += (IMAP_UIDS[msg_id],) else: results += (msg_id,) if uid: command_string = "UID " + command else: command_string = command return "%s\r\n%s OK %s completed\r\n" % \ ("* SEARCH " + ' '.join([str(r) for r in results]), id, command_string) def onFetch(self, id, command, args, uid=False): msg_nums, msg_parts = args.split(None, 1) msg_nums = msg_nums.split() response = {} for msg in msg_nums: response[msg] = [] if msg_parts.find("UID") != -1: if uid: for msg in msg_nums: response[msg].append("FETCH (UID %s)" % (msg,)) else: for msg in msg_nums: response[msg].append("FETCH (UID %s)" % (IMAP_UIDS[int(msg)])) if msg_parts.find("BODY.PEEK[]") != -1: for msg in msg_nums: if uid: msg_uid = int(msg) else: msg_uid = IMAP_UIDS[int(msg)] response[msg].append(("FETCH (BODY[] {%s}" % (len(IMAP_MESSAGES[msg_uid])), IMAP_MESSAGES[msg_uid])) if msg_parts.find("RFC822.HEADER") != -1: for msg in msg_nums:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -