📄 a_test_console.py
字号:
#
# test_socket.py
#
# A script for testing Series 60 Python socket module.
#
# NOTE: scripts with "*" are supposed to be run versus another client/server
#
# Copyright (c) 2005 Nokia Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import appuifw
import e32
#from e32socket import *
from socket import *
import os
import sys
from key_codes import *
import thread
script_exit_flag = False
script_lock = e32.Ao_lock()
def main_menu_setup():
appuifw.app.menu = [#(u"getservice", getservice),\
(u"GPRS client", gprs_client),\
(u"IP client/server", ip),\
(u"UDP cl/srv", udp_cl_srv),\
(u"UDP connect", udp_connect),\
(u"Host Resolver", host_resolver),\
(u"SSL", ssl_cl),\
(u"OBEX sending", obex_send),\
(u"RFCOMM sending", rfcomm_send),\
(u"* RFCOMM server *", rfcomm_srv),\
(u"* RFCOMM client *", rfcomm_cl),\
(u"* RFCOMM Known CL*", rfcomm_known_cl),\
(u"* OBEX client *", obex_cl),\
(u"* OBEX Known CL*", obex_known_cl),\
(u"* OBEX server *", obex_srv),\
(u"Stressing GRPS", gprs_stress),\
(u"other minor", other),]
appuifw.app.exit_key_handler = exit_key_handler
# ***** GPRS
def gprs_client():
print "GPRS test..."
s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
print "socket created"
s.connect(('194.109.137.226', 80)) #dev
# s.connect(('www.connecting.europe.nokia.com', 80))
# s.connect(('131.228.55.140', 8080)) #emulator
print "connected"
s.send("GET http://www.python.org/pics/pythonHi.gif\x0d\x0a")
print "send called"
msg = s.read_all()
print "read_all called"
f = open(u"C:\\testSocket\\pythonHi.gif", "w")
if (msg.startswith('HTTP/')):
index = msg.find('\x0d\x0a\x0d\x0a')
f.write(msg[index+4:])
else:
f.write(msg)
f.close()
del s
# try:
# content_handler = appuifw.Content_handler()
# r = content_handler.open(u'c:\\testSocket\\pythonHi.gif')
# except:
# print "couldn't open data received"
print "done."
# ***** IP client/server
def ip_server(lock):
LF = open(u'c:\\testSocket\\logIPSrv.txt', 'w')
s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
LF.write("create\n")
LF.flush()
s.bind(('0.0.0.0', 3333))
# s.bind(('localhost', 3333)) #this doesn't work
LF.write("bind\n")
LF.flush()
state = 1
while state:
s.listen(5)
LF.write("listen\n")
LF.flush()
try:
bs, address = s.accept()
remAdd, port = address
LF.write("\ns accepted")
LF.flush()
LF.write("\ntuple: %s"%(str(address)))
LF.write("\nremAddr: %s"%remAdd)
LF.write("\nremPort: %d"%port)
except:
LF.write("\nexception risen for accept")
type, value, tb = sys.exc_info()
LF.write((str(type)+'\n'+str(value)))
LF.flush()
avail = 1
echo = ''
while avail:
msg = bs.read()
if msg == '\n':
avail = 0
continue
echo = echo + msg
LF.write("bs read msg\n")
LF.flush()
if echo == '':
state = 0
LF.write("nothing read\n")
LF.flush()
continue
else:
LF.write("Server has received: ")
LF.write(echo)
LF.flush()
bs.send("A msg from server\n")
LF.write("\nbs write")
LF.flush()
del bs
del s
LF.write("\n END")
LF.close()
state = 0
lock.signal()
def ip_client(): #lock):
LF = open(u'c:\\testSocket\\logIPCli.txt', 'w')
index = 0
while index == 0:
msg = "hello\n"
s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
LF.write("cl created\n")
LF.flush()
s.connect(('127.0.0.1', 3333))
LF.write("cl connect\n")
LF.flush()
s.send(msg)
LF.write("cl write\n")
LF.flush()
state = 1
msg2 = ''
while state:
reply = s.read()
if reply == '\n':
state = 0
continue
msg2 = msg2 + reply
LF.write("cl read: \n")
LF.write(msg2)
LF.flush()
del s
LF.write("\n END \n")
LF.close()
# lock.signal()
def ip():
"""this script runs ip client and ip server in different threads"""
print "IP client/server test..."
syncS = e32.Ao_lock()
# syncC = e32.Ao_lock()
thread.start_new_thread(ip_server, (syncS, ))
thread.start_new_thread(ip_client, ()) #syncC, ))
syncS.wait()
# syncC.wait()
del syncS
# del syncC
print " executed"
def ssl_cl():
LF = open(u"C:\\testSocket\\ssl.txt", "w")
try:
print "ssl client"
s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
print ("\ntcp socket created")
# s.connect(('198.65.143.195', 443)) #DEVICE
s.connect(('www.connecting.europe.nokia.com', 443)) #EMULATOR
print ("\ntcp socket connected")
LF.write("\ntcp socket connected")
LF.flush()
sec = ssl(s)
print ("\nssl socket created")
LF.write("\nssl socket created")
LF.flush()
# sec.write("GET https://www.fortify.net/sslcheck.html\x0d\x0a") #DEV
sec.write("GET https://www.connecting.europe.nokia.com/mnc/resources/images/mnc/ncp_logo_blue.gif\x0d\x0a") #EM
print ("\nssl.write")
LF.write("\nssl.write")
LF.flush()
# html = sec.read()
html = sec.read(8192)
print ("\nssl.read")
LF.write("\nssl.read")
LF.flush()
HTML = open(u"C:\\testSocket\\sslTEST.html","w")
HTML.write(html)
HTML.close()
del sec
del s
print ("\nboth socket deleted")
LF.write("\nboth socket deleted")
LF.flush()
except:
print "\nexception risen"
type, value, tb = sys.exc_info()
print((str(type)+'\n'+str(value)))
LF.write((str(type)+'\n'+str(value)))
LF.flush()
LF.close()
# ***** RFCOM Send towards pc
def rfcomm_send():
print "RFCOMM client test 2 pc"
LF = open(u"C:\\testSocket\\rfcommCliPC.txt", "w")
s = socket(AF_BT, SOCK_STREAM, BTPROTO_RFCOMM)
LF.write("s created")
LF.flush()
try:
addr, serv = bt_discover()
print("\nbt_discover called")
print("\naddress: %s, len dict %d"%(addr, len(serv)))
print("\n%s"%serv)
LF.write("\nbt_discover called")
LF.write("\naddress: %s, len dict %d"%(addr, len(serv)))
LF.write("\n%s"%serv)
LF.flush()
list = serv.keys()
channel = appuifw.popup_menu(list, u"Choose a remote service")
s.connect((addr, serv[(list[channel])]))
# s.connect((addr, serv[u"Py RFCOMM service"]))
print "connected"
print "port used: %d"%(serv[(list[channel])])
LF.write("\nConnected")
LF.flush()
s.send("Greetings from Python!\n")
print "wrote..."
LF.write("\ns write called. Next deletion")
LF.flush()
except:
LF.write("\nexception risen for bt_discover")
type, value, tb = sys.exc_info()
LF.write((str(type)+'\n'+str(value)))
LF.flush()
LF.close()
del s
print "done"
# ***** RFCOMM server
def rfcomm_srv():
LF = open(u"C:\\testSocket\\rfcommSrv.txt", "w")
s = socket(AF_BT, SOCK_STREAM, BTPROTO_RFCOMM)
print("s created and opened")
LF.write("s created and opened")
LF.flush()
ch = bt_rfcomm_get_available_server_channel(s)
print("\ns asked for available channel: %d"%ch)
LF.write("\ns asked for available channel: %d"%ch)
LF.flush()
try:
s.bind(("whatever", ch))
except:
print "exception for bind!"
LF.write("\nexception risen for s.bind")
type, value, tb = sys.exc_info()
LF.write((str(type)+'\n'+str(value)))
LF.flush()
try:
s.listen(5)
except:
print "exception for listen!"
LF.write("\nexception risen for s.listen")
type, value, tb = sys.exc_info()
LF.write((str(type)+'\n'+str(value)))
LF.flush()
try:
bt_advertise_service(u"Py RFCOMM service", s, True, RFCOMM)
print "service advertised"
LF.write("\nservice advertised")
LF.flush()
except:
print "exception for service advertising!"
LF.write("\nexception risen for advertise service")
type, value, tb = sys.exc_info()
LF.write((str(type)+'\n'+str(value)))
LF.flush()
try:
mode = AUTH
set_security(s, mode)
print "set security"
LF.write("\nsecurity set")
LF.flush()
except:
print "exception for security set!"
LF.write("\nexception risen for set security")
type, value, tb = sys.exc_info()
LF.write((str(type)+'\n'+str(value)))
LF.flush()
try:
bs, remAdd = s.accept()
print "accepted"
LF.write("\ns accepted. Rem.Addr.: %s"%remAdd)
LF.flush()
LF.write("\nremAddr: %s"%remAdd)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -