urllib.py

来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,466 行 · 第 1/4 页

PY
1,466
字号
"""Open an arbitrary URL.See the following document for more info on URLs:"Names and Addresses, URIs, URLs, URNs, URCs", athttp://www.w3.org/pub/WWW/Addressing/Overview.htmlSee also the HTTP spec (from which the error codes are derived):"HTTP - Hypertext Transfer Protocol", athttp://www.w3.org/pub/WWW/Protocols/Related standards and specs:- RFC1808: the "relative URL" spec. (authoritative status)- RFC1738 - the "URL standard". (authoritative status)- RFC1630 - the "URI spec". (informational status)The object returned by URLopener().open(file) will differ perprotocol.  All you know is that is has methods read(), readline(),readlines(), fileno(), close() and info().  The read*(), fileno()and close() methods work like those of open files.The info() method returns a mimetools.Message object which can beused to query various info about the object, if available.(mimetools.Message objects are queried with the getheader() method.)"""import stringimport socketimport osimport statimport timeimport sysimport types__all__ = ["urlopen", "URLopener", "FancyURLopener", "urlretrieve",           "urlcleanup", "quote", "quote_plus", "unquote", "unquote_plus",           "urlencode", "url2pathname", "pathname2url", "splittag",           "localhost", "thishost", "ftperrors", "basejoin", "unwrap",           "splittype", "splithost", "splituser", "splitpasswd", "splitport",           "splitnport", "splitquery", "splitattr", "splitvalue",           "splitgophertype", "getproxies"]__version__ = '1.15'    # XXX This version is not always updated :-(MAXFTPCACHE = 10        # Trim the ftp cache beyond this size# Helper for non-unix systemsif os.name == 'mac':    from macurl2path import url2pathname, pathname2urlelif os.name == 'nt':    from nturl2path import url2pathname, pathname2urlelif os.name == 'riscos':    from rourl2path import url2pathname, pathname2urlelse:    def url2pathname(pathname):        return unquote(pathname)    def pathname2url(pathname):        return quote(pathname)# This really consists of two pieces:# (1) a class which handles opening of all sorts of URLs#     (plus assorted utilities etc.)# (2) a set of functions for parsing URLs# XXX Should these be separated out into different modules?# Shortcut for basic usage_urlopener = Nonedef urlopen(url, data=None):    """urlopen(url [, data]) -> open file-like object"""    global _urlopener    if not _urlopener:        _urlopener = FancyURLopener()    if data is None:        return _urlopener.open(url)    else:        return _urlopener.open(url, data)def urlretrieve(url, filename=None, reporthook=None, data=None):    global _urlopener    if not _urlopener:        _urlopener = FancyURLopener()    return _urlopener.retrieve(url, filename, reporthook, data)def urlcleanup():    if _urlopener:        _urlopener.cleanup()ftpcache = {}class URLopener:    """Class to open URLs.    This is a class rather than just a subroutine because we may need    more than one set of global protocol-specific options.    Note -- this is a base class for those who don't want the    automatic handling of errors type 302 (relocated) and 401    (authorization needed)."""    __tempfiles = None    version = "Python-urllib/%s" % __version__    # Constructor    def __init__(self, proxies=None, **x509):        if proxies is None:            proxies = getproxies()        assert hasattr(proxies, 'has_key'), "proxies must be a mapping"        self.proxies = proxies        self.key_file = x509.get('key_file')        self.cert_file = x509.get('cert_file')        self.addheaders = [('User-agent', self.version)]        self.__tempfiles = []        self.__unlink = os.unlink # See cleanup()        self.tempcache = None        # Undocumented feature: if you assign {} to tempcache,        # it is used to cache files retrieved with        # self.retrieve().  This is not enabled by default        # since it does not work for changing documents (and I        # haven't got the logic to check expiration headers        # yet).        self.ftpcache = ftpcache        # Undocumented feature: you can use a different        # ftp cache by assigning to the .ftpcache member;        # in case you want logically independent URL openers        # XXX This is not threadsafe.  Bah.    def __del__(self):        self.close()    def close(self):        self.cleanup()    def cleanup(self):        # This code sometimes runs when the rest of this module        # has already been deleted, so it can't use any globals        # or import anything.        if self.__tempfiles:            for file in self.__tempfiles:                try:                    self.__unlink(file)                except OSError:                    pass            del self.__tempfiles[:]        if self.tempcache:            self.tempcache.clear()    def addheader(self, *args):        """Add a header to be used by the HTTP interface only        e.g. u.addheader('Accept', 'sound/basic')"""        self.addheaders.append(args)    # External interface    def open(self, fullurl, data=None):        """Use URLopener().open(file) instead of open(file, 'r')."""        fullurl = unwrap(toBytes(fullurl))        if self.tempcache and self.tempcache.has_key(fullurl):            filename, headers = self.tempcache[fullurl]            fp = open(filename, 'rb')            return addinfourl(fp, headers, fullurl)        urltype, url = splittype(fullurl)        if not urltype:            urltype = 'file'        if self.proxies.has_key(urltype):            proxy = self.proxies[urltype]            urltype, proxyhost = splittype(proxy)            host, selector = splithost(proxyhost)            url = (host, fullurl) # Signal special case to open_*()        else:            proxy = None        name = 'open_' + urltype        self.type = urltype        if '-' in name:            # replace - with _            name = '_'.join(name.split('-'))        if not hasattr(self, name):            if proxy:                return self.open_unknown_proxy(proxy, fullurl, data)            else:                return self.open_unknown(fullurl, data)        try:            if data is None:                return getattr(self, name)(url)            else:                return getattr(self, name)(url, data)        except socket.error, msg:            raise IOError, ('socket error', msg), sys.exc_info()[2]    def open_unknown(self, fullurl, data=None):        """Overridable interface to open unknown URL type."""        type, url = splittype(fullurl)        raise IOError, ('url error', 'unknown url type', type)    def open_unknown_proxy(self, proxy, fullurl, data=None):        """Overridable interface to open unknown URL type."""        type, url = splittype(fullurl)        raise IOError, ('url error', 'invalid proxy for %s' % type, proxy)    # External interface    def retrieve(self, url, filename=None, reporthook=None, data=None):        """retrieve(url) returns (filename, headers) for a local object        or (tempfilename, headers) for a remote object."""        url = unwrap(toBytes(url))        if self.tempcache and self.tempcache.has_key(url):            return self.tempcache[url]        type, url1 = splittype(url)        if not filename and (not type or type == 'file'):            try:                fp = self.open_local_file(url1)                hdrs = fp.info()                del fp                return url2pathname(splithost(url1)[1]), hdrs            except IOError, msg:                pass        fp = self.open(url, data)        headers = fp.info()        if not filename:            import tempfile            garbage, path = splittype(url)            garbage, path = splithost(path or "")            path, garbage = splitquery(path or "")            path, garbage = splitattr(path or "")            suffix = os.path.splitext(path)[1]            filename = tempfile.mktemp(suffix)            self.__tempfiles.append(filename)        result = filename, headers        if self.tempcache is not None:            self.tempcache[url] = result        tfp = open(filename, 'wb')        bs = 1024*8        size = -1        blocknum = 1        if reporthook:            if headers.has_key("content-length"):                size = int(headers["Content-Length"])            reporthook(0, bs, size)        block = fp.read(bs)        if reporthook:            reporthook(1, bs, size)        while block:            tfp.write(block)            block = fp.read(bs)            blocknum = blocknum + 1            if reporthook:                reporthook(blocknum, bs, size)        fp.close()        tfp.close()        del fp        del tfp        return result    # Each method named open_<type> knows how to open that type of URL    def open_http(self, url, data=None):        """Use HTTP protocol."""        import httplib        user_passwd = None        if type(url) is types.StringType:            host, selector = splithost(url)            if host:                user_passwd, host = splituser(host)                host = unquote(host)            realhost = host        else:            host, selector = url            urltype, rest = splittype(selector)            url = rest            user_passwd = None            if urltype.lower() != 'http':                realhost = None            else:                realhost, rest = splithost(rest)                if realhost:                    user_passwd, realhost = splituser(realhost)                if user_passwd:                    selector = "%s://%s%s" % (urltype, realhost, rest)                if proxy_bypass(realhost):                    host = realhost            #print "proxy via http:", host, selector        if not host: raise IOError, ('http error', 'no host given')        if user_passwd:            import base64            auth = base64.encodestring(user_passwd).strip()        else:            auth = None        h = httplib.HTTP(host)        if data is not None:            h.putrequest('POST', selector)            h.putheader('Content-type', 'application/x-www-form-urlencoded')            h.putheader('Content-length', '%d' % len(data))        else:            h.putrequest('GET', selector)        if auth: h.putheader('Authorization', 'Basic %s' % auth)        if realhost: h.putheader('Host', realhost)        for args in self.addheaders: apply(h.putheader, args)        h.endheaders()        if data is not None:            h.send(data)        errcode, errmsg, headers = h.getreply()        fp = h.getfile()        if errcode == 200:            return addinfourl(fp, headers, "http:" + url)        else:            if data is None:                return self.http_error(url, fp, errcode, errmsg, headers)            else:                return self.http_error(url, fp, errcode, errmsg, headers, data)    def http_error(self, url, fp, errcode, errmsg, headers, data=None):        """Handle http errors.        Derived class can override this, or provide specific handlers        named http_error_DDD where DDD is the 3-digit error code."""        # First check if there's a specific handler for this error        name = 'http_error_%d' % errcode        if hasattr(self, name):            method = getattr(self, name)            if data is None:                result = method(url, fp, errcode, errmsg, headers)            else:                result = method(url, fp, errcode, errmsg, headers, data)            if result: return result        return self.http_error_default(url, fp, errcode, errmsg, headers)    def http_error_default(self, url, fp, errcode, errmsg, headers):        """Default error handler: close the connection and raise IOError."""        void = fp.read()        fp.close()        raise IOError, ('http error', errcode, errmsg, headers)    if hasattr(socket, "ssl"):        def open_https(self, url, data=None):            """Use HTTPS protocol."""            import httplib            user_passwd = None            if type(url) is types.StringType:                host, selector = splithost(url)                if host:                    user_passwd, host = splituser(host)                    host = unquote(host)                realhost = host            else:                host, selector = url                urltype, rest = splittype(selector)                url = rest                user_passwd = None                if urltype.lower() != 'https':                    realhost = None                else:                    realhost, rest = splithost(rest)                    if realhost:                        user_passwd, realhost = splituser(realhost)                    if user_passwd:                        selector = "%s://%s%s" % (urltype, realhost, rest)                #print "proxy via https:", host, selector            if not host: raise IOError, ('https error', 'no host given')            if user_passwd:                import base64                auth = base64.encodestring(user_passwd).strip()            else:                auth = None            h = httplib.HTTPS(host, 0,                              key_file=self.key_file,                              cert_file=self.cert_file)            if data is not None:                h.putrequest('POST', selector)                h.putheader('Content-type',                            'application/x-www-form-urlencoded')                h.putheader('Content-length', '%d' % len(data))            else:                h.putrequest('GET', selector)            if auth: h.putheader('Authorization: Basic %s' % auth)

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?