⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 urllib2.py

📁 mallet是自然语言处理、机器学习领域的一个开源项目。
💻 PY
📖 第 1 页 / 共 3 页
字号:
        self.http_error_auth_reqed('proxy-authenticate', host, req, headers)def encode_digest(digest):    hexrep = []    for c in digest:        n = (ord(c) >> 4) & 0xf        hexrep.append(hex(n)[-1])        n = ord(c) & 0xf        hexrep.append(hex(n)[-1])    return ''.join(hexrep)class AbstractHTTPHandler(BaseHandler):    def do_open(self, http_class, req):        host = req.get_host()        if not host:            raise URLError('no host given')        h = http_class(host) # will parse host:port        if req.has_data():            data = req.get_data()            h.putrequest('POST', req.get_selector())            if not req.headers.has_key('Content-type'):                h.putheader('Content-type',                            'application/x-www-form-urlencoded')            if not req.headers.has_key('Content-length'):                h.putheader('Content-length', '%d' % len(data))        else:            h.putrequest('GET', req.get_selector())        scheme, sel = splittype(req.get_selector())        sel_host, sel_path = splithost(sel)        h.putheader('Host', sel_host or host)        for args in self.parent.addheaders:            h.putheader(*args)        for k, v in req.headers.items():            h.putheader(k, v)        # httplib will attempt to connect() here.  be prepared        # to convert a socket error to a URLError.        try:            h.endheaders()        except socket.error, err:            raise URLError(err)        if req.has_data():            h.send(data)        code, msg, hdrs = h.getreply()        fp = h.getfile()        if code == 200:            return addinfourl(fp, hdrs, req.get_full_url())        else:            return self.parent.error('http', req, fp, code, msg, hdrs)class HTTPHandler(AbstractHTTPHandler):    def http_open(self, req):        return self.do_open(httplib.HTTP, req)if hasattr(httplib, 'HTTPS'):    class HTTPSHandler(AbstractHTTPHandler):        def https_open(self, req):            return self.do_open(httplib.HTTPS, req)class UnknownHandler(BaseHandler):    def unknown_open(self, req):        type = req.get_type()        raise URLError('unknown url type: %s' % type)def parse_keqv_list(l):    """Parse list of key=value strings where keys are not duplicated."""    parsed = {}    for elt in l:        k, v = elt.split('=', 1)        if v[0] == '"' and v[-1] == '"':            v = v[1:-1]        parsed[k] = v    return parseddef parse_http_list(s):    """Parse lists as described by RFC 2068 Section 2.    In particular, parse comman-separated lists where the elements of    the list may include quoted-strings.  A quoted-string could    contain a comma.    """    # XXX this function could probably use more testing    list = []    end = len(s)    i = 0    inquote = 0    start = 0    while i < end:        cur = s[i:]        c = cur.find(',')        q = cur.find('"')        if c == -1:            list.append(s[start:])            break        if q == -1:            if inquote:                raise ValueError, "unbalanced quotes"            else:                list.append(s[start:i+c])                i = i + c + 1                continue        if inquote:            if q < c:                list.append(s[start:i+c])                i = i + c + 1                start = i                inquote = 0            else:                i = i + q        else:            if c < q:                list.append(s[start:i+c])                i = i + c + 1                start = i            else:                inquote = 1                i = i + q + 1    return map(lambda x: x.strip(), list)class FileHandler(BaseHandler):    # Use local file or FTP depending on form of URL    def file_open(self, req):        url = req.get_selector()        if url[:2] == '//' and url[2:3] != '/':            req.type = 'ftp'            return self.parent.open(req)        else:            return self.open_local_file(req)    # names for the localhost    names = None    def get_names(self):        if FileHandler.names is None:            FileHandler.names = (socket.gethostbyname('localhost'),                                 socket.gethostbyname(socket.gethostname()))        return FileHandler.names    # not entirely sure what the rules are here    def open_local_file(self, req):        host = req.get_host()        file = req.get_selector()        localfile = url2pathname(file)        stats = os.stat(localfile)        size = stats[stat.ST_SIZE]        modified = rfc822.formatdate(stats[stat.ST_MTIME])        mtype = mimetypes.guess_type(file)[0]        stats = os.stat(localfile)        headers = mimetools.Message(StringIO(            'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %            (mtype or 'text/plain', size, modified)))        if host:            host, port = splitport(host)        if not host or \           (not port and socket.gethostbyname(host) in self.get_names()):            return addinfourl(open(localfile, 'rb'),                              headers, 'file:'+file)        raise URLError('file not on local host')class FTPHandler(BaseHandler):    def ftp_open(self, req):        host = req.get_host()        if not host:            raise IOError, ('ftp error', 'no host given')        # XXX handle custom username & password        try:            host = socket.gethostbyname(host)        except socket.error, msg:            raise URLError(msg)        host, port = splitport(host)        if port is None:            port = ftplib.FTP_PORT        path, attrs = splitattr(req.get_selector())        path = unquote(path)        dirs = path.split('/')        dirs, file = dirs[:-1], dirs[-1]        if dirs and not dirs[0]:            dirs = dirs[1:]        user = passwd = '' # XXX        try:            fw = self.connect_ftp(user, passwd, host, port, dirs)            type = file and 'I' or 'D'            for attr in attrs:                attr, value = splitattr(attr)                if attr.lower() == 'type' and \                   value in ('a', 'A', 'i', 'I', 'd', 'D'):                    type = value.upper()            fp, retrlen = fw.retrfile(file, type)            headers = ""            mtype = mimetypes.guess_type(req.get_full_url())[0]            if mtype:                headers += "Content-Type: %s\n" % mtype            if retrlen is not None and retrlen >= 0:                headers += "Content-Length: %d\n" % retrlen            sf = StringIO(headers)            headers = mimetools.Message(sf)            return addinfourl(fp, headers, req.get_full_url())        except ftplib.all_errors, msg:            raise IOError, ('ftp error', msg), sys.exc_info()[2]    def connect_ftp(self, user, passwd, host, port, dirs):        fw = ftpwrapper(user, passwd, host, port, dirs)##        fw.ftp.set_debuglevel(1)        return fwclass CacheFTPHandler(FTPHandler):    # XXX would be nice to have pluggable cache strategies    # XXX this stuff is definitely not thread safe    def __init__(self):        self.cache = {}        self.timeout = {}        self.soonest = 0        self.delay = 60        self.max_conns = 16    def setTimeout(self, t):        self.delay = t    def setMaxConns(self, m):        self.max_conns = m    def connect_ftp(self, user, passwd, host, port, dirs):        key = user, passwd, host, port        if self.cache.has_key(key):            self.timeout[key] = time.time() + self.delay        else:            self.cache[key] = ftpwrapper(user, passwd, host, port, dirs)            self.timeout[key] = time.time() + self.delay        self.check_cache()        return self.cache[key]    def check_cache(self):        # first check for old ones        t = time.time()        if self.soonest <= t:            for k, v in self.timeout.items():                if v < t:                    self.cache[k].close()                    del self.cache[k]                    del self.timeout[k]        self.soonest = min(self.timeout.values())        # then check the size        if len(self.cache) == self.max_conns:            for k, v in self.timeout.items():                if v == self.soonest:                    del self.cache[k]                    del self.timeout[k]                    break            self.soonest = min(self.timeout.values())class GopherHandler(BaseHandler):    def gopher_open(self, req):        host = req.get_host()        if not host:            raise GopherError('no host given')        host = unquote(host)        selector = req.get_selector()        type, selector = splitgophertype(selector)        selector, query = splitquery(selector)        selector = unquote(selector)        if query:            query = unquote(query)            fp = gopherlib.send_query(selector, query, host)        else:            fp = gopherlib.send_selector(selector, host)        return addinfourl(fp, noheaders(), req.get_full_url())#bleck! don't use this yetclass OpenerFactory:    default_handlers = [UnknownHandler, HTTPHandler,                        HTTPDefaultErrorHandler, HTTPRedirectHandler,                        FTPHandler, FileHandler]    proxy_handlers = [ProxyHandler]    handlers = []    replacement_handlers = []    def add_proxy_handler(self, ph):        self.proxy_handlers = self.proxy_handlers + [ph]    def add_handler(self, h):        self.handlers = self.handlers + [h]    def replace_handler(self, h):        pass    def build_opener(self):        opener = OpenerDirector()        for ph in self.proxy_handlers:            if inspect.isclass(ph):                ph = ph()            opener.add_handler(ph)if __name__ == "__main__":    # XXX some of the test code depends on machine configurations that    # are internal to CNRI.   Need to set up a public server with the    # right authentication configuration for test purposes.    if socket.gethostname() == 'bitdiddle':        localhost = 'bitdiddle.cnri.reston.va.us'    elif socket.gethostname() == 'bitdiddle.concentric.net':        localhost = 'localhost'    else:        localhost = None    urls = [        # Thanks to Fred for finding these!        'gopher://gopher.lib.ncsu.edu/11/library/stacks/Alex',        'gopher://gopher.vt.edu:10010/10/33',        'file:/etc/passwd',        'file://nonsensename/etc/passwd',        'ftp://www.python.org/pub/python/misc/sousa.au',        'ftp://www.python.org/pub/tmp/blat',        'http://www.espn.com/', # redirect        'http://www.python.org/Spanish/Inquistion/',        ('http://www.python.org/cgi-bin/faqw.py',         'query=pythonistas&querytype=simple&casefold=yes&req=search'),        'http://www.python.org/',        'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC/research-reports/00README-Legal-Rules-Regs',            ]##    if localhost is not None:##        urls = urls + [##            'file://%s/etc/passwd' % localhost,##            'http://%s/simple/' % localhost,##            'http://%s/digest/' % localhost,##            'http://%s/not/found.h' % localhost,##            ]##        bauth = HTTPBasicAuthHandler()##        bauth.add_password('basic_test_realm', localhost, 'jhylton',##                           'password')##        dauth = HTTPDigestAuthHandler()##        dauth.add_password('digest_test_realm', localhost, 'jhylton',##                           'password')    cfh = CacheFTPHandler()    cfh.setTimeout(1)##    # XXX try out some custom proxy objects too!##    def at_cnri(req):##        host = req.get_host()##        print host##        if host[-18:] == '.cnri.reston.va.us':##            return 1##    p = CustomProxy('http', at_cnri, 'proxy.cnri.reston.va.us')##    ph = CustomProxyHandler(p)##    install_opener(build_opener(dauth, bauth, cfh, GopherHandler, ph))    install_opener(build_opener(cfh, GopherHandler))    for url in urls:        if isinstance(url, types.TupleType):            url, req = url        else:            req = None        print url        try:            f = urlopen(url, req)        except IOError, err:            print "IOError:", err        except socket.error, err:            print "socket.error:", err        else:            buf = f.read()            f.close()            print "read %d bytes" % len(buf)        print        time.sleep(0.1)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -