📄 garmin.py
字号:
#!/usr/bin/env python""" garmin This module implements the protocol used for communication by the Garmin GPS receivers. It is based on the official description available from Garmin at http://www.garmin.com/support/commProtocol.html There are lots of variations in the protocols employed by different Garmin products. This module tries to cover most of them, which is why it looks so big! Only a small subset of the module will be used by any particular model. It can easily be extended to cover any models not currently included. For documentation, see the source, and the included index.html file. This is released under the Gnu General Public Licence. A copy of this can be found at http://www.opensource.org/licenses/gpl-license.html For the latest information about PyGarmin, please see http://pygarmin.sourceforge.net/ (c) 2001 Quentin Stafford-Fraser <quentin@uk.research.att.com> (c) 2000 James A. H. Skillen <jahs@skillen.org.uk> (c) 2001 Raymond Penners <raymond@dotsphinx.com> (c) 2001 Tom Grydeland <Tom.Grydeland@phys.uit.no> """import os, string, newstruct, time, sysstruct = newstruct# Set this value to > 0 for some debugging output, and the higher# the number, the more you'll get.debug = 0# Introduction =====================================================# There are 3 levels of protocol documented:## Application (highest level)# Link# Physical (lowest level)## Garmin documents the various versions of these under labels of# Pxxx, Lxxx, Axxx etc, and this convention is followed here.# There are also various data types, named Dxxx.# Roughly speaking, the Physical protocols specify RS232, the Link# protocols specify a packet structure for sending messages to and# fro, and the Application protocol specify what can actually go in# those packets.# secs from Unix epoch (start of 1970) to Sun Dec 31 00:00:00 1989TimeEpoch = 631065600 # Physical protocols ===============================================# See the Garmin docs for this. At the time of writing, the only# documented physical layer is P000 which is roughly RS232 at 9600# baud, 8 data bits, no parity, 1 stop bit. Unlike pure RS232, no# negative voltages are used, but that is normally not too important.# In software, we model this as something that has read and write# methods, which can be used by the higher protocol levels. Later, we# subclass this as something which handles Unix serial ports.class P000: " Physical layer for communicating with Garmin " def read(self, n): pass def write(self, n): pass# The following is handy for debugging:def hexdump(data): return string.join(map(lambda x: "%02x" % ord(x), data))# Link protocols ===================================================LinkException = "Link Error"class L000: "Basic Link Protocol" Pid_Ack_Byte = 6 Pid_Nak_Byte = 21 Pid_Protocol_Array = 253 Pid_Product_Rqst = 254 Pid_Product_Data = 255 # DataLinkEscape etc DLE = "\x10" ETX = "\x03" EOM = DLE+ETX def __init__(self, physicalLayer): self.phys = physicalLayer def sendPacket(self, ptype, data, readAck=1): " Send a message. By default this will also wait for the ack." if type(data) == type(""): ld = chr(len(data)) else: # XXX assume 16-bit integer for now ld = chr(2) data = struct.pack("<h",data) tp = chr(ptype) chk = self.checksum( tp + ld + data) escline = self.escape( ld + data + chk) bytes = self.DLE + tp + escline + self.EOM self.phys.write(bytes) if debug > 5: print "< packet %3d : " % ptype, hexdump(data) if readAck: self.readAcknowledge(ptype) def readPacket(self, sendAck=1): " Read a message. By default this will also send the ack." dle = self.phys.read(1) # Find the start of a message while dle != self.DLE: print "resync - expected DLE and got something else" dle = self.phys.read(1) # We've now found either the start or the end of a msg # Try reading the type. tp = self.phys.read(1) if tp == self.ETX: # It was the end! dle = self.phys.read(1) tp = self.phys.read(1) # Now we should be synchronised ptype = ord(tp) ld = self.readEscapedByte() datalen = ord(ld) data = "" for i in range(0, datalen): data = data + self.readEscapedByte() ck = self.readEscapedByte() if ck != self.checksum(tp + ld + data): raise LinkException, "Invalid checksum" eom = self.phys.read(2) assert(eom==self.EOM, "Invalid EOM seen") if debug > 5: print "> packet %3d : " % ptype, hexdump(data) if sendAck: self.sendAcknowledge(ptype) return (ptype, data) def expectPacket(self, ptype): "Expect and read a particular msg type. Return data." tp, data = self.readPacket() if tp != ptype: raise LinkException, "Expected msg type %d, got %d" % (ptype, tp) return data def readAcknowledge(self, ptype): "Read an ack msg in response to a particular sent msg" if debug > 5: print "(>ack)", tp, data = self.readPacket(0) if (tp & 0xff) != self.Pid_Ack_Byte or ord(data[0]) != ptype: raise LinkException, "Acknowledge error" def sendAcknowledge(self, ptype): if debug > 5: print "(<ack)", self.sendPacket(self.Pid_Ack_Byte, struct.pack("<h", ptype), 0) def readEscapedByte(self): c = self.phys.read(1) if c == self.DLE: c = self.phys.read(1) return c def checksum(self, data): sum = 0 for i in data: sum = sum + ord(i) sum = sum % 256 return chr((256-sum) % 256) def escape(self, data): "Escape any DLE characters" return string.join(string.split(data, self.DLE), self.DLE+self.DLE)# L001 builds on L000class L001(L000): "Link protocol 1" Pid_Command_Data = 10 Pid_Xfer_Cmplt = 12 Pid_Date_Time_Data = 14 Pid_Position_Data = 17 Pid_Prx_Wpt_Data = 19 Pid_Records = 27 Pid_Rte_Hdr = 29 Pid_Rte_Wpt_Data = 30 Pid_Almanac_Data = 31 Pid_Trk_Data = 34 Pid_Wpt_Data = 35 Pid_Pvt_Data = 51 Pid_Rte_Link_Data = 98 Pid_Trk_Hdr = 99# L002 builds on L000class L002(L000): "Link Protocol 2" Pid_Almanac_Data = 4 Pid_Command_Data = 11 Pid_Xfer_Cmplt = 12 Pid_Date_Time_Data = 20 Pid_Position_Data = 24 Pid_Records = 35 Pid_Rte_Hdr = 37 Pid_Rte_Wpt_Data = 39 Pid_Wpt_Data = 43# Application Protocols =======================================ProtocolException = "Protocol Error"# A000 and A001 are used to find out what is on the other end of the# wire, and hence which other protocols we can use.class A000: "Product Data Protocol" def __init__(self, linkLayer): self.link = linkLayer def getProductData(self): fmt = "<hh" self.link.sendPacket(self.link.Pid_Product_Rqst,"") data = self.link.expectPacket(self.link.Pid_Product_Data) (prod_id, soft_ver) = struct.unpack(fmt, data[:4]) prod_descs = string.split(data[4:-1], "\0") return (prod_id, soft_ver/100.0, prod_descs)class A001: "Protocol Capabilities Protocol" def __init__(self, linkLayer): self.link=linkLayer def getProtocols(self): # may raise LinkException here if debug > 3: print "Try reading protocols using PCP" data = self.link.expectPacket(self.link.Pid_Protocol_Array) num = len(data)/3 fmt = "<"+num*"ch" tup = struct.unpack(fmt, data) protocols = [] for i in range(0, 2*num, 2): protocols.append(tup[i]+"%03d"%tup[i+1]) if debug > 0: print "Protocols reported by A001:", protocols return protocols # Commands ---------------------------------------------------class A010: "Device Command Protocol 1" Cmnd_Abort_Transfer = 0 # abort current transfer Cmnd_Transfer_Alm = 1 # transfer almanac Cmnd_Transfer_Posn = 2 # transfer position Cmnd_Transfer_Prx = 3 # transfer proximity waypoints Cmnd_Transfer_Rte = 4 # transfer routes Cmnd_Transfer_Time = 5 # transfer time Cmnd_Transfer_Trk = 6 # transfer track log Cmnd_Transfer_Wpt = 7 # transfer waypoints Cmnd_Turn_Off_Pwr = 8 # turn off power Cmnd_Start_Pvt_Data = 49 # start transmitting PVT data Cmnd_Stop_Pvt_Data = 50 # stop transmitting PVT data class A011: "Device Command Protocol 2" Cmnd_Abort_Transfer = 0 # abort current transfer Cmnd_Transfer_Alm = 4 # transfer almanac Cmnd_Transfer_Rte = 8 # transfer routes Cmnd_Transfer_Time = 20 # transfer time Cmnd_Transfer_Wpt = 21 # transfer waypoints Cmnd_Turn_Off_Pwr = 26 # turn off power# Transfer Protocols -------------------------------------------# Most of the following protocols transfer groups of records of a# particular format. The exact format depends on the product in use.# Some records may have sub-groups within the transfer (eg. routes)# each with their own header.class TransferProtocol: def __init__(self, link, cmdproto, datatypes): self.link = link self.cmdproto = cmdproto self.datatypes = datatypes def getData(self, cmd, *pids): pass def putData(self, cmd, data_pid, records): numrecords = len(records) if debug > 3: print self.__doc__, "Sending %d records" % numrecords self.link.sendPacket(self.link.Pid_Records, numrecords) for i in records: self.link.sendPacket(data_pid, i.pack()) self.link.sendPacket(self.link.Pid_Xfer_Cmplt, cmd)class SingleTransferProtocol(TransferProtocol): def getData(self, cmd, pid): self.link.sendPacket(self.link.Pid_Command_Data, cmd) data = self.link.expectPacket(self.link.Pid_Records) (numrecords,) = struct.unpack("<h", data) if debug > 3: print self.__doc__, "Expecting %d records" % numrecords result = [] for i in range(numrecords): data = self.link.expectPacket(pid) p = self.datatypes[0]() p.unpack(data) result.append(p) self.link.expectPacket(self.link.Pid_Xfer_Cmplt) return resultclass MultiTransferProtocol(TransferProtocol): def getData(self, cmd, hdr_pid, *data_pids): self.link.sendPacket(self.link.Pid_Command_Data, cmd) data = self.link.expectPacket(self.link.Pid_Records) (numrecords,) = struct.unpack("<h", data) if debug > 3: print self.__doc__, "Expecting %d records" % numrecords data_pids = list(data_pids) result = [] last = [] for i in range(numrecords): tp, data = self.link.readPacket() if tp == hdr_pid: if last: result.append(last) last = [] index = 0 else: try: index = data_pids.index(tp) + 1 except ValueError: raise ProtocolException, "Expected header or point" p = self.datatypes[index]() p.unpack(data) last.append(p) self.link.expectPacket(self.link.Pid_Xfer_Cmplt) if last: result.append(last) return resultclass A100(SingleTransferProtocol): "Waypoint Transfer Protocol" def getData(self): return SingleTransferProtocol.getData(self, self.cmdproto.Cmnd_Transfer_Wpt, self.link.Pid_Wpt_Data) def putData(self,data): return SingleTransferProtocol.putData(self, self.cmdproto.Cmnd_Transfer_Wpt, self.link.Pid_Wpt_Data, data) class A200(MultiTransferProtocol): "Route Transfer Protocol" def getData(self): return MultiTransferProtocol.getData(self, self.cmdproto.Cmnd_Transfer_Rte, self.link.Pid_Rte_Hdr, self.link.Pid_Rte_Wpt_Data)class A201(MultiTransferProtocol): "Route Transfer Protocol" def getData(self): return MultiTransferProtocol.getData(self, self.cmdproto.Cmnd_Transfer_Rte, self.link.Pid_Rte_Hdr, self.link.Pid_Rte_Wpt_Data, self.link.Pid_Rte_Link_Data)class A300(SingleTransferProtocol): "Track Log Transfer Protocol" def getData(self): return SingleTransferProtocol.getData(self,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -