📄 gpsfake.py
字号:
iflag |= termios.INPCK cflag |= termios.PARENB elif parity == 'O': iflag |= termios.INPCK cflag |= termios.PARENB | termios.PARODD ispeed = ospeed = speed termios.tcsetattr(ttyfp.fileno(), termios.TCSANOW, [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]) def read(self): "Discard control strings written by gpsd." # A tcflush implementation works on Linux but fails on OpenBSD 4. termios.tcflush(self.master_fd, termios.TCIFLUSH) # Alas, the FIONREAD version also works on Linux and fails on OpenBSD. #try: # buf = array.array('i', [0]) # fcntl.ioctl(self.master_fd, termios.FIONREAD, buf, True) # n = struct.unpack('i', buf)[0] # os.read(self.master_fd, n) #except IOError: # pass def feed(self): "Feed a line from the contents of the GPS log to the daemon." line = self.testload.sentences[self.index % len(self.testload.sentences)] os.write(self.master_fd, line) time.sleep((WRITE_PAD * len(line)) / self.speed) self.index += 1class DaemonError(exceptions.Exception): def __init__(self, msg): self.msg = msg def __str__(self): return repr(self.msg)class DaemonInstance: "Control a gpsd instance." def __init__(self, control_socket=None): self.sockfile = None self.pid = None if control_socket: self.control_socket = control_socket else: self.control_socket = "/tmp/gpsfake-%d.sock" % os.getpid() self.pidfile = "/tmp/gpsfake_pid-%s" % os.getpid() def spawn(self, options, port, background=False, prefix=""): "Spawn a daemon instance." self.spawncmd = None if not '/usr/sbin' in os.environ['PATH']: os.environ['PATH']=os.environ['PATH'] + ":/usr/sbin" for path in os.environ['PATH'].split(':'): _spawncmd = "%s/gpsd" % path if os.path.isfile(_spawncmd) and os.access(_spawncmd, os.X_OK): self.spawncmd = _spawncmd break if not self.spawncmd: raise DaemonError("Cannot execute gpsd: executable not found.") # The -b option to suppress hanging on probe returns is needed to cope # with OpenBSD (and possibly other non-Linux systems) that don't support # anything we can use to implement the FakeGPS.read() method self.spawncmd += " -b -N -S %s -F %s -P %s %s" % (port, self.control_socket, self.pidfile, options) if prefix: self.spawncmd = prefix + " " + self.spawncmd.strip() if background: self.spawncmd += " &" status = os.system(self.spawncmd) if os.WIFSIGNALED(status) or os.WEXITSTATUS(status): raise DaemonError("daemon exited with status %d" % status) def wait_pid(self): "Wait for the daemon, get its PID and a control-socket connection." while True: try: fp = open(self.pidfile) except IOError: time.sleep(0.5) continue try: fp.seek(0) pidstr = fp.read() self.pid = int(pidstr) except ValueError: time.sleep(0.5) continue # Avoid race condition -- PID not yet written fp.close() break def __get_control_socket(self): # Now we know it's running, get a connection to the control socket. if not os.path.exists(self.control_socket): return None try: self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) self.sock.connect(self.control_socket) except socket.error: if self.sock: self.sock.close() self.sock = None return self.sock def is_alive(self): "Is the daemon still alive?" try: os.kill(self.pid, 0) return True except OSError: return False def add_device(self, path): "Add a device to the daemon's internal search list." if self.__get_control_socket(): self.sock.sendall("+%s\r\n" % path) self.sock.recv(12) self.sock.close() def remove_device(self, path): "Remove a device from the daemon's internal search list." if self.__get_control_socket(): self.sock.sendall("-%s\r\n" % path) self.sock.recv(12) self.sock.close() def kill(self): "Kill the daemon instance." if self.pid: try: os.kill(self.pid, signal.SIGTERM) except OSError: pass self.pid = None time.sleep(1) # Give signal time to landclass TestSessionError(exceptions.Exception): def __init__(self, msg): self.msg = msgclass TestSession: "Manage a session including a daemon with fake GPSes and clients." CLOSE_DELAY = 1 def __init__(self, prefix=None, port=None, options=None, verbose=False, predump=False): "Initialize the test session by launching the daemon." self.verbose = verbose self.predump = predump self.daemon = DaemonInstance() self.fakegpslist = {} self.client_id = 0 self.readers = 0 self.writers = 0 self.runqueue = [] self.index = 0 if port: self.port = port else: self.port = gps.GPSD_PORT self.progress = lambda x: None self.reporter = lambda x: None for sig in (signal.SIGQUIT, signal.SIGINT, signal.SIGTERM): signal.signal(sig, lambda signal, frame: self.cleanup()) self.daemon.spawn(background=True, prefix=prefix, port=self.port, options=options) self.daemon.wait_pid() self.default_predicate = None self.fd_set = [] self.threadlock = None def set_predicate(self, pred): "Set a default go predicate for the session." self.default_predicate = pred def gps_add(self, logfile, speed=4800, pred=None): "Add a simulated GPS being fed by the specified logfile." self.progress("gpsfake: gps_add(%s, %d)\n" % (logfile, speed)) if logfile not in self.fakegpslist: newgps = FakeGPS(logfile, speed=speed, verbose=self.verbose, predump=self.predump) if pred: newgps.go_predicate = pred elif self.default_predicate: newgps.go_predicate = self.default_predicate self.fakegpslist[newgps.slave] = newgps self.append(newgps) newgps.exhausted = 0 self.daemon.add_device(newgps.slave) return newgps.slave def gps_remove(self, name): "Remove a simulated GPS from the daemon's search list." self.progress("gpsfake: gps_remove(%s)\n" % name) self.remove(self.fakegpslist[name]) self.daemon.remove_device(name) del self.fakegpslist[name] def client_add(self, commands): "Initiate a client session and force connection to a fake GPS." self.progress("gpsfake: client_add()\n") newclient = gps.gps(port=self.port) self.append(newclient) newclient.id = self.client_id + 1 self.client_id += 1 self.progress("gpsfake: client %d has %s\n" % (self.client_id,newclient.device)) if commands: self.initialize(newclient, commands) return self.client_id def client_query(self, id, commands): "Ship a command to a client channel, get a response (threaded mode only)." self.progress("gpsfake: client_query(%d, %s)\n" % (id, `commands`)) for obj in self.runqueue: if isinstance(obj, gps.gps) and obj.id == id: obj.query(commands) return obj.response return None def client_remove(self, cid): "Terminate a client session." self.progress("gpsfake: client_remove(%d)\n" % cid) for obj in self.runqueue: if isinstance(obj, gps.gps) and obj.id == cid: self.remove(obj) return True else: return False def wait(self, seconds): "Wait, doing nothing." self.progress("gpsfake: wait(%d)\n" % seconds) time.sleep(seconds) def gather(self, seconds): "Wait, doing nothing but watching for sentences." self.progress("gpsfake: gather(%d)\n" % seconds) #mark = time.time() time.sleep(seconds) #if self.timings.c_recv_time <= mark: # TestSessionError("no sentences received\n") def cleanup(self): "We're done, kill the daemon." self.progress("gpsfake: cleanup()\n") if self.daemon: self.daemon.kill() self.daemon = None def run(self): "Run the tests." try: while self.daemon: # We have to read anything that gpsd might have tried # to send to the GPS here -- under OpenBSD the # TIOCDRAIN will hang, otherwise. for device in self.runqueue: if isinstance(device, FakeGPS): device.read() had_output = False chosen = self.choose() if isinstance(chosen, FakeGPS): # Delay a few seconds after a GPS source is exhauseted # to remove it. This should give its subscribers time # to get gpsd's response before we call cleanup() if chosen.exhausted and (time.time() - chosen.exhausted > TestSession.CLOSE_DELAY): self.remove(chosen) self.progress("gpsfake: GPS %s removed\n" % chosen.slave) elif not chosen.go_predicate(chosen.index, chosen): if chosen.exhausted == 0: chosen.exhausted = time.time() self.progress("gpsfake: GPS %s ran out of input\n" % chosen.slave) else: chosen.feed() elif isinstance(chosen, gps.gps): if chosen.enqueued: chosen.send(chosen.enqueued) chosen.enqueued = "" while chosen.waiting(): chosen.poll() self.reporter(chosen.response) had_output = True else: raise TestSessionError("test object of unknown type") if not self.writers and not had_output: break finally: self.cleanup() # All knowledge about locks and threading is below this line, # except for the bare fact that self.threadlock is set to None # in the class init method. def append(self, obj): "Add a producer or consumer to the object list." if self.threadlock: self.threadlock.acquire() self.runqueue.append(obj) if isinstance(obj, FakeGPS): self.writers += 1 elif isinstance(obj, gps.gps): self.readers += 1 if self.threadlock: self.threadlock.release() def remove(self, obj): "Remove a producer or consumer from the object list." if self.threadlock: self.threadlock.acquire() self.runqueue.remove(obj) if isinstance(obj, FakeGPS): self.writers -= 1 elif isinstance(obj, gps.gps): self.readers -= 1 self.index = min(len(self.runqueue)-1, self.index) if self.threadlock: self.threadlock.release() def choose(self): "Atomically get the next object scheduled to do something." if self.threadlock: self.threadlock.acquire() chosen = self.index self.index += 1 self.index %= len(self.runqueue) if self.threadlock: self.threadlock.release() return self.runqueue[chosen] def initialize(self, client, commands): "Arrange for client to ship specified commands when it goes active." client.enqueued = "" if not self.threadlock: client.query(commands) else: client.enqueued = commands def start(self): self.threadlock = threading.Lock() threading.Thread(target=self.run)# End
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -