EMRではクラスタのJavaのメモリ指定ができるのでやってみます。
EMRはジョブフローを起動するときに、Bootstrapアクションを指定出来ます。
AWSコンソールでジョブフローを起動する場合は、ジョブフロー作成のダイアログ中にBootstrapアクションを設定できます。
「BOOTSTRAP ACTIONS」のフェーズで「Configure your Bootstrap Actions」を選択すると以下のような画面が出てきます。
Action Typeにはいくつかあり、監視ツールの「Install Ganglia」や「Memory Intensive Configuration」などのプリセットとカスタムアクションがあります。
「Amazon S3 Location」欄にS3上のアドレスを記載すると、そこに置かれたシェルスクリプトなどがジョブの起動時に実行されるようです。
Memory Intensive Configuration
ここでは、メモリ集中型設定というのをやってみます。 Action Typeから「Memory Intensive Configuration」という項目のを選びます。 「Amazon S3 Location」欄にS3アドレスが表示されます。
このアドレスに該当するスクリプトを取得すると以下のような内容になっています。
$ curl http://elasticmapreduce.s3.amazonaws.com/bootstrap-actions/configurations/latest/memory-intensive #!/usr/bin/ruby ## Copyright 2010-2010 Amazon.com, Inc. or its affiliates. All Rights Reserved. ## Licensed under the Apache License, Version 2.0 (the "Liense"). You may not use this file except in compliance with the License. ## A copy of the License is located at http://aws.Amazon/apache2.0/ ## or in the "license" file accompanying this file. ## This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ## See the License for the specific language governing permissions and limitations under the License. require 'json' require 'hpricot' require 'tempfile' CONFIG_HEADER = "<?xml version=\"1.0\"?>\n<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>" conf_fields = [ { :field => "mapred.child.java.opts={VAR}", :roles => [:slave] }, { :field => "mapred.tasktracker.map.tasks.maximum={VAR}", :roles => [:master, :slave] }, { :field => "mapred.tasktracker.reduce.tasks.maximum={VAR}", :roles => [:master, :slave] } ] configs = { "m1.small" => ["-Xmx512m", "2", "1"], "m1.large" => ["-Xmx1024m", "3", "1"], "m1.xlarge" => ["-Xmx1024m", "8", "3"], "c1.medium" => ["-Xmx512m", "2", "1"], "c1.xlarge" => ["-Xmx512m", "7", "2"], "m2.xlarge" => ["-Xmx3072m", "3", "1"], "m2.2xlarge" => ["-Xmx4096m", "6", "2"], "m2.4xlarge" => ["-Xmx4096m", "14", "4"] } heap_fields = [ { :field => "HADOOP_NAMENODE_HEAPSIZE={VAR}", :roles => [:master, :slave] }, { :field => "HADOOP_JOBTRACKER_HEAPSIZE={VAR}", :roles => [:master, :slave] }, { :field => "HADOOP_TASKTRACKER_HEAPSIZE={VAR}", :roles => [:master, :slave] }, { :field => "HADOOP_DATANODE_HEAPSIZE={VAR}", :roles => [:master, :slave] } ] heaps = { "m1.small" => ["512", "512", "256", "128"], "m1.large" => ["1024", "3072", "512", "512"], "m1.xlarge" => ["3072", "9216", "512", "512"], "c1.medium" => ["512", "768", "256", "128"], "c1.xlarge" => ["1024", "2048", "512", "512"], "m2.xlarge" => ["2048", "4096", "512", "512"], "m2.2xlarge" => ["2048", "8192", "1024", "1024"], "m2.4xlarge" => ["8192", "8192", "1024", "1024"] } def parse_config_file(config_file_path) ret = [] if File.exist?(config_file_path) then doc = open(config_file_path) { |f| Hpricot(f) } (doc/"configuration"/"property").each do |property| val = {:name => (property/"name").inner_html, :value => (property/"value").inner_html } if (property/"final").inner_html != "" then val[:final] = (property/"final").inner_html end ret << val end else puts "#{config_file_path} does not exist, assuming empty configuration" end return ret end def dump_config_file(file_name, config) open(file_name, 'w') do |f| f.puts CONFIG_HEADER f.puts '<configuration>' for entry in config f.print " <property><name>#{entry[:name]}</name><value>#{entry[:value]}</value>" if entry[:final] then f.print "<final>#{entry[:final]}</final>" end f.puts '</property>' end f.puts '</configuration>' end end def merge_config(default, overwrite) for entry in overwrite cells = default.select { |x| x[:name] == entry[:name]} if cells.size == 0 then puts "'#{entry[:name]}': default does not have key, appending value '#{entry[:value]}'" default << entry elsif cells.size == 1 then puts "'#{entry[:name]}': new value '#{entry[:value]}' overwriting '#{cells[0][:value]}'" cells[0].replace(entry) else raise "'#{entry[:name]}': default has #{cells.size} keys" end end end def do_overwrites(conf_list, heap_list) file = "/home/hadoop/conf/mapred-site.xml" default = parse_config_file(file) for arg in conf_list puts "Processing default file #{file} with overwrite #{arg}" key = arg.split('=', 2)[0] value = arg.split('=', 2)[1] overwrite = [{:name => key, :value => value }] merge_config(default,overwrite) end dump_config_file(file + ".new", default) if File.exist?(file) then File.rename(file, file + ".old") end File.rename(file + ".new", file) puts "Saved #{file} with overwrites. Original saved to #{file}.old" file = "/home/hadoop/conf/hadoop-user-env.sh" if File.exist?(file) then File.delete(file) end open(file, 'w') do |f| f.puts "#!/bin/bash" for arg in heap_list f.puts arg end end end class JsonInfoFile INFO_DIR = "/mnt/var/lib/info/" def initialize(file_type) @json = JSON.parse(File.read(File.join(INFO_DIR, file_type + ".json"))) end def [](json_path) json = @json begin path = json_path.split('.') visited = [] for item in path if !json.kind_of? Hash then raise "#{visited.join('.')} not of type object, got '#{json.inspect}' from #{@json.inspect}" end visited << item json = json[item] end if json == nil then raise "#{visited.join('.')} does not exist" end return json rescue puts "Unable to process path '#{json_path}', #{$!}" exit -1 end end end def warn(msg) STDERR.puts "#{Time.now.utc} WARN " + msg end def substitute_in(row, fields, instance_role) if row.size != fields.size then raise RuntimeError, "Incompatible row and field list row=#{row}, fields=#{fields}" end result = [] for index in 0 ... row.size do if fields[index][:roles].include?(instance_role) then result << fields[index][:field].sub('{VAR}', row[index]) end end return result end HPC_INSTANCE_TYPES = [ "cc1.4xlarge", "cg1.4xlarge" ] jobflow_info = JsonInfoFile.new("job-flow") instance_info = JsonInfoFile.new("instance") if instance_info['isMaster'].to_s == 'true' then instance_type = jobflow_info["masterInstanceType"] instance_role = :master else instance_group_id = instance_info['instanceGroupId'] instance_groups = jobflow_info['instanceGroups'] index = instance_groups.index { |g| g['instanceGroupId'] == instance_group_id } instance_group = instance_groups[index] instance_type = instance_group['instanceType'] instance_role = :slave end if HPC_INSTANCE_TYPES.include?(instance_type) then warn "This bootstrap action is not supported for the HPC instances (cc1.4xlarge and cg1.4xlarge)" else conf_list = substitute_in(configs[instance_type], conf_fields, instance_role) heap_list = substitute_in(heaps[instance_type], heap_fields, instance_role) do_overwrites(conf_list, heap_list) end
RUBYスクリプトのようです。
このスクリプトではインスタンスサイズに応じてネームノード、JobTracker、TaskTracker、データノードに対して、メモリ集中型のヒープサイズをそれぞれの環境変数に設定するように
/home/hadoop/conf/hadoop-user-env.sh
を上書きしています。
そして、hadoop-user-env.shは
/home/hadoop/conf/hadoop-env.sh
から以下のように読み込まれ、ジョブ実行時に実行される仕組みのようです。
#!/bin/bash export HADOOP_DATANODE_HEAPSIZE="96" export HADOOP_NAMENODE_HEAPSIZE="192" export HADOOP_JOBTRACKER_HEAPSIZE="576" export HADOOP_TASKTRACKER_HEAPSIZE="192" export HADOOP_OPTS="$HADOOP_OPTS -server" if [ -e /home/hadoop/conf/hadoop-user-env.sh ] ; then . /home/hadoop/conf/hadoop-user-env.sh fi
これがメモリ集中型の場合
- HADOOP_JOBTRACKER_HEAPSIZE:512
- HADOOP_NAMENODE_HEAPSIZE:512
- HADOOP_TASKTRACKER_HEAPSIZE:256
- HADOOP_DATANODE_HEAPSIZE:128
のようになるようです。
ネームノードにより多くのメモリが割り当てられるようです。
これで実行すると、新しいメモリ割り当ての設定でジョブが実行されます。
Custom Action
プリセット以外にも、「Custom Action」という自由にスクリプトなどをアップできるオプションもあります。
「Amazon S3 Location」欄に自分でアップしたスクリプトファイルのS3アドレスを入力します。
ここでは、例として
s3://memorycraft-emr/script/heap.sh として以下のファイルアップしておきます。
前述のMemory Intensive Configurationではインスタンスごとに決め打ちのヒープサイズが自動で適用されましたが、このように自分で各ヒープサイズを指定するように hadoop-user-env.shを作るようなスクリプトを書くこともできます。
そして、以下のようにS3アドレスを入力します。
あとは通常どおり実行します。
インスタンスサイズは上げたくないけどOutOfMemoryを回避したいという場合には、まずこのようにBootstrapActionで調整してみるといいと思います。
また、Bootstrap設定はSDKなどのAPI経由のジョブフロー起動時にも行うことが可能です。
.... 略 ..... 'AmiVersion' => 'latest', 'LogUri' => "$LOG_BUCKET_LOCATION", 'BootstrapActions' => array( array( 'Name' => 'Custom Action', 'ScriptBootstrapAction' => array( 'Path' => 's3://memorycraft-emr/script/heap.sh', ), ), ), 'Steps' => array( new \CFStepConfig(array( .... 略 .....
メモリ設定以外にも処理前に自由に処理を挿入できるので、カスタム処理を行ったり、実行環境を最適化できるので、いろいろ便利そうです。
以上です。