EMRを実行するときにメモリが足りなくてエラーになる場合があります。
EMRではクラスタのJavaのメモリ指定ができるのでやってみます。
EMRはジョブフローを起動するときに、Bootstrapアクションを指定出来ます。
AWSコンソールでジョブフローを起動する場合は、ジョブフロー作成のダイアログ中にBootstrapアクションを設定できます。
「BOOTSTRAP ACTIONS」のフェーズで「Configure your Bootstrap Actions」を選択すると以下のような画面が出てきます。
Action Typeにはいくつかあり、監視ツールの「Install Ganglia」や「Memory Intensive Configuration」などのプリセットとカスタムアクションがあります。
「Amazon S3 Location」欄にS3上のアドレスを記載すると、そこに置かれたシェルスクリプトなどがジョブの起動時に実行されるようです。
Memory Intensive Configuration
ここでは、メモリ集中型設定というのをやってみます。
Action Typeから「Memory Intensive Configuration」という項目のを選びます。
「Amazon S3 Location」欄にS3アドレスが表示されます。
このアドレスに該当するスクリプトを取得すると以下のような内容になっています。
$ curl http://elasticmapreduce.s3.amazonaws.com/bootstrap-actions/configurations/latest/memory-intensive
#!/usr/bin/ruby
## Copyright 2010-2010 Amazon.com, Inc. or its affiliates. All Rights Reserved.
## Licensed under the Apache License, Version 2.0 (the "Liense"). You may not use this file except in compliance with the License.
## A copy of the License is located at http://aws.Amazon/apache2.0/
## or in the "license" file accompanying this file.
## This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and limitations under the License.
require 'json'
require 'hpricot'
require 'tempfile'
CONFIG_HEADER = "<?xml version=\"1.0\"?>\n<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>"
conf_fields = [
{ :field => "mapred.child.java.opts={VAR}", :roles => [:slave] },
{ :field => "mapred.tasktracker.map.tasks.maximum={VAR}", :roles => [:master, :slave] },
{ :field => "mapred.tasktracker.reduce.tasks.maximum={VAR}", :roles => [:master, :slave] }
]
configs = {
"m1.small" => ["-Xmx512m", "2", "1"],
"m1.large" => ["-Xmx1024m", "3", "1"],
"m1.xlarge" => ["-Xmx1024m", "8", "3"],
"c1.medium" => ["-Xmx512m", "2", "1"],
"c1.xlarge" => ["-Xmx512m", "7", "2"],
"m2.xlarge" => ["-Xmx3072m", "3", "1"],
"m2.2xlarge" => ["-Xmx4096m", "6", "2"],
"m2.4xlarge" => ["-Xmx4096m", "14", "4"]
}
heap_fields = [
{ :field => "HADOOP_NAMENODE_HEAPSIZE={VAR}", :roles => [:master, :slave] },
{ :field => "HADOOP_JOBTRACKER_HEAPSIZE={VAR}", :roles => [:master, :slave] },
{ :field => "HADOOP_TASKTRACKER_HEAPSIZE={VAR}", :roles => [:master, :slave] },
{ :field => "HADOOP_DATANODE_HEAPSIZE={VAR}", :roles => [:master, :slave] }
]
heaps = {
"m1.small" => ["512", "512", "256", "128"],
"m1.large" => ["1024", "3072", "512", "512"],
"m1.xlarge" => ["3072", "9216", "512", "512"],
"c1.medium" => ["512", "768", "256", "128"],
"c1.xlarge" => ["1024", "2048", "512", "512"],
"m2.xlarge" => ["2048", "4096", "512", "512"],
"m2.2xlarge" => ["2048", "8192", "1024", "1024"],
"m2.4xlarge" => ["8192", "8192", "1024", "1024"]
}
def parse_config_file(config_file_path)
ret = []
if File.exist?(config_file_path) then
doc = open(config_file_path) { |f| Hpricot(f) }
(doc/"configuration"/"property").each do |property|
val = {:name => (property/"name").inner_html, :value => (property/"value").inner_html }
if (property/"final").inner_html != "" then
val[:final] = (property/"final").inner_html
end
ret << val
end
else
puts "#{config_file_path} does not exist, assuming empty configuration"
end
return ret
end
def dump_config_file(file_name, config)
open(file_name, 'w') do |f|
f.puts CONFIG_HEADER
f.puts '<configuration>'
for entry in config
f.print " <property><name>#{entry[:name]}</name><value>#{entry[:value]}</value>"
if entry[:final] then
f.print "<final>#{entry[:final]}</final>"
end
f.puts '</property>'
end
f.puts '</configuration>'
end
end
def merge_config(default, overwrite)
for entry in overwrite
cells = default.select { |x| x[:name] == entry[:name]}
if cells.size == 0 then
puts "'#{entry[:name]}': default does not have key, appending value '#{entry[:value]}'"
default << entry
elsif cells.size == 1 then
puts "'#{entry[:name]}': new value '#{entry[:value]}' overwriting '#{cells[0][:value]}'"
cells[0].replace(entry)
else
raise "'#{entry[:name]}': default has #{cells.size} keys"
end
end
end
def do_overwrites(conf_list, heap_list)
file = "/home/hadoop/conf/mapred-site.xml"
default = parse_config_file(file)
for arg in conf_list
puts "Processing default file #{file} with overwrite #{arg}"
key = arg.split('=', 2)[0]
value = arg.split('=', 2)[1]
overwrite = [{:name => key, :value => value }]
merge_config(default,overwrite)
end
dump_config_file(file + ".new", default)
if File.exist?(file) then
File.rename(file, file + ".old")
end
File.rename(file + ".new", file)
puts "Saved #{file} with overwrites. Original saved to #{file}.old"
file = "/home/hadoop/conf/hadoop-user-env.sh"
if File.exist?(file) then
File.delete(file)
end
open(file, 'w') do |f|
f.puts "#!/bin/bash"
for arg in heap_list
f.puts arg
end
end
end
class JsonInfoFile
INFO_DIR = "/mnt/var/lib/info/"
def initialize(file_type)
@json = JSON.parse(File.read(File.join(INFO_DIR, file_type + ".json")))
end
def [](json_path)
json = @json
begin
path = json_path.split('.')
visited = []
for item in path
if !json.kind_of? Hash then
raise "#{visited.join('.')} not of type object, got '#{json.inspect}' from #{@json.inspect}"
end
visited << item
json = json[item]
end
if json == nil then
raise "#{visited.join('.')} does not exist"
end
return json
rescue
puts "Unable to process path '#{json_path}', #{$!}"
exit -1
end
end
end
def warn(msg)
STDERR.puts "#{Time.now.utc} WARN " + msg
end
def substitute_in(row, fields, instance_role)
if row.size != fields.size then
raise RuntimeError, "Incompatible row and field list row=#{row}, fields=#{fields}"
end
result = []
for index in 0 ... row.size do
if fields[index][:roles].include?(instance_role) then
result << fields[index][:field].sub('{VAR}', row[index])
end
end
return result
end
HPC_INSTANCE_TYPES = [ "cc1.4xlarge", "cg1.4xlarge" ]
jobflow_info = JsonInfoFile.new("job-flow")
instance_info = JsonInfoFile.new("instance")
if instance_info['isMaster'].to_s == 'true' then
instance_type = jobflow_info["masterInstanceType"]
instance_role = :master
else
instance_group_id = instance_info['instanceGroupId']
instance_groups = jobflow_info['instanceGroups']
index = instance_groups.index { |g| g['instanceGroupId'] == instance_group_id }
instance_group = instance_groups[index]
instance_type = instance_group['instanceType']
instance_role = :slave
end
if HPC_INSTANCE_TYPES.include?(instance_type) then
warn "This bootstrap action is not supported for the HPC instances (cc1.4xlarge and cg1.4xlarge)"
else
conf_list = substitute_in(configs[instance_type], conf_fields, instance_role)
heap_list = substitute_in(heaps[instance_type], heap_fields, instance_role)
do_overwrites(conf_list, heap_list)
end
RUBYスクリプトのようです。
このスクリプトではインスタンスサイズに応じてネームノード、JobTracker、TaskTracker、データノードに対して、メモリ集中型のヒープサイズをそれぞれの環境変数に設定するように
/home/hadoop/conf/hadoop-user-env.sh
を上書きしています。
そして、hadoop-user-env.shは
/home/hadoop/conf/hadoop-env.sh
から以下のように読み込まれ、ジョブ実行時に実行される仕組みのようです。
#!/bin/bash
export HADOOP_DATANODE_HEAPSIZE="96"
export HADOOP_NAMENODE_HEAPSIZE="192"
export HADOOP_JOBTRACKER_HEAPSIZE="576"
export HADOOP_TASKTRACKER_HEAPSIZE="192"
export HADOOP_OPTS="$HADOOP_OPTS -server"
if [ -e /home/hadoop/conf/hadoop-user-env.sh ] ; then
. /home/hadoop/conf/hadoop-user-env.sh
fi
これがメモリ集中型の場合
- HADOOP_JOBTRACKER_HEAPSIZE:512
- HADOOP_NAMENODE_HEAPSIZE:512
- HADOOP_TASKTRACKER_HEAPSIZE:256
- HADOOP_DATANODE_HEAPSIZE:128
のようになるようです。
ネームノードにより多くのメモリが割り当てられるようです。
これで実行すると、新しいメモリ割り当ての設定でジョブが実行されます。
Custom Action
プリセット以外にも、「Custom Action」という自由にスクリプトなどをアップできるオプションもあります。
「Amazon S3 Location」欄に自分でアップしたスクリプトファイルのS3アドレスを入力します。
ここでは、例として
s3://memorycraft-emr/script/heap.sh として以下のファイルアップしておきます。
前述のMemory Intensive Configurationではインスタンスごとに決め打ちのヒープサイズが自動で適用されましたが、このように自分で各ヒープサイズを指定するように hadoop-user-env.shを作るようなスクリプトを書くこともできます。
そして、以下のようにS3アドレスを入力します。
あとは通常どおり実行します。
インスタンスサイズは上げたくないけどOutOfMemoryを回避したいという場合には、まずこのようにBootstrapActionで調整してみるといいと思います。
また、Bootstrap設定はSDKなどのAPI経由のジョブフロー起動時にも行うことが可能です。
.... 略 .....
'AmiVersion' => 'latest',
'LogUri' => "$LOG_BUCKET_LOCATION",
'BootstrapActions' => array(
array(
'Name' => 'Custom Action',
'ScriptBootstrapAction' => array(
'Path' => 's3://memorycraft-emr/script/heap.sh',
),
),
),
'Steps' => array(
new \CFStepConfig(array(
.... 略 .....
メモリ設定以外にも処理前に自由に処理を挿入できるので、カスタム処理を行ったり、実行環境を最適化できるので、いろいろ便利そうです。
以上です。