📄 urllib2.py
字号:
self.http_error_auth_reqed('proxy-authenticate', host, req, headers)def encode_digest(digest): hexrep = [] for c in digest: n = (ord(c) >> 4) & 0xf hexrep.append(hex(n)[-1]) n = ord(c) & 0xf hexrep.append(hex(n)[-1]) return ''.join(hexrep)class AbstractHTTPHandler(BaseHandler): def do_open(self, http_class, req): host = req.get_host() if not host: raise URLError('no host given') h = http_class(host) # will parse host:port if req.has_data(): data = req.get_data() h.putrequest('POST', req.get_selector()) if not req.headers.has_key('Content-type'): h.putheader('Content-type', 'application/x-www-form-urlencoded') if not req.headers.has_key('Content-length'): h.putheader('Content-length', '%d' % len(data)) else: h.putrequest('GET', req.get_selector()) scheme, sel = splittype(req.get_selector()) sel_host, sel_path = splithost(sel) h.putheader('Host', sel_host or host) for args in self.parent.addheaders: h.putheader(*args) for k, v in req.headers.items(): h.putheader(k, v) # httplib will attempt to connect() here. be prepared # to convert a socket error to a URLError. try: h.endheaders() except socket.error, err: raise URLError(err) if req.has_data(): h.send(data) code, msg, hdrs = h.getreply() fp = h.getfile() if code == 200: return addinfourl(fp, hdrs, req.get_full_url()) else: return self.parent.error('http', req, fp, code, msg, hdrs)class HTTPHandler(AbstractHTTPHandler): def http_open(self, req): return self.do_open(httplib.HTTP, req)if hasattr(httplib, 'HTTPS'): class HTTPSHandler(AbstractHTTPHandler): def https_open(self, req): return self.do_open(httplib.HTTPS, req)class UnknownHandler(BaseHandler): def unknown_open(self, req): type = req.get_type() raise URLError('unknown url type: %s' % type)def parse_keqv_list(l): """Parse list of key=value strings where keys are not duplicated.""" parsed = {} for elt in l: k, v = elt.split('=', 1) if v[0] == '"' and v[-1] == '"': v = v[1:-1] parsed[k] = v return parseddef parse_http_list(s): """Parse lists as described by RFC 2068 Section 2. In particular, parse comman-separated lists where the elements of the list may include quoted-strings. A quoted-string could contain a comma. """ # XXX this function could probably use more testing list = [] end = len(s) i = 0 inquote = 0 start = 0 while i < end: cur = s[i:] c = cur.find(',') q = cur.find('"') if c == -1: list.append(s[start:]) break if q == -1: if inquote: raise ValueError, "unbalanced quotes" else: list.append(s[start:i+c]) i = i + c + 1 continue if inquote: if q < c: list.append(s[start:i+c]) i = i + c + 1 start = i inquote = 0 else: i = i + q else: if c < q: list.append(s[start:i+c]) i = i + c + 1 start = i else: inquote = 1 i = i + q + 1 return map(lambda x: x.strip(), list)class FileHandler(BaseHandler): # Use local file or FTP depending on form of URL def file_open(self, req): url = req.get_selector() if url[:2] == '//' and url[2:3] != '/': req.type = 'ftp' return self.parent.open(req) else: return self.open_local_file(req) # names for the localhost names = None def get_names(self): if FileHandler.names is None: FileHandler.names = (socket.gethostbyname('localhost'), socket.gethostbyname(socket.gethostname())) return FileHandler.names # not entirely sure what the rules are here def open_local_file(self, req): host = req.get_host() file = req.get_selector() localfile = url2pathname(file) stats = os.stat(localfile) size = stats[stat.ST_SIZE] modified = rfc822.formatdate(stats[stat.ST_MTIME]) mtype = mimetypes.guess_type(file)[0] stats = os.stat(localfile) headers = mimetools.Message(StringIO( 'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' % (mtype or 'text/plain', size, modified))) if host: host, port = splitport(host) if not host or \ (not port and socket.gethostbyname(host) in self.get_names()): return addinfourl(open(localfile, 'rb'), headers, 'file:'+file) raise URLError('file not on local host')class FTPHandler(BaseHandler): def ftp_open(self, req): host = req.get_host() if not host: raise IOError, ('ftp error', 'no host given') # XXX handle custom username & password try: host = socket.gethostbyname(host) except socket.error, msg: raise URLError(msg) host, port = splitport(host) if port is None: port = ftplib.FTP_PORT path, attrs = splitattr(req.get_selector()) path = unquote(path) dirs = path.split('/') dirs, file = dirs[:-1], dirs[-1] if dirs and not dirs[0]: dirs = dirs[1:] user = passwd = '' # XXX try: fw = self.connect_ftp(user, passwd, host, port, dirs) type = file and 'I' or 'D' for attr in attrs: attr, value = splitattr(attr) if attr.lower() == 'type' and \ value in ('a', 'A', 'i', 'I', 'd', 'D'): type = value.upper() fp, retrlen = fw.retrfile(file, type) headers = "" mtype = mimetypes.guess_type(req.get_full_url())[0] if mtype: headers += "Content-Type: %s\n" % mtype if retrlen is not None and retrlen >= 0: headers += "Content-Length: %d\n" % retrlen sf = StringIO(headers) headers = mimetools.Message(sf) return addinfourl(fp, headers, req.get_full_url()) except ftplib.all_errors, msg: raise IOError, ('ftp error', msg), sys.exc_info()[2] def connect_ftp(self, user, passwd, host, port, dirs): fw = ftpwrapper(user, passwd, host, port, dirs)## fw.ftp.set_debuglevel(1) return fwclass CacheFTPHandler(FTPHandler): # XXX would be nice to have pluggable cache strategies # XXX this stuff is definitely not thread safe def __init__(self): self.cache = {} self.timeout = {} self.soonest = 0 self.delay = 60 self.max_conns = 16 def setTimeout(self, t): self.delay = t def setMaxConns(self, m): self.max_conns = m def connect_ftp(self, user, passwd, host, port, dirs): key = user, passwd, host, port if self.cache.has_key(key): self.timeout[key] = time.time() + self.delay else: self.cache[key] = ftpwrapper(user, passwd, host, port, dirs) self.timeout[key] = time.time() + self.delay self.check_cache() return self.cache[key] def check_cache(self): # first check for old ones t = time.time() if self.soonest <= t: for k, v in self.timeout.items(): if v < t: self.cache[k].close() del self.cache[k] del self.timeout[k] self.soonest = min(self.timeout.values()) # then check the size if len(self.cache) == self.max_conns: for k, v in self.timeout.items(): if v == self.soonest: del self.cache[k] del self.timeout[k] break self.soonest = min(self.timeout.values())class GopherHandler(BaseHandler): def gopher_open(self, req): host = req.get_host() if not host: raise GopherError('no host given') host = unquote(host) selector = req.get_selector() type, selector = splitgophertype(selector) selector, query = splitquery(selector) selector = unquote(selector) if query: query = unquote(query) fp = gopherlib.send_query(selector, query, host) else: fp = gopherlib.send_selector(selector, host) return addinfourl(fp, noheaders(), req.get_full_url())#bleck! don't use this yetclass OpenerFactory: default_handlers = [UnknownHandler, HTTPHandler, HTTPDefaultErrorHandler, HTTPRedirectHandler, FTPHandler, FileHandler] proxy_handlers = [ProxyHandler] handlers = [] replacement_handlers = [] def add_proxy_handler(self, ph): self.proxy_handlers = self.proxy_handlers + [ph] def add_handler(self, h): self.handlers = self.handlers + [h] def replace_handler(self, h): pass def build_opener(self): opener = OpenerDirector() for ph in self.proxy_handlers: if inspect.isclass(ph): ph = ph() opener.add_handler(ph)if __name__ == "__main__": # XXX some of the test code depends on machine configurations that # are internal to CNRI. Need to set up a public server with the # right authentication configuration for test purposes. if socket.gethostname() == 'bitdiddle': localhost = 'bitdiddle.cnri.reston.va.us' elif socket.gethostname() == 'bitdiddle.concentric.net': localhost = 'localhost' else: localhost = None urls = [ # Thanks to Fred for finding these! 'gopher://gopher.lib.ncsu.edu/11/library/stacks/Alex', 'gopher://gopher.vt.edu:10010/10/33', 'file:/etc/passwd', 'file://nonsensename/etc/passwd', 'ftp://www.python.org/pub/python/misc/sousa.au', 'ftp://www.python.org/pub/tmp/blat', 'http://www.espn.com/', # redirect 'http://www.python.org/Spanish/Inquistion/', ('http://www.python.org/cgi-bin/faqw.py', 'query=pythonistas&querytype=simple&casefold=yes&req=search'), 'http://www.python.org/', 'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC/research-reports/00README-Legal-Rules-Regs', ]## if localhost is not None:## urls = urls + [## 'file://%s/etc/passwd' % localhost,## 'http://%s/simple/' % localhost,## 'http://%s/digest/' % localhost,## 'http://%s/not/found.h' % localhost,## ]## bauth = HTTPBasicAuthHandler()## bauth.add_password('basic_test_realm', localhost, 'jhylton',## 'password')## dauth = HTTPDigestAuthHandler()## dauth.add_password('digest_test_realm', localhost, 'jhylton',## 'password') cfh = CacheFTPHandler() cfh.setTimeout(1)## # XXX try out some custom proxy objects too!## def at_cnri(req):## host = req.get_host()## print host## if host[-18:] == '.cnri.reston.va.us':## return 1## p = CustomProxy('http', at_cnri, 'proxy.cnri.reston.va.us')## ph = CustomProxyHandler(p)## install_opener(build_opener(dauth, bauth, cfh, GopherHandler, ph)) install_opener(build_opener(cfh, GopherHandler)) for url in urls: if isinstance(url, types.TupleType): url, req = url else: req = None print url try: f = urlopen(url, req) except IOError, err: print "IOError:", err except socket.error, err: print "socket.error:", err else: buf = f.read() f.close() print "read %d bytes" % len(buf) print time.sleep(0.1)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -