⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dibbler.py

📁 用python实现的邮件过滤器
💻 PY
📖 第 1 页 / 共 3 页
字号:
        timeNow = time.gmtime(time.time())        httpNow = time.strftime('%a, %d %b %Y %H:%M:%S GMT', timeNow)        headers = []        headers.append("HTTP/1.1 200 OK")        headers.append("Connection: close")        headers.append('Content-Type: %s; charset="utf-8"' % contentType)        headers.append("Date: %s" % httpNow)        for name, value in extraHeaders.items():            headers.append("%s: %s" % (name, value))        headers.append("")        headers.append("")        self._bufferedHeaders = headers    def writeError(self, code, message):        """Reflected from `HTTPPlugin`s."""        # Writing an error overrides any buffered headers, but obviously        # doesn't want to write any headers if some have already gone.        headers = []        if not self._headersWritten:            headers.append("HTTP/1.0 %d Error" % code)            headers.append("Connection: close")            headers.append('Content-Type: text/html; charset="utf-8"')            headers.append("")            headers.append("")        self.push("%s<html><body>%s</body></html>" % \                  ('\r\n'.join(headers), message))    def write(self, content):        """Reflected from `HTTPPlugin`s."""        # The methlet is writing, so write any buffered headers first.        headers = []        if self._bufferedHeaders:            headers = self._bufferedHeaders            self._bufferedHeaders = None            self._headersWritten = True        # `write(None)` just flushes buffered headers.        if content is None:            content = ''        self.push('\r\n'.join(headers) + str(content))    def writeUnauthorizedAccess(self, authenticationMode):        """Access is protected by HTTP authentication."""        if authenticationMode == HTTPServer.BASIC_AUTHENTICATION:            authString = self._getBasicAuthString()        elif authenticationMode == HTTPServer.DIGEST_AUTHENTICATION:            authString = self._getDigestAuthString()        else:            self.writeError(500, "Inconsistent authentication mode.")            return        headers = []        headers.append('HTTP/1.0 401 Unauthorized')        headers.append('WWW-Authenticate: ' + authString)        headers.append('Connection: close')        headers.append('Content-Type: text/html; charset="utf-8"')        headers.append('')        headers.append('')        self.write('\r\n'.join(headers) + self._server.getCancelMessage())        self.close_when_done()    def _getDigestAuthString(self):        """Builds the WWW-Authenticate header for Digest authentication."""        authString  = 'Digest realm="' + self._server.getRealm() + '"'        authString += ', nonce="' + self._getCurrentNonce() + '"'        authString += ', opaque="0000000000000000"'        authString += ', stale="false"'        authString += ', algorithm="MD5"'        authString += ', qop="auth"'        return authString    def _getBasicAuthString(self):        """Builds the WWW-Authenticate header for Basic authentication."""        return 'Basic realm="' + self._server.getRealm() + '"'    def _getCurrentNonce(self):        """Returns the current nonce value. This value is a Base64 encoding        of current time plus 20 minutes. This means the nonce will expire 20        minutes from now."""        timeString = time.asctime(time.localtime(time.time() + 20*60))        if RSTRIP_CHARS_AVAILABLE:            return base64.encodestring(timeString).rstrip('\n=')        else:            # Python pre 2.2.2, so can't do a rstrip(chars).  Do it            # manually instead.            def rstrip(s, chars):                if not s:                    return s                if s[-1] in chars:                    return rstrip(s[:-1])                return s            return rstrip(base64.encodestring(timeString), '\n=')    def _isValidNonce(self, nonce):        """Check if the specified nonce is still valid. A nonce is invalid        when its time converted value is lower than current time."""        padAmount = len(nonce) % 4        if padAmount > 0: padAmount = 4 - padAmount        nonce += '=' * (len(nonce) + padAmount)        decoded = base64.decodestring(nonce)        return time.time() < time.mktime(time.strptime(decoded))    def _basicAuthentication(self, login):        """Performs a Basic HTTP authentication. Returns True when the user        has logged in successfully, False otherwise."""        userName, password = base64.decodestring(login).split(':')        return self._server.isValidUser(userName, password)    def _digestAuthentication(self, login, method):        """Performs a Digest HTTP authentication. Returns True when the user        has logged in successfully, False otherwise."""        def stripQuotes(s):            return (s[0] == '"' and s[-1] == '"') and s[1:-1] or s        options  = dict(self._login_splitter.findall(login))        userName = stripQuotes(options["username"])        password = self._server.getPasswordForUser(userName)        nonce    = stripQuotes(options["nonce"])        # The following computations are based upon RFC 2617.        A1  = "%s:%s:%s" % (userName, self._server.getRealm(), password)        HA1 = md5.new(A1).hexdigest()        A2  = "%s:%s" % (method, stripQuotes(options["uri"]))        HA2 = md5.new(A2).hexdigest()        unhashedDigest = ""        if options.has_key("qop"):            # IE 6.0 doesn't give nc back correctly?            if not options["nc"]:                options["nc"] = "00000001"            # Firefox 1.0 doesn't give qop back correctly?            if not options["qop"]:                options["qop"] = "auth"            unhashedDigest = "%s:%s:%s:%s:%s:%s" % \                            (HA1, nonce,                             stripQuotes(options["nc"]),                             stripQuotes(options["cnonce"]),                             stripQuotes(options["qop"]), HA2)        else:            unhashedDigest = "%s:%s:%s" % (HA1, nonce, HA2)        hashedDigest = md5.new(unhashedDigest).hexdigest()        return (stripQuotes(options["response"]) == hashedDigest and                self._isValidNonce(nonce))class HTTPPlugin:    """Base class for HTTP server plugins.  See the main documentation for    details."""    def __init__(self):        # self._handler is filled in by `HTTPHandler.found_terminator()`.        pass    def onIncomingConnection(self, clientSocket):        """Implement this and return False to veto incoming connections."""        return True    def writeOKHeaders(self, contentType, extraHeaders={}):        """A methlet should call this with the Content-Type and optionally        a dictionary of extra headers (eg. Expires) before calling        `write()`."""        return self._handler.writeOKHeaders(contentType, extraHeaders)    def writeError(self, code, message):        """A methlet should call this instead of `writeOKHeaders()` /        `write()` to report an HTTP error (eg. 403 Forbidden)."""        return self._handler.writeError(code, message)    def write(self, content):        """A methlet should call this after `writeOKHeaders` to write the        page's content."""        return self._handler.write(content)    def flush(self):        """A methlet can call this after calling `write`, to ensure that        the content is written immediately to the browser.  This isn't        necessary most of the time, but if you're writing "Please wait..."        before performing a long operation, calling `flush()` is a good        idea."""        return self._handler.flush()    def close(self, flush=True):        """Closes the connection to the browser.  You should call `close()`        before calling `sys.exit()` in any 'shutdown' methlets you write."""        if flush:            self.flush()        return self._handler.close()def run(launchBrowser=False, context=_defaultContext):    """Runs a `Dibbler` application.  Servers listen for incoming connections    and route requests through to plugins until a plugin calls `sys.exit()`    or raises a `SystemExit` exception."""    if launchBrowser:        try:            url = "http://localhost:%d/" % context._HTTPPort            webbrowser.open_new(url)        except webbrowser.Error, e:            print "\n%s.\nPlease point your web browser at %s." % (e, url)    asyncore.loop(map=context._map)def runTestServer(readyEvent=None):    """Runs the calendar server example, with an added `/shutdown` URL."""    import Dibbler, calendar    class Calendar(Dibbler.HTTPPlugin):        _form = '''<html><body><h3>Calendar Server</h3>                   <form action='/'>                   Year: <input type='text' name='year' size='4'>                   <input type='submit' value='Go'></form>                   <pre>%s</pre></body></html>'''        def onHome(self, year=None):            if year:                result = calendar.calendar(int(year))            else:                result = ""            self.writeOKHeaders('text/html')            self.write(self._form % result)        def onShutdown(self):            self.writeOKHeaders('text/html')            self.write("<html><body><p>OK.</p></body></html>")            self.close()            sys.exit()    httpServer = Dibbler.HTTPServer(8888)    httpServer.register(Calendar())    if readyEvent:        # Tell the self-test code that the test server is up and running.        readyEvent.set()    Dibbler.run(launchBrowser=True)def test():    """Run a self-test."""    # Run the calendar server in a separate thread.    import threading, urllib    testServerReady = threading.Event()    threading.Thread(target=runTestServer, args=(testServerReady,)).start()    testServerReady.wait()    # Connect to the server and ask for a calendar.    page = urllib.urlopen("http://localhost:8888/?year=2003").read()    if page.find('January') != -1:        print "Self test passed."    else:        print "Self-test failed!"    # Wait for a key while the user plays with his browser.    raw_input("Press any key to shut down the application server...")    # Ask the server to shut down.    page = urllib.urlopen("http://localhost:8888/shutdown").read()    if page.find('OK') != -1:        print "Shutdown OK."    else:        print "Shutdown failed!"if __name__ == '__main__':    test()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -