⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 test_filecorpus.py

📁 用python实现的邮件过滤器
💻 PY
字号:
# Test the FileCorpus module.import osimport sysimport timeimport gzipimport errnoimport unittestimport sb_test_supportsb_test_support.fix_sys_path()from spambayes import storagefrom spambayes.FileCorpus import ExpiryFileCorpusfrom spambayes.FileCorpus import FileCorpus, FileMessage, GzipFileMessagefrom spambayes.FileCorpus import FileMessageFactory, GzipFileMessageFactory# We borrow the test messages that test_sb_server uses.from test_sb_server import good1, spam1, malformed1class _FactoryBaseTest(unittest.TestCase):    # Subclass must define a concrete factory.    factory = None    def test_create_no_content(self):        f = self.factory()        key = "testmessage"        directory = "fctesthamcorpus"        msg = f.create(key, directory)        self.assertEqual(msg.file_name, key)        self.assertEqual(msg.directory, directory)        self.assertEqual(msg.loaded, False)    def test_create_with_content(self):        f = self.factory()        key = "testmessage"        directory = "fctesthamcorpus"        content = good1        msg = f.create(key, directory, content=good1)        self.assertEqual(msg.file_name, key)        self.assertEqual(msg.directory, directory)        self.assertEqual(msg.loaded, True)        self.assertEqual(msg.as_string(), good1.replace("\n", "\r\n"))class FileMessageFactoryTest(_FactoryBaseTest):    factory = FileMessageFactory    def test_klass(self):        self.assertEqual(self.factory.klass, FileMessage)class GzipFileMessageFactoryTest(_FactoryBaseTest):    factory = GzipFileMessageFactory    def test_klass(self):        self.assertEqual(self.factory.klass, GzipFileMessage)class _FileCorpusBaseTest(unittest.TestCase):    def _setUpDirectory(self, dirname):        try:            os.mkdir(dirname)        except OSError, e:            if e[0] != errno.EEXIST:                raise    def setUp(self):        # Make corpus directories.        self._setUpDirectory('fctestspamcorpus')        self._setUpDirectory('fctesthamcorpus')        self._setUpDirectory('fctestunsurecorpus')    def _tearDownDirectory(self, dirname):        try:            flist = os.listdir(dirname)        except OSError, e:            if e.errno != 3:                raise        else:            for filename in flist:                fn = os.path.join(dirname, filename)                os.unlink(fn)        try:            os.rmdir(dirname)        except OSError, e:            if e.errno != 2:                raise    def tearDown(self):        self._tearDownDirectory('fctestspamcorpus')        self._tearDownDirectory('fctesthamcorpus')        self._tearDownDirectory('fctestunsurecorpus')        try:            os.unlink('fctestmisc.bayes')        except OSError, e:            if e.errno != 2:                raise        try:            os.unlink('fctestclass.bayes')        except OSError, e:            if e.errno != 2:                raiseclass _FileMessageBaseTest(_FileCorpusBaseTest):    # Subclass must define a concrete message klass, and wrong_klass.    klass = None    wrong_klass = None    def setUp(self):        _FileCorpusBaseTest.setUp(self)        self.filename = "testmessage"        self.directory = "fctestspamcorpus"        fn = os.path.join(self.directory, self.filename)        try:            os.remove(fn)        except OSError:            pass        f = open(fn, "w")        self.created_time = time.time()        f.write(spam1)        f.close()        self.msg = self.klass(self.filename, self.directory)        # Message of wrong type, to test mixed corpus.        self.wrongname = "wrongmessage"        def good_as_string():            return good1        wrong_msg = self.wrong_klass(self.wrongname, self.directory)        wrong_msg.as_string = good_as_string        wrong_msg.store()    def tearDown(self):        fn = os.path.join(self.directory, self.filename)        try:            os.remove(fn)        except OSError:            pass            fn = os.path.join(self.directory, self.wrongname)        try:            os.remove(fn)        except OSError:            pass        _FileCorpusBaseTest.tearDown(self)    def test___init__(self):        self.assertEqual(self.msg.file_name, self.filename)        self.assertEqual(self.msg.directory, self.directory)        self.assertEqual(self.msg.loaded, False)    def test_as_string(self):        self.assertEqual(self.msg.as_string(), spam1.replace("\n", "\r\n"))    def test_pathname(self):        self.assertEqual(self.msg.pathname(), os.path.join(self.directory,                                                           self.filename))    def test_name(self):        self.assertEqual(self.msg.name(), self.filename)    def test_key(self):        self.assertEqual(self.msg.key(), self.filename)    def test_createTimestamp(self):        timestamp = self.msg.createTimestamp()        # As long as they are equal to the nearest second, that will do.        self.assertEqual(int(timestamp), int(self.created_time))    def test_remove(self):        pathname = os.path.join(self.directory, self.filename)        self.assertEqual(os.path.exists(pathname), True)        self.msg.remove()        self.assertEqual(os.path.exists(pathname), False)    def test_remove_not_there(self):        pathname = os.path.join(self.directory, self.filename)        self.assertEqual(os.path.exists(pathname), True)        os.remove(pathname)        self.msg.remove()        self.assertEqual(os.path.exists(pathname), False)    def test_load(self):        # Load correct type.        self.assertEqual(self.msg.loaded, False)        self.msg.load()        self.assertEqual(self.msg.loaded, True)        self.assertEqual(self.msg.as_string(), spam1.replace("\n", "\r\n"))    def test_load_wrong(self):        # Load incorrect type.        self.msg.file_name = self.wrongname        self.assertEqual(self.msg.loaded, False)        self.msg.load()        self.assertEqual(self.msg.loaded, True)        self.assertEqual(self.msg.as_string(), good1.replace("\n", "\r\n"))    def test_load_already_loaded(self):        # Shouldn't do anything if already loaded.        self.msg.file_name = None        self.msg.loaded = True        # This will raise an error if a load from storage is attempted.        self.msg.load()class FileMessageTest(_FileMessageBaseTest):    klass = FileMessage    wrong_klass = GzipFileMessage    def test_store(self):        def good_as_string():            return good1        self.msg.as_string = good_as_string        self.msg.store()        pathname = os.path.join(self.directory, self.filename)        f = open(pathname)        content = f.read()        f.close()        self.assertEqual(content, good1)class GzipFileMessageTest(_FileMessageBaseTest):    klass = GzipFileMessage    wrong_klass = FileMessage    def test_store(self):        def good_as_string():            return good1        self.msg.as_string = good_as_string        self.msg.store()        pathname = os.path.join(self.directory, self.filename)        f = gzip.open(pathname)        content = f.read()        f.close()        self.assertEqual(content, good1)class FileCorpusTest(_FileCorpusBaseTest):    def setUp(self):        _FileCorpusBaseTest.setUp(self)        self.directory = 'fctesthamcorpus'        self.cache_size = 100        self.factory = FileMessageFactory()        self.stuff_corpus()        self.corpus = FileCorpus(self.factory, self.directory,                                 '?', self.cache_size)    def stuff_corpus(self):        """Put messages in the corpus"""        i = 0        for content in [good1, spam1, malformed1]:            self.msg = self.factory.create(str(i), self.directory, content)            self.msg.store()            i += 1        # Put in a message that won't match the filter.        msg = self.factory.create("10", self.directory, good1)        msg.store()            def test___init__(self):        self.assertEqual(self.corpus.directory, self.directory)        self.assertEqual(self.corpus.filter, '?')        self.assertEqual(self.corpus.cacheSize, self.cache_size)    def test_filter(self):        self.assertEqual(len(self.corpus.msgs), 3)        # Try again, with all messages.        self.corpus = FileCorpus(self.factory, self.directory,                                 '*', self.cache_size)        self.assertEqual(len(self.corpus.msgs), 4)    def test_makeMessage_no_content(self):        key = "testmake"        self.corpus.makeMessage(key)    def test_makeMessage_with_content(self):        key = "testmake"        content = spam1        msg = self.corpus.makeMessage(key, content)        self.assertEqual(msg.key(), key)        self.assertEqual(msg.as_string(), content.replace("\n", "\r\n"))    def test_addMessage_invalid(self):        class msg(object):            def key(self):                return 'aa'        self.assertRaises(ValueError, self.corpus.addMessage, msg())    def test_addMessage(self):        msg = self.factory.create("9", 'fctestspamcorpus', good1)        self.corpus.addMessage(msg)        self.assertEqual(msg.directory, self.directory)        fn = os.path.join(self.directory, "9")        f = open(fn)        content = f.read()        f.close()        self.assertEqual(content, good1)    def test_removeMessage(self):        fn = self.msg.pathname()        self.assertEqual(os.path.exists(fn), True)        self.corpus.removeMessage(self.msg)        self.assertEqual(os.path.exists(fn), False)class ExpiryFileCorpusTest(FileCorpusTest):    def setUp(self):        _FileCorpusBaseTest.setUp(self)        self.cache_size = 100        self.directory = 'fctesthamcorpus'        self.factory = FileMessageFactory()        self.stuff_corpus()        self.corpus = ExpiryFileCorpus(1.0, self.factory, self.directory,                                       '?', self.cache_size)def suite():    suite = unittest.TestSuite()    clses = (FileMessageFactoryTest,             GzipFileMessageFactoryTest,             FileMessageTest,             GzipFileMessageTest,             FileCorpusTest,             ExpiryFileCorpusTest,             )    for cls in clses:        suite.addTest(unittest.makeSuite(cls))    return suiteif __name__=='__main__':    sb_test_support.unittest_main(argv=sys.argv + ['suite'])

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -