⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 socket.py

📁 python s60 1.4.5版本的源代码
💻 PY
📖 第 1 页 / 共 2 页
字号:
# Copyright (c) 2005 - 2007 Nokia Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Wrapper module for e32socket (_socket), providing some additional
# facilities implemented in Python.

from e32socket import *

import os
import e32

_default_access_point = None

__all__ = ["getfqdn","getservbyname","getaddrinfo","gethostname"]
import e32socket
__all__.extend(os._get_exports_list(e32socket))
__all__ += ('_socketobject','_fileobject')

# Release 1.0 wanted the e32socket.socket object as the
# argument for some methods. To preserve compatibility,
# we accept both that and the proper type (_socketobject).
def _unwrap(sock):
    if isinstance(sock,_socketobject):
        return sock._sock
    else:
        return sock
def bt_advertise_service(name, socket, flag, class_):
    return e32socket.bt_advertise_service(name,_unwrap(socket),flag,class_)
def bt_obex_receive(socket, filename):
    return e32socket.bt_obex_receive(_unwrap(socket), filename)
def bt_rfcomm_get_available_server_channel(socket):
    return e32socket.bt_rfcomm_get_available_server_channel(_unwrap(socket)) 
def set_security(socket,mode):
    return e32socket.set_security(_unwrap(socket),mode) 
def gethostname():
    return "localhost"

def gethostbyaddr(addr):
    if not _isnumericipaddr(addr):
        addr = gethostbyname(addr)
    (hostname, aliaslist, ipaddrlist) = e32socket.gethostbyaddr(addr)
    return (str(hostname), aliaslist, ipaddrlist)

_realsocketcall = e32socket.socket

def socket(family, type, proto=0, apo=None):
    return _socketobject(_realsocketcall(family, type, proto, apo), family)

try:
    _realsslcall = e32socket.ssl
except AttributeError:
    pass # No ssl
else:
    def ssl(sock, keyfile=None, certfile=None, hostname=None):
        realsock=getattr(sock, "_sock", sock)
        if e32.s60_version_info>=(3,0):
            # On S60 3rd Ed secure sockets must be given the expected
            # hostname before connecting or error -7547 occurs. This
            # is not needed on 2nd edition. See known issue KIS000322.
            if hostname is None:
                # To make things convenient, if a hostname was given
                # to .connect it is stored in the socket object.
                if hasattr(sock, "_getconnectname"):
                    hostname=sock._getconnectname()
                if hostname is None:
                    raise RuntimeError("Expected hostname must be given either to socket .connect() or as 4th parameter of ssl() call. See S60 known issue KIS000322.")
        return _realsslcall(realsock, keyfile, certfile, hostname)
    # Note: this is just a stopgap hack while waiting for proper SSL error handling.
    # Until that time, SSL operations _will not_ raise sslerror properly as they should.
    SSL_ERROR_NONE=0
    SSL_ERROR_SSL=1
    SSL_ERROR_WANT_READ=2
    SSL_ERROR_WANT_WRITE=3
    SSL_ERROR_WANT_X509_LOOKUP=4 
    SSL_ERROR_SYSCALL=5
    SSL_ERROR_ZERO_RETURN=6
    SSL_ERROR_WANT_CONNECT=7
    SSL_ERROR_EOF=8
    SSL_ERROR_INVALID_ERROR_CODE=9
    class sslerror(Exception):
        pass

del os

AF_UNSPEC = 0

GAI_ANY = 0

EAI_ADDRFAMILY = 1
EAI_AGAIN = 2
EAI_BADFLAGS = 3
EAI_FAIL = 4
EAI_FAMILY = 5
EAI_MEMORY = 6
EAI_NODATA = 7
EAI_NONAME = 8
EAI_SERVICE = 9
EAI_SOCKTYPE = 10
EAI_SYSTEM = 11
EAI_BADHINTS = 12
EAI_PROTOCOL = 13
EAI_MAX = 14

AI_PASSIVE = 0x00000001
AI_CANONNAME = 0x00000002
AI_NUMERICHOST = 0x00000004

def _isnumeric(value):
    try:
        tmp = int(value)
    except:
        return False
    return True

def _isnumericipaddr(addr):
    for x in addr.split('.'):
        if not _isnumeric(x):
            return False
    return True

service_db = {'echo':[('tcp',7),('udp',7)],
              'ftp-data':[('tcp',20),('udp',20)],
              'ftp':[('tcp',21),('udp',21)],
              'ssh':[('tcp',22),('udp',22)],
              'telnet':[('tcp',23),('udp',23)],
              'smtp':[('tcp',25),('udp',25)],
              'time':[('tcp',37),('udp',37)],
              'domain':[('tcp',53),('udp',53)],
              'tftp':[('tcp',69),('udp',69)],
              'http':[('tcp',80),('udp',80)],
              'www-http':[('tcp',80),('udp',80)],
              'pop2':[('tcp',109),('udp',109)],
              'pop3':[('tcp',110),('udp',110)],
              'sftp':[('tcp',115),('udp',115)],
              'nntp':[('tcp',119),('udp',119)]}

def getservbyname(service, proto):
    service_record = service_db[service.lower()]
    for x in service_record:
        if x[0] == proto:
            return x[1]
    raise error("service/proto not found")

def getaddrinfo(host, port, fam=AF_UNSPEC, socktype=GAI_ANY, proto=0, flags=0):
    if host == None and port == None:
        raise gaierror(EAI_NONAME)

    if fam not in [AF_UNSPEC, AF_INET]:
        raise gaierror(EAI_FAMILY)

    if flags & AI_NUMERICHOST and host and not _isnumericipaddr(host):
        raise gaierror(EAI_NONAME)

    r_family = AF_INET
    r_socktype = GAI_ANY
    r_proto = GAI_ANY
    r_canonname = None
    r_host = None
    r_port = GAI_ANY
        
    if socktype == GAI_ANY:
        if proto == IPPROTO_UDP:
            r_socktype = SOCK_DGRAM
        elif proto == IPPROTO_TCP:
            r_socktype = SOCK_STREAM
    elif socktype == SOCK_DGRAM:
        if not proto in [IPPROTO_UDP, GAI_ANY]:
            raise gaierror(EAI_BADHINTS)
        r_socktype = SOCK_DGRAM
        r_proto = IPPROTO_UDP
    elif socktype == SOCK_STREAM:
        if not proto in [IPPROTO_TCP, GAI_ANY]:
            raise gaierror(EAI_BADHINTS)
        r_socktype = SOCK_STREAM
        r_proto = IPPROTO_TCP
    else:
        raise gaierror(EAI_SOCKTYPE)

    if port:
        if _isnumeric(port):
            if r_socktype == GAI_ANY:
                r_socktype = SOCK_DGRAM
                r_proto = IPPROTO_UDP
            r_port = port
        else:
            if r_socktype == GAI_ANY:
                r_port = getservbyname(port, 'tcp')
                if r_port:
                    r_socktype = SOCK_STREAM
                    r_proto = IPPROTO_TCP
                else:
                    r_port = getservbyname(port, 'udp')
                    if r_port:
                        r_socktype = SOCK_DGRAM
                        r_proto = IPPROTO_UDP
                        
            elif  r_socktype == SOCK_DGRAM:
                r_port = getservbyname(port, 'udp')
            elif  r_socktype == SOCK_STREAM:
                r_port = getservbyname(port, 'tcp')

            if not r_port:
                raise gaierror(EAI_SERVICE)

    if not host:
        if flags & AI_PASSIVE:
            r_host = '0.0.0.0'
        else:
            r_host = '127.0.0.1'
    elif _isnumericipaddr(host):
        r_host = host
        if flags & AI_CANONNAME:
            if flags & AI_NUMERICHOST:
                r_canonname = host
            else:
                r_canonname, aliases, ipaddrs = gethostbyaddr(host)
    else:
        r_host = gethostbyname(host)
        if flags & AI_CANONNAME:
            r_canonname = host   # hmmm...

    return [(r_family, r_socktype, r_proto, r_canonname, (r_host, r_port))]

def getfqdn(name=''):
    name = name.strip()
    if not name or name == '0.0.0.0':
        name = gethostname()
    try:
        hostname, aliases, ipaddrs = gethostbyaddr(name)
    except error:
        pass
    else:
        aliases.insert(0, hostname)
        for name in aliases:
            if '.' in name:
                break
        else:
            name = hostname
    return name

_socketmethods = (
        'bind', 'connect_ex', 'fileno', 'listen',
        'getpeername', 'getsockname', 'getsockopt', 'setsockopt',
        'sendall', 'sendto', 'shutdown')
    
def raise_error(*args,**kwargs):
    raise error(9, 'Bad file descriptor')

class _socketobject(object):
    def __init__(self, sock, family):
        self._internalsocket=_internalsocketobject(sock,family)
        self._sock=sock
        for k in dir(self._internalsocket):
            if not k.startswith('__') and k != 'close':
                value=getattr(self._internalsocket,k)
                if callable(value):
                    setattr(self,k,value)
    def close(self):

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -