⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 web_display.py

📁 Urwid is a Python library for making text console applications. It has many features including fluid
💻 PY
📖 第 1 页 / 共 2 页
字号:
	width: 100%; height: 100%;	margin: 3px 0 0 0; border: 1px solid #999; }#page { position: relative;  width: 100%;height: 100%;}"""# HTML Initial Page_html_page = ["""<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"><html><head><title>Urwid Web Display - ""","""</title><style type="text/css">""" + _css_style + """</style></head><body id="body" onload="load_web_display()"><div style="position:absolute; visibility:hidden;"><br id="br"\> <pre>The quick brown fox jumps over the lazy dog.<span id="testchar">X</span><span id="testchar2">Y</span></pre></div>Urwid Web Display - <b>""","""</b> -Status: <span id="status">Set up</span><script type="text/javascript">//<![CDATA[""" + _js_code +"""//]]></script><pre id="text"></pre></body></html>"""]class Screen:	def __init__(self):		self.palette = {}		self.has_color = True		def register_palette( self, l ):		"""Register a list of palette entries.		l -- list of (name, foreground, background) or		     (name, same_as_other_name) palette entries.		calls self.register_palette_entry for each item in l		"""				for item in l:			if len(item) in (3,4):				self.register_palette_entry( *item )				continue			assert len(item) == 2, "Invalid register_palette usage"			name, like_name = item			if not self.palette.has_key(like_name):				raise Exception("palette entry '%s' doesn't exist"%like_name)			self.palette[name] = self.palette[like_name]	def register_palette_entry( self, name, foreground, background, 		mono=None):		"""Register a single palette entry.		name -- new entry/attribute name		foreground -- foreground colour		background -- background colour		mono -- monochrome terminal attribute		See curses_display.register_palette_entry for more info.		"""		if foreground == "default":			foreground = "black"		if background == "default":			background = "light gray"		self.palette[name] = (foreground, background, mono)	def set_mouse_tracking(self):		"""Not yet implemented"""		pass	def start(self):		"""			This function reads the initial screen size, generates a		unique id and handles cleanup when fn exits.				web_display.set_preferences(..) must be called before calling		this function for the preferences to take effect		"""		global _prefs				client_init = sys.stdin.read(50)		assert client_init.startswith("window resize "),client_init		ignore1,ignore2,x,y = client_init.split(" ",3)		x = int(x)		y = int(y)		self._set_screen_size( x, y )		self.last_screen = {}		self.last_screen_width = 0			self.update_method = os.environ["HTTP_X_URWID_METHOD"]		assert self.update_method in ("multipart","polling")			if self.update_method == "polling" and not _prefs.allow_polling:			sys.stdout.write("Status: 403 Forbidden\r\n\r\n")			sys.exit(0)				clients = glob.glob(os.path.join(_prefs.pipe_dir,"urwid*.in"))		if len(clients) >= _prefs.max_clients:			sys.stdout.write("Status: 503 Sever Busy\r\n\r\n")			sys.exit(0)				urwid_id = "%09d%09d"%(random.randrange(10**9),			random.randrange(10**9))		self.pipe_name = os.path.join(_prefs.pipe_dir,"urwid"+urwid_id)		os.mkfifo(self.pipe_name+".in",0600)		signal.signal(signal.SIGTERM,self._cleanup_pipe)				self.input_fd = os.open(self.pipe_name+".in", 			os.O_NONBLOCK | os.O_RDONLY)		self.input_tail = ""		self.content_head = ("Content-type: "			"multipart/x-mixed-replace;boundary=ZZ\r\n"			"X-Urwid-ID: "+urwid_id+"\r\n"			"\r\n\r\n"			"--ZZ\r\n")		if self.update_method=="polling":			self.content_head = (				"Content-type: text/plain\r\n"				"X-Urwid-ID: "+urwid_id+"\r\n"				"\r\n\r\n")				signal.signal(signal.SIGALRM,self._handle_alarm)		signal.alarm( ALARM_DELAY )	def stop(self):		"""		Restore settings and clean up.  		"""		try:			self._close_connection()		except:			pass		signal.signal(signal.SIGTERM,signal.SIG_DFL)		self._cleanup_pipe()			def run_wrapper(self,fn):		"""		Run the application main loop, calling start() first		and stop() on exit.		"""		try:			self.start()			return fn()		finally:			self.stop()				def _close_connection(self):		if self.update_method == "polling child":			self.server_socket.settimeout(0)			socket, addr = self.server_socket.accept()			socket.sendall("Z")			socket.close()				if self.update_method == "multipart":			sys.stdout.write("\r\nZ"				"\r\n--ZZ--\r\n")			sys.stdout.flush()					def _cleanup_pipe(self, *args):		if not self.pipe_name: return		try:			os.remove(self.pipe_name+".in")			os.remove(self.pipe_name+".update")		except:			pass	def _set_screen_size(self, cols, rows ):		"""Set the screen size (within max size)."""				if cols > MAX_COLS:			cols = MAX_COLS		if rows > MAX_ROWS:			rows = MAX_ROWS		self.screen_size = cols, rows			def draw_screen(self, (cols, rows), r ):		"""Send a screen update to the client."""				if cols != self.last_screen_width:			self.last_screen = {}			sendq = [self.content_head]				if self.update_method == "polling":			send = sendq.append		elif self.update_method == "polling child":			signal.alarm( 0 )			try:				s, addr = self.server_socket.accept()			except socket.timeout, e:				sys.exit(0)			send = s.sendall		else:			signal.alarm( 0 )			send = sendq.append			send("\r\n")			self.content_head = ""				assert r.rows() == rows			if r.cursor is not None:			cx, cy = r.cursor		else:			cx = cy = None				new_screen = {}				y = -1		for row in r.content():			y += 1			row = list(row)						l = []						sig = tuple(row)			if y == cy: sig = sig + (cx,)			new_screen[sig] = new_screen.get(sig,[]) + [y]			old_line_numbers = self.last_screen.get(sig, None)			if old_line_numbers is not None:				if y in old_line_numbers:					old_line = y				else:					old_line = old_line_numbers[0]				send( "<%d\n"%old_line )				continue						col = 0			for (a, cs, run) in row:				run = run.translate(_trans_table)				if a is None:					fg,bg,mono = "black", "light gray", None				else:					fg,bg,mono = self.palette[a]				if y == cy and col <= cx:					run_width = util.calc_width(run, 0, 						len(run))					if col+run_width > cx:						l.append(code_span(run, fg, bg,							cx-col))					else:						l.append(code_span(run, fg, bg))					col += run_width				else:					l.append(code_span(run, fg, bg))			send("".join(l)+"\n")		self.last_screen = new_screen		self.last_screen_width = cols				if self.update_method == "polling":			sys.stdout.write("".join(sendq))			sys.stdout.flush()			sys.stdout.close()			self._fork_child()		elif self.update_method == "polling child":			s.close()		else: # update_method == "multipart"			send("\r\n--ZZ\r\n")			sys.stdout.write("".join(sendq))			sys.stdout.flush()				signal.alarm( ALARM_DELAY )		def clear(self):		"""		Force the screen to be completely repainted on the next		call to draw_screen().		(does nothing for web_display)		"""		pass	def _fork_child(self):		"""		Fork a child to run CGI disconnected for polling update method.		Force parent process to exit.		"""		daemonize( self.pipe_name +".err" )		self.input_fd = os.open(self.pipe_name+".in", 			os.O_NONBLOCK | os.O_RDONLY)		self.update_method = "polling child"		s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)		s.bind( self.pipe_name+".update" )		s.listen(1)		s.settimeout(POLL_CONNECT)		self.server_socket = s		def _handle_alarm(self, sig, frame):		assert self.update_method in ("multipart","polling child")		if self.update_method == "polling child":			# send empty update			try:				s, addr = self.server_socket.accept()				s.close()			except socket.timeout, e:				sys.exit(0)		else:			# send empty update			sys.stdout.write("\r\n\r\n--ZZ\r\n")			sys.stdout.flush()		signal.alarm( ALARM_DELAY )						def get_cols_rows(self):		"""Return the screen size."""		return self.screen_size	def get_input(self, raw_keys=False):		"""Return pending input as a list."""		l = []		resized = False				try:			iready,oready,eready = select.select(				[self.input_fd],[],[],0.5)		except select.error, e:			# return on interruptions			if e.args[0] == 4: 				if raw_keys:					return [],[]				return []			raise		if not iready:			if raw_keys:				return [],[]			return []				keydata = os.read(self.input_fd, MAX_READ)		os.close(self.input_fd)		self.input_fd = os.open(self.pipe_name+".in", 			os.O_NONBLOCK | os.O_RDONLY)		#sys.stderr.write( `keydata,self.input_tail`+"\n" )		keys = keydata.split("\n")		keys[0] = self.input_tail + keys[0]		self.input_tail = keys[-1]				for k in keys[:-1]:			if k.startswith("window resize "):				ign1,ign2,x,y = k.split(" ",3)				x = int(x)				y = int(y)				self._set_screen_size(x, y)				resized = True			else:				l.append(k)		if resized:			l.append("window resize")				if raw_keys:			return l, []		return l	def code_span( s, fg, bg, cursor = -1):	code_fg = _code_colours[ fg ]	code_bg = _code_colours[ bg ]		if cursor >= 0:		c_off, _ign = util.calc_text_pos(s, 0, len(s), cursor)		c2_off = util.move_next_char(s, c_off, len(s))		return ( code_fg + code_bg + s[:c_off] + "\n" +			 code_bg + code_fg + s[c_off:c2_off] + "\n" +			 code_fg + code_bg + s[c2_off:] + "\n")	else:		return code_fg + code_bg + s + "\n"def html_escape(text):	"""Escape text so that it will be displayed safely within HTML"""	text = text.replace('&','&amp;')	text = text.replace('<','&lt;')	text = text.replace('>','&gt;')	return text	def is_web_request():	"""	Return True if this is a CGI web request.	"""	return os.environ.has_key('REQUEST_METHOD')def handle_short_request():	"""	Handle short requests such as passing keystrokes to the application	or sending the initial html page.  If returns True, then this	function recognised and handled a short request, and the calling	script should immediately exit.	web_display.set_preferences(..) should be called before calling this	function for the preferences to take effect	"""	global _prefs		if not is_web_request():		return False			if os.environ['REQUEST_METHOD'] == "GET":		# Initial request, send the HTML and javascript.		sys.stdout.write("Content-type: text/html\r\n\r\n" +			html_escape(_prefs.app_name).join(_html_page))		return True		if os.environ['REQUEST_METHOD'] != "POST":		# Don't know what to do with head requests etc.		return False		if not os.environ.has_key('HTTP_X_URWID_ID'):		# If no urwid id, then the application should be started.		return False	urwid_id = os.environ['HTTP_X_URWID_ID']	if len(urwid_id)>20:		#invalid. handle by ignoring		#assert 0, "urwid id too long!"		sys.stdout.write("Status: 414 URI Too Long\r\n\r\n")		return True	for c in urwid_id:		if c not in "0123456789":			# invald. handle by ignoring			#assert 0, "invalid chars in id!"			sys.stdout.write("Status: 403 Forbidden\r\n\r\n")			return True		if os.environ.get('HTTP_X_URWID_METHOD',None) == "polling":		# this is a screen update request		s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)		try:			s.connect( os.path.join(_prefs.pipe_dir,				"urwid"+urwid_id+".update") )			data = "Content-type: text/plain\r\n\r\n"+s.recv(BUF_SZ)			while data:				sys.stdout.write(data)				data = s.recv(BUF_SZ)			return True		except socket.error,e:			sys.stdout.write("Status: 404 Not Found\r\n\r\n")			return True						# this is a keyboard input request	try:		fd = os.open((os.path.join(_prefs.pipe_dir,			"urwid"+urwid_id+".in")), os.O_WRONLY)	except OSError,e:		sys.stdout.write("Status: 404 Not Found\r\n\r\n")		return True			keydata = sys.stdin.read(MAX_READ)	os.write(fd,keydata)	os.close(fd)	sys.stdout.write("Content-type: text/plain\r\n\r\n")		return Trueclass _Preferences:	app_name = "Unnamed Application"	pipe_dir = "/tmp"	allow_polling = True	max_clients = 20_prefs = _Preferences()def set_preferences( app_name, pipe_dir="/tmp", allow_polling=True, 	max_clients=20 ):	"""	Set web_display preferences.		app_name -- application name to appear in html interface	pipe_dir -- directory for input pipes, daemon update sockets 	            and daemon error logs	allow_polling -- allow creation of daemon processes for 	                 browsers without multipart support 	max_clients -- maximum concurrent client connections. This		       pool is shared by all urwid applications		       using the same pipe_dir	"""	global _prefs	_prefs.app_name = app_name	_prefs.pipe_dir = pipe_dir	_prefs.allow_polling = allow_polling	_prefs.max_clients = max_clientsclass ErrorLog:	def __init__(self, errfile ):		self.errfile = errfile	def write(self, err):		open(self.errfile,"a").write(err)def daemonize( errfile ):	"""	Detach process and become a daemon.	"""	pid = os.fork()	if pid:		os._exit(0)	os.setsid()	signal.signal(signal.SIGHUP, signal.SIG_IGN)	os.umask(0)	pid = os.fork()	if pid:		os._exit(0)	os.chdir("/")	for fd in range(0,20):		try:			os.close(fd)		except OSError:			pass	sys.stdin = open("/dev/null","r")	sys.stdout = open("/dev/null","w")	sys.stderr = ErrorLog( errfile )

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -