2013年3月23日土曜日

EMRってなんじゃ?(Bootstrap Actionで実行時メモリ使用量を設定する)

EMRを実行するときにメモリが足りなくてエラーになる場合があります。
EMRではクラスタのJavaのメモリ指定ができるのでやってみます。

EMRはジョブフローを起動するときに、Bootstrapアクションを指定出来ます。
AWSコンソールでジョブフローを起動する場合は、ジョブフロー作成のダイアログ中にBootstrapアクションを設定できます。

「BOOTSTRAP ACTIONS」のフェーズで「Configure your Bootstrap Actions」を選択すると以下のような画面が出てきます。



Action Typeにはいくつかあり、監視ツールの「Install Ganglia」や「Memory Intensive Configuration」などのプリセットとカスタムアクションがあります。
「Amazon S3 Location」欄にS3上のアドレスを記載すると、そこに置かれたシェルスクリプトなどがジョブの起動時に実行されるようです。


Memory Intensive Configuration


 ここでは、メモリ集中型設定というのをやってみます。 Action Typeから「Memory Intensive Configuration」という項目のを選びます。 「Amazon S3 Location」欄にS3アドレスが表示されます。



このアドレスに該当するスクリプトを取得すると以下のような内容になっています。
$ curl http://elasticmapreduce.s3.amazonaws.com/bootstrap-actions/configurations/latest/memory-intensive
#!/usr/bin/ruby

## Copyright 2010-2010 Amazon.com, Inc. or its affiliates. All Rights Reserved.
## Licensed under the Apache License, Version 2.0 (the "Liense"). You may not use this file except in compliance with the License.
## A copy of the License is located at http://aws.Amazon/apache2.0/
## or in the "license" file accompanying this file.
## This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and limitations under the License.

require 'json'
require 'hpricot'
require 'tempfile'

CONFIG_HEADER = "<?xml version=\"1.0\"?>\n<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>"

conf_fields = [
  { :field => "mapred.child.java.opts={VAR}",                   :roles => [:slave]          },
  { :field => "mapred.tasktracker.map.tasks.maximum={VAR}",     :roles => [:master, :slave] },
  { :field => "mapred.tasktracker.reduce.tasks.maximum={VAR}",  :roles => [:master, :slave] }
]

configs = {
  "m1.small"    => ["-Xmx512m",  "2",  "1"],
  "m1.large"    => ["-Xmx1024m", "3",  "1"],
  "m1.xlarge"   => ["-Xmx1024m", "8",  "3"],
  "c1.medium"   => ["-Xmx512m",  "2",  "1"],
  "c1.xlarge"   => ["-Xmx512m",  "7",  "2"],
  "m2.xlarge"   => ["-Xmx3072m", "3",  "1"],
  "m2.2xlarge"  => ["-Xmx4096m", "6",  "2"],
  "m2.4xlarge"  => ["-Xmx4096m", "14", "4"]
}

heap_fields = [
  { :field => "HADOOP_NAMENODE_HEAPSIZE={VAR}",    :roles => [:master, :slave] },
  { :field => "HADOOP_JOBTRACKER_HEAPSIZE={VAR}",  :roles => [:master, :slave] },
  { :field => "HADOOP_TASKTRACKER_HEAPSIZE={VAR}", :roles => [:master, :slave] },
  { :field => "HADOOP_DATANODE_HEAPSIZE={VAR}",    :roles => [:master, :slave] }
]

heaps = {
  "m1.small"    => ["512",  "512",  "256",  "128"],
  "m1.large"    => ["1024", "3072", "512",  "512"],
  "m1.xlarge"   => ["3072", "9216", "512",  "512"],
  "c1.medium"   => ["512",  "768",  "256",  "128"],
  "c1.xlarge"   => ["1024", "2048", "512",  "512"],
  "m2.xlarge"   => ["2048", "4096", "512",  "512"],
  "m2.2xlarge"  => ["2048", "8192", "1024", "1024"],
  "m2.4xlarge"  => ["8192", "8192", "1024", "1024"]
}

def parse_config_file(config_file_path)
  ret = []
  if File.exist?(config_file_path) then
    doc = open(config_file_path) { |f| Hpricot(f) }
    (doc/"configuration"/"property").each do |property|
      val = {:name => (property/"name").inner_html, :value => (property/"value").inner_html }
      if (property/"final").inner_html != "" then
        val[:final] =  (property/"final").inner_html
      end
      ret << val
    end
  else
    puts "#{config_file_path} does not exist, assuming empty configuration"
  end
  return ret
end

def dump_config_file(file_name, config)
  open(file_name, 'w') do |f|
    f.puts CONFIG_HEADER
    f.puts '<configuration>'
    for entry in config
      f.print "  <property><name>#{entry[:name]}</name><value>#{entry[:value]}</value>"
      if entry[:final] then
        f.print "<final>#{entry[:final]}</final>"
      end
      f.puts '</property>'
    end
    f.puts '</configuration>'
  end
end

def merge_config(default, overwrite)
  for entry in overwrite
    cells = default.select { |x| x[:name] == entry[:name]}
    if cells.size == 0 then
      puts "'#{entry[:name]}': default does not have key, appending value '#{entry[:value]}'"
      default << entry
    elsif cells.size == 1 then
      puts "'#{entry[:name]}': new value '#{entry[:value]}' overwriting '#{cells[0][:value]}'"
      cells[0].replace(entry)
    else
      raise "'#{entry[:name]}': default has #{cells.size} keys"
    end
  end
end

def do_overwrites(conf_list, heap_list)
  file = "/home/hadoop/conf/mapred-site.xml"
  default = parse_config_file(file)
  for arg in conf_list
    puts "Processing default file #{file} with overwrite #{arg}"
    key   = arg.split('=', 2)[0]
    value = arg.split('=', 2)[1]
    overwrite = [{:name => key, :value => value }]
    merge_config(default,overwrite)
  end
  dump_config_file(file + ".new", default)
  if File.exist?(file) then
    File.rename(file, file + ".old")
  end
  File.rename(file + ".new", file)
  puts "Saved #{file} with overwrites. Original saved to #{file}.old"
  file = "/home/hadoop/conf/hadoop-user-env.sh"
  if File.exist?(file) then
    File.delete(file)
  end
  open(file, 'w') do |f|
    f.puts "#!/bin/bash"
    for arg in heap_list
      f.puts arg
    end
  end
end

class JsonInfoFile

  INFO_DIR = "/mnt/var/lib/info/"

  def initialize(file_type)
    @json = JSON.parse(File.read(File.join(INFO_DIR, file_type + ".json")))
  end

  def [](json_path)
    json = @json
    begin
      path = json_path.split('.')
      visited = []
      for item in path
        if !json.kind_of? Hash then
          raise "#{visited.join('.')} not of type object, got '#{json.inspect}' from #{@json.inspect}"
        end
        visited << item
        json = json[item]
      end
      if json == nil then
        raise "#{visited.join('.')} does not exist"
      end
      return json
    rescue
      puts "Unable to process path '#{json_path}', #{$!}"
      exit -1
    end
  end
end

def warn(msg)
  STDERR.puts "#{Time.now.utc} WARN " + msg
end

def substitute_in(row, fields, instance_role)
  if row.size != fields.size then
    raise RuntimeError, "Incompatible row and field list row=#{row}, fields=#{fields}"
  end
  result = []
  for index in 0 ... row.size do
    if fields[index][:roles].include?(instance_role) then
      result << fields[index][:field].sub('{VAR}', row[index])
    end
  end
  return result
end

HPC_INSTANCE_TYPES = [ "cc1.4xlarge", "cg1.4xlarge" ]

jobflow_info = JsonInfoFile.new("job-flow")
instance_info = JsonInfoFile.new("instance")
if instance_info['isMaster'].to_s == 'true'  then
  instance_type = jobflow_info["masterInstanceType"]
  instance_role = :master
else
  instance_group_id = instance_info['instanceGroupId']
  instance_groups = jobflow_info['instanceGroups']
  index = instance_groups.index { |g| g['instanceGroupId'] == instance_group_id }
  instance_group = instance_groups[index]
  instance_type = instance_group['instanceType']
  instance_role = :slave
end

if HPC_INSTANCE_TYPES.include?(instance_type) then
  warn "This bootstrap action is not supported for the HPC instances (cc1.4xlarge and cg1.4xlarge)"
else
  conf_list = substitute_in(configs[instance_type], conf_fields, instance_role)
  heap_list = substitute_in(heaps[instance_type], heap_fields, instance_role)
  do_overwrites(conf_list, heap_list)
end

RUBYスクリプトのようです。
このスクリプトではインスタンスサイズに応じてネームノード、JobTracker、TaskTracker、データノードに対して、メモリ集中型のヒープサイズをそれぞれの環境変数に設定するように
/home/hadoop/conf/hadoop-user-env.sh
を上書きしています。

そして、hadoop-user-env.shは
/home/hadoop/conf/hadoop-env.sh
から以下のように読み込まれ、ジョブ実行時に実行される仕組みのようです。

#!/bin/bash

export HADOOP_DATANODE_HEAPSIZE="96"
export HADOOP_NAMENODE_HEAPSIZE="192"
export HADOOP_JOBTRACKER_HEAPSIZE="576"
export HADOOP_TASKTRACKER_HEAPSIZE="192"
export HADOOP_OPTS="$HADOOP_OPTS -server"
if [ -e /home/hadoop/conf/hadoop-user-env.sh ] ; then
  . /home/hadoop/conf/hadoop-user-env.sh
fi


これがメモリ集中型の場合
  • HADOOP_JOBTRACKER_HEAPSIZE:512
  • HADOOP_NAMENODE_HEAPSIZE:512
  • HADOOP_TASKTRACKER_HEAPSIZE:256
  • HADOOP_DATANODE_HEAPSIZE:128

のようになるようです。
ネームノードにより多くのメモリが割り当てられるようです。
これで実行すると、新しいメモリ割り当ての設定でジョブが実行されます。



Custom Action



プリセット以外にも、「Custom Action」という自由にスクリプトなどをアップできるオプションもあります。
「Amazon S3 Location」欄に自分でアップしたスクリプトファイルのS3アドレスを入力します。
ここでは、例として
s3://memorycraft-emr/script/heap.sh として以下のファイルアップしておきます。
前述のMemory Intensive Configurationではインスタンスごとに決め打ちのヒープサイズが自動で適用されましたが、このように自分で各ヒープサイズを指定するように hadoop-user-env.shを作るようなスクリプトを書くこともできます。



そして、以下のようにS3アドレスを入力します。


あとは通常どおり実行します。



インスタンスサイズは上げたくないけどOutOfMemoryを回避したいという場合には、まずこのようにBootstrapActionで調整してみるといいと思います。

また、Bootstrap設定はSDKなどのAPI経由のジョブフロー起動時にも行うことが可能です。
  .... 略 .....
   'AmiVersion'       => 'latest',
   'LogUri'           => "$LOG_BUCKET_LOCATION",
   'BootstrapActions' => array(
    array(
     'Name'                  => 'Custom Action',
     'ScriptBootstrapAction' => array(
      'Path' => 's3://memorycraft-emr/script/heap.sh',
     ),
    ),
   ),
   'Steps' => array(
    new \CFStepConfig(array(
  .... 略 .....


メモリ設定以外にも処理前に自由に処理を挿入できるので、カスタム処理を行ったり、実行環境を最適化できるので、いろいろ便利そうです。


以上です。