📄 dibbler.py
字号:
timeNow = time.gmtime(time.time()) httpNow = time.strftime('%a, %d %b %Y %H:%M:%S GMT', timeNow) headers = [] headers.append("HTTP/1.1 200 OK") headers.append("Connection: close") headers.append('Content-Type: %s; charset="utf-8"' % contentType) headers.append("Date: %s" % httpNow) for name, value in extraHeaders.items(): headers.append("%s: %s" % (name, value)) headers.append("") headers.append("") self._bufferedHeaders = headers def writeError(self, code, message): """Reflected from `HTTPPlugin`s.""" # Writing an error overrides any buffered headers, but obviously # doesn't want to write any headers if some have already gone. headers = [] if not self._headersWritten: headers.append("HTTP/1.0 %d Error" % code) headers.append("Connection: close") headers.append('Content-Type: text/html; charset="utf-8"') headers.append("") headers.append("") self.push("%s<html><body>%s</body></html>" % \ ('\r\n'.join(headers), message)) def write(self, content): """Reflected from `HTTPPlugin`s.""" # The methlet is writing, so write any buffered headers first. headers = [] if self._bufferedHeaders: headers = self._bufferedHeaders self._bufferedHeaders = None self._headersWritten = True # `write(None)` just flushes buffered headers. if content is None: content = '' self.push('\r\n'.join(headers) + str(content)) def writeUnauthorizedAccess(self, authenticationMode): """Access is protected by HTTP authentication.""" if authenticationMode == HTTPServer.BASIC_AUTHENTICATION: authString = self._getBasicAuthString() elif authenticationMode == HTTPServer.DIGEST_AUTHENTICATION: authString = self._getDigestAuthString() else: self.writeError(500, "Inconsistent authentication mode.") return headers = [] headers.append('HTTP/1.0 401 Unauthorized') headers.append('WWW-Authenticate: ' + authString) headers.append('Connection: close') headers.append('Content-Type: text/html; charset="utf-8"') headers.append('') headers.append('') self.write('\r\n'.join(headers) + self._server.getCancelMessage()) self.close_when_done() def _getDigestAuthString(self): """Builds the WWW-Authenticate header for Digest authentication.""" authString = 'Digest realm="' + self._server.getRealm() + '"' authString += ', nonce="' + self._getCurrentNonce() + '"' authString += ', opaque="0000000000000000"' authString += ', stale="false"' authString += ', algorithm="MD5"' authString += ', qop="auth"' return authString def _getBasicAuthString(self): """Builds the WWW-Authenticate header for Basic authentication.""" return 'Basic realm="' + self._server.getRealm() + '"' def _getCurrentNonce(self): """Returns the current nonce value. This value is a Base64 encoding of current time plus 20 minutes. This means the nonce will expire 20 minutes from now.""" timeString = time.asctime(time.localtime(time.time() + 20*60)) if RSTRIP_CHARS_AVAILABLE: return base64.encodestring(timeString).rstrip('\n=') else: # Python pre 2.2.2, so can't do a rstrip(chars). Do it # manually instead. def rstrip(s, chars): if not s: return s if s[-1] in chars: return rstrip(s[:-1]) return s return rstrip(base64.encodestring(timeString), '\n=') def _isValidNonce(self, nonce): """Check if the specified nonce is still valid. A nonce is invalid when its time converted value is lower than current time.""" padAmount = len(nonce) % 4 if padAmount > 0: padAmount = 4 - padAmount nonce += '=' * (len(nonce) + padAmount) decoded = base64.decodestring(nonce) return time.time() < time.mktime(time.strptime(decoded)) def _basicAuthentication(self, login): """Performs a Basic HTTP authentication. Returns True when the user has logged in successfully, False otherwise.""" userName, password = base64.decodestring(login).split(':') return self._server.isValidUser(userName, password) def _digestAuthentication(self, login, method): """Performs a Digest HTTP authentication. Returns True when the user has logged in successfully, False otherwise.""" def stripQuotes(s): return (s[0] == '"' and s[-1] == '"') and s[1:-1] or s options = dict(self._login_splitter.findall(login)) userName = stripQuotes(options["username"]) password = self._server.getPasswordForUser(userName) nonce = stripQuotes(options["nonce"]) # The following computations are based upon RFC 2617. A1 = "%s:%s:%s" % (userName, self._server.getRealm(), password) HA1 = md5.new(A1).hexdigest() A2 = "%s:%s" % (method, stripQuotes(options["uri"])) HA2 = md5.new(A2).hexdigest() unhashedDigest = "" if options.has_key("qop"): # IE 6.0 doesn't give nc back correctly? if not options["nc"]: options["nc"] = "00000001" # Firefox 1.0 doesn't give qop back correctly? if not options["qop"]: options["qop"] = "auth" unhashedDigest = "%s:%s:%s:%s:%s:%s" % \ (HA1, nonce, stripQuotes(options["nc"]), stripQuotes(options["cnonce"]), stripQuotes(options["qop"]), HA2) else: unhashedDigest = "%s:%s:%s" % (HA1, nonce, HA2) hashedDigest = md5.new(unhashedDigest).hexdigest() return (stripQuotes(options["response"]) == hashedDigest and self._isValidNonce(nonce))class HTTPPlugin: """Base class for HTTP server plugins. See the main documentation for details.""" def __init__(self): # self._handler is filled in by `HTTPHandler.found_terminator()`. pass def onIncomingConnection(self, clientSocket): """Implement this and return False to veto incoming connections.""" return True def writeOKHeaders(self, contentType, extraHeaders={}): """A methlet should call this with the Content-Type and optionally a dictionary of extra headers (eg. Expires) before calling `write()`.""" return self._handler.writeOKHeaders(contentType, extraHeaders) def writeError(self, code, message): """A methlet should call this instead of `writeOKHeaders()` / `write()` to report an HTTP error (eg. 403 Forbidden).""" return self._handler.writeError(code, message) def write(self, content): """A methlet should call this after `writeOKHeaders` to write the page's content.""" return self._handler.write(content) def flush(self): """A methlet can call this after calling `write`, to ensure that the content is written immediately to the browser. This isn't necessary most of the time, but if you're writing "Please wait..." before performing a long operation, calling `flush()` is a good idea.""" return self._handler.flush() def close(self, flush=True): """Closes the connection to the browser. You should call `close()` before calling `sys.exit()` in any 'shutdown' methlets you write.""" if flush: self.flush() return self._handler.close()def run(launchBrowser=False, context=_defaultContext): """Runs a `Dibbler` application. Servers listen for incoming connections and route requests through to plugins until a plugin calls `sys.exit()` or raises a `SystemExit` exception.""" if launchBrowser: try: url = "http://localhost:%d/" % context._HTTPPort webbrowser.open_new(url) except webbrowser.Error, e: print "\n%s.\nPlease point your web browser at %s." % (e, url) asyncore.loop(map=context._map)def runTestServer(readyEvent=None): """Runs the calendar server example, with an added `/shutdown` URL.""" import Dibbler, calendar class Calendar(Dibbler.HTTPPlugin): _form = '''<html><body><h3>Calendar Server</h3> <form action='/'> Year: <input type='text' name='year' size='4'> <input type='submit' value='Go'></form> <pre>%s</pre></body></html>''' def onHome(self, year=None): if year: result = calendar.calendar(int(year)) else: result = "" self.writeOKHeaders('text/html') self.write(self._form % result) def onShutdown(self): self.writeOKHeaders('text/html') self.write("<html><body><p>OK.</p></body></html>") self.close() sys.exit() httpServer = Dibbler.HTTPServer(8888) httpServer.register(Calendar()) if readyEvent: # Tell the self-test code that the test server is up and running. readyEvent.set() Dibbler.run(launchBrowser=True)def test(): """Run a self-test.""" # Run the calendar server in a separate thread. import threading, urllib testServerReady = threading.Event() threading.Thread(target=runTestServer, args=(testServerReady,)).start() testServerReady.wait() # Connect to the server and ask for a calendar. page = urllib.urlopen("http://localhost:8888/?year=2003").read() if page.find('January') != -1: print "Self test passed." else: print "Self-test failed!" # Wait for a key while the user plays with his browser. raw_input("Press any key to shut down the application server...") # Ask the server to shut down. page = urllib.urlopen("http://localhost:8888/shutdown").read() if page.find('OK') != -1: print "Shutdown OK." else: print "Shutdown failed!"if __name__ == '__main__': test()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -