📄 test_message.py
字号:
options["Headers", "notate_subject"] = (self.ham, self.spam, self.unsure) self.msg.addSBHeaders(self.s_prob, self.clues) disp, orig = self.msg["Subject"].split(',', 1) self.assertEqual(orig, subject) self.assertEqual(disp, self.spam) def test_notate_to_changed(self): saved_ham = options["Headers", "header_ham_string"] notate_to = options.get_option("Headers", "notate_to") saved_to = notate_to.allowed_values try: options["Headers", "header_ham_string"] = "bacon" header_strings = (options["Headers", "header_ham_string"], options["Headers", "header_spam_string"], options["Headers", "header_unsure_string"]) notate_to = options.get_option("Headers", "notate_to") notate_to.allowed_values = header_strings self.ham = options["Headers", "header_ham_string"] result = self.test_notate_to_ham() # Just be sure that it's using the new value. self.assertEqual(self.msg["To"].split(',', 1)[0], "bacon@spambayes.invalid") finally: # If we leave these changed, then lots of other tests will # fail. options["Headers", "header_ham_string"] = saved_ham self.ham = saved_ham notate_to.allowed_values = saved_to return result def test_id_header(self): options['Headers','add_unique_id'] = True id = "test" self.msg.id = id self.msg.addSBHeaders(self.g_prob, self.clues) self.assertEqual(self.msg[options['Headers', 'mailid_header_name']], id) def test_id_header_off(self): options['Headers','add_unique_id'] = False id = "test" self.msg.id = id self.msg.addSBHeaders(self.g_prob, self.clues) self.assertEqual(self.msg[options['Headers', 'mailid_header_name']], None) def test_currentSBHeaders(self): sbheaders = self.msg.currentSBHeaders() self.assertEqual({}, sbheaders) headers = {options['Headers', 'classification_header_name'] : '1', options['Headers', 'mailid_header_name'] : '2', options['Headers', 'classification_header_name'] + "-ID" : '3', options['Headers', 'thermostat_header_name'] : '4', options['Headers', 'evidence_header_name'] : '5', options['Headers', 'score_header_name'] : '6', options['Headers', 'trained_header_name'] : '7', } for name, val in headers.items(): self.msg[name] = val sbheaders = self.msg.currentSBHeaders() self.assertEqual(headers, sbheaders) def test_delSBHeaders(self): headers = (options['Headers', 'classification_header_name'], options['Headers', 'mailid_header_name'], options['Headers', 'classification_header_name'] + "-ID", options['Headers', 'thermostat_header_name'], options['Headers', 'evidence_header_name'], options['Headers', 'score_header_name'], options['Headers', 'trained_header_name'],) for header in headers: self.msg[header] = "test" for header in headers: self.assert_(header in self.msg.keys()) self.msg.delSBHeaders() for header in headers: self.assert_(header not in self.msg.keys()) def test_delNotations(self): # Add each type of notation to each header and check that it # is removed. for headername in ["subject", "to"]: for disp in (self.ham, self.spam, self.unsure): # Add a notation to the header header = self.msg[headername] self.assertEqual(header.find(disp), -1) options["Headers", "notate_%s" % (headername,)] = \ (self.ham, self.unsure, self.spam) prob = {self.ham:self.g_prob, self.spam:self.s_prob, self.unsure:self.u_prob}[disp] self.msg.addSBHeaders(prob, self.clues) self.assertNotEqual(self.msg[headername].find(disp), -1) # Remove it self.msg.delNotations() self.assertEqual(self.msg[headername], header) def test_delNotations_missing(self): # Check that nothing is removed if the disposition is not # there. for headername in ["subject", "to"]: for disp in (self.ham, self.spam, self.unsure): # Add a notation to the header header = self.msg[headername] self.assertEqual(header.find(disp), -1) options["Headers", "notate_%s" % (headername,)] = () prob = {self.ham:self.g_prob, self.spam:self.s_prob, self.unsure:self.u_prob}[disp] self.msg.addSBHeaders(prob, self.clues) self.assertEqual(self.msg[headername].find(disp), -1) # Remove it self.msg.delNotations() self.assertEqual(self.msg[headername], header) def test_delNotations_no_header(self): # Check that it works if there is no subject/to header. for headername in ["subject", "to"]: for disp in (self.ham, self.spam, self.unsure): del self.msg[headername] options["Headers", "notate_%s" % (headername,)] = \ (self.ham, self.unsure, self.spam) self.msg.delNotations() self.assertEqual(self.msg[headername], None) def test_delNotations_only_once_subject(self): self._test_delNotations_only_once("subject") def test_delNotations_only_once_to(self): self._test_delNotations_only_once("to") def _test_delNotations_only_once(self, headername): # Check that only one disposition is removed, even if more than # one is present. for disp in (self.ham, self.spam, self.unsure): # Add a notation to the header header = self.msg[headername] self.assertEqual(header.find(disp), -1) options["Headers", "notate_%s" % (headername,)] = \ (self.ham, self.unsure, self.spam) prob = {self.ham:self.g_prob, self.spam:self.s_prob, self.unsure:self.u_prob}[disp] self.msg.addSBHeaders(prob, self.clues) self.assertNotEqual(self.msg[headername].find(disp), -1) header2 = self.msg[headername] # Add a second notation self.msg.addSBHeaders(prob, self.clues) self.assertNotEqual(self.msg[headername].\ replace(disp, "", 1).find(disp), -1) # Remove it self.msg.delNotations() self.assertEqual(self.msg[headername], header2) # Restore for next time round the loop self.msg.replace_header(headername, header)class MessageInfoBaseTest(unittest.TestCase): def setUp(self, fn=TEMP_PICKLE_NAME): self.db = self.klass(fn, self.mode) def test_mode(self): self.assertEqual(self.mode, self.db.mode) def test_load_msg_missing(self): msg = email.message_from_string(good1, _class=Message) msg.id = "Test" dummy_values = "a", "b" msg.c, msg.t = dummy_values self.db.load_msg(msg) self.assertEqual((msg.c, msg.t), dummy_values) def test_load_msg_compat(self): msg = email.message_from_string(good1, _class=Message) msg.id = "Test" dummy_values = "a", "b" self.db.db[msg.id] = dummy_values self.db.load_msg(msg) self.assertEqual((msg.c, msg.t), dummy_values) def test_load_msg(self): msg = email.message_from_string(good1, _class=Message) msg.id = "Test" dummy_values = [('a', 1), ('b', 2)] self.db.db[msg.id] = dummy_values self.db.load_msg(msg) for att, val in dummy_values: self.assertEqual(getattr(msg, att), val) def test_store_msg(self): msg = email.message_from_string(good1, _class=Message) msg.id = "Test" saved = self.db.store self.done = False try: self.db.store = self._fake_store self.db.store_msg(msg) finally: self.db.store = saved self.assertEqual(self.done, True) correct = [(att, getattr(msg, att)) \ for att in msg.stored_attributes] db_version = dict(self.db.db[msg.id]) correct_version = dict(correct) correct_version["date_modified"], time.time() self.assertEqual(db_version, correct_version) def _fake_store(self): self.done = True def test_remove_msg(self): msg = email.message_from_string(good1, _class=Message) msg.id = "Test" self.db.db[msg.id] = "test" saved = self.db.store self.done = False try: self.db.store = self._fake_store self.db.remove_msg(msg) finally: self.db.store = saved self.assertEqual(self.done, True) self.assertRaises(KeyError, self.db.db.__getitem__, msg.id) def test_load(self): # Create a db to try and load. data = {"1" : ('a', 'b', 'c'), "2" : ('d', 'e', 'f'), "3" : "test"} for k, v in data.items(): self.db.db[k] = v self.db.store() fn = self.db.db_name self.db.close() db2 = self.klass(fn, self.mode) try: self.assertEqual(len(db2.db.keys()), len(data.keys())) for k, v in data.items(): self.assertEqual(db2.db[k], v) finally: db2.close() def test_load_new(self): # Load from a non-existing db (i.e. create new). self.assertEqual(self.db.db.keys(), [])class MessageInfoPickleTest(MessageInfoBaseTest): def setUp(self): self.mode = 1 self.klass = MessageInfoPickle MessageInfoBaseTest.setUp(self, TEMP_PICKLE_NAME) def tearDown(self): try: os.remove(TEMP_PICKLE_NAME) except OSError: pass def store(self): if self.db is not None: self.db.sync()class MessageInfoDBTest(MessageInfoBaseTest): def setUp(self): self.mode = 'c' self.klass = MessageInfoDB MessageInfoBaseTest.setUp(self, TEMP_DBM_NAME) def tearDown(self): self.db.close() try: os.remove(TEMP_DBM_NAME) except OSError: pass def store(self): if self.db is not None: self.db.sync() def _fake_close(self): self.done += 1 def test_close(self): saved_db = self.db.db.close saved_dbm = self.db.dbm.close try: self.done = 0 self.db.db.close = self._fake_close self.db.dbm.close = self._fake_close self.db.close() self.assertEqual(self.done, 2) finally: # If we don't put these back (whatever happens), then # the db isn't closed and can't be deleted in tearDown. self.db.db.close = saved_db self.db.dbm.close = saved_dbmclass UtilitiesTest(unittest.TestCase): def _verify_details(self, details): loc = details.find(__file__) self.assertNotEqual(loc, -1) loc = details.find("Exception: Test") self.assertNotEqual(loc, -1) def _verify_exception_header(self, msg, details): msg = email.message_from_string(msg) details = "\r\n.".join(details.strip().split('\n')) headerName = 'X-Spambayes-Exception' header = email.Header.Header(details, header_name=headerName) self.assertEqual(msg[headerName].replace('\r\n', '\n'), str(header).replace('\r\n', '\n')) def test_insert_exception_header(self): # Cause an exception to insert. try: raise Exception("Test") except Exception: pass msg, details = insert_exception_header(good1) self._verify_details(details) self._verify_exception_header(msg, details) def test_insert_exception_header_and_id(self): # Cause an exception to insert. try: raise Exception("Test") except Exception: pass id = "Message ID" msg, details = insert_exception_header(good1, id) self._verify_details(details) self._verify_exception_header(msg, details) # Check that ID header is inserted. msg = email.message_from_string(msg) headerName = options["Headers", "mailid_header_name"] header = email.Header.Header(id, header_name=headerName) self.assertEqual(msg[headerName], str(header).replace('\n', '\r\n')) def test_insert_exception_header_no_separator(self): # Cause an exception to insert. try: raise Exception("Test") except Exception: pass msg, details = insert_exception_header(malformed1) self._verify_details(details) self._verify_exception_header(msg, details) def suite(): suite = unittest.TestSuite() classes = (MessageTest, SBHeaderMessageTest, MessageInfoPickleTest, UtilitiesTest, ) from spambayes import dbmstorage try: dbmstorage.open_best() except dbmstorage.error: print "Skipping MessageInfoDBTest - no dbm module available" from spambayes import message def always_pickle(): return "__test.pik", "pickle" message.database_type = always_pickle except TypeError: # We need an argument, so TypeError will be raised # when it *is* available. classes += (MessageInfoDBTest,) for cls in classes: suite.addTest(unittest.makeSuite(cls)) return suiteif __name__=='__main__': sb_test_support.unittest_main(argv=sys.argv + ['suite'])
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -