📄 gps.py
字号:
#!/usr/bin/env python# $Id: gps.py 4565 2007-12-14 03:28:28Z esr $## gps.py -- Python interface to GPSD.#import time, calendar, math, socket, sys, select# Needed in all versions of Python that don't implement# PEP 75 (http://python.fyxm.net/peps/pep-0754.html).# This includes at least versions up to 2.3.4.# PosInf = 1e300000# NaN = PosInf/PosInf# IEEE754 says NaN == NaN is always false!# so this is wrong: # def isnan(x): return x == NaN# so let's use PosInf as a proxy# this is not mathmetically correct but good enough for altitudeNaN = 1e10def isnan(x): return x > 1e9ONLINE_SET = 0x00000001TIME_SET = 0x00000002TIMERR_SET = 0x00000004LATLON_SET = 0x00000008ALTITUDE_SET = 0x00000010SPEED_SET = 0x00000020TRACK_SET = 0x00000040CLIMB_SET = 0x00000080STATUS_SET = 0x00000100MODE_SET = 0x00000200HDOP_SET = 0x00000400VDOP_SET = 0x00000800PDOP_SET = 0x00001000TDOP_SET = 0x00002000GDOP_SET = 0x00004000HERR_SET = 0x00008000VERR_SET = 0x00010000PERR_SET = 0x00020000SATELLITE_SET = 0x00040000SPEEDERR_SET = 0x00080000TRACKERR_SET = 0x00100000CLIMBERR_SET = 0x00200000DEVICE_SET = 0x00400000DEVICELIST_SET = 0x00800000DEVICEID_SET = 0x01000000ERROR_SET = 0x02000000CYCLE_START_SET = 0x04000000FIX_SET = (TIME_SET|MODE_SET|TIMERR_SET|LATLON_SET|HERR_SET|ALTITUDE_SET|VERR_SET|TRACK_SET|TRACKERR_SET|SPEED_SET|SPEEDERR_SET|CLIMB_SET|CLIMBERR_SET)STATUS_NO_FIX = 0STATUS_FIX = 1STATUS_DGPS_FIX = 2MODE_NO_FIX = 1MODE_2D = 2MODE_3D = 3MAXCHANNELS = 12SIGNAL_STRENGTH_UNKNOWN = NaN# The spec says 82, but some receivers (TN-200, GSW 2.3.2) output 86 charactersNMEA_MAX = 86GPSD_PORT = 2947class gpstimings: def __init__(self): self.sentence_tag = "" self.sentence_length = 0 self.sentence_time = 0.0 self.d_xmit_time = 0.0 self.d_recv_time = 0.0 self.d_decode_time = 0.0 self.emit_time = 0.0 self.poll_time = 0.0 self.c_recv_time = 0.0 self.c_decode_time = 0.0 def d_received(self): if self.sentence_time: return self.d_recv_time + self.sentence_time else: return self.d_recv_time + self.d_xmit_time def collect(self, tag, length, sentence_time, xmit_time, recv_time, decode_time, poll_time, emit_time): self.sentence_tag = tag self.sentence_length = int(length) self.sentence_time = float(sentence_time) self.d_xmit_time = float(xmit_time) self.d_recv_time = float(recv_time) self.d_decode_time = float(decode_time) self.poll_time = float(poll_time) self.emit_time = float(emit_time) def __str__(self): return "%s\t%2d\t%2.6f\t%2.6f\t%2.6f\t%2.6f\t%2.6f\t%2.6f\t%2.6f\t%2.6f\n" \ % (self.sentence_tag, self.sentence_length, self.sentence_time, self.d_xmit_time, self.d_recv_time, self.d_decode_time, self.poll_time, self.emit_time, self.c_recv_time, self.c_decode_time) class gpsfix: def __init__(self): self.mode = MODE_NO_FIX self.time = NaN self.ept = NaN self.latitude = self.longitude = 0.0 self.eph = NaN self.altitude = NaN # Meters self.epv = NaN self.track = NaN # Degrees from true north self.speed = NaN # Knots self.climb = NaN # Meters per second self.epd = NaN self.eps = NaN self.epc = NaNclass gpsdata: "Position, track, velocity and status information returned by a GPS." class satellite: def __init__(self, PRN, elevation, azimuth, ss, used=None): self.PRN = PRN self.elevation = elevation self.azimuth = azimuth self.ss = ss self.used = used def __repr__(self): return "PRN: %3d E: %3d Az: %3d Ss: %d Used: %s" % (self.PRN,self.elevation,self.azimuth,self.ss,"ny"[self.used]) def __init__(self): # Initialize all data members self.online = 0 # NZ if GPS on, zero if not self.valid = 0 self.fix = gpsfix() self.status = STATUS_NO_FIX self.utc = "" self.satellites_used = 0 # Satellites used in last fix self.pdop = self.hdop = self.vdop = self.tdop = self.gdop = 0.0 self.epe = 0.0 self.satellites = [] # satellite objects in view self.await = self.parts = 0 self.gps_id = None self.driver_mode = 0 self.profiling = False self.timings = gpstimings() self.baudrate = 0 self.stopbits = 0 self.cycle = 0 self.mincycle = 0 self.device = None self.devices = [] def __repr__(self): st = "" st += "Lat/lon: %f %f\n" % (self.fix.latitude, self.fix.longitude) if isnan(self.fix.altitude): st += "Altitude: ?\n" else: st += "Altitude: %f\n" % (self.fix.altitude) if isnan(self.fix.speed): st += "Speed: ?\n" else: st += "Speed: %f\n" % (self.fix.speed) if isnan(self.fix.track): st += "Track: ?\n" else: st += "Track: %f\n" % (self.fix.track) st += "Status: STATUS_%s\n" %("NO_FIX","FIX","DGPS_FIX")[self.status] st += "Mode: MODE_"+("ZERO", "NO_FIX", "2D","3D")[self.fix.mode]+"\n" st += "Quality: %d p=%2.2f h=%2.2f v=%2.2f t=%2.2f g=%2.2f\n" % \ (self.satellites_used, self.pdop, self.hdop, self.vdop, self.tdop, self.gdop) st += "Y: %s satellites in view:\n" % len(self.satellites) for sat in self.satellites: st += " " + repr(sat) + "\n" return stclass gps(gpsdata): "Client interface to a running gpsd instance." def __init__(self, host="127.0.0.1", port="2947", verbose=0): gpsdata.__init__(self) self.sock = None # in case we blow up in connect self.sockfile = None self.connect(host, port) self.verbose = verbose self.raw_hook = None def connect(self, host, port): """Connect to a host on a given port. If the hostname ends with a colon (`:') followed by a number, and there is no port specified, that suffix will be stripped off and the number interpreted as the port number to use. """ if not port and (host.find(':') == host.rfind(':')): i = host.rfind(':') if i >= 0: host, port = host[:i], host[i+1:] try: port = int(port) except ValueError: raise socket.error, "nonnumeric port" if not port: port = GPSD_PORT #if self.debuglevel > 0: print 'connect:', (host, port) msg = "getaddrinfo returns an empty list" self.sock = None self.sockfile = None for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): af, socktype, proto, canonname, sa = res try: self.sock = socket.socket(af, socktype, proto) #if self.debuglevel > 0: print 'connect:', (host, port) self.sock.connect(sa) self.sockfile = self.sock.makefile() except socket.error, msg: #if self.debuglevel > 0: print 'connect fail:', (host, port) self.close() continue break if not self.sock: raise socket.error, msg def set_raw_hook(self, hook): self.raw_hook = hook def close(self): if self.sockfile: self.sockfile.close() if self.sock: self.sock.close() self.sock = None self.sockfile = None def __del__(self): self.close() def __unpack(self, buf): # unpack a daemon response into the instance members self.gps_time = 0.0 fields = buf.strip().split(",") if fields[0] == "GPSD": for field in fields[1:]: if not field or field[1] != '=': continue cmd = field[0] data = field[2:] if data[0] == "?": continue if cmd in ('A', 'a'): self.fix.altitude = float(data) self.valid |= ALTITUDE_SET elif cmd in ('B', 'b'): if data == '?': self.baudrate = self.stopbits = 0 self.device = None else: (f1, f2, f3, f4) = data.split() self.baudrate = int(f1) self.stopbits = int(f4) elif cmd in ('C', 'c'):
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -