⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 tester.py

📁 用python实现的邮件过滤器
💻 PY
📖 第 1 页 / 共 2 页
字号:
        spam_msg.Delete()    print "Created a Spam message, and saw it get filtered and trained."def _DoTestHamTrain(driver, folder1, folder2):    # [ 780612 ] Outlook incorrectly trains on moved messages    # Should not train when previously classified message is moved by the user    # from one watch folder to another.    bayes = driver.manager.classifier_data.bayes    nham = bayes.nham    nspam = bayes.nspam    # Create a ham message in the Inbox - it wont get filtered if the other    # tests pass, but we do need to wait for it to be scored.    msg, words = driver.CreateTestMessageInFolder(HAM, folder1)    # sleep to ensure filtering.    WaitForFilters()    # It should still be in the Inbox.    if driver.FindTestMessage(folder1) is None:        TestFailed("The test ham message appeared to have been filtered!")    # Manually move it to folder2    msg.Move(folder2)    # sleep to any processing in this folder.    WaitForFilters()    # re-find it in folder2    msg = driver.FindTestMessage(folder2)    if driver.FindTestMessage(folder2) is None:        TestFailed("Couldn't find the ham message we just moved")    if nspam != bayes.nspam or nham != bayes.nham:        TestFailed("Move of existing ham caused a train")    msg.Delete()def _DoTestHamFilter(driver, folder):    # Create a ham message in the Inbox - it should not get filtered    msg, words = driver.CreateTestMessageInFolder(HAM, folder)    # sleep to ensure filtering.    WaitForFilters()    # It should still be in the Inbox.    if driver.FindTestMessage(folder) is None:        TestFailed("The test ham message appeared to have been filtered!")    msg.Delete()def TestHamFilter(driver):    # Execute the 'ham' test in every folder we watch.    mgr = driver.manager    gen = mgr.message_store.GetFolderGenerator(                        mgr.config.filter.watch_folder_ids,                        mgr.config.filter.watch_include_sub)    num = 0    folders = []    for f in gen:        print "Running ham filter tests on folder '%s'" % f.GetFQName()        f = f.GetOutlookItem()        _DoTestHamFilter(driver, f)        num += 1        folders.append(f)    # Now test incremental train logic, between all these folders.    if len(folders)<2:        print "NOTE: Can't do incremental training tests as only 1 watch folder is in place"    else:        for f in folders:            # 'targets' is a list of all folders except this            targets = folders[:]            targets.remove(f)            for t in targets:                _DoTestHamTrain(driver, f, t)    print "Created a Ham message, and saw it remain in place (in %d watch folders.)" % numdef TestUnsureFilter(driver):    # Create a spam message in the Inbox - it should get immediately filtered    for msf_watch, folder_watch in driver.GetWatchFolderGenerator():        print "Performing Spam test on watch folder '%s'..." % msf_watch.GetFQName()        msg, words = driver.CreateTestMessageInFolder(UNSURE, folder_watch)        # sleep to ensure filtering.        WaitForFilters()        # It should no longer be in the Inbox.        driver.CheckMessageFilteredFrom(folder_watch)        # It should be in the "unsure" folder.        spam_msg = driver.FindTestMessage(driver.folder_unsure)        if spam_msg is None:            TestFailed("The test message vanished from the Inbox, but didn't appear in Unsure")        spam_msg.Delete()    print "Created an unsure message, and saw it get filtered"def run_tests(manager):    "Filtering tests"    driver = Driver(manager)    manager.Save() # necessary after a full retrain    assert driver.manager.config.filter.enabled, "Filtering must be enabled for these tests"    assert driver.manager.config.training.train_recovered_spam and \           driver.manager.config.training.train_manual_spam, "Incremental training must be enabled for these tests"    driver.CleanAllTestMessages()    TestSpamFilter(driver)    TestUnsureFilter(driver)    TestHamFilter(driver)    driver.CleanAllTestMessages()def run_filter_tests(manager):    # setup config to save info with the message, and test    apply_with_new_config(manager,                          {"Filter.timer_enabled": False,                           "Filter.save_spam_info" : True,                          },                          run_tests, manager)    apply_with_new_config(manager,                          {"Filter.timer_enabled": True,                           "Filter.save_spam_info" : True,                          },                          run_tests, manager)    apply_with_new_config(manager,                          {"Filter.timer_enabled": False,                           "Filter.save_spam_info" : False,                          },                          run_tests, manager)    apply_with_new_config(manager,                          {"Filter.timer_enabled": True,                           "Filter.save_spam_info" : False,                          },                          run_tests, manager)def apply_with_new_config(manager, new_config_dict, func, *args):    old_config = {}    friendly_opts = []    for name, val in new_config_dict.items():        sect_name, opt_name = name.split(".")        old_config[sect_name, opt_name] = manager.options.get(sect_name, opt_name)        manager.options.set(sect_name, opt_name, val)        friendly_opts.append("%s=%s" % (name, val))    manager.addin.FiltersChanged() # to ensure correct filtler in place    try:        test_name = getattr(func, "__doc__", None)        if not test_name: test_name = func.__name__        print "*" * 10, "Running '%s' with %s" % (test_name, ", ".join(friendly_opts))        func(*args)    finally:        for (sect_name, opt_name), val in old_config.items():            manager.options.set(sect_name, opt_name, val)################################################################################ "Non-filter" tests are those that don't require us to create messages and# see them get filtered.def run_nonfilter_tests(manager):    # And now some other 'sanity' checks.    # Check messages we are unable to score.    # Must enable the filtering code for this test    msgstore.test_suite_running = False    try:        print "Scanning all your good mail and spam for some sanity checks..."        num_found = num_looked = 0        num_without_headers = num_without_body = num_without_html_body = 0        for folder_ids, include_sub in [            (manager.config.filter.watch_folder_ids, manager.config.filter.watch_include_sub),            ([manager.config.filter.spam_folder_id], False),            ]:            for folder in manager.message_store.GetFolderGenerator(folder_ids, include_sub):                for message in folder.GetMessageGenerator(False):                    # If not ipm.note, then no point reporting - but any                    # ipm.note messages we don't want to filter should be                    # reported.                    num_looked += 1                    if num_looked % 500 == 0: print " scanned", num_looked, "messages..."                    if not message.IsFilterCandidate() and \                        message.msgclass.lower().startswith("ipm.note"):                        if num_found == 0:                            print "*" * 80                            print "WARNING: We found the following messages in your folders that would not be filtered by the addin"                            print "If any of these messages should be filtered, we have a bug!"                        num_found += 1                        print " %s/%s" % (folder.name, message.subject)                    headers, body, html_body = message._GetMessageTextParts()                    if not headers: num_without_headers += 1                    if not body: num_without_body += 1                    # for HTML, we only check multi-part                    temp_obj = rfc822.Message(cStringIO.StringIO(headers+"\n\n"))                    content_type = temp_obj.get("content-type", '')                    if content_type.lower().startswith("multipart"):                        if not html_body: num_without_html_body += 1        print "Checked %d items, %d non-filterable items found" % (num_looked, num_found)        print "of these items, %d had no headers, %d had no text body and %d had no HTML" % \                (num_without_headers, num_without_body, num_without_html_body)    finally:        msgstore.test_suite_running = Truedef run_invalid_id_tests(manager):    # Do some tests with invalid message and folder IDs.    print "Doing some 'invalid ID' tests - you should see a couple of warning, but no errors or tracebacks"    id_no_item = ('0000','0000') # this ID is 'valid' - but there will be no such item    id_invalid = ('xxxx','xxxx') # this ID is 'invalid' in that the hex-bin conversion fails    id_empty1 = ('','')    id_empty2 = ()    bad_ids = id_no_item, id_invalid, id_empty1, id_empty2    for id in bad_ids:        AssertRaises(msgstore.MsgStoreException, manager.message_store.GetMessage, id)    # Test 'GetFolderGenerator' works with invalid ids.    for id in bad_ids:        AssertRaises(msgstore.MsgStoreException, manager.message_store.GetFolder, id)        ids = manager.config.filter.watch_folder_ids[:]        ids.append(id)        found = 0        for f in manager.message_store.GetFolderGenerator(ids, False):            found += 1        if found > len(manager.config.filter.watch_folder_ids):            raise TestFailed("Seemed to find the extra folder")        names = manager.FormatFolderNames(ids, False)        if names.find("<unknown") < 0:            raise TestFailed("Couldn't find unknown folder in names '%s'" % names)    print "Finished 'invalid ID' tests"################################################################################ "Failure" tests - execute some tests while provoking the msgstore to simulate# various MAPI errors.  Although not complete, it does help exercise our code# paths through the code.def _restore_mapi_failure():    msgstore.test_suite_failure = None    msgstore.test_suite_failure_request = Nonedef _setup_for_mapi_failure(checkpoint, hr, fail_count = None):    assert msgstore.test_suite_running, "msgstore should already know its running"    assert not msgstore.test_suite_failure, "should already have torn down previous failure"    msgstore.test_suite_failure = pythoncom.com_error, \                         (hr, "testsuite generated error", None, -1)    msgstore.test_suite_failure_request = checkpoint    msgstore.test_suite_failure_count = fail_countdef _setup_mapi_notfound_failure(checkpoint):    _setup_for_mapi_failure(checkpoint, mapi.MAPI_E_NOT_FOUND)def _do_single_failure_ham_test(driver, checkpoint, hr, fail_count = None):    _do_single_failure_test(driver, True, checkpoint, hr, fail_count)def _do_single_failure_spam_test(driver, checkpoint, hr, fail_count = None):    _do_single_failure_test(driver, False, checkpoint, hr, fail_count)def _do_single_failure_test(driver, is_ham, checkpoint, hr, fail_count):    print "-> Testing MAPI error '%s' in %s" % (mapiutil.GetScodeString(hr),                                              checkpoint)    # message moved after we have ID, but before opening.    for msf, folder in driver.GetWatchFolderGenerator():        print "Testing in folder '%s'" % msf.GetFQName()        if is_ham:            msg, words = driver.CreateTestMessageInFolder(HAM, folder)        else:            msg, words = driver.CreateTestMessageInFolder(SPAM, folder)        try:            _setup_for_mapi_failure(checkpoint, hr, fail_count)            try:                # sleep to ensure filtering.                WaitForFilters()            finally:                _restore_mapi_failure()            if driver.FindTestMessage(folder) is None:                TestFailed("We appear to have filtered a message even though we forced 'not found' failure")        finally:            if msg is not None:                msg.Delete()    print "<- Finished MAPI error '%s' in %s" % (mapiutil.GetScodeString(hr),                                                 checkpoint)def do_failure_tests(manager):    # We setup msgstore to fail for us, then try a few tests.  The idea is to    # ensure we gracefully degrade in these failures.    # We set verbosity to min of 1, as this helps us see how the filters handle    # the errors.    driver = Driver(manager)    driver.CleanAllTestMessages()    old_verbose = manager.verbose    manager.verbose = max(1, old_verbose)    try:        _do_single_failure_ham_test(driver, "MAPIMsgStoreMsg._EnsureObject", mapi.MAPI_E_NOT_FOUND)        _do_single_failure_ham_test(driver, "MAPIMsgStoreMsg.SetField", -2146644781)        _do_single_failure_ham_test(driver, "MAPIMsgStoreMsg.Save", -2146644781)        _do_single_failure_ham_test(driver, "MAPIMsgStoreMsg.Save",                                    mapi.MAPI_E_OBJECT_CHANGED, fail_count=1)        # SetReadState???        _do_single_failure_spam_test(driver, "MAPIMsgStoreMsg._DoCopyMove", mapi.MAPI_E_TABLE_TOO_BIG)    finally:        manager.verbose = old_verbosedef run_failure_tests(manager):    "Forced MAPI failure tests"    apply_with_new_config(manager,                          {"Filter.timer_enabled": True,                          },                          do_failure_tests, manager)    apply_with_new_config(manager,                          {"Filter.timer_enabled": False,                          },                          do_failure_tests, manager)def filter_message_with_event(msg, mgr, all_actions=True):    import filter    ret = filter._original_filter_message(msg, mgr, all_actions)    if ret != "Failed":        filter_event.set() # only set if it works    return retdef test(manager):    from dialogs import SetWaitCursor    SetWaitCursor(1)    import filter    if "_original_filter_message" not in filter.__dict__:        filter._original_filter_message = filter.filter_message        filter.filter_message = filter_message_with_event    try: # restore the plugin config at exit.        assert not msgstore.test_suite_running, "already running??"        msgstore.test_suite_running = True        assert not manager.test_suite_running, "already running??"        manager.test_suite_running = True        run_filter_tests(manager)        run_failure_tests(manager)        run_invalid_id_tests(manager)        # non-filter tests take alot of time - ask if you want to do them        if manager.AskQuestion("Do you want to run the non-filter tests?" \                               "\r\n\r\nThese may take some time"):            run_nonfilter_tests(manager)        print "*" * 20        print "Test suite finished without error!"        print "*" * 20    finally:        print "Restoring standard configuration..."        # Always restore configuration to how we started.        msgstore.test_suite_running = False        manager.test_suite_running = False        manager.LoadConfig()        manager.addin.FiltersChanged() # restore original filters.        manager.addin.ProcessMissedMessages()        SetWaitCursor(0)if __name__=='__main__':    print "NOTE: This will NOT work from the command line"    print "(it nearly will, and is useful for debugging the tests"    print "themselves, so we will run them anyway!)"    test()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -