⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 hdfs.py

📁 HADOOP 0.18.0安装源代码头文件
💻 PY
字号:
#Licensed to the Apache Software Foundation (ASF) under one#or more contributor license agreements.  See the NOTICE file#distributed with this work for additional information#regarding copyright ownership.  The ASF licenses this file#to you under the Apache License, Version 2.0 (the#"License"); you may not use this file except in compliance#with the License.  You may obtain a copy of the License at#     http://www.apache.org/licenses/LICENSE-2.0#Unless required by applicable law or agreed to in writing, software#distributed under the License is distributed on an "AS IS" BASIS,#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.#See the License for the specific language governing permissions and#limitations under the License."""define Hdfs as subclass of Service"""# -*- python -*-import osfrom service import *from hodlib.Hod.nodePool import *from hodlib.Common.desc import CommandDescfrom hodlib.Common.util import get_exception_string, parseEqualsclass HdfsExternal(MasterSlave):  """dummy proxy to external HDFS instance"""  def __init__(self, serviceDesc, workDirs, version):    MasterSlave.__init__(self, serviceDesc, workDirs,None)    self.launchedMaster = True    self.masterInitialized = True    self.version = version      def getMasterRequest(self):    return None  def getMasterCommands(self, serviceDict):    return []  def getAdminCommands(self, serviceDict):    return []  def getWorkerCommands(self, serviceDict):    return []  def getMasterAddrs(self):    attrs = self.serviceDesc.getfinalAttrs()    addr = attrs['fs.default.name']    return [addr]    def setMasterParams(self, dict):   self.serviceDesc.dict['final-attrs']['fs.default.name'] = "%s:%s" % \     (dict['host'], dict['fs_port'])   if self.version < 16:    self.serviceDesc.dict['final-attrs']['dfs.info.port'] = \                                    str(self.serviceDesc.dict['info_port'])   else:     # After Hadoop-2185     self.serviceDesc.dict['final-attrs']['dfs.http.address'] = "%s:%s" % \       (dict['host'], dict['info_port'])  def getInfoAddrs(self):    attrs = self.serviceDesc.getfinalAttrs()    if self.version < 16:      addr = attrs['fs.default.name']      k,v = addr.split( ":")      infoaddr = k + ':' + attrs['dfs.info.port']    else:      # After Hadoop-2185      infoaddr = attrs['dfs.http.address']    return [infoaddr]class Hdfs(MasterSlave):  def __init__(self, serviceDesc, nodePool, required_node, version, \                                        format=True, upgrade=False):    MasterSlave.__init__(self, serviceDesc, nodePool, required_node)    self.masterNode = None    self.masterAddr = None    self.runAdminCommands = True    self.infoAddr = None    self._isLost = False    self.format = format    self.upgrade = upgrade    self.workers = []    self.version = version  def getMasterRequest(self):    req = NodeRequest(1, [], False)    return req  def getMasterCommands(self, serviceDict):    masterCommands = []    if self.format:      masterCommands.append(self._getNameNodeCommand(True))    if self.upgrade:      masterCommands.append(self._getNameNodeCommand(False, True))    else:      masterCommands.append(self._getNameNodeCommand(False))    return masterCommands  def getAdminCommands(self, serviceDict):    adminCommands = []    if self.upgrade and self.runAdminCommands:      adminCommands.append(self._getNameNodeAdminCommand('-safemode wait'))      adminCommands.append(self._getNameNodeAdminCommand('-finalizeUpgrade',                                                          True, True))    self.runAdminCommands = False    return adminCommands  def getWorkerCommands(self, serviceDict):    cmdDesc = self._getDataNodeCommand()    return [cmdDesc]  def setMasterNodes(self, list):    node = list[0]    self.masterNode = node      def getMasterAddrs(self):    return [self.masterAddr]  def getInfoAddrs(self):    return [self.infoAddr]  def getWorkers(self):    return self.workers  def setMasterParams(self, list):    dict = self._parseEquals(list)    self.masterAddr = dict['fs.default.name']    k,v = self.masterAddr.split( ":")    self.masterNode = k    if self.version < 16:      self.infoAddr = self.masterNode + ':' + dict['dfs.info.port']    else:      # After Hadoop-2185      self.infoAddr = dict['dfs.http.address']     def _parseEquals(self, list):    return parseEquals(list)    def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):    namedir = None    hadooptmpdir = None    datadir = []    for p in parentDirs:      workDirs.append(p)      workDirs.append(os.path.join(p, subDir))      dir = os.path.join(p, subDir, 'dfs-data')      datadir.append(dir)      if not hadooptmpdir:        # Not used currently, generating hadooptmpdir just in case        hadooptmpdir = os.path.join(p, subDir, 'hadoop-tmp')      if not namedir:        namedir = os.path.join(p, subDir, 'dfs-name')    workDirs.append(namedir)    workDirs.extend(datadir)    # FIXME!! use csv    attrs['dfs.name.dir'] = namedir    attrs['hadoop.tmp.dir'] = hadooptmpdir    attrs['dfs.data.dir'] = ','.join(datadir)    # FIXME -- change dfs.client.buffer.dir    envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"  def _getNameNodeCommand(self, format=False, upgrade=False):    sd = self.serviceDesc    parentDirs = self.workDirs    workDirs = []    attrs = sd.getfinalAttrs().copy()    envs = sd.getEnvs().copy()        if 'fs.default.name' not in attrs:      attrs['fs.default.name'] = 'fillinhostport'     if self.version < 16:     if 'dfs.info.port' not in attrs:      attrs['dfs.info.port'] = 'fillinport'    else:      # Addressing Hadoop-2185, added the following. Earlier versions don't      # care about this      if 'dfs.http.address' not in attrs:        attrs['dfs.http.address'] = 'fillinhostport'    self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')    dict = { 'name' : 'namenode' }    dict['program'] = os.path.join('bin', 'hadoop')    argv = ['namenode']    if format:      argv.append('-format')    elif upgrade:      argv.append('-upgrade')    dict['argv'] = argv    dict['envs'] = envs    dict['pkgdirs'] = sd.getPkgDirs()    dict['workdirs'] = workDirs    dict['final-attrs'] = attrs    dict['attrs'] = sd.getAttrs()    if format:      dict['fg'] = 'true'      dict['stdin'] = 'Y'    cmd = CommandDesc(dict)    return cmd  def _getNameNodeAdminCommand(self, adminCommand, wait=True, ignoreFailures=False):    sd = self.serviceDesc    parentDirs = self.workDirs    workDirs = []    attrs = sd.getfinalAttrs().copy()    envs = sd.getEnvs().copy()    nn = self.masterAddr    if nn == None:      raise ValueError, "Can't get namenode address"    attrs['fs.default.name'] = nn    self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')    dict = { 'name' : 'dfsadmin' }    dict['program'] = os.path.join('bin', 'hadoop')    argv = ['dfsadmin']    argv.append(adminCommand)    dict['argv'] = argv    dict['envs'] = envs    dict['pkgdirs'] = sd.getPkgDirs()    dict['workdirs'] = workDirs    dict['final-attrs'] = attrs    dict['attrs'] = sd.getAttrs()    if wait:      dict['fg'] = 'true'      dict['stdin'] = 'Y'    if ignoreFailures:      dict['ignorefailures'] = 'Y'    cmd = CommandDesc(dict)    return cmd   def _getDataNodeCommand(self):    sd = self.serviceDesc    parentDirs = self.workDirs    workDirs = []    attrs = sd.getfinalAttrs().copy()    envs = sd.getEnvs().copy()    nn = self.masterAddr    if nn == None:      raise ValueError, "Can't get namenode address"    attrs['fs.default.name'] = nn    if self.version < 16:      if 'dfs.datanode.port' not in attrs:        attrs['dfs.datanode.port'] = 'fillinport'      if 'dfs.datanode.info.port' not in attrs:        attrs['dfs.datanode.info.port'] = 'fillinport'    else:      # Adding the following. Hadoop-2185      if 'dfs.datanode.address' not in attrs:        attrs['dfs.datanode.address'] = 'fillinhostport'      if 'dfs.datanode.http.address' not in attrs:        attrs['dfs.datanode.http.address'] = 'fillinhostport'        if self.version >= 18:      # After HADOOP-3283      # TODO: check for major as well as minor versions      attrs['dfs.datanode.ipc.address'] = 'fillinhostport'                        self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn')    dict = { 'name' : 'datanode' }    dict['program'] = os.path.join('bin', 'hadoop')    dict['argv'] = ['datanode']    dict['envs'] = envs    dict['pkgdirs'] = sd.getPkgDirs()    dict['workdirs'] = workDirs    dict['final-attrs'] = attrs    dict['attrs'] = sd.getAttrs()    cmd = CommandDesc(dict)    return cmd

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -