📄 hodring.py
字号:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements. See the NOTICE file#distributed with this work for additional information#regarding copyright ownership. The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License. You may obtain a copy of the License at# http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License.#!/usr/bin/env python"""hodring launches hadoop commands on work node and cleans up all the work dirs afterward"""# -*- python -*-import os, sys, time, shutil, getpass, xml.dom.minidom, xml.dom.pulldomimport socket, sets, urllib, csv, signal, pprint, random, re, httplibfrom xml.dom import getDOMImplementationfrom pprint import pformatfrom optparse import OptionParserfrom urlparse import urlparsefrom hodlib.Common.util import local_fqdn, parseEquals, getMapredSystemDirectory, isProcessRunningfrom hodlib.Common.tcp import tcpSocket, tcpError binfile = sys.path[0]libdir = os.path.dirname(binfile)sys.path.append(libdir)import hodlib.Common.loggerfrom hodlib.GridServices.service import *from hodlib.Common.util import *from hodlib.Common.socketServers import threadedHTTPServerfrom hodlib.Common.hodsvc import hodBaseServicefrom hodlib.Common.threads import simpleCommandfrom hodlib.Common.xmlrpc import hodXRClientmswindows = (sys.platform == "win32")originalcwd = os.getcwd()reHdfsURI = re.compile("hdfs://(.*?:\d+)(.*)")class CommandDesc: """A class that represents the commands that are run by hodring""" def __init__(self, dict, log): self.log = log self.log.debug("In command desc") self.log.debug("Done in command desc") dict.setdefault('argv', []) dict.setdefault('version', None) dict.setdefault('envs', {}) dict.setdefault('workdirs', []) dict.setdefault('attrs', {}) dict.setdefault('final-attrs', {}) dict.setdefault('fg', False) dict.setdefault('ignorefailures', False) dict.setdefault('stdin', None) self.log.debug("Printing dict") self._checkRequired(dict) self.dict = dict def _checkRequired(self, dict): if 'name' not in dict: raise ValueError, "Command description lacks 'name'" if 'program' not in dict: raise ValueError, "Command description lacks 'program'" if 'pkgdirs' not in dict: raise ValueError, "Command description lacks 'pkgdirs'" def getName(self): return self.dict['name'] def getProgram(self): return self.dict['program'] def getArgv(self): return self.dict['argv'] def getVersion(self): return self.dict['version'] def getEnvs(self): return self.dict['envs'] def getPkgDirs(self): return self.dict['pkgdirs'] def getWorkDirs(self): return self.dict['workdirs'] def getAttrs(self): return self.dict['attrs'] def getfinalAttrs(self): return self.dict['final-attrs'] def isForeground(self): return self.dict['fg'] def isIgnoreFailures(self): return self.dict['ignorefailures'] def getStdin(self): return self.dict['stdin'] def parseDesc(str): dict = CommandDesc._parseMap(str) dict['argv'] = CommandDesc._parseList(dict['argv']) dict['envs'] = CommandDesc._parseMap(dict['envs']) dict['pkgdirs'] = CommandDesc._parseList(dict['pkgdirs'], ':') dict['workdirs'] = CommandDesc._parseList(dict['workdirs'], ':') dict['attrs'] = CommandDesc._parseMap(dict['attrs']) dict['final-attrs'] = CommandDesc._parseMap(dict['final-attrs']) return CommandDesc(dict) parseDesc = staticmethod(parseDesc) def _parseList(str, delim = ','): list = [] for row in csv.reader([str], delimiter=delim, escapechar='\\', quoting=csv.QUOTE_NONE, doublequote=False): list.extend(row) return list _parseList = staticmethod(_parseList) def _parseMap(str): """Parses key value pairs""" dict = {} for row in csv.reader([str], escapechar='\\', quoting=csv.QUOTE_NONE, doublequote=False): for f in row: [k, v] = f.split('=', 1) dict[k] = v return dict _parseMap = staticmethod(_parseMap)class MRSystemDirectoryManager: """Class that is responsible for managing the MapReduce system directory""" def __init__(self, jtPid, mrSysDir, fsName, hadoopPath, log, retries=120): self.__jtPid = jtPid self.__mrSysDir = mrSysDir self.__fsName = fsName self.__hadoopPath = hadoopPath self.__log = log self.__retries = retries def toCleanupArgs(self): return " --jt-pid %s --mr-sys-dir %s --fs-name %s --hadoop-path %s " \ % (self.__jtPid, self.__mrSysDir, self.__fsName, self.__hadoopPath) def removeMRSystemDirectory(self): jtActive = isProcessRunning(self.__jtPid) count = 0 # try for a max of a minute for the process to end while jtActive and (count<self.__retries): time.sleep(0.5) jtActive = isProcessRunning(self.__jtPid) count += 1 if count == self.__retries: self.__log.warn('Job Tracker did not exit even after a minute. Not going to try and cleanup the system directory') return self.__log.debug('jt is now inactive') cmd = "%s dfs -fs hdfs://%s -rmr %s" % (self.__hadoopPath, self.__fsName, \ self.__mrSysDir) self.__log.debug('Command to run to remove system directory: %s' % (cmd)) try: hadoopCommand = simpleCommand('mr-sys-dir-cleaner', cmd) hadoopCommand.start() hadoopCommand.wait() hadoopCommand.join() ret = hadoopCommand.exit_code() if ret != 0: self.__log.warn("Error in removing MapReduce system directory '%s' from '%s' using path '%s'" \ % (self.__mrSysDir, self.__fsName, self.__hadoopPath)) self.__log.warn(pprint.pformat(hadoopCommand.output())) else: self.__log.info("Removed MapReduce system directory successfully.") except: self.__log.error('Exception while cleaning up MapReduce system directory. May not be cleaned up. %s', \ get_exception_error_string()) self.__log.debug(get_exception_string())def createMRSystemDirectoryManager(dict, log): keys = [ 'jt-pid', 'mr-sys-dir', 'fs-name', 'hadoop-path' ] for key in keys: if (not dict.has_key(key)) or (dict[key] is None): return None mrSysDirManager = MRSystemDirectoryManager(int(dict['jt-pid']), dict['mr-sys-dir'], \ dict['fs-name'], dict['hadoop-path'], log) return mrSysDirManagerclass HadoopCommand: """Runs a single hadoop command""" def __init__(self, id, desc, tempdir, tardir, log, javahome, mrSysDir, restart=False): self.desc = desc self.log = log self.javahome = javahome self.__mrSysDir = mrSysDir self.program = desc.getProgram() self.name = desc.getName() self.workdirs = desc.getWorkDirs() self.hadoopdir = tempdir self.confdir = os.path.join(self.hadoopdir, '%d-%s' % (id, self.name), "confdir") self.logdir = os.path.join(self.hadoopdir, '%d-%s' % (id, self.name), "logdir") self.out = os.path.join(self.logdir, '%s.out' % self.name) self.err = os.path.join(self.logdir, '%s.err' % self.name) self.child = None self.restart = restart self.filledInKeyVals = [] self._createWorkDirs() self._createHadoopSiteXml() self._createHadoopLogDir() self.__hadoopThread = None self.stdErrContents = "" # store list of contents for returning to user def _createWorkDirs(self): for dir in self.workdirs: if os.path.exists(dir): if not os.access(dir, os.F_OK | os.R_OK | os.W_OK | os.X_OK): raise ValueError, "Workdir %s does not allow rwx permission." % (dir) continue try: os.makedirs(dir) except: pass def getFilledInKeyValues(self): return self.filledInKeyVals def createXML(self, doc, attr, topElement, final): for k,v in attr.iteritems(): self.log.debug('_createHadoopSiteXml: ' + str(k) + " " + str(v)) if ( v == "fillinport" ): v = "%d" % (ServiceUtil.getUniqRandomPort(low=50000, log=self.log)) keyvalpair = '' if isinstance(v, (tuple, list)): for item in v: keyvalpair = "%s%s=%s," % (keyvalpair, k, item) keyvalpair = keyvalpair[:-1] else: keyvalpair = k + '=' + v self.filledInKeyVals.append(keyvalpair) if(k == "mapred.job.tracker"): # total hack for time's sake keyvalpair = k + "=" + v self.filledInKeyVals.append(keyvalpair) if ( v == "fillinhostport"): port = "%d" % (ServiceUtil.getUniqRandomPort(low=50000, log=self.log)) self.log.debug('Setting hostname to: %s' % local_fqdn()) v = local_fqdn() + ':' + port keyvalpair = '' if isinstance(v, (tuple, list)): for item in v: keyvalpair = "%s%s=%s," % (keyvalpair, k, item) keyvalpair = keyvalpair[:-1] else: keyvalpair = k + '=' + v self.filledInKeyVals.append(keyvalpair) if ( v == "fillindir"): v = self.__mrSysDir pass prop = None if isinstance(v, (tuple, list)): for item in v: prop = self._createXmlElement(doc, k, item, "No description", final) topElement.appendChild(prop) else: if k == 'fs.default.name': prop = self._createXmlElement(doc, k, "hdfs://" + v, "No description", final) else: prop = self._createXmlElement(doc, k, v, "No description", final) topElement.appendChild(prop) def _createHadoopSiteXml(self): if self.restart: if not os.path.exists(self.confdir): os.makedirs(self.confdir) else: assert os.path.exists(self.confdir) == False os.makedirs(self.confdir) implementation = getDOMImplementation()
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -