urllib.py

来自「mallet是自然语言处理、机器学习领域的一个开源项目。」· Python 代码 · 共 1,466 行 · 第 1/4 页

PY
1,466
字号
            if realhost: h.putheader('Host', realhost)            for args in self.addheaders: apply(h.putheader, args)            h.endheaders()            if data is not None:                h.send(data)            errcode, errmsg, headers = h.getreply()            fp = h.getfile()            if errcode == 200:                return addinfourl(fp, headers, "https:" + url)            else:                if data is None:                    return self.http_error(url, fp, errcode, errmsg, headers)                else:                    return self.http_error(url, fp, errcode, errmsg, headers,                                           data)    def open_gopher(self, url):        """Use Gopher protocol."""        import gopherlib        host, selector = splithost(url)        if not host: raise IOError, ('gopher error', 'no host given')        host = unquote(host)        type, selector = splitgophertype(selector)        selector, query = splitquery(selector)        selector = unquote(selector)        if query:            query = unquote(query)            fp = gopherlib.send_query(selector, query, host)        else:            fp = gopherlib.send_selector(selector, host)        return addinfourl(fp, noheaders(), "gopher:" + url)    def open_file(self, url):        """Use local file or FTP depending on form of URL."""        if url[:2] == '//' and url[2:3] != '/' and url[2:12].lower() != 'localhost/':            return self.open_ftp(url)        else:            return self.open_local_file(url)    def open_local_file(self, url):        """Use local file."""        import mimetypes, mimetools, rfc822, StringIO        host, file = splithost(url)        localname = url2pathname(file)        try:            stats = os.stat(localname)        except OSError, e:            raise IOError(e.errno, e.strerror, e.filename)        size = stats[stat.ST_SIZE]        modified = rfc822.formatdate(stats[stat.ST_MTIME])        mtype = mimetypes.guess_type(url)[0]        headers = mimetools.Message(StringIO.StringIO(            'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %            (mtype or 'text/plain', size, modified)))        if not host:            urlfile = file            if file[:1] == '/':                urlfile = 'file://' + file            return addinfourl(open(localname, 'rb'),                              headers, urlfile)        host, port = splitport(host)        if not port \           and socket.gethostbyname(host) in (localhost(), thishost()):            urlfile = file            if file[:1] == '/':                urlfile = 'file://' + file            return addinfourl(open(localname, 'rb'),                              headers, urlfile)        raise IOError, ('local file error', 'not on local host')    def open_ftp(self, url):        """Use FTP protocol."""        import mimetypes, mimetools, StringIO        host, path = splithost(url)        if not host: raise IOError, ('ftp error', 'no host given')        host, port = splitport(host)        user, host = splituser(host)        if user: user, passwd = splitpasswd(user)        else: passwd = None        host = unquote(host)        user = unquote(user or '')        passwd = unquote(passwd or '')        host = socket.gethostbyname(host)        if not port:            import ftplib            port = ftplib.FTP_PORT        else:            port = int(port)        path, attrs = splitattr(path)        path = unquote(path)        dirs = path.split('/')        dirs, file = dirs[:-1], dirs[-1]        if dirs and not dirs[0]: dirs = dirs[1:]        if dirs and not dirs[0]: dirs[0] = '/'        key = user, host, port, '/'.join(dirs)        # XXX thread unsafe!        if len(self.ftpcache) > MAXFTPCACHE:            # Prune the cache, rather arbitrarily            for k in self.ftpcache.keys():                if k != key:                    v = self.ftpcache[k]                    del self.ftpcache[k]                    v.close()        try:            if not self.ftpcache.has_key(key):                self.ftpcache[key] = \                    ftpwrapper(user, passwd, host, port, dirs)            if not file: type = 'D'            else: type = 'I'            for attr in attrs:                attr, value = splitvalue(attr)                if attr.lower() == 'type' and \                   value in ('a', 'A', 'i', 'I', 'd', 'D'):                    type = value.upper()            (fp, retrlen) = self.ftpcache[key].retrfile(file, type)            mtype = mimetypes.guess_type("ftp:" + url)[0]            headers = ""            if mtype:                headers += "Content-Type: %s\n" % mtype            if retrlen is not None and retrlen >= 0:                headers += "Content-Length: %d\n" % retrlen            headers = mimetools.Message(StringIO.StringIO(headers))            return addinfourl(fp, headers, "ftp:" + url)        except ftperrors(), msg:            raise IOError, ('ftp error', msg), sys.exc_info()[2]    def open_data(self, url, data=None):        """Use "data" URL."""        # ignore POSTed data        #        # syntax of data URLs:        # dataurl   := "data:" [ mediatype ] [ ";base64" ] "," data        # mediatype := [ type "/" subtype ] *( ";" parameter )        # data      := *urlchar        # parameter := attribute "=" value        import StringIO, mimetools, time        try:            [type, data] = url.split(',', 1)        except ValueError:            raise IOError, ('data error', 'bad data URL')        if not type:            type = 'text/plain;charset=US-ASCII'        semi = type.rfind(';')        if semi >= 0 and '=' not in type[semi:]:            encoding = type[semi+1:]            type = type[:semi]        else:            encoding = ''        msg = []        msg.append('Date: %s'%time.strftime('%a, %d %b %Y %T GMT',                                            time.gmtime(time.time())))        msg.append('Content-type: %s' % type)        if encoding == 'base64':            import base64            data = base64.decodestring(data)        else:            data = unquote(data)        msg.append('Content-length: %d' % len(data))        msg.append('')        msg.append(data)        msg = '\n'.join(msg)        f = StringIO.StringIO(msg)        headers = mimetools.Message(f, 0)        f.fileno = None     # needed for addinfourl        return addinfourl(f, headers, url)class FancyURLopener(URLopener):    """Derived class with handlers for errors we can handle (perhaps)."""    def __init__(self, *args):        apply(URLopener.__init__, (self,) + args)        self.auth_cache = {}        self.tries = 0        self.maxtries = 10    def http_error_default(self, url, fp, errcode, errmsg, headers):        """Default error handling -- don't raise an exception."""        return addinfourl(fp, headers, "http:" + url)    def http_error_302(self, url, fp, errcode, errmsg, headers, data=None):        """Error 302 -- relocated (temporarily)."""        self.tries += 1        if self.maxtries and self.tries >= self.maxtries:            if hasattr(self, "http_error_500"):                meth = self.http_error_500            else:                meth = self.http_error_default            self.tries = 0            return meth(url, fp, 500,                        "Internal Server Error: Redirect Recursion", headers)        result = self.redirect_internal(url, fp, errcode, errmsg, headers,                                        data)        self.tries = 0        return result    def redirect_internal(self, url, fp, errcode, errmsg, headers, data):        if headers.has_key('location'):            newurl = headers['location']        elif headers.has_key('uri'):            newurl = headers['uri']        else:            return        void = fp.read()        fp.close()        # In case the server sent a relative URL, join with original:        newurl = basejoin(self.type + ":" + url, newurl)        if data is None:            return self.open(newurl)        else:            return self.open(newurl, data)    def http_error_301(self, url, fp, errcode, errmsg, headers, data=None):        """Error 301 -- also relocated (permanently)."""        return self.http_error_302(url, fp, errcode, errmsg, headers, data)    def http_error_303(self, url, fp, errcode, errmsg, headers, data=None):        """Error 303 -- also relocated (essentially identical to 302)."""        return self.http_error_302(url, fp, errcode, errmsg, headers, data)    def http_error_401(self, url, fp, errcode, errmsg, headers, data=None):        """Error 401 -- authentication required.        See this URL for a description of the basic authentication scheme:        http://www.ics.uci.edu/pub/ietf/http/draft-ietf-http-v10-spec-00.txt"""        if not headers.has_key('www-authenticate'):            URLopener.http_error_default(self, url, fp,                                         errcode, errmsg, headers)        stuff = headers['www-authenticate']        import re        match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff)        if not match:            URLopener.http_error_default(self, url, fp,                                         errcode, errmsg, headers)        scheme, realm = match.groups()        if scheme.lower() != 'basic':            URLopener.http_error_default(self, url, fp,                                         errcode, errmsg, headers)        name = 'retry_' + self.type + '_basic_auth'        if data is None:            return getattr(self,name)(url, realm)        else:            return getattr(self,name)(url, realm, data)    def retry_http_basic_auth(self, url, realm, data=None):        host, selector = splithost(url)        i = host.find('@') + 1        host = host[i:]        user, passwd = self.get_user_passwd(host, realm, i)        if not (user or passwd): return None        host = quote(user, safe='') + ':' + quote(passwd, safe='') + '@' + host        newurl = 'http://' + host + selector        if data is None:            return self.open(newurl)        else:            return self.open(newurl, data)    def retry_https_basic_auth(self, url, realm, data=None):        host, selector = splithost(url)        i = host.find('@') + 1        host = host[i:]        user, passwd = self.get_user_passwd(host, realm, i)        if not (user or passwd): return None        host = quote(user, safe='') + ':' + quote(passwd, safe='') + '@' + host        newurl = '//' + host + selector        return self.open_https(newurl, data)    def get_user_passwd(self, host, realm, clear_cache = 0):        key = realm + '@' + host.lower()        if self.auth_cache.has_key(key):            if clear_cache:                del self.auth_cache[key]            else:                return self.auth_cache[key]        user, passwd = self.prompt_user_passwd(host, realm)        if user or passwd: self.auth_cache[key] = (user, passwd)        return user, passwd    def prompt_user_passwd(self, host, realm):        """Override this in a GUI environment!"""        import getpass        try:            user = raw_input("Enter username for %s at %s: " % (realm,                                                                host))            passwd = getpass.getpass("Enter password for %s in %s at %s: " %                (user, realm, host))            return user, passwd        except KeyboardInterrupt:            print            return None, None# Utility functions_localhost = Nonedef localhost():    """Return the IP address of the magic hostname 'localhost'."""    global _localhost    if not _localhost:        _localhost = socket.gethostbyname('localhost')    return _localhost_thishost = Nonedef thishost():    """Return the IP address of the current host."""    global _thishost    if not _thishost:        _thishost = socket.gethostbyname(socket.gethostname())    return _thishost_ftperrors = Nonedef ftperrors():    """Return the set of errors raised by the FTP class."""    global _ftperrors    if not _ftperrors:        import ftplib        _ftperrors = ftplib.all_errors    return _ftperrors_noheaders = Nonedef noheaders():    """Return an empty mimetools.Message object."""    global _noheaders    if not _noheaders:        import mimetools        import StringIO        _noheaders = mimetools.Message(StringIO.StringIO(), 0)        _noheaders.fp.close()   # Recycle file descriptor    return _noheaders# Utility classesclass ftpwrapper:    """Class used by open_ftp() for cache of open FTP connections."""    def __init__(self, user, passwd, host, port, dirs):        self.user = user        self.passwd = passwd        self.host = host        self.port = port        self.dirs = dirs        self.init()    def init(self):        import ftplib        self.busy = 0        self.ftp = ftplib.FTP()        self.ftp.connect(self.host, self.port)        self.ftp.login(self.user, self.passwd)        for dir in self.dirs:            self.ftp.cwd(dir)    def retrfile(self, file, type):        import ftplib        self.endtransfer()        if type in ('d', 'D'): cmd = 'TYPE A'; isdir = 1        else: cmd = 'TYPE ' + type; isdir = 0        try:            self.ftp.voidcmd(cmd)        except ftplib.all_errors:            self.init()            self.ftp.voidcmd(cmd)        conn = None        if file and not isdir:            # Use nlst to see if the file exists at all            try:                self.ftp.nlst(file)

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?