⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 test_smtpproxy.py

📁 用python实现的邮件过滤器
💻 PY
字号:
#! /usr/bin/env python"""Test that the SMTP proxy is working correctly.The -t option runs a fake SMTP server on port 8025.  This is thesame server that the testing option uses, and may be separately run forother testing purposes.Usage:    test_smtpproxy.py [options]        options:            -t      : Runs a fake SMTP server on port 8025 (for testing).            -h      : Displays this help message.Any other options runs this in the standard Python unittest form."""# This module is part of the spambayes project, which is Copyright 2002-4# The Python Software Foundation and is covered by the Python Software# Foundation license.__author__ = "Tony Meyer <ta-meyer@ihug.co.nz>"__credits__ = "Richie Hindle, Mark Hammond, all the SpamBayes folk."try:    True, Falseexcept NameError:    # Maintain compatibility with Python 2.2    True, False = 1, 0# One example of spam and one of ham - both are used to train, and are# then classified.  Not a good test of the classifier, but a perfectly# good test of the SMTP proxy.  These are the same messages as in the# POP3 proxy test (test_sb-server.py).spam1 = """From: friend@public.comSubject: Make money fastHello tim_chandler , Want to save money ?Now is a good time to consider refinancing. Rates are low so you can cutyour current payments and save money.http://64.251.22.101/interest/index%38%30%300%2E%68t%6DTake off list on site [s5]"""good1 = """From: chris@example.comSubject: ZPT and DTMLJean Jordaan wrote:> 'Fraid so ;>  It contains a vintage dtml-calendar tag.>   http://www.zope.org/Members/teyc/CalendarTag>> Hmm I think I see what you mean: one needn't manually pass on the> namespace to a ZPT?Yeah, Page Templates are a bit more clever, sadly, DTML methods aren't :-(Chris"""import reimport sysimport socketimport getoptimport asyncoreimport operatorimport unittestimport threadimport smtplibimport sb_test_supportsb_test_support.fix_sys_path()from spambayes import Dibblerfrom spambayes import tokenizerfrom spambayes.Options import optionsfrom sb_server import state, _recreateStatefrom spambayes.smtpproxy import BayesSMTPProxyListener, SMTPTrainerfrom spambayes.ProxyUI import ProxyUserInterfacefrom spambayes.UserInterface import UserInterfaceServerfrom spambayes.classifier import Classifierclass TestListener(Dibbler.Listener):    """Listener for TestSMTPServer."""    def __init__(self, socketMap=asyncore.socket_map):        Dibbler.Listener.__init__(self, 8025, TestSMTPServer,                              (socketMap,), socketMap=socketMap)class TestSMTPServer(Dibbler.BrighterAsyncChat):    """Minimal SMTP server, for testing purposes.  Understands    "MAIL FROM", "RCPT TO", "DATA" and "QUIT".  All mail is    simply discarded. Also understands the 'KILL' command, to    kill it."""    def __init__(self, clientSocket, socketMap):        # Grumble: asynchat.__init__ doesn't take a 'map' argument,        # hence the two-stage construction.        Dibbler.BrighterAsyncChat.__init__(self, map=socketMap)        Dibbler.BrighterAsyncChat.set_socket(self, clientSocket, socketMap)        self.set_terminator('\r\n')        self.okCommands = ['MAIL FROM:', 'RCPT TO:', 'DATA', 'QUIT', 'KILL',]        self.handlers = {'MAIL FROM:': self.onFrom,                         'RCPT TO:': self.onTo,                         'DATA': self.onData,                         'QUIT': self.onQuit,                         'KILL': self.onKill,                         }        self.push("220 SpamBayes test SMTP server ready\r\n")        self.request = ''        self.inData = False    def collect_incoming_data(self, data):        """Asynchat override."""        self.request = self.request + data    def push(self, data):        Dibbler.BrighterAsyncChat.push(self, data)    def recv(self, buffer_size):        """Asynchat override."""        try:            return Dibbler.BrighterAsyncChat.recv(self, buffer_size)        except socket.error, e:            if e[0] == 10035:                return ''            raise    def found_terminator(self):        """Asynchat override."""        if self.inData:            # Just throw the data away, unless it is the terminator.            if self.request.strip() == '.':                self.inData = False                self.push("250 Message accepted for delivery\r\n")        else:            self.request = self.request.upper()            foundCmd = False            for cmd in self.okCommands:                if self.request.startswith(cmd):                    handler = self.handlers[cmd]                    cooked = handler(self.request[len(cmd):])                    if cooked is not None:                        self.push(cooked)                    foundCmd = True                    break            if not foundCmd:                # Something we don't know about.  Assume that it is ok!                self.push("250 Unknown command %s ok.\r\n" %                          (self.request,))        self.request = ''    def onKill(self, args):        self.push("221 Goodbye\n") # Why not be polite <wink>        self.socket.shutdown(2)        self.close()        raise SystemExit    def onQuit(self, args):        self.push("221 Goodbye\r\n")        self.close_when_done()    def onFrom(self, args):        # We don't care who it is from.        return "250 %s... Sender ok\r\n" % (args.lower(),)    def onTo(self, args):        if args == options["smtpproxy", "ham_address"].upper():            return "504 This command should not have got to the server\r\n"        elif args == options["smtpproxy", "spam_address"].upper():            return "504 This command should not have got to the server\r\n"        return "250 %s... Recipient ok\r\n" % (args.lower(),)    def onData(self, args):        self.inData = True        return '354 Enter mail, end with "." on a line by itself\r\n'class SMTPProxyTest(unittest.TestCase):    """Runs a self-test using TestSMTPServer, a minimal SMTP server    that receives mail and discards it."""    def setUp(self):        pass    def tearDown(self):        pass    def test_direct_connection(self):        # Connect to the test server.        smtpServer = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        smtpServer.connect(('localhost', 8025))        try:            response = smtpServer.recv(100)        except socket.error, e:            if e[0] == 10035:                # non-blocking socket so that the recognition                # can proceed, so this doesn't mean much                pass            else:                raise        self.assertEqual(response, "220 SpamBayes test SMTP server ready\r\n",                         "Couldn't connect to test SMTP server")        smtpServer.send('quit\r\n')    def test_proxy_connection(self):        # Connect to the proxy server.        proxy = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        proxy.connect(('localhost', 8026))        try:            response = proxy.recv(100)        except socket.error, e:            if e[0] == 10035:                # non-blocking socket so that the recognition                # can proceed, so this doesn't mean much                pass            else:                raise        self.assertEqual(response, "220 SpamBayes test SMTP server ready\r\n",                         "Couldn't connect to proxy server")        proxy.send('quit\r\n')    def test_disconnection(self):        proxy = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        proxy.connect(('localhost', 8025))        try:            response = proxy.recv(100)        except socket.error, e:            if e[0] == 10053:                # Socket is dead, which is what we want.                pass            else:                raise        proxy.send("quit\r\n")        try:            response = proxy.recv(100)        except socket.error, e:            if e[0] == 10053:                # Socket is dead, which is what we want.                pass            else:                raise        self.assertEqual(response, "221 Goodbye\r\n",                         "Couldn't disconnect from SMTP server")    def test_sendmessage(self):        s = smtplib.SMTP('localhost', 8026)        s.sendmail("ta-meyer@ihug.co.nz", "ta-meyer@ihug.co.nz", good1)        s.quit()    def test_ham_intercept(self):        pre_ham_trained = bayes.nham        s = smtplib.SMTP('localhost', 8026)        s.sendmail("ta-meyer@ihug.co.nz",                   options["smtpproxy", "ham_address"], good1)        s.quit()        post_ham_trained = bayes.nham        self.assertEqual(pre_ham_trained+1, post_ham_trained)def suite():    suite = unittest.TestSuite()    suite.addTest(unittest.makeSuite(SMTPProxyTest))    return suitedef run():    # Read the arguments.    try:        opts, args = getopt.getopt(sys.argv[1:], 'ht')    except getopt.error, msg:        print >>sys.stderr, str(msg) + '\n\n' + __doc__        sys.exit()    for opt, arg in opts:        if opt == '-h':            print >>sys.stderr, __doc__            sys.exit()        elif opt == '-t':            state.isTest = True            state.runTestServer = True    state.createWorkers()    if state.runTestServer:        print "Running a test SMTP server on port 8025..."        TestListener()        asyncore.loop()    else:        state.isTest = True        state.buildServerStrings()        testSocketMap = {}        def runTestServer():            TestListener(socketMap=testSocketMap)            asyncore.loop(map=testSocketMap)        def runProxy():            global bayes            bayes = Classifier()            trainer = SMTPTrainer(bayes, state)            BayesSMTPProxyListener('localhost', 8025, ('', 8026), trainer)            Dibbler.run()        thread.start_new_thread(runTestServer, ())        thread.start_new_thread(runProxy, ())        sb_test_support.unittest_main(argv=sys.argv + ['suite'])if __name__ == '__main__':    run()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -