📄 test_sb_server.py
字号:
#! /usr/bin/env python"""Test the POP3 proxy is working correctly.Given no command line options, carries out a test that thePOP3 proxy can be connected to, that incoming mail is classified,that pipelining is removed from the CAPA[bility] query, and that theweb ui is present.The -t option runs a fake POP3 server on port 8110. This is thesame server that the -z option uses, and may be separately run forother testing purposes.Usage: test_sb-server.py [options] options: -t : Runs a fake POP3 server on port 8110 (for testing). -h : Displays this help message."""# This module is part of the spambayes project, which is Copyright 2002-5# The Python Software Foundation and is covered by the Python Software# Foundation license.__author__ = "Richie Hindle <richie@entrian.com>"__credits__ = "All the Spambayes folk."try: True, Falseexcept NameError: # Maintain compatibility with Python 2.2 True, False = 1, 0# This code originally formed a part of pop3proxy.py. If you are examining# the history of this file, you may need to go back to there.todo = """Web training interface: o Functional tests."""# One example of spam and one of ham - both are used to train, and are# then classified. Not a good test of the classifier, but a perfectly# good test of the POP3 proxy. The bodies of these came from the# spambayes project, and Richie added the headers because the# originals had no headers.spam1 = """From: friend@public.comSubject: Make money fastHello tim_chandler , Want to save money ?Now is a good time to consider refinancing. Rates are low so you can cutyour current payments and save money.http://64.251.22.101/interest/index%38%30%300%2E%68t%6DTake off list on site [s5]"""good1 = """From: chris@example.comSubject: ZPT and DTMLJean Jordaan wrote:> 'Fraid so ;> It contains a vintage dtml-calendar tag.> http://www.zope.org/Members/teyc/CalendarTag>> Hmm I think I see what you mean: one needn't manually pass on the> namespace to a ZPT?Yeah, Page Templates are a bit more clever, sadly, DTML methods aren't :-(Chris"""# An example of a particularly nasty malformed message - where there is# no body, and no separator, which would at one point slip through# SpamBayes. This is an example that Tony made up.malformed1 = """From: ta-meyer@ihug.co.nzSubject: No body, and no separator"""import asyncoreimport socketimport operatorimport reimport timeimport getoptimport sys, osimport sb_test_supportsb_test_support.fix_sys_path()from spambayes import Dibblerfrom spambayes import tokenizerfrom spambayes.UserInterface import UserInterfaceServerfrom spambayes.ProxyUI import ProxyUserInterfacefrom sb_server import BayesProxyListenerfrom sb_server import state, _recreateStatefrom spambayes.Options import options# HEADER_EXAMPLE is the longest possible header - the length of this one# is added to the size of each message.HEADER_EXAMPLE = '%s: xxxxxxxxxxxxxxxxxxxx\r\n' % \ options["Headers", "classification_header_name"]class TestListener(Dibbler.Listener): """Listener for TestPOP3Server. Works on port 8110, to co-exist with real POP3 servers.""" def __init__(self, socketMap=asyncore.socket_map): Dibbler.Listener.__init__(self, 8110, TestPOP3Server, (socketMap,), socketMap=socketMap)class TestPOP3Server(Dibbler.BrighterAsyncChat): """Minimal POP3 server, for testing purposes. Doesn't support UIDL. USER, PASS, APOP, DELE and RSET simply return "+OK" without doing anything. Also understands the 'KILL' command, to kill it, and a 'SLOW' command, to change to really slow retrieval. The mail content is the example messages above. """ def __init__(self, clientSocket, socketMap): # Grumble: asynchat.__init__ doesn't take a 'map' argument, # hence the two-stage construction. Dibbler.BrighterAsyncChat.__init__(self, map=socketMap) Dibbler.BrighterAsyncChat.set_socket(self, clientSocket, socketMap) self.maildrop = [spam1, good1, malformed1] self.set_terminator('\r\n') self.okCommands = ['USER', 'PASS', 'APOP', 'NOOP', 'SLOW', 'DELE', 'RSET', 'QUIT', 'KILL'] self.handlers = {'CAPA': self.onCapa, 'STAT': self.onStat, 'LIST': self.onList, 'RETR': self.onRetr, 'TOP': self.onTop} self.push("+OK ready\r\n") self.request = '' self.push_delay = 0.0 # 0.02 is a useful value for testing. def collect_incoming_data(self, data): """Asynchat override.""" self.request = self.request + data def found_terminator(self): """Asynchat override.""" if ' ' in self.request: command, args = self.request.split(None, 1) else: command, args = self.request, '' command = command.upper() if command in self.okCommands: self.push("+OK (we hope)\r\n") if command == 'QUIT': self.close_when_done() elif command == 'KILL': self.socket.shutdown(2) self.close() raise SystemExit elif command == 'SLOW': self.push_delay = 1.0 else: handler = self.handlers.get(command, self.onUnknown) self.push_slowly(handler(command, args)) self.request = '' def push_slowly(self, response): """Sometimes we push out the response slowly to try and generate timeouts. If the delay is 0, this just does a regular push.""" if self.push_delay: for c in response.split('\n'): if c and c[-1] == '\r': self.push(c + '\n') else: # We want to trigger onServerLine, so need the '\r', # so modify the message just a wee bit. self.push(c + '\r\n') time.sleep(self.push_delay * len(c)) else: self.push(response) def onCapa(self, command, args): """POP3 CAPA command. This lies about supporting pipelining for test purposes - the POP3 proxy *doesn't* support pipelining, and we test that it correctly filters out that capability from the proxied capability list. Ditto for STLS.""" lines = ["+OK Capability list follows", "PIPELINING", "STLS", "TOP", ".", ""] return '\r\n'.join(lines) def onStat(self, command, args): """POP3 STAT command.""" maildropSize = reduce(operator.add, map(len, self.maildrop)) maildropSize += len(self.maildrop) * len(HEADER_EXAMPLE) return "+OK %d %d\r\n" % (len(self.maildrop), maildropSize) def onList(self, command, args): """POP3 LIST command, with optional message number argument.""" if args: try: number = int(args) except ValueError: number = -1 if 0 < number <= len(self.maildrop): return "+OK %d\r\n" % len(self.maildrop[number-1]) else: return "-ERR no such message\r\n" else: returnLines = ["+OK"] for messageIndex in range(len(self.maildrop)): size = len(self.maildrop[messageIndex]) returnLines.append("%d %d" % (messageIndex + 1, size)) returnLines.append(".") return '\r\n'.join(returnLines) + '\r\n' def _getMessage(self, number, maxLines): """Implements the POP3 RETR and TOP commands.""" if 0 < number <= len(self.maildrop): message = self.maildrop[number-1] try: headers, body = message.split('\n\n', 1) except ValueError: return "+OK %d octets\r\n%s\r\n.\r\n" % (len(message), message) bodyLines = body.split('\n')[:maxLines] message = headers + '\r\n\r\n' + '\n'.join(bodyLines) return "+OK\r\n%s\r\n.\r\n" % message else: return "-ERR no such message\r\n" def onRetr(self, command, args): """POP3 RETR command.""" try: number = int(args) except ValueError: number = -1 return self._getMessage(number, 12345) def onTop(self, command, args): """POP3 RETR command.""" try: number, lines = map(int, args.split()) except ValueError: number, lines = -1, -1 return self._getMessage(number, lines) def onUnknown(self, command, args): """Unknown POP3 command.""" return "-ERR Unknown command: %s\r\n" % repr(command)def test(): """Runs a self-test using TestPOP3Server, a minimal POP3 server that serves the example emails above. """ # Run a proxy and a test server in separate threads with separate # asyncore environments. import threading state.isTest = True testServerReady = threading.Event() def runTestServer(): testSocketMap = {} TestListener(socketMap=testSocketMap) testServerReady.set() asyncore.loop(map=testSocketMap) proxyReady = threading.Event() def runUIAndProxy(): httpServer = UserInterfaceServer(8881) proxyUI = ProxyUserInterface(state, _recreateState) httpServer.register(proxyUI) BayesProxyListener('localhost', 8110, ('', 8111)) state.bayes.learn(tokenizer.tokenize(spam1), True) state.bayes.learn(tokenizer.tokenize(good1), False) proxyReady.set() Dibbler.run() threading.Thread(target=runTestServer).start() testServerReady.wait() threading.Thread(target=runUIAndProxy).start() proxyReady.wait() # Connect to the proxy and the test server. proxy = socket.socket(socket.AF_INET, socket.SOCK_STREAM) proxy.connect(('localhost', 8111)) response = proxy.recv(100) assert response == "+OK ready\r\n" pop3Server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) pop3Server.connect(('localhost', 8110)) response = pop3Server.recv(100) assert response == "+OK ready\r\n" # Verify that the test server claims to support pipelining. pop3Server.send("capa\r\n") response = pop3Server.recv(1000) assert response.find("PIPELINING") >= 0 # Ask for the capabilities via the proxy, and verify that the proxy # is filtering out the PIPELINING capability. proxy.send("capa\r\n") response = proxy.recv(1000) assert response.find("PIPELINING") == -1 # Verify that the test server claims to support STLS. pop3Server.send("capa\r\n") response = pop3Server.recv(1000) assert response.find("STLS") >= 0 # Ask for the capabilities via the proxy, and verify that the proxy # is filtering out the STLS capability. proxy.send("capa\r\n") response = proxy.recv(1000) assert response.find("STLS") == -1 # Stat the mailbox to get the number of messages. proxy.send("stat\r\n") response = proxy.recv(100) count, totalSize = map(int, response.split()[1:3]) assert count == 3 # Loop through the messages ensuring that they have judgement # headers. for i in range(1, count+1): response = "" proxy.send("retr %d\r\n" % i) while response.find('\n.\r\n') == -1: response = response + proxy.recv(1000) assert response.find(options["Headers", "classification_header_name"]) >= 0 # Check that the proxy times out when it should. options["pop3proxy", "retrieval_timeout"] = 30 options["Headers", "include_evidence"] = False assert spam1.find('\n\n') > options["pop3proxy", "retrieval_timeout"] print "This test is rather slow..." proxy.send("slow\r\n") response = proxy.recv(100) assert response.find("OK") != -1 proxy.send("retr 1\r\n") response = proxy.recv(1000) assert len(response) < len(spam1) print "Slow test done. Thanks for waiting!" # Smoke-test the HTML UI. httpServer = socket.socket(socket.AF_INET, socket.SOCK_STREAM) httpServer.connect(('localhost', 8881)) httpServer.sendall("get / HTTP/1.0\r\n\r\n") response = '' while 1: packet = httpServer.recv(1000) if not packet: break response += packet assert re.search(r"(?s)<html>.*SpamBayes proxy.*</html>", response) # Kill the proxy and the test server. proxy.sendall("kill\r\n") proxy.recv(100) pop3Server.sendall("kill\r\n") pop3Server.recv(100)def run(): # Read the arguments. try: opts, args = getopt.getopt(sys.argv[1:], 'ht') except getopt.error, msg: print >>sys.stderr, str(msg) + '\n\n' + __doc__ sys.exit() state.isTest = True runSelfTest = True for opt, arg in opts: if opt == '-h': print >>sys.stderr, __doc__ sys.exit() elif opt == '-t': state.isTest = True state.runTestServer = True runSelfTest = False state.createWorkers() if runSelfTest: print "\nRunning self-test...\n" state.buildServerStrings() test() print "Self-test passed." # ...else it would have asserted. elif state.runTestServer: print "Running a test POP3 server on port 8110..." TestListener() asyncore.loop()if __name__ == '__main__': run()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -