⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 classifier.py

📁 用python实现的邮件过滤器
💻 PY
📖 第 1 页 / 共 3 页
字号:
    def _generate_slurp(self):        # We don't want to do this recursively and check URLs        # on webpages, so we have this little cheat.        if not hasattr(self, "setup_done"):            self.setup()            self.setup_done = True        if not hasattr(self, "do_slurp") or self.do_slurp:            if slurp_wordstream:                self.do_slurp = False                tokens = self.slurp(*slurp_wordstream)                self.do_slurp = True                self._save_caches()                return tokens        return []    def setup(self):        # Can't import this at the top because it's circular.        # XXX Someone smarter than me, please figure out the right        # XXX way to do this.        from spambayes.FileCorpus import ExpiryFileCorpus, FileMessageFactory        username = options["globals", "proxy_username"]        password = options["globals", "proxy_password"]        server = options["globals", "proxy_server"]        if server.find(":") != -1:            server, port = server.split(':', 1)        else:            port = 8080        if server:            # Build a new opener that uses a proxy requiring authorization            proxy_support = urllib2.ProxyHandler({"http" : \                                                  "http://%s:%s@%s:%d" % \                                                  (username, password,                                                   server, port)})            opener = urllib2.build_opener(proxy_support,                                          urllib2.HTTPHandler)        else:            # Build a new opener without any proxy information.            opener = urllib2.build_opener(urllib2.HTTPHandler)        # Install it        urllib2.install_opener(opener)        # Setup the cache for retrieved urls        age = options["URLRetriever", "x-cache_expiry_days"]*24*60*60        dir = options["URLRetriever", "x-cache_directory"]        if not os.path.exists(dir):            # Create the directory.            if options["globals", "verbose"]:                print >>sys.stderr, "Creating URL cache directory"            os.makedirs(dir)        self.urlCorpus = ExpiryFileCorpus(age, FileMessageFactory(),                                          dir, cacheSize=20)        # Kill any old information in the cache        self.urlCorpus.removeExpiredMessages()        # Setup caches for unretrievable urls        self.bad_url_cache_name = os.path.join(dir, "bad_urls.pck")        self.http_error_cache_name = os.path.join(dir, "http_error_urls.pck")        if os.path.exists(self.bad_url_cache_name):            b_file = file(self.bad_url_cache_name, "r")            try:                self.bad_urls = pickle.load(b_file)            except IOError, ValueError:                # Something went wrong loading it (bad pickle,                # probably).  Start afresh.                if options["globals", "verbose"]:                    print >>sys.stderr, "Bad URL pickle, using new."                self.bad_urls = {"url:non_resolving": (),                                 "url:non_html": (),                                 "url:unknown_error": ()}            b_file.close()        else:            if options["globals", "verbose"]:                print "URL caches don't exist: creating"            self.bad_urls = {"url:non_resolving": (),                        "url:non_html": (),                        "url:unknown_error": ()}        if os.path.exists(self.http_error_cache_name):            h_file = file(self.http_error_cache_name, "r")            try:                self.http_error_urls = pickle.load(h_file)            except IOError, ValueError:                # Something went wrong loading it (bad pickle,                # probably).  Start afresh.                if options["globals", "verbose"]:                    print >>sys.stderr, "Bad HHTP error pickle, using new."                self.http_error_urls = {}            h_file.close()        else:            self.http_error_urls = {}    def _save_caches(self):        # XXX Note that these caches are never refreshed, which might not        # XXX be a good thing long-term (if a previously invalid URL        # XXX becomes valid, for example).        for name, data in [(self.bad_url_cache_name, self.bad_urls),                           (self.http_error_cache_name, self.http_error_urls),]:            # Save to a temp file first, in case something goes wrong.            cache = open(name + ".tmp", "w")            pickle.dump(data, cache)            cache.close()            try:                os.rename(name + ".tmp", name)            except OSError:                # Atomic replace isn't possible with win32, so just                # remove and rename.                os.remove(name)                os.rename(name + ".tmp", name)    def slurp(self, proto, url):        # We generate these tokens:        #  url:non_resolving        #  url:non_html        #  url:http_XXX (for each type of http error encounted,        #                for example 404, 403, ...)        # And tokenise the received page (but we do not slurp this).        # Actually, the special url: tokens barely showed up in my testing,        # although I would have thought that they would more - this might        # be due to an error, although they do turn up on occasion.  In        # any case, we have to do the test, so generating an extra token        # doesn't cost us anything apart from another entry in the db, and        # it's only two entries, plus one for each type of http error        # encountered, so it's pretty neglible.        # If there is no content in the URL, then just return immediately.        # "http://)" will trigger this.        if not url:            return ["url:non_resolving"]                from spambayes.tokenizer import Tokenizer        if options["URLRetriever", "x-only_slurp_base"]:            url = self._base_url(url)        # Check the unretrievable caches        for err in self.bad_urls.keys():            if url in self.bad_urls[err]:                return [err]        if self.http_error_urls.has_key(url):            return self.http_error_urls[url]        # We check if the url will resolve first        mo = DOMAIN_AND_PORT_RE.match(url)        domain = mo.group(1)        if mo.group(3) is None:            port = 80        else:            port = mo.group(3)        try:            not_used = socket.getaddrinfo(domain, port)        except socket.error:            self.bad_urls["url:non_resolving"] += (url,)            return ["url:non_resolving"]        # If the message is in our cache, then we can just skip over        # retrieving it from the network, and get it from there, instead.        url_key = URL_KEY_RE.sub('_', url)        cached_message = self.urlCorpus.get(url_key)        if cached_message is None:            # We're going to ignore everything that isn't text/html,            # so we might as well not bother retrieving anything with            # these extensions.            parts = url.split('.')            if parts[-1] in ('jpg', 'gif', 'png', 'css', 'js'):                self.bad_urls["url:non_html"] += (url,)                return ["url:non_html"]            # Waiting for the default timeout period slows everything            # down far too much, so try and reduce it for just this            # call (this will only work with Python 2.3 and above).            try:                timeout = socket.getdefaulttimeout()                socket.setdefaulttimeout(5)            except AttributeError:                # Probably Python 2.2.                pass            try:                if options["globals", "verbose"]:                    print >>sys.stderr, "Slurping", url                f = urllib2.urlopen("%s://%s" % (proto, url))            except (urllib2.URLError, socket.error), details:                mo = HTTP_ERROR_RE.match(str(details))                if mo:                    self.http_error_urls[url] = "url:http_" + mo.group(1)                    return ["url:http_" + mo.group(1)]                self.bad_urls["url:unknown_error"] += (url,)                return ["url:unknown_error"]            # Restore the timeout            try:                socket.setdefaulttimeout(timeout)            except AttributeError:                # Probably Python 2.2.                pass            try:                # Anything that isn't text/html is ignored                content_type = f.info().get('content-type')                if content_type is None or \                   not content_type.startswith("text/html"):                    self.bad_urls["url:non_html"] += (url,)                    return ["url:non_html"]                page = f.read()                headers = str(f.info())                f.close()            except socket.error:                # This is probably a temporary error, like a timeout.                # For now, just bail out.                return []                        fake_message_string = headers + "\r\n" + page            # Retrieving the same messages over and over again will tire            # us out, so we store them in our own wee cache.            message = self.urlCorpus.makeMessage(url_key,                                                 fake_message_string)            self.urlCorpus.addMessage(message)        else:            fake_message_string = cached_message.as_string()        msg = message_from_string(fake_message_string)        # We don't want to do full header tokenising, as this is        # optimised for messages, not webpages, so we just do the        # basic stuff.        bht = options["Tokenizer", "basic_header_tokenize"]        bhto = options["Tokenizer", "basic_header_tokenize_only"]        options["Tokenizer", "basic_header_tokenize"] = True        options["Tokenizer", "basic_header_tokenize_only"] = True        tokens = Tokenizer().tokenize(msg)        pf = options["URLRetriever", "x-web_prefix"]        tokens = ["%s%s" % (pf, tok) for tok in tokens]        # Undo the changes        options["Tokenizer", "basic_header_tokenize"] = bht        options["Tokenizer", "basic_header_tokenize_only"] = bhto        return tokens    def _base_url(self, url):        # To try and speed things up, and to avoid following        # unique URLS, we convert the URL to as basic a form        # as we can - so http://www.massey.ac.nz/~tameyer/index.html?you=me        # would become http://massey.ac.nz and http://id.example.com        # would become http://example.com        url += '/'        domain, garbage = url.split('/', 1)        parts = domain.split('.')        if len(parts) > 2:            base_domain = parts[-2] + '.' + parts[-1]            if len(parts[-1]) < 3:                base_domain = parts[-3] + '.' + base_domain        else:            base_domain = domain        return base_domain    def _add_slurped(self, wordstream):        """Add tokens generated by 'slurping' (i.e. tokenizing        the text at the web pages pointed to by URLs in messages)        to the wordstream."""        for token in wordstream:            yield token        slurped_tokens = self._generate_slurp()        for token in slurped_tokens:            yield token    def _wordinfokeys(self):        return self.wordinfo.keys()Bayes = Classifier

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -