⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 hodring.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
📖 第 1 页 / 共 3 页
字号:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements.  See the NOTICE file#distributed with this work for additional information#regarding copyright ownership.  The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License.  You may obtain a copy of the License at#     http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License.#!/usr/bin/env python"""hodring launches hadoop commands on work node and  cleans up all the work dirs afterward"""# -*- python -*-import os, sys, time, shutil, getpass, xml.dom.minidom, xml.dom.pulldomimport socket, sets, urllib, csv, signal, pprint, random, re, httplibfrom xml.dom import getDOMImplementationfrom pprint import pformatfrom optparse import OptionParserfrom urlparse import urlparsefrom hodlib.Common.util import local_fqdn, parseEquals, getMapredSystemDirectory, isProcessRunningfrom hodlib.Common.tcp import tcpSocket, tcpError binfile = sys.path[0]libdir = os.path.dirname(binfile)sys.path.append(libdir)import hodlib.Common.loggerfrom hodlib.GridServices.service import *from hodlib.Common.util import *from hodlib.Common.socketServers import threadedHTTPServerfrom hodlib.Common.hodsvc import hodBaseServicefrom hodlib.Common.threads import simpleCommandfrom hodlib.Common.xmlrpc import hodXRClientmswindows = (sys.platform == "win32")originalcwd = os.getcwd()reHdfsURI = re.compile("hdfs://(.*?:\d+)(.*)")class CommandDesc:  """A class that represents the commands that  are run by hodring"""  def __init__(self, dict, log):    self.log = log    self.log.debug("In command desc")    self.log.debug("Done in command desc")    dict.setdefault('argv', [])    dict.setdefault('version', None)    dict.setdefault('envs', {})    dict.setdefault('workdirs', [])    dict.setdefault('attrs', {})    dict.setdefault('final-attrs', {})    dict.setdefault('fg', False)    dict.setdefault('ignorefailures', False)    dict.setdefault('stdin', None)    self.log.debug("Printing dict")    self._checkRequired(dict)    self.dict = dict  def _checkRequired(self, dict):    if 'name' not in dict:      raise ValueError, "Command description lacks 'name'"    if 'program' not in dict:      raise ValueError, "Command description lacks 'program'"    if 'pkgdirs' not in dict:      raise ValueError, "Command description lacks 'pkgdirs'"  def getName(self):    return self.dict['name']  def getProgram(self):    return self.dict['program']  def getArgv(self):    return self.dict['argv']  def getVersion(self):    return self.dict['version']  def getEnvs(self):    return self.dict['envs']  def getPkgDirs(self):    return self.dict['pkgdirs']  def getWorkDirs(self):    return self.dict['workdirs']  def getAttrs(self):    return self.dict['attrs']  def getfinalAttrs(self):    return self.dict['final-attrs']    def isForeground(self):    return self.dict['fg']  def isIgnoreFailures(self):    return self.dict['ignorefailures']  def getStdin(self):    return self.dict['stdin']  def parseDesc(str):    dict = CommandDesc._parseMap(str)    dict['argv'] = CommandDesc._parseList(dict['argv'])    dict['envs'] = CommandDesc._parseMap(dict['envs'])    dict['pkgdirs'] = CommandDesc._parseList(dict['pkgdirs'], ':')    dict['workdirs'] = CommandDesc._parseList(dict['workdirs'], ':')    dict['attrs'] = CommandDesc._parseMap(dict['attrs'])    dict['final-attrs'] = CommandDesc._parseMap(dict['final-attrs'])						    return CommandDesc(dict)  parseDesc = staticmethod(parseDesc)  def _parseList(str, delim = ','):    list = []    for row in csv.reader([str], delimiter=delim, escapechar='\\',                           quoting=csv.QUOTE_NONE, doublequote=False):      list.extend(row)    return list  _parseList = staticmethod(_parseList)  def _parseMap(str):    """Parses key value pairs"""    dict = {}    for row in csv.reader([str], escapechar='\\', quoting=csv.QUOTE_NONE, doublequote=False):      for f in row:        [k, v] = f.split('=', 1)        dict[k] = v    return dict  _parseMap = staticmethod(_parseMap)class MRSystemDirectoryManager:  """Class that is responsible for managing the MapReduce system directory"""  def __init__(self, jtPid, mrSysDir, fsName, hadoopPath, log, retries=120):    self.__jtPid = jtPid    self.__mrSysDir = mrSysDir    self.__fsName = fsName    self.__hadoopPath = hadoopPath    self.__log = log    self.__retries = retries  def toCleanupArgs(self):    return " --jt-pid %s --mr-sys-dir %s --fs-name %s --hadoop-path %s " \              % (self.__jtPid, self.__mrSysDir, self.__fsName, self.__hadoopPath)  def removeMRSystemDirectory(self):        jtActive = isProcessRunning(self.__jtPid)    count = 0 # try for a max of a minute for the process to end    while jtActive and (count<self.__retries):      time.sleep(0.5)      jtActive = isProcessRunning(self.__jtPid)      count += 1        if count == self.__retries:      self.__log.warn('Job Tracker did not exit even after a minute. Not going to try and cleanup the system directory')      return    self.__log.debug('jt is now inactive')    cmd = "%s dfs -fs hdfs://%s -rmr %s" % (self.__hadoopPath, self.__fsName, \                                            self.__mrSysDir)    self.__log.debug('Command to run to remove system directory: %s' % (cmd))    try:      hadoopCommand = simpleCommand('mr-sys-dir-cleaner', cmd)      hadoopCommand.start()      hadoopCommand.wait()      hadoopCommand.join()      ret = hadoopCommand.exit_code()      if ret != 0:        self.__log.warn("Error in removing MapReduce system directory '%s' from '%s' using path '%s'" \                          % (self.__mrSysDir, self.__fsName, self.__hadoopPath))        self.__log.warn(pprint.pformat(hadoopCommand.output()))      else:        self.__log.info("Removed MapReduce system directory successfully.")    except:      self.__log.error('Exception while cleaning up MapReduce system directory. May not be cleaned up. %s', \                          get_exception_error_string())      self.__log.debug(get_exception_string())def createMRSystemDirectoryManager(dict, log):  keys = [ 'jt-pid', 'mr-sys-dir', 'fs-name', 'hadoop-path' ]  for key in keys:    if (not dict.has_key(key)) or (dict[key] is None):      return None  mrSysDirManager = MRSystemDirectoryManager(int(dict['jt-pid']), dict['mr-sys-dir'], \                                              dict['fs-name'], dict['hadoop-path'], log)  return mrSysDirManagerclass HadoopCommand:  """Runs a single hadoop command"""      def __init__(self, id, desc, tempdir, tardir, log, javahome,                 mrSysDir, restart=False):    self.desc = desc    self.log = log    self.javahome = javahome    self.__mrSysDir = mrSysDir    self.program = desc.getProgram()    self.name = desc.getName()    self.workdirs = desc.getWorkDirs()    self.hadoopdir = tempdir    self.confdir = os.path.join(self.hadoopdir, '%d-%s' % (id, self.name),                                 "confdir")    self.logdir = os.path.join(self.hadoopdir, '%d-%s' % (id, self.name),                                "logdir")    self.out = os.path.join(self.logdir, '%s.out' % self.name)    self.err = os.path.join(self.logdir, '%s.err' % self.name)    self.child = None    self.restart = restart    self.filledInKeyVals = []    self._createWorkDirs()    self._createHadoopSiteXml()    self._createHadoopLogDir()    self.__hadoopThread = None    self.stdErrContents = "" # store list of contents for returning to user  def _createWorkDirs(self):    for dir in self.workdirs:      if os.path.exists(dir):        if not os.access(dir, os.F_OK | os.R_OK | os.W_OK | os.X_OK):          raise ValueError, "Workdir %s does not allow rwx permission." % (dir)        continue      try:        os.makedirs(dir)      except:        pass  def getFilledInKeyValues(self):    return self.filledInKeyVals  def createXML(self, doc, attr, topElement, final):    for k,v in attr.iteritems():      self.log.debug('_createHadoopSiteXml: ' + str(k) + " " + str(v))      if ( v == "fillinport" ):        v = "%d" % (ServiceUtil.getUniqRandomPort(low=50000, log=self.log))      keyvalpair = ''      if isinstance(v, (tuple, list)):        for item in v:          keyvalpair = "%s%s=%s," % (keyvalpair, k, item)        keyvalpair = keyvalpair[:-1]      else:        keyvalpair = k + '=' + v      self.filledInKeyVals.append(keyvalpair)      if(k == "mapred.job.tracker"): # total hack for time's sake        keyvalpair = k + "=" + v        self.filledInKeyVals.append(keyvalpair)	      if ( v == "fillinhostport"):        port = "%d" % (ServiceUtil.getUniqRandomPort(low=50000, log=self.log))        self.log.debug('Setting hostname to: %s' % local_fqdn())        v = local_fqdn() + ':' + port            keyvalpair = ''      if isinstance(v, (tuple, list)):        for item in v:          keyvalpair = "%s%s=%s," % (keyvalpair, k, item)        keyvalpair = keyvalpair[:-1]      else:        keyvalpair = k + '=' + v            self.filledInKeyVals.append(keyvalpair)      if ( v == "fillindir"):        v = self.__mrSysDir        pass            prop = None      if isinstance(v, (tuple, list)):        for item in v:          prop = self._createXmlElement(doc, k, item, "No description", final)          topElement.appendChild(prop)      else:        if k == 'fs.default.name':          prop = self._createXmlElement(doc, k, "hdfs://" + v, "No description", final)        else:          prop = self._createXmlElement(doc, k, v, "No description", final)        topElement.appendChild(prop)	  def _createHadoopSiteXml(self):    if self.restart:      if not os.path.exists(self.confdir):        os.makedirs(self.confdir)    else:      assert os.path.exists(self.confdir) == False      os.makedirs(self.confdir)    implementation = getDOMImplementation()

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -