urllib.py
来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,466 行 · 第 1/4 页
PY
1,466 行
"""Open an arbitrary URL.See the following document for more info on URLs:"Names and Addresses, URIs, URLs, URNs, URCs", athttp://www.w3.org/pub/WWW/Addressing/Overview.htmlSee also the HTTP spec (from which the error codes are derived):"HTTP - Hypertext Transfer Protocol", athttp://www.w3.org/pub/WWW/Protocols/Related standards and specs:- RFC1808: the "relative URL" spec. (authoritative status)- RFC1738 - the "URL standard". (authoritative status)- RFC1630 - the "URI spec". (informational status)The object returned by URLopener().open(file) will differ perprotocol. All you know is that is has methods read(), readline(),readlines(), fileno(), close() and info(). The read*(), fileno()and close() methods work like those of open files.The info() method returns a mimetools.Message object which can beused to query various info about the object, if available.(mimetools.Message objects are queried with the getheader() method.)"""import stringimport socketimport osimport statimport timeimport sysimport types__all__ = ["urlopen", "URLopener", "FancyURLopener", "urlretrieve", "urlcleanup", "quote", "quote_plus", "unquote", "unquote_plus", "urlencode", "url2pathname", "pathname2url", "splittag", "localhost", "thishost", "ftperrors", "basejoin", "unwrap", "splittype", "splithost", "splituser", "splitpasswd", "splitport", "splitnport", "splitquery", "splitattr", "splitvalue", "splitgophertype", "getproxies"]__version__ = '1.15' # XXX This version is not always updated :-(MAXFTPCACHE = 10 # Trim the ftp cache beyond this size# Helper for non-unix systemsif os.name == 'mac': from macurl2path import url2pathname, pathname2urlelif os.name == 'nt': from nturl2path import url2pathname, pathname2urlelif os.name == 'riscos': from rourl2path import url2pathname, pathname2urlelse: def url2pathname(pathname): return unquote(pathname) def pathname2url(pathname): return quote(pathname)# This really consists of two pieces:# (1) a class which handles opening of all sorts of URLs# (plus assorted utilities etc.)# (2) a set of functions for parsing URLs# XXX Should these be separated out into different modules?# Shortcut for basic usage_urlopener = Nonedef urlopen(url, data=None): """urlopen(url [, data]) -> open file-like object""" global _urlopener if not _urlopener: _urlopener = FancyURLopener() if data is None: return _urlopener.open(url) else: return _urlopener.open(url, data)def urlretrieve(url, filename=None, reporthook=None, data=None): global _urlopener if not _urlopener: _urlopener = FancyURLopener() return _urlopener.retrieve(url, filename, reporthook, data)def urlcleanup(): if _urlopener: _urlopener.cleanup()ftpcache = {}class URLopener: """Class to open URLs. This is a class rather than just a subroutine because we may need more than one set of global protocol-specific options. Note -- this is a base class for those who don't want the automatic handling of errors type 302 (relocated) and 401 (authorization needed).""" __tempfiles = None version = "Python-urllib/%s" % __version__ # Constructor def __init__(self, proxies=None, **x509): if proxies is None: proxies = getproxies() assert hasattr(proxies, 'has_key'), "proxies must be a mapping" self.proxies = proxies self.key_file = x509.get('key_file') self.cert_file = x509.get('cert_file') self.addheaders = [('User-agent', self.version)] self.__tempfiles = [] self.__unlink = os.unlink # See cleanup() self.tempcache = None # Undocumented feature: if you assign {} to tempcache, # it is used to cache files retrieved with # self.retrieve(). This is not enabled by default # since it does not work for changing documents (and I # haven't got the logic to check expiration headers # yet). self.ftpcache = ftpcache # Undocumented feature: you can use a different # ftp cache by assigning to the .ftpcache member; # in case you want logically independent URL openers # XXX This is not threadsafe. Bah. def __del__(self): self.close() def close(self): self.cleanup() def cleanup(self): # This code sometimes runs when the rest of this module # has already been deleted, so it can't use any globals # or import anything. if self.__tempfiles: for file in self.__tempfiles: try: self.__unlink(file) except OSError: pass del self.__tempfiles[:] if self.tempcache: self.tempcache.clear() def addheader(self, *args): """Add a header to be used by the HTTP interface only e.g. u.addheader('Accept', 'sound/basic')""" self.addheaders.append(args) # External interface def open(self, fullurl, data=None): """Use URLopener().open(file) instead of open(file, 'r').""" fullurl = unwrap(toBytes(fullurl)) if self.tempcache and self.tempcache.has_key(fullurl): filename, headers = self.tempcache[fullurl] fp = open(filename, 'rb') return addinfourl(fp, headers, fullurl) urltype, url = splittype(fullurl) if not urltype: urltype = 'file' if self.proxies.has_key(urltype): proxy = self.proxies[urltype] urltype, proxyhost = splittype(proxy) host, selector = splithost(proxyhost) url = (host, fullurl) # Signal special case to open_*() else: proxy = None name = 'open_' + urltype self.type = urltype if '-' in name: # replace - with _ name = '_'.join(name.split('-')) if not hasattr(self, name): if proxy: return self.open_unknown_proxy(proxy, fullurl, data) else: return self.open_unknown(fullurl, data) try: if data is None: return getattr(self, name)(url) else: return getattr(self, name)(url, data) except socket.error, msg: raise IOError, ('socket error', msg), sys.exc_info()[2] def open_unknown(self, fullurl, data=None): """Overridable interface to open unknown URL type.""" type, url = splittype(fullurl) raise IOError, ('url error', 'unknown url type', type) def open_unknown_proxy(self, proxy, fullurl, data=None): """Overridable interface to open unknown URL type.""" type, url = splittype(fullurl) raise IOError, ('url error', 'invalid proxy for %s' % type, proxy) # External interface def retrieve(self, url, filename=None, reporthook=None, data=None): """retrieve(url) returns (filename, headers) for a local object or (tempfilename, headers) for a remote object.""" url = unwrap(toBytes(url)) if self.tempcache and self.tempcache.has_key(url): return self.tempcache[url] type, url1 = splittype(url) if not filename and (not type or type == 'file'): try: fp = self.open_local_file(url1) hdrs = fp.info() del fp return url2pathname(splithost(url1)[1]), hdrs except IOError, msg: pass fp = self.open(url, data) headers = fp.info() if not filename: import tempfile garbage, path = splittype(url) garbage, path = splithost(path or "") path, garbage = splitquery(path or "") path, garbage = splitattr(path or "") suffix = os.path.splitext(path)[1] filename = tempfile.mktemp(suffix) self.__tempfiles.append(filename) result = filename, headers if self.tempcache is not None: self.tempcache[url] = result tfp = open(filename, 'wb') bs = 1024*8 size = -1 blocknum = 1 if reporthook: if headers.has_key("content-length"): size = int(headers["Content-Length"]) reporthook(0, bs, size) block = fp.read(bs) if reporthook: reporthook(1, bs, size) while block: tfp.write(block) block = fp.read(bs) blocknum = blocknum + 1 if reporthook: reporthook(blocknum, bs, size) fp.close() tfp.close() del fp del tfp return result # Each method named open_<type> knows how to open that type of URL def open_http(self, url, data=None): """Use HTTP protocol.""" import httplib user_passwd = None if type(url) is types.StringType: host, selector = splithost(url) if host: user_passwd, host = splituser(host) host = unquote(host) realhost = host else: host, selector = url urltype, rest = splittype(selector) url = rest user_passwd = None if urltype.lower() != 'http': realhost = None else: realhost, rest = splithost(rest) if realhost: user_passwd, realhost = splituser(realhost) if user_passwd: selector = "%s://%s%s" % (urltype, realhost, rest) if proxy_bypass(realhost): host = realhost #print "proxy via http:", host, selector if not host: raise IOError, ('http error', 'no host given') if user_passwd: import base64 auth = base64.encodestring(user_passwd).strip() else: auth = None h = httplib.HTTP(host) if data is not None: h.putrequest('POST', selector) h.putheader('Content-type', 'application/x-www-form-urlencoded') h.putheader('Content-length', '%d' % len(data)) else: h.putrequest('GET', selector) if auth: h.putheader('Authorization', 'Basic %s' % auth) if realhost: h.putheader('Host', realhost) for args in self.addheaders: apply(h.putheader, args) h.endheaders() if data is not None: h.send(data) errcode, errmsg, headers = h.getreply() fp = h.getfile() if errcode == 200: return addinfourl(fp, headers, "http:" + url) else: if data is None: return self.http_error(url, fp, errcode, errmsg, headers) else: return self.http_error(url, fp, errcode, errmsg, headers, data) def http_error(self, url, fp, errcode, errmsg, headers, data=None): """Handle http errors. Derived class can override this, or provide specific handlers named http_error_DDD where DDD is the 3-digit error code.""" # First check if there's a specific handler for this error name = 'http_error_%d' % errcode if hasattr(self, name): method = getattr(self, name) if data is None: result = method(url, fp, errcode, errmsg, headers) else: result = method(url, fp, errcode, errmsg, headers, data) if result: return result return self.http_error_default(url, fp, errcode, errmsg, headers) def http_error_default(self, url, fp, errcode, errmsg, headers): """Default error handler: close the connection and raise IOError.""" void = fp.read() fp.close() raise IOError, ('http error', errcode, errmsg, headers) if hasattr(socket, "ssl"): def open_https(self, url, data=None): """Use HTTPS protocol.""" import httplib user_passwd = None if type(url) is types.StringType: host, selector = splithost(url) if host: user_passwd, host = splituser(host) host = unquote(host) realhost = host else: host, selector = url urltype, rest = splittype(selector) url = rest user_passwd = None if urltype.lower() != 'https': realhost = None else: realhost, rest = splithost(rest) if realhost: user_passwd, realhost = splituser(realhost) if user_passwd: selector = "%s://%s%s" % (urltype, realhost, rest) #print "proxy via https:", host, selector if not host: raise IOError, ('https error', 'no host given') if user_passwd: import base64 auth = base64.encodestring(user_passwd).strip() else: auth = None h = httplib.HTTPS(host, 0, key_file=self.key_file, cert_file=self.cert_file) if data is not None: h.putrequest('POST', selector) h.putheader('Content-type', 'application/x-www-form-urlencoded') h.putheader('Content-length', '%d' % len(data)) else: h.putrequest('GET', selector) if auth: h.putheader('Authorization: Basic %s' % auth)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?