📄 tos.py
字号:
# Copyright (c) 2008 Johns Hopkins University.
# All rights reserved.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose, without fee, and without written
# agreement is hereby granted, provided that the above copyright
# notice, the (updated) modification history and the author appear in
# all copies of this source code.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS `AS IS'
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS
# BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, LOSS OF USE, DATA,
# OR PROFITS) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
# THE POSSIBILITY OF SUCH DAMAGE.
# @author Razvan Musaloiu-E. <razvanm@cs.jhu.edu>
# @author David Purdy <david@radioretail.co.za>
"""
A library that implements the T2 serial communication.
This library has two parts: one that deals with sending and receiving
packets using the serial format from T2 (TEP113) and a second one that
tries to simplifies the work with arbitrary packets.
"""
import sys, struct, time, socket, operator, os
import traceback
try:
import serial
except ImportError, e:
print "Please install PySerial first."
sys.exit(1)
__version__ = "$Id: tos.py,v 1.10 2009/11/02 21:28:49 razvanm Exp $"
__all__ = ['Serial', 'AM',
'Packet', 'RawPacket',
'AckFrame', 'DataFrame', 'NoAckDataFrame',
'ActiveMessage']
HDLC_FLAG_BYTE = 0x7e
HDLC_CTLESC_BYTE = 0x7d
TOS_SERIAL_ACTIVE_MESSAGE_ID = 0
TOS_SERIAL_CC1000_ID = 1
TOS_SERIAL_802_15_4_ID = 2
TOS_SERIAL_UNKNOWN_ID = 255
SERIAL_PROTO_ACK = 67
SERIAL_PROTO_PACKET_ACK = 68
SERIAL_PROTO_PACKET_NOACK = 69
SERIAL_PROTO_PACKET_UNKNOWN = 255
def list2hex(v):
return " ".join(["%02x" % p for p in v])
class Timeout(Exception):
pass
def getSource(comm):
source = comm.split('@')
params = source[1].split(':')
debug = '--debug' in sys.argv
if source[0] == 'serial':
try:
return Serial(params[0], int(params[1]), flush=True, debug=debug)
except:
print "ERROR: Unable to initialize a serial connection to", comm
raise Exception
elif source[0] == 'network':
try:
return SerialMIB600(params[0], int(params[1]), debug=debug)
except:
print "ERROR: Unable to initialize a network connection to", comm
print "ERROR:", traceback.format_exc()
raise Exception
raise Exception
class Serial:
def __init__(self, port, baudrate, flush=False, debug=False, readTimeout=None, ackTimeout=0.02):
self.debug = debug
self.readTimeout = readTimeout
self.ackTimeout = ackTimeout
self._ts = None
if port.startswith('COM') or port.startswith('com'):
port = int(port[3:]) - 1
elif port.isdigit():
port = int(port) - 1
self._s = serial.Serial(port, int(baudrate), rtscts=0, timeout=0.5)
self._s.flushInput()
if flush:
print >>sys.stdout, "Flushing the serial port",
endtime = time.time() + 1
while time.time() < endtime:
self._s.read()
sys.stdout.write(".")
if not self.debug:
sys.stdout.write("\n")
self._s.close()
self._s = serial.Serial(port, baudrate, rtscts=0, timeout=readTimeout)
def getByte(self):
c = self._s.read()
if c == '':
raise Timeout
#print 'Serial:getByte: 0x%02x' % ord(c)
return ord(c)
def putBytes(self, data):
#print "DEBUG: putBytes:", data
for b in data:
self._s.write(struct.pack('B', b))
time.sleep(0.000001)
def getTimeout(self):
return self._s.timeout
def setTimeout(self, timeout):
self._s.timeout = timeout
class SerialMIB600:
def __init__(self, host, port=10002, debug=False, readTimeout=None, ackTimeout=0.5):
self.debug = debug
self.readTimeout = readTimeout
self.ackTimeout = ackTimeout
self._ts = None
self._s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._s.connect((host, port))
print "Connected"
def getByte(self):
try:
c = self._s.recv(1)
except socket.timeout:
c = ''
if c == '':
raise Timeout
#print 'Serial:getByte: 0x%02x' % ord(c)
return ord(c)
def putBytes(self, data):
#print "DEBUG: putBytes:", data
for b in data:
self._s.send(struct.pack('B', b))
def getTimeout(self):
return self._s.gettimeout()
def setTimeout(self, timeout):
self._s.settimeout(timeout)
class HDLC:
"""
An HDLC object offers a way to send and receive data on a byte
source using a HDLC-like formating.
"""
def __init__(self, source):
self._s = source
# Returns the next incoming serial packet
def read(self, timeout=None):
"""Wait for a packet and return it as a RawPacket."""
# Developer notes:
#
# Packet data read is in this format:
# [HDLC_FLAG_BYTE][Escaped data][HDLC_FLAG_BYTE]
#
# [Escaped data] is encoded so that [HDLC_FLAG_BYTE] byte
# values cannot occur within it. When [Escaped data] has been
# unescaped, the last 2 bytes are a 16-bit CRC of the earlier
# part of the packet (excluding the initial HDLC_FLAG_BYTE
# byte)
#
# It's also possible that the serial device was half-way
# through transmitting a packet when this function was called
# (app was just started). So we also neeed to handle this
# case:
#
# [Incomplete escaped data][HDLC_FLAG_BYTE][HDLC_FLAG_BYTE][Escaped data][HDLC_FLAG_BYTE]
#
# In this case we skip over the first (incomplete) packet.
#
if self._s.getTimeout() != timeout and timeout != None:
self.log("Set the timeout to %s, previous one was %s" % (timeout, self._s.getTimeout()))
self._s.setTimeout(timeout)
# +--- FLAG -----+
# | | ___________
# v | / |
# >(1)-- !FLAG -->(2)<-- !FLAG --+
# |
# FLAG
# | ___________
# v / |
# (3)<-- FLAG ---+
# |
# !FLAG
# | ___________
# v / |
# (4)<-- !FLAG --+
# |
# FLAG
# |
# v
# (5)
try:
# Read bytes until we get to a HDLC_FLAG_BYTE value
# (either the end of a packet, or the start of a new one)
d = self._s.getByte()
ts = time.time()
if d != HDLC_FLAG_BYTE:
self.log("Skipping byte %d" % d)
while d != HDLC_FLAG_BYTE:
d = self._s.getByte()
self.log("Skipping byte %d" % d)
ts = time.time()
# Store HDLC_FLAG_BYTE at the start of the retrieved packet
# data:
packet = [d]
# Is the next byte also HDLC_FLAG_BYTE?
d = self._s.getByte()
while d == HDLC_FLAG_BYTE:
d = self._s.getByte()
ts = time.time()
# We are now on the 2nd byte of the packet. Add it to
# our retrieved packet data:
packet.append(d)
# Read bytes from serial until we read another HDLC_FLAG_BYTE
# value (end of the current packet):
while d != HDLC_FLAG_BYTE:
d = self._s.getByte()
packet.append(d)
# Done reading a whole packet from serial
self.log("SimpleSerial:_read: unescaped %s" % packet)
# Decode the packet, and check CRC:
packet = self._unescape(packet)
crc = self._crc16(0, packet[1:-3])
packet_crc = self._decode(packet[-3:-1])
if crc != packet_crc:
print "Warning: wrong CRC! %x != %x %s" % (crc, packet_crc, ["%2x" % i for i in packet])
if not self._s._ts:
self._s._ts = ts
self.log("Serial:_read: %.4f (%.4f) Recv: %s" % (ts, ts - self._s._ts, self._format(packet[1:-3])))
self._ts = ts
# Packet was successfully retrieved, so return it in a
# RawPacket wrapper object (but leave out the HDLC_FLAG_BYTE
# and CRC bytes)
return RawPacket(ts, packet[1:-3])
except Timeout:
return None
def write(self, payload, seqno):
"""
Write a packet. If the payload argument is a list, it is
assumed to be exactly the payload. Otherwise the payload is
assume to be a Packet and the real payload is obtain by
calling the .payload().
"""
if isinstance(payload, Packet):
payload = payload.payload()
packet = DataFrame();
# We need to always request for acks
packet.protocol = SERIAL_PROTO_PACKET_ACK
packet.seqno = seqno
packet.dispatch = 0
packet.data = payload
packet = packet.payload()
crc = self._crc16(0, packet)
packet.append(crc & 0xff)
packet.append((crc >> 8) & 0xff)
packet = [HDLC_FLAG_BYTE] + self._escape(packet) + [HDLC_FLAG_BYTE]
self.log("Serial: write %s" % packet)
self._s.putBytes(packet)
def _format(self, payload):
f = NoAckDataFrame(payload)
if f.protocol == SERIAL_PROTO_ACK:
rpacket = AckFrame(payload)
return "Ack seqno: %d" % (rpacket.seqno)
else:
rpacket = ActiveMessage(f.data)
return "D: %04x S: %04x L: %02x G: %02x T: %02x | %s" % \
(rpacket.destination, rpacket.source,
rpacket.length, rpacket.group, rpacket.type,
list2hex(rpacket.data))
def _crc16(self, base_crc, frame_data):
crc = base_crc
for b in frame_data:
crc = crc ^ (b << 8)
for i in range(0, 8):
if crc & 0x8000 == 0x8000:
crc = (crc << 1) ^ 0x1021
else:
crc = crc << 1
crc = crc & 0xffff
return crc
def _encode(self, val, dim):
output = []
for i in range(dim):
output.append(val & 0xFF)
val = val >> 8
return output
def _decode(self, v):
r = long(0)
for i in v[::-1]:
r = (r << 8) + i
return r
def _unescape(self, packet):
r = []
esc = False
for b in packet:
if esc:
r.append(b ^ 0x20)
esc = False
elif b == HDLC_CTLESC_BYTE:
esc = True
else:
r.append(b)
return r
def _escape(self, packet):
r = []
for b in packet:
if b == HDLC_FLAG_BYTE or b == HDLC_CTLESC_BYTE:
r.append(HDLC_CTLESC_BYTE)
r.append(b ^ 0x20)
else:
r.append(b)
return r
def log(self, s):
if self._s.debug:
print s
class SimpleAM(object):
def __init__(self, source, oobHook=None):
self._source = source
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -