⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 garmin.py

📁 gps pygarmin-0[1].6.tgz
💻 PY
📖 第 1 页 / 共 4 页
字号:
#!/usr/bin/env python"""   garmin      This module implements the protocol used for communication by the   Garmin GPS receivers. It is based on the official description   available from Garmin at      http://www.garmin.com/support/commProtocol.html   There are lots of variations in the protocols employed by different   Garmin products. This module tries to cover most of them, which is   why it looks so big! Only a small subset of the module will be used   by any particular model. It can easily be extended to cover any   models not currently included.   For documentation, see the source, and the included index.html file.   This is released under the Gnu General Public Licence. A copy of   this can be found at http://www.opensource.org/licenses/gpl-license.html   For the latest information about PyGarmin, please see   http://pygarmin.sourceforge.net/   (c) 2001 Quentin Stafford-Fraser <quentin@uk.research.att.com>   (c) 2000 James A. H. Skillen <jahs@skillen.org.uk>   (c) 2001 Raymond Penners <raymond@dotsphinx.com>   (c) 2001 Tom Grydeland <Tom.Grydeland@phys.uit.no>   """import os, string, newstruct, time, sysstruct = newstruct# Set this value to > 0 for some debugging output, and the higher# the number, the more you'll get.debug = 0# Introduction =====================================================# There are 3 levels of protocol documented:##       Application  (highest level)#       Link#       Physical     (lowest level)## Garmin documents the various versions of these under labels of# Pxxx, Lxxx, Axxx etc, and this convention is followed here.# There are also various data types, named Dxxx.# Roughly speaking, the Physical protocols specify RS232, the Link# protocols specify a packet structure for sending messages to and# fro, and the Application protocol specify what can actually go in# those packets.# secs from Unix epoch (start of 1970) to Sun Dec 31 00:00:00 1989TimeEpoch = 631065600   # Physical protocols ===============================================# See the Garmin docs for this. At the time of writing, the only# documented physical layer is P000 which is roughly RS232 at 9600# baud, 8 data bits, no parity, 1 stop bit. Unlike pure RS232, no# negative voltages are used, but that is normally not too important.# In software, we model this as something that has read and write# methods, which can be used by the higher protocol levels. Later, we# subclass this as something which handles Unix serial ports.class P000:   " Physical layer for communicating with Garmin "   def read(self, n):      pass   def write(self, n):      pass# The following is handy for debugging:def hexdump(data): return string.join(map(lambda x: "%02x" % ord(x), data))# Link protocols ===================================================LinkException = "Link Error"class L000:   "Basic Link Protocol"   Pid_Ack_Byte = 6   Pid_Nak_Byte = 21   Pid_Protocol_Array = 253   Pid_Product_Rqst = 254   Pid_Product_Data = 255   # DataLinkEscape etc   DLE                  = "\x10"   ETX                  = "\x03"   EOM                  = DLE+ETX   def __init__(self, physicalLayer):      self.phys = physicalLayer   def sendPacket(self, ptype, data, readAck=1):      " Send a message. By default this will also wait for the ack."      if type(data) == type(""):         ld = chr(len(data))      else: # XXX assume 16-bit integer for now         ld = chr(2)         data = struct.pack("<h",data)      tp = chr(ptype)      chk = self.checksum( tp + ld + data)      escline = self.escape( ld + data + chk)      bytes = self.DLE + tp + escline + self.EOM       self.phys.write(bytes)      if debug > 5: print "< packet %3d : " % ptype, hexdump(data)      if readAck:         self.readAcknowledge(ptype)   def readPacket(self, sendAck=1):      " Read a message. By default this will also send the ack."      dle = self.phys.read(1)      # Find the start of a message      while dle != self.DLE:         print "resync - expected DLE and got something else"         dle = self.phys.read(1)      # We've now found either the start or the end of a msg      # Try reading the type.      tp = self.phys.read(1)      if tp == self.ETX:         # It was the end!         dle = self.phys.read(1)         tp = self.phys.read(1)      # Now we should be synchronised      ptype = ord(tp)      ld = self.readEscapedByte()      datalen = ord(ld)      data = ""      for i in range(0, datalen):         data = data + self.readEscapedByte()      ck = self.readEscapedByte()      if ck != self.checksum(tp + ld + data):         raise LinkException, "Invalid checksum"      eom = self.phys.read(2)      assert(eom==self.EOM, "Invalid EOM seen")      if debug > 5: print "> packet %3d : " % ptype, hexdump(data)      if sendAck:         self.sendAcknowledge(ptype)      return (ptype, data)   def expectPacket(self, ptype):      "Expect and read a particular msg type. Return data."      tp, data = self.readPacket()      if tp != ptype:         raise LinkException, "Expected msg type %d, got %d" % (ptype, tp)      return data   def readAcknowledge(self, ptype):      "Read an ack msg in response to a particular sent msg"      if debug > 5: print "(>ack)",      tp, data = self.readPacket(0)      if (tp & 0xff) != self.Pid_Ack_Byte or ord(data[0]) != ptype:         raise LinkException, "Acknowledge error"   def sendAcknowledge(self, ptype):      if debug > 5: print "(<ack)",      self.sendPacket(self.Pid_Ack_Byte, struct.pack("<h", ptype), 0)         def readEscapedByte(self):      c = self.phys.read(1)      if c == self.DLE:         c = self.phys.read(1)      return c   def checksum(self, data):      sum = 0      for i in data:         sum = sum + ord(i)      sum = sum % 256      return chr((256-sum) % 256)   def escape(self, data):      "Escape any DLE characters"      return string.join(string.split(data, self.DLE), self.DLE+self.DLE)# L001 builds on L000class L001(L000):   "Link protocol 1"   Pid_Command_Data = 10   Pid_Xfer_Cmplt = 12   Pid_Date_Time_Data = 14   Pid_Position_Data = 17   Pid_Prx_Wpt_Data = 19   Pid_Records = 27   Pid_Rte_Hdr = 29   Pid_Rte_Wpt_Data = 30   Pid_Almanac_Data = 31   Pid_Trk_Data = 34   Pid_Wpt_Data = 35   Pid_Pvt_Data = 51   Pid_Rte_Link_Data = 98   Pid_Trk_Hdr = 99# L002 builds on L000class L002(L000):   "Link Protocol 2"   Pid_Almanac_Data = 4   Pid_Command_Data = 11   Pid_Xfer_Cmplt = 12   Pid_Date_Time_Data = 20   Pid_Position_Data = 24   Pid_Records = 35   Pid_Rte_Hdr = 37   Pid_Rte_Wpt_Data = 39   Pid_Wpt_Data = 43# Application Protocols =======================================ProtocolException = "Protocol Error"# A000 and A001 are used to find out what is on the other end of the# wire, and hence which other protocols we can use.class A000:   "Product Data Protocol"   def __init__(self, linkLayer):      self.link = linkLayer   def getProductData(self):      fmt = "<hh"      self.link.sendPacket(self.link.Pid_Product_Rqst,"")      data = self.link.expectPacket(self.link.Pid_Product_Data)      (prod_id, soft_ver)   = struct.unpack(fmt, data[:4])      prod_descs = string.split(data[4:-1], "\0")      return (prod_id, soft_ver/100.0, prod_descs)class A001:   "Protocol Capabilities Protocol"   def __init__(self, linkLayer):      self.link=linkLayer   def getProtocols(self):      # may raise LinkException here      if debug > 3: print "Try reading protocols using PCP"      data = self.link.expectPacket(self.link.Pid_Protocol_Array)      num = len(data)/3      fmt = "<"+num*"ch"      tup = struct.unpack(fmt, data)      protocols = []      for i in range(0, 2*num, 2):         protocols.append(tup[i]+"%03d"%tup[i+1])      if debug > 0:         print "Protocols reported by A001:", protocols      return protocols      # Commands  ---------------------------------------------------class A010:   "Device Command Protocol 1"   Cmnd_Abort_Transfer = 0   # abort current transfer    Cmnd_Transfer_Alm = 1     # transfer almanac    Cmnd_Transfer_Posn = 2    # transfer position    Cmnd_Transfer_Prx = 3     # transfer proximity waypoints    Cmnd_Transfer_Rte = 4     # transfer routes    Cmnd_Transfer_Time = 5    # transfer time    Cmnd_Transfer_Trk = 6     # transfer track log    Cmnd_Transfer_Wpt = 7     # transfer waypoints    Cmnd_Turn_Off_Pwr = 8     # turn off power    Cmnd_Start_Pvt_Data = 49  # start transmitting PVT data    Cmnd_Stop_Pvt_Data = 50   # stop transmitting PVT data class A011:   "Device Command Protocol 2"   Cmnd_Abort_Transfer = 0   # abort current transfer   Cmnd_Transfer_Alm = 4     # transfer almanac   Cmnd_Transfer_Rte = 8     # transfer routes   Cmnd_Transfer_Time = 20   # transfer time   Cmnd_Transfer_Wpt = 21    # transfer waypoints   Cmnd_Turn_Off_Pwr = 26    # turn off power# Transfer Protocols -------------------------------------------# Most of the following protocols transfer groups of records of a# particular format. The exact format depends on the product in use.# Some records may have sub-groups within the transfer (eg. routes)# each with their own header.class TransferProtocol:   def __init__(self, link, cmdproto, datatypes):      self.link = link      self.cmdproto = cmdproto      self.datatypes = datatypes   def getData(self, cmd, *pids):      pass   def putData(self, cmd, data_pid, records):      numrecords = len(records)      if debug > 3: print self.__doc__, "Sending %d records" % numrecords      self.link.sendPacket(self.link.Pid_Records, numrecords)      for i in records:         self.link.sendPacket(data_pid, i.pack())      self.link.sendPacket(self.link.Pid_Xfer_Cmplt, cmd)class SingleTransferProtocol(TransferProtocol):   def getData(self, cmd, pid):      self.link.sendPacket(self.link.Pid_Command_Data, cmd)      data = self.link.expectPacket(self.link.Pid_Records)      (numrecords,) = struct.unpack("<h", data)      if debug > 3: print self.__doc__, "Expecting %d records" % numrecords      result = []      for i in range(numrecords):         data = self.link.expectPacket(pid)         p = self.datatypes[0]()         p.unpack(data)         result.append(p)      self.link.expectPacket(self.link.Pid_Xfer_Cmplt)      return resultclass MultiTransferProtocol(TransferProtocol):   def getData(self, cmd, hdr_pid, *data_pids):      self.link.sendPacket(self.link.Pid_Command_Data, cmd)      data = self.link.expectPacket(self.link.Pid_Records)      (numrecords,) = struct.unpack("<h", data)      if debug > 3: print self.__doc__, "Expecting %d records" % numrecords      data_pids = list(data_pids)      result = []      last = []      for i in range(numrecords):         tp, data = self.link.readPacket()         if tp == hdr_pid:            if last:               result.append(last)               last = []            index = 0         else:            try:               index = data_pids.index(tp) + 1            except ValueError:               raise ProtocolException, "Expected header or point"         p = self.datatypes[index]()         p.unpack(data)         last.append(p)      self.link.expectPacket(self.link.Pid_Xfer_Cmplt)      if last:         result.append(last)      return resultclass A100(SingleTransferProtocol):   "Waypoint Transfer Protocol"   def getData(self):      return SingleTransferProtocol.getData(self,                                            self.cmdproto.Cmnd_Transfer_Wpt,                                            self.link.Pid_Wpt_Data)   def putData(self,data):      return SingleTransferProtocol.putData(self,                                            self.cmdproto.Cmnd_Transfer_Wpt,                                            self.link.Pid_Wpt_Data,                                            data)      class A200(MultiTransferProtocol):   "Route Transfer Protocol"   def getData(self):      return MultiTransferProtocol.getData(self,                                           self.cmdproto.Cmnd_Transfer_Rte,                                           self.link.Pid_Rte_Hdr,                                           self.link.Pid_Rte_Wpt_Data)class A201(MultiTransferProtocol):   "Route Transfer Protocol"   def getData(self):      return MultiTransferProtocol.getData(self,                                           self.cmdproto.Cmnd_Transfer_Rte,                                           self.link.Pid_Rte_Hdr,                                           self.link.Pid_Rte_Wpt_Data,                                           self.link.Pid_Rte_Link_Data)class A300(SingleTransferProtocol):   "Track Log Transfer Protocol"   def getData(self):      return SingleTransferProtocol.getData(self,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -