📄 tos-nwprog
字号:
#!/usr/bin/env python
# Copyright (c) 2007 Johns Hopkins University.
# All rights reserved.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose, without fee, and without written
# agreement is hereby granted, provided that the above copyright
# notice, the (updated) modification history and the author appear in
# all copies of this source code.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS `AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS
# BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, LOSS OF USE, DATA,
# OR PROFITS) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
# THE POSSIBILITY OF SUCH DAMAGE.
# @author Razvan Musaloiu-E. <razvanm@cs.jhu.edu>
# @author Chieh-Jan Mike Liang <cliang4@cs.jhu.edu>
# b6lowpan/nwprog port:
# @author Stephen Dawson-Haggerty <stevedh@cs.berkeley.edu>
import sys, stat, struct, subprocess, time, os.path, socket, getopt, re
try:
import tos
except ImportError:
import posix
sys.path = [os.path.join(posix.environ['TOSROOT'], 'support', 'sdk', 'python')] + sys.path
import tos
from datetime import datetime
# Path to the python script that builds Deluge image from XML
PATH_PY_BUILD_IMAGE = os.path.join(os.path.dirname(sys.argv[0]), 'tos-build-deluge-image')
# Commands for NWProg
NWPROG_CMD_ERASE = 1
NWPROG_CMD_WRITE = 2
NWPROG_CMD_READ = 3
# Deluge parameters
DELUGE_MAX_PAGES = 128
DELUGE_IDENT_OFFSET = 0
DELUGE_IDENT_SIZE = 128
NWPROG_PORT = 5213
NWPROG_PKT_SIZE = 64
NWPROG_REQ_FMT = "!BBH"
NWPROG_REPLY_FMT = "!BBBBH"
ERROR_SUCCESS = 0
nRetries = 3
class CommandFailedException:
pass
def send_command(cmd_str, retries):
s.sendto(cmd_str, (remote, NWPROG_PORT))
s.settimeout(3)
(real_cmd, real_imgno, real_offset) = struct.unpack(NWPROG_REQ_FMT, cmd_str[0:4])
try:
data, addr = s.recvfrom(1024)
# make sure this is the guy we're programming
if (addr[0] == remote):
(error, pack, cmd, imgno, offset) = struct.unpack(NWPROG_REPLY_FMT, data)
if error != ERROR_SUCCESS or real_offset != offset or real_imgno != imgno:
print "WARNING: received error while sending block; retrying"
raise socket.timeout
else: return data
else:
print "WARNING: received unexpected reply from", addr[0]
return False
except socket.timeout:
# socket timeout out try again
if retries > 0:
return send_command(cmd_str, retries - 1)
else:
return False
def erase(imgNum, none=None):
e_req = struct.pack(NWPROG_REQ_FMT, NWPROG_CMD_ERASE, imgNum, 0)
return send_command(e_req, 1)
def read(imgNum, unused=None):
length = 40000
pkt_offset = 0
while length > 0:
sreqpkt = struct.pack(NWPROG_REQ_FMT, NWPROG_CMD_READ, imgNum, pkt_offset)
data = send_command(sreqpkt, 5)
if data != False:
(error, pack, cmd, imgno, offset) = struct.unpack(NWPROG_REPLY_FMT, data[0:6])
if offset == pkt_offset:
for c in data[6:]:
print >>sys.stderr, ord(c)
else:
print "ERROR: Out of sequence data: aborting"
sys.exit(1)
pkt_offset += len(data) - 6
length -= (len(data) - 6)
return True
def write(imgNum, data):
length = len(data)
total_length = length # For progress bar
next_tick = 100 # For progress bar
start_time = time.time()
print "[0% 25% 50% 75% 100%]\r[",
pkt_offset = 0
pkt_length = 0
while length > 0:
if ((length * 100) / total_length) < next_tick:
next_tick = next_tick - 2
sys.stdout.write('-')
sys.stdout.flush()
# Calculates the payload size for the current packet
if length >= NWPROG_PKT_SIZE:
pkt_length = NWPROG_PKT_SIZE
else:
pkt_length = length
sreqpkt = struct.pack(NWPROG_REQ_FMT,
NWPROG_CMD_WRITE, imgNum, pkt_offset)
for i in data[pkt_offset:pkt_offset+pkt_length]:
sreqpkt += chr(i)
# Sends packet to serial
if not send_command(sreqpkt, 5):
print "\nReceived error from mote while programming"
print "Perhaps the block size is too large, or the flash is broken?"
return False
length -= pkt_length
pkt_offset += pkt_length
print '\r' + ' ' * 52,
elasped_time = time.time() - start_time
print "\r%s bytes in %.2f seconds (%.4f bytes/s)" % (total_length, elasped_time, int(total_length) / (elasped_time))
return True
# Injects an image (specified by tos_image_xml) to an image volume
def upload(imgNum, tos_image_xml):
# Checks for valid file path
try:
os.stat(tos_image_xml) # Checks whether tos_image_xml is a valid file
except:
print "ERROR: Unable to find the TOS image XML, \"%s\"" % tos_image_xml
return False
try:
os.stat(PATH_PY_BUILD_IMAGE) # Checks whether PATH_PY_BUILD_IMAGE is a valid file
except:
print "ERROR: Unable to find the image building utility, \"%s\"" % PATH_PY_BUILD_IMAGE
return False
# Creates binary image from the TOS image XML
print "--------------------------------------------------"
cmd = [PATH_PY_BUILD_IMAGE, "-i", str(imgNum), tos_image_xml]
print "Create image:", ' '.join(cmd)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = p.communicate(None)
print err,
print "--------------------------------------------------"
# Writes the new binary image
image = [struct.unpack("B", c)[0] for c in out]
if len(image) > 0 and erase(imgNum):
return write(imgNum, image)
else:
print "Could not proceed: image size is zero or erase failed"
return False
def print_usage():
print
print "Usage: %s <(-e|-u) image_number> <-f app_xml> [options] [ip_address]" % sys.argv[0]
print " -u --upload Upload a compiled TinyOS application"
print " -r --read Read back a volume"
print " -e --erase Erase an image in the external flash"
print " -f --appfile The tos_image.xml file to upload"
print " -m --motelist A file containing a list of IPv6 addresses to upload to"
print " -r --retries The number of times to retry each operation (currently %i)" % nRetries
print " -p --payload-sz How much payload to include in every packet (currently %i)" % NWPROG_PKT_SIZE
print " -d --dudfile File to write list of motes which did not program (default: stdout)"
print
def checkImgNum(imgNum):
# Checks for valid image number format
try:
imgNum = int(imgNum)
except:
print "ERROR: Image number is not valid"
sys.exit(-1)
return imgNum
# ======== MAIN ======== #
if __name__ == '__main__':
try:
opts, args = getopt.getopt(sys.argv[1:], "e:u:m:f:r:p:d:r:",
["--erase", "--upload", "--motelist", "--appfile",
"--retries", "--payload", "--dudfile",
"--upload"])
except getopt.GetoptError, err:
print str(err)
print_usage()
sys.exit(1)
imgNum = None
uploadFile = None
appFile = None
dudFile = None
for o, a in opts:
if o in ["-e", "--erase"]:
imgNum = checkImgNum(a)
cmd = "eras"
elif o in ["-u", "--upload"]:
imgNum = checkImgNum(a)
cmd = "upload"
elif o in ["-r", "--read"]:
imgNum = checkImgNum(a)
cmd = "read"
elif o in ["-m", "--motelist"]:
uploadFile = a
elif o in ["-f", "--appfile"]:
appFile = a
elif o in ["-r", "--retries"]:
nRetries = int(a)
elif o in ["-p", "--payload-sz"]:
NWPROG_PKT_SIZE = int(a)
elif o in ["-d", "--dudfile"]:
dudFile = a
if imgNum == None or (cmd != "eras" and cmd != "read" and appFile == None):
print_usage()
sys.exit(1)
upload_list = []
if uploadFile == None:
upload_list = [(ip, nRetries) for ip in args]
else:
fp = open(uploadFile, "r")
rexp = re.compile("^.*#")
for ip in fp.readlines():
if re.match(rexp,ip): continue
upload_list.append( (ip.strip().lower(), nRetries) )
fp.close()
if cmd == 'upload': cmd_fn = upload
elif cmd == 'read': cmd_fn = read
else: cmd_fn = erase
print "%sing %i motes" % (cmd, len(upload_list))
print "retries: %i payload: %i" % (nRetries, NWPROG_PKT_SIZE)
for t in range(0, nRetries):
for i in range(0, len(upload_list)):
remote, tries_left = upload_list[i]
if tries_left <= 0: continue
print "%sing %s, %i tries remaining ..." % (cmd, remote, tries_left)
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
if not cmd_fn(imgNum, appFile):
upload_list[i] = (remote, tries_left - 1)
else:
upload_list[i] = (remote, -1)
print "Success!"
s.close()
except KeyboardInterrupt:
print "Interrupted; exiting"
sys.exit(2)
except Exception, e:
print "Received unexpected exception while programming"
print str(e)
s.close()
pass
printedHeading = False
if dudFile != None:
dudFp = open(dudFile, "w")
else: dudFp = sys.stdout
for i in range(0, len(upload_list)):
remote, tries_left = upload_list[i]
if tries_left == 0 and not printedHeading:
printedHeading = True
print "WARNING: not all motes were succesfully %sed!" % cmd
if tries_left == 0:
print >>dudFp, remote
if dudFp != sys.stdout:
dudFp.close()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -