⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 a_test_console.py

📁 python s60 1.4.5版本的源代码
💻 PY
📖 第 1 页 / 共 3 页
字号:
#
# test_socket.py
#
# A script for testing Series 60 Python socket module.
#
# NOTE: scripts with "*" are supposed to be run versus another client/server
#
# Copyright (c) 2005 Nokia Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import appuifw
import e32
#from e32socket import *
from socket import *
import os
import sys
from key_codes import *
import thread

script_exit_flag = False
script_lock = e32.Ao_lock()

def main_menu_setup():
    appuifw.app.menu = [#(u"getservice", getservice),\
                        (u"GPRS client", gprs_client),\
                        (u"IP client/server", ip),\
                        (u"UDP cl/srv", udp_cl_srv),\
                        (u"UDP connect", udp_connect),\
                        (u"Host Resolver", host_resolver),\
                        (u"SSL", ssl_cl),\
                        (u"OBEX sending", obex_send),\
                        (u"RFCOMM sending", rfcomm_send),\
                        (u"* RFCOMM server *", rfcomm_srv),\
                        (u"* RFCOMM client *", rfcomm_cl),\
                        (u"* RFCOMM Known CL*", rfcomm_known_cl),\
                        (u"* OBEX client *", obex_cl),\
                        (u"* OBEX Known CL*", obex_known_cl),\
                        (u"* OBEX server *", obex_srv),\
                        (u"Stressing GRPS", gprs_stress),\
                        (u"other minor", other),]
    appuifw.app.exit_key_handler = exit_key_handler


# ***** GPRS
def gprs_client():
    print "GPRS test..."
    s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
    print "socket created"
    s.connect(('194.109.137.226', 80))  #dev
#    s.connect(('www.connecting.europe.nokia.com', 80))
#    s.connect(('131.228.55.140', 8080))  #emulator
    print "connected"
    s.send("GET http://www.python.org/pics/pythonHi.gif\x0d\x0a")
    print "send called"
    msg = s.read_all()
    print "read_all called"
    f = open(u"C:\\testSocket\\pythonHi.gif", "w")
    if (msg.startswith('HTTP/')):
        index = msg.find('\x0d\x0a\x0d\x0a')
        f.write(msg[index+4:])
    else:
        f.write(msg)            
    f.close()            
    del s
#    try:
#        content_handler = appuifw.Content_handler()
#        r = content_handler.open(u'c:\\testSocket\\pythonHi.gif')
#    except:
#        print "couldn't open data received"
    print "done."


# ***** IP client/server
def ip_server(lock):
        LF = open(u'c:\\testSocket\\logIPSrv.txt', 'w')
	s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
        LF.write("create\n")
	LF.flush()
	s.bind(('0.0.0.0', 3333))
#	s.bind(('localhost', 3333))  #this doesn't work
        LF.write("bind\n")
	LF.flush()	
	state = 1
	while state:
		s.listen(5)
		LF.write("listen\n")
		LF.flush()		
                try:
                    bs, address = s.accept()
                    remAdd, port = address
                    LF.write("\ns accepted")
                    LF.flush()
                    LF.write("\ntuple: %s"%(str(address)))
                    LF.write("\nremAddr: %s"%remAdd)
                    LF.write("\nremPort: %d"%port)
                except:
                    LF.write("\nexception risen for accept")
                    type, value, tb = sys.exc_info()
                    LF.write((str(type)+'\n'+str(value)))    
		LF.flush()		
		avail = 1
		echo = ''
		while avail:
			msg = bs.read()
			if msg == '\n':
				avail = 0
				continue
			echo = echo + msg
		LF.write("bs read msg\n")
		LF.flush()		
		if echo == '':
			state = 0
			LF.write("nothing read\n")
			LF.flush()
			continue
		else:
			LF.write("Server has received: ")
			LF.write(echo)
			LF.flush()
			bs.send("A msg from server\n")
		LF.write("\nbs write")
		LF.flush()
		del bs
		del s
		LF.write("\n END")
		LF.close()
                state = 0
        lock.signal()		

def ip_client():   #lock):
    LF = open(u'c:\\testSocket\\logIPCli.txt', 'w')
    index = 0

    while index == 0:	
        msg = "hello\n"
        s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
        LF.write("cl created\n")
        LF.flush()
        s.connect(('127.0.0.1', 3333))
        LF.write("cl connect\n")
        LF.flush()		
        s.send(msg)
        LF.write("cl write\n")
        LF.flush()
        state = 1
        msg2 = ''
        while state:
            reply = s.read()
            if reply == '\n':
                state = 0
                continue
            msg2 = msg2 + reply

        LF.write("cl read: \n")
        LF.write(msg2)
        LF.flush()
        del s
        LF.write("\n END \n")
        LF.close()
#    lock.signal()		    

def ip():
    """this script runs ip client and ip server in different threads"""
    print "IP client/server test..."
    syncS = e32.Ao_lock()
#    syncC = e32.Ao_lock()
    thread.start_new_thread(ip_server, (syncS, ))
    thread.start_new_thread(ip_client, ())  #syncC, ))
    syncS.wait()
#    syncC.wait()
    del syncS
#    del syncC
    print " executed"

def ssl_cl():
    LF = open(u"C:\\testSocket\\ssl.txt", "w")  
    try:
        print "ssl client"
        s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)
        print ("\ntcp socket created")
#        s.connect(('198.65.143.195', 443))			 #DEVICE
        s.connect(('www.connecting.europe.nokia.com', 443))	 #EMULATOR

        print ("\ntcp socket connected")
        LF.write("\ntcp socket connected")
        LF.flush()
      
        sec = ssl(s)
        print ("\nssl socket created")
        LF.write("\nssl socket created")
        LF.flush()

#        sec.write("GET https://www.fortify.net/sslcheck.html\x0d\x0a")		#DEV
        sec.write("GET https://www.connecting.europe.nokia.com/mnc/resources/images/mnc/ncp_logo_blue.gif\x0d\x0a") 	#EM
        print ("\nssl.write")
        LF.write("\nssl.write")
        LF.flush()

#        html = sec.read()
        html = sec.read(8192)
        print ("\nssl.read")
        LF.write("\nssl.read")
        LF.flush()
	        
        HTML = open(u"C:\\testSocket\\sslTEST.html","w")
        HTML.write(html)
        HTML.close()
        
        del sec
        del s
        print ("\nboth socket deleted")
        LF.write("\nboth socket deleted")
        LF.flush()     
    except:
        print "\nexception risen"
        type, value, tb = sys.exc_info()
        print((str(type)+'\n'+str(value)))
        LF.write((str(type)+'\n'+str(value)))
        LF.flush()
    LF.close()
        
# ***** RFCOM Send towards pc
def rfcomm_send():
    print "RFCOMM client test 2 pc"
    LF = open(u"C:\\testSocket\\rfcommCliPC.txt", "w")  
    s = socket(AF_BT, SOCK_STREAM, BTPROTO_RFCOMM)
    LF.write("s created")
    LF.flush()
    try:
        addr, serv = bt_discover()
        print("\nbt_discover called")
        print("\naddress: %s, len dict %d"%(addr, len(serv)))
	print("\n%s"%serv)
        LF.write("\nbt_discover called")
        LF.write("\naddress: %s, len dict %d"%(addr, len(serv)))
	LF.write("\n%s"%serv)
        LF.flush()
        list = serv.keys()
        channel = appuifw.popup_menu(list, u"Choose a remote service")
        s.connect((addr, serv[(list[channel])]))

#        s.connect((addr, serv[u"Py RFCOMM service"]))
        print "connected"
        print "port used: %d"%(serv[(list[channel])])      
        LF.write("\nConnected")
        LF.flush()
        s.send("Greetings from Python!\n")
        print "wrote..."
        LF.write("\ns write called. Next deletion")
        LF.flush()       
    except:
        LF.write("\nexception risen for bt_discover")
        type, value, tb = sys.exc_info()
        LF.write((str(type)+'\n'+str(value)))    
        LF.flush()       
    LF.close()
    del s
    print "done"


# ***** RFCOMM server
def rfcomm_srv():
    LF = open(u"C:\\testSocket\\rfcommSrv.txt", "w")  
    s = socket(AF_BT, SOCK_STREAM, BTPROTO_RFCOMM)
    print("s created and opened")
    LF.write("s created and opened")
    LF.flush()
    ch = bt_rfcomm_get_available_server_channel(s)
    print("\ns asked for available channel: %d"%ch)
    LF.write("\ns asked for available channel: %d"%ch)
    LF.flush()
    try:
        s.bind(("whatever", ch))
    except:
        print "exception for bind!"
        LF.write("\nexception risen for s.bind")
        type, value, tb = sys.exc_info()
        LF.write((str(type)+'\n'+str(value)))
        LF.flush()         
    try:
        s.listen(5)
    except:
        print "exception for listen!"
        LF.write("\nexception risen for s.listen")
        type, value, tb = sys.exc_info()
        LF.write((str(type)+'\n'+str(value)))
        LF.flush()         

    try:    
        bt_advertise_service(u"Py RFCOMM service", s, True, RFCOMM)
        print "service advertised"
        LF.write("\nservice advertised")
        LF.flush()
    except:
        print "exception for service advertising!"
        LF.write("\nexception risen for advertise service")
        type, value, tb = sys.exc_info()
        LF.write((str(type)+'\n'+str(value)))
        LF.flush()         

    try:
        mode = AUTH
        set_security(s, mode)
        print "set security"
        LF.write("\nsecurity set")
        LF.flush()
    except:
        print "exception for security set!"
        LF.write("\nexception risen for set security")
        type, value, tb = sys.exc_info()
        LF.write((str(type)+'\n'+str(value)))
        LF.flush()         
       
    try:
        bs, remAdd = s.accept()
        print "accepted"
        LF.write("\ns accepted. Rem.Addr.: %s"%remAdd)
        LF.flush()
        LF.write("\nremAddr: %s"%remAdd)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -