⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 a_test_console.py

📁 python s60 1.4.5版本的源代码
💻 PY
📖 第 1 页 / 共 3 页
字号:

    try:
        print("\ngethostbyaddress giving DNS: ")
        (a, b, c) =  gethostbyaddr("www.nokia.com")        
        print a, (str(b)), c
        LF.write("\ngethostbyaddress giving DNS. Name is: %s %s %s " %(a,(str(b)),(str(c))))
        LF.flush()
    except:
        print("\nException risen")
        LF.write("\nexception risen")
        type, value, tb = sys.exc_info()
        LF.write("\nEXCEPTION:")
        LF.write((str(type)+'\n'+str(value)))    
        print((str(type)+'\n'+str(value)))    

    LF.close()
    print "\nExecuted"

# ***** UDP client/server
def udp_server(lock):
    LF = open(u"C:\\testSocket\\udpSrv.txt", "w")
    s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
    LF.write("\nUDP socket created")
    LF.flush()
    try:
        s.bind(('0.0.0.0', 1001))
        LF.write("\nbind called")
        LF.flush()
        thread.start_new_thread(udp_client, ())
        data, address = s.recvfrom(14)
        addr, port = address
        LF.write("\nrecvfrom called")
        LF.flush()
        if not data:
            LF.write("\nClient has exited")
        else:
            LF.write("\nreceived message: %s"%data)
            LF.write("\ntuple: %s"%(str(address)))
            LF.write("\nremote address: %s"%addr)
            LF.write("\nremote port: %d"%port)
    except:
        LF.write("\nexception risen")
        type, value, tb = sys.exc_info()
        LF.write("\nEXCEPTION:")
        LF.write((str(type)+'\n'+str(value)))
    LF.write("\ntest ended")    
    LF.close()
    del s
    lock.signal()

def udp_client():  
    LF = open(u"C:\\testSocket\\udpCl.txt", "w")
    s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
    LF.write("UDP client socket created")
    LF.flush()
    i = 0
    LF.write("\nsendto return: ")
    LF.flush()
    while i == 0:
        i = s.sendto("hello michele\n", ('127.0.0.1', 1001))
        LF.write("%d"%i)
        LF.flush()
        if i>0:
            LF.write("\nSent data: %d"%i)
    LF.write("\ntest ended")    
    LF.close()
    del s

def udp_cl_srv():
    print "UDP client/server test..."
    syncS = e32.Ao_lock()
    thread.start_new_thread(udp_server, (syncS, ))
    syncS.wait()
    del syncS
    print " executed"

#********* UDP using connect
def udp_conn_srv(lock):
    LF = open(u"C:\\testSocket\\udpSrv.txt", "w")
    s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
    LF.write("\nUDP socket created")
    LF.flush()

    try:
        s.bind(('0.0.0.0', 1002))
        LF.write("\nbind called")
        LF.flush()
        thread.start_new_thread(udp_conn_cl, ())
        LF.write("\nnew thread started")
        LF.flush()

#        data, address = s.recv(14, MSG_WAITALL)
        data = s.recv(14)
        LF.write("\nrecvfrom called and retreived: %s"%(str(data)))
        LF.flush()
        if not data:
            LF.write("\nClient has exited")
        else:
            LF.write("\nreceived message: %s"%data)
    except:
        LF.write("\nexception risen")
        type, value, tb = sys.exc_info()
        LF.write("\nEXCEPTION:")
        LF.write((str(type)+'\n'+str(value)))
    LF.write("\ntest ended")    
    LF.close()
    del s
    lock.signal()

def udp_conn_cl():  
    LF = open(u"C:\\testSocket\\udpConnectCL.txt", "w")
    s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
    LF.write("UDP client socket created")
    LF.flush()
    s.connect(('127.0.0.1', 1002))  
    LF.write("UDP socket connect() called")
    LF.flush()
    ret = s.send("hello michele\n")
    LF.write("\nData Sent. Return: %s"%(str(ret)))
    LF.write("\ntest ended")    
    LF.close()
    del s

def udp_connect():
    print "UDP connect CL/SRV test..."
    syncS = e32.Ao_lock()
    thread.start_new_thread(udp_conn_srv, (syncS, ))
    syncS.wait()
    del syncS
    print " executed"   


#****** getservice
"""def getservice():
    import e32socket
    try:
        print ("getservice by number")
        name = e32socket.getservice(80, e32socket.AF_INET, e32socket.SOCK_STREAM, e32socket.IPPROTO_TCP)
        print (str(name))
        print ("getservice by name")
        port =  e32socket.getservice("http", e32socket.AF_INET, e32socket.SOCK_STREAM, e32socket.IPPROTO_TCP)
        print (str(port))
    except:
        type, value, tb = sys.exc_info()
        print("\nEXCEPTION:")
        print((str(type)+'\n'+str(value)))
"""

# ***** Stressing GPRS
def gprs_stress():
     LF = open(u"C:\\testSocket\\stressingGPRS.txt", "w")
     It = range(20)
     LF.write("Socket creation")
     for i in It:
        try:
            s = socket(i-1)
            del s
        except:
            LF.write("\nexception risen with %d" %(i-1))
            type, value, tb = sys.exc_info()
            LF.write("\nEXCEPTION:")
            LF.write((str(type)+'\n'+str(value)))

     LF.flush() 
     LF.write("\n***OPEN")

     try:
         s = socket()
         LF.write("\nSHOULD NEVER HAPPEN")
     except:
         LF.write("\nexception risen for bad creation()")
         type, value, tb = sys.exc_info()
         LF.write((str(type)+'\n'+str(value)))
     LF.flush() 
     try:
         s = socket(AF_INET)
         LF.write("\nSHOULD NEVER HAPPEN")
     except:
         LF.write("\nexception risen for Socket(AF_INET)")
         type, value, tb = sys.exc_info()
         LF.write((str(type)+'\n'+str(value)))
     LF.flush() 
     try:
         s = socket(AF_INET, 2, IPPROTO_TCP)
         LF.write("\nSHOULD NEVER HAPPEN")
     except:
         LF.write("\nexception risen for Socket(AF_INET,2,IPPROTO_TCP) ")
         type, value, tb = sys.exc_info()
         LF.write((str(type)+'\n'+str(value)))
     LF.flush()  
     LF.write("\n***CONNECT")

     # try to connect before opening
     s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)

     try:
         s.connect(('', 80))
         LF.write("\nTest FAILED: connected")
     except:      
         type, value, tb = sys.exc_info()
         LF.write("\nEXCEPTION: ")
         LF.write((str(type)+'\n'+str(value)))

     LF.write("\n***WRITE")
     LF.flush() 
#     s.connect('131.228.55.140', 8080)
     s.connect(('194.109.137.226', 80))

     s.send("GET http://www.python.org/pics/pythonHi.gif\x0d\x0a")
     LF.write("\n***READ")
     try:
         msg = s.read(0)
         LF.write("\nTest FAILED: send")
     except:      
         type, value, tb = sys.exc_info()
         LF.write("\nEXCEPTION: ")
         LF.write((str(type)+'\n'+str(value)))
     LF.flush() 
     msg = ''
     try:    
         msg = s.read(3765)
         x = len(msg)
         LF.write("\n%d"%x)      
         f = open(u"C:\\testSocket\\pythonHi.gif", "w")
         f.write(msg)
         f.close()
         LF.write(" data read & file written...")      
     except:      
         type, value, tb = sys.exc_info()
         LF.write("\nEXCEPTION: ")
         LF.write((str(type)+'\n'+str(value)))
     LF.flush() 

     f = open(u"C:\\testSocket\\pythonHi.gif", "w")
     if (msg.startswith('HTTP/')):
        index = msg.find('\x0d\x0a\x0d\x0a')
        f.write(msg[index+4:])
     else:
        f.write(msg)            
     f.close()
#     LF.close()
     del s

     #testing read_all() towards a big file
     print("\nTest downloading of a big file")
     LF.write("\n\n=======\ntesting read_all() towards a big file")
     LF.flush()   
     s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
     try:
#         s.connect('131.228.55.140', 8080)
         s.connect(('65.114.4.69', 80))
         s.send("GET http://www.comics.com/comics/dilbert/archive/images/dilbert2004080130741.jpg\x0d\x0a")
     except:
         type, value, tb = sys.exc_info()
         LF.write("\nEXCEPTION before real stuff: ")
         LF.write((str(type)+'\n'+str(value)))
         LF.flush()             
     try: 
         msg = s.read_all()
         LF.write("\nread_all() read big data")
         LF.flush()
     except:      
         type, value, tb = sys.exc_info()
         LF.write("\nEXCEPTION in read_all(): ")
         LF.write((str(type)+'\n'+str(value)))
         LF.flush()           
     
     f = open(u"C:\\testSocket\\bigFile.jpg","w")
     f.write(msg)
     f.close()
     f = open(u"C:\\testSocket\\bigFile.jpg","w")
     if (msg.startswith('HTTP/')):
        index = msg.find('\x0d\x0a\x0d\x0a')
        f.write(msg[index+4:])
     else:
        f.write(msg)        
     f.close()
     LF.close()
     print "Done."

#**** other
def other():
    try:
        print "other test: connect_ex"
        s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
        print ("\ntcp socket created")
        s.connect_ex(('216.239.59.99', 80))
        print ("YOU SHOULD NOT SEE THIS")
    except:
        print "\nexception risen"
        type, value, tb = sys.exc_info()
        print((str(type)+'\n'+str(value)))

    try:
        print "\nother test: fileno"
        s.fileno()
        print ("YOU SHOULD NOT SEE THIS")
    except:
        print "\nexception risen"
        type, value, tb = sys.exc_info()
        print((str(type)+'\n'+str(value)))

    try:
        s.close
        del s
    except:
        print "\nexception risen"
        type, value, tb = sys.exc_info()
        print((str(type)+'\n'+str(value)))      

def exit_key_handler():
    appuifw.app.title = old_title
    global script_lock
    script_lock.signal()

old_title = appuifw.app.title
appuifw.app.title = u"Socket testing console"

try:
    os.chdir("C:\\testSocket")
except:    
    os.mkdir("C:\\testSocket")
    os.chdir("C:\\testSocket")
main_menu_setup()
#appuifw.app.exit_key_handler = exit_key_handler
script_lock.wait()
if script_exit_flag == True:
    appuifw.app.set_exit()
    appuifw.app.menu = []
    appuifw.app.title = old_title

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -