📄 a_test_console.py
字号:
try:
print("\ngethostbyaddress giving DNS: ")
(a, b, c) = gethostbyaddr("www.nokia.com")
print a, (str(b)), c
LF.write("\ngethostbyaddress giving DNS. Name is: %s %s %s " %(a,(str(b)),(str(c))))
LF.flush()
except:
print("\nException risen")
LF.write("\nexception risen")
type, value, tb = sys.exc_info()
LF.write("\nEXCEPTION:")
LF.write((str(type)+'\n'+str(value)))
print((str(type)+'\n'+str(value)))
LF.close()
print "\nExecuted"
# ***** UDP client/server
def udp_server(lock):
LF = open(u"C:\\testSocket\\udpSrv.txt", "w")
s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
LF.write("\nUDP socket created")
LF.flush()
try:
s.bind(('0.0.0.0', 1001))
LF.write("\nbind called")
LF.flush()
thread.start_new_thread(udp_client, ())
data, address = s.recvfrom(14)
addr, port = address
LF.write("\nrecvfrom called")
LF.flush()
if not data:
LF.write("\nClient has exited")
else:
LF.write("\nreceived message: %s"%data)
LF.write("\ntuple: %s"%(str(address)))
LF.write("\nremote address: %s"%addr)
LF.write("\nremote port: %d"%port)
except:
LF.write("\nexception risen")
type, value, tb = sys.exc_info()
LF.write("\nEXCEPTION:")
LF.write((str(type)+'\n'+str(value)))
LF.write("\ntest ended")
LF.close()
del s
lock.signal()
def udp_client():
LF = open(u"C:\\testSocket\\udpCl.txt", "w")
s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
LF.write("UDP client socket created")
LF.flush()
i = 0
LF.write("\nsendto return: ")
LF.flush()
while i == 0:
i = s.sendto("hello michele\n", ('127.0.0.1', 1001))
LF.write("%d"%i)
LF.flush()
if i>0:
LF.write("\nSent data: %d"%i)
LF.write("\ntest ended")
LF.close()
del s
def udp_cl_srv():
print "UDP client/server test..."
syncS = e32.Ao_lock()
thread.start_new_thread(udp_server, (syncS, ))
syncS.wait()
del syncS
print " executed"
#********* UDP using connect
def udp_conn_srv(lock):
LF = open(u"C:\\testSocket\\udpSrv.txt", "w")
s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
LF.write("\nUDP socket created")
LF.flush()
try:
s.bind(('0.0.0.0', 1002))
LF.write("\nbind called")
LF.flush()
thread.start_new_thread(udp_conn_cl, ())
LF.write("\nnew thread started")
LF.flush()
# data, address = s.recv(14, MSG_WAITALL)
data = s.recv(14)
LF.write("\nrecvfrom called and retreived: %s"%(str(data)))
LF.flush()
if not data:
LF.write("\nClient has exited")
else:
LF.write("\nreceived message: %s"%data)
except:
LF.write("\nexception risen")
type, value, tb = sys.exc_info()
LF.write("\nEXCEPTION:")
LF.write((str(type)+'\n'+str(value)))
LF.write("\ntest ended")
LF.close()
del s
lock.signal()
def udp_conn_cl():
LF = open(u"C:\\testSocket\\udpConnectCL.txt", "w")
s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)
LF.write("UDP client socket created")
LF.flush()
s.connect(('127.0.0.1', 1002))
LF.write("UDP socket connect() called")
LF.flush()
ret = s.send("hello michele\n")
LF.write("\nData Sent. Return: %s"%(str(ret)))
LF.write("\ntest ended")
LF.close()
del s
def udp_connect():
print "UDP connect CL/SRV test..."
syncS = e32.Ao_lock()
thread.start_new_thread(udp_conn_srv, (syncS, ))
syncS.wait()
del syncS
print " executed"
#****** getservice
"""def getservice():
import e32socket
try:
print ("getservice by number")
name = e32socket.getservice(80, e32socket.AF_INET, e32socket.SOCK_STREAM, e32socket.IPPROTO_TCP)
print (str(name))
print ("getservice by name")
port = e32socket.getservice("http", e32socket.AF_INET, e32socket.SOCK_STREAM, e32socket.IPPROTO_TCP)
print (str(port))
except:
type, value, tb = sys.exc_info()
print("\nEXCEPTION:")
print((str(type)+'\n'+str(value)))
"""
# ***** Stressing GPRS
def gprs_stress():
LF = open(u"C:\\testSocket\\stressingGPRS.txt", "w")
It = range(20)
LF.write("Socket creation")
for i in It:
try:
s = socket(i-1)
del s
except:
LF.write("\nexception risen with %d" %(i-1))
type, value, tb = sys.exc_info()
LF.write("\nEXCEPTION:")
LF.write((str(type)+'\n'+str(value)))
LF.flush()
LF.write("\n***OPEN")
try:
s = socket()
LF.write("\nSHOULD NEVER HAPPEN")
except:
LF.write("\nexception risen for bad creation()")
type, value, tb = sys.exc_info()
LF.write((str(type)+'\n'+str(value)))
LF.flush()
try:
s = socket(AF_INET)
LF.write("\nSHOULD NEVER HAPPEN")
except:
LF.write("\nexception risen for Socket(AF_INET)")
type, value, tb = sys.exc_info()
LF.write((str(type)+'\n'+str(value)))
LF.flush()
try:
s = socket(AF_INET, 2, IPPROTO_TCP)
LF.write("\nSHOULD NEVER HAPPEN")
except:
LF.write("\nexception risen for Socket(AF_INET,2,IPPROTO_TCP) ")
type, value, tb = sys.exc_info()
LF.write((str(type)+'\n'+str(value)))
LF.flush()
LF.write("\n***CONNECT")
# try to connect before opening
s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
try:
s.connect(('', 80))
LF.write("\nTest FAILED: connected")
except:
type, value, tb = sys.exc_info()
LF.write("\nEXCEPTION: ")
LF.write((str(type)+'\n'+str(value)))
LF.write("\n***WRITE")
LF.flush()
# s.connect('131.228.55.140', 8080)
s.connect(('194.109.137.226', 80))
s.send("GET http://www.python.org/pics/pythonHi.gif\x0d\x0a")
LF.write("\n***READ")
try:
msg = s.read(0)
LF.write("\nTest FAILED: send")
except:
type, value, tb = sys.exc_info()
LF.write("\nEXCEPTION: ")
LF.write((str(type)+'\n'+str(value)))
LF.flush()
msg = ''
try:
msg = s.read(3765)
x = len(msg)
LF.write("\n%d"%x)
f = open(u"C:\\testSocket\\pythonHi.gif", "w")
f.write(msg)
f.close()
LF.write(" data read & file written...")
except:
type, value, tb = sys.exc_info()
LF.write("\nEXCEPTION: ")
LF.write((str(type)+'\n'+str(value)))
LF.flush()
f = open(u"C:\\testSocket\\pythonHi.gif", "w")
if (msg.startswith('HTTP/')):
index = msg.find('\x0d\x0a\x0d\x0a')
f.write(msg[index+4:])
else:
f.write(msg)
f.close()
# LF.close()
del s
#testing read_all() towards a big file
print("\nTest downloading of a big file")
LF.write("\n\n=======\ntesting read_all() towards a big file")
LF.flush()
s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
try:
# s.connect('131.228.55.140', 8080)
s.connect(('65.114.4.69', 80))
s.send("GET http://www.comics.com/comics/dilbert/archive/images/dilbert2004080130741.jpg\x0d\x0a")
except:
type, value, tb = sys.exc_info()
LF.write("\nEXCEPTION before real stuff: ")
LF.write((str(type)+'\n'+str(value)))
LF.flush()
try:
msg = s.read_all()
LF.write("\nread_all() read big data")
LF.flush()
except:
type, value, tb = sys.exc_info()
LF.write("\nEXCEPTION in read_all(): ")
LF.write((str(type)+'\n'+str(value)))
LF.flush()
f = open(u"C:\\testSocket\\bigFile.jpg","w")
f.write(msg)
f.close()
f = open(u"C:\\testSocket\\bigFile.jpg","w")
if (msg.startswith('HTTP/')):
index = msg.find('\x0d\x0a\x0d\x0a')
f.write(msg[index+4:])
else:
f.write(msg)
f.close()
LF.close()
print "Done."
#**** other
def other():
try:
print "other test: connect_ex"
s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
print ("\ntcp socket created")
s.connect_ex(('216.239.59.99', 80))
print ("YOU SHOULD NOT SEE THIS")
except:
print "\nexception risen"
type, value, tb = sys.exc_info()
print((str(type)+'\n'+str(value)))
try:
print "\nother test: fileno"
s.fileno()
print ("YOU SHOULD NOT SEE THIS")
except:
print "\nexception risen"
type, value, tb = sys.exc_info()
print((str(type)+'\n'+str(value)))
try:
s.close
del s
except:
print "\nexception risen"
type, value, tb = sys.exc_info()
print((str(type)+'\n'+str(value)))
def exit_key_handler():
appuifw.app.title = old_title
global script_lock
script_lock.signal()
old_title = appuifw.app.title
appuifw.app.title = u"Socket testing console"
try:
os.chdir("C:\\testSocket")
except:
os.mkdir("C:\\testSocket")
os.chdir("C:\\testSocket")
main_menu_setup()
#appuifw.app.exit_key_handler = exit_key_handler
script_lock.wait()
if script_exit_flag == True:
appuifw.app.set_exit()
appuifw.app.menu = []
appuifw.app.title = old_title
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -