2013年12月11日水曜日

AutoScalingってなんじゃ?(AutoScalingがAWSマネージメントコンソールで管理可能に!)

ついにAWSの管理コンソールからAutoScalingが管理可能になりました。

早速触ってみます。

EC2のコンソールの左ペインの最下部にAutoScaling(以下AS)の項目が追加されています。
選択してみると、左側に概要が表示されるので、「Create Auto Scaling Group」をクリックします。




Launch Configuration の作成



既存のLaunchConfiguration(以下LC)を使うか、新規にLCを作るかを聞かれるので、今回は新規LCを選びます。


すると、LC作成ウィザードに切り替わり、インスタンスの起動にまつわる設定が始まります。


AMI


インスタンスタイプ




詳細設定


ここで、LC名を入力します。
それ以外はインスタンス起動の詳細設定と同じです。




ストレージ





セキュリティグループ





確認画面






キーの選択




ここでLCで設定する鍵を選択して「Create launch configuration」をクリックすると、LCの作成は終了で、自動的にASの作成に移ります。



Auto Scaling Groupの作成



基本設定


AS作成画面では、先に新規作成したLCまたは既存から選択したLCがセットされ、AS名やサイズ、起動場所(VPCやゾーン)、バランサのヘルスチェックなどを入力します。




スケーリングポリシー


次にスケーリングポリシーを設定します。
アラームや増減の限界値や単位を設定します。



「Add new alarm」でその場でアラーム設定が可能です。




通知


通知の設定をします。ASのイベントとそれが発生した時の通知先を設定します。




確認画面


最後に確認画面で内容をチェックし、「Create auto scaling group」をクリックして設定を終えます。





Auto Scaling Groupの確認



設定の変更


一覧画面に作成したばかりのASが表示されます。
また、ASを選択すると、下部ペインの「Details」タブなどで詳細が確認でき、
「Edit」で設定の変更が可能です。

ここで、DesiredCapacityの変更もできるので、min=1, max=5, desired=1にして、1つインスタンスを起動してみます。




すると、「ScalingHistory」タブでスケーリングイベントの履歴が確認できます。
失敗イベントはその理由も記述されているので簡単なデバッグも行えます。




インスタンス一覧を見てみます。



正常に起動しているようです。


いままでASはコンソールからはまったく見えませんでしたが、これで視認性がよくなりました。
これからもコマンドライン等で設定を行うという人も、状態の確認などはコンソールを使うと便利かもしれません。


以上です。



2013年12月9日月曜日

CDPってなんじゃ?(Kinesisでstream chain)

この記事は「CDP Advent Calendar 2013」12/9用の記事です。

 前回前々回でKinesisを触りました。

資料などみていく中で、ストリームの終端になりえるものに、Redshift, S3, EMR, DynamoDBなどの他に、別のKinesisストリームもありえるということを知りました。

処理の内容が単純、高速であったり、データ数が少ない場合には1つのConsumerで事足りると思いますが、あまりにも大量であったり、処理の幾つかに非常に時間の掛かるものがあったり、複合的な処理が必要な場合には、複数のストリームを使うと全体の効率が上がる場合があるかと思います。

これは、CDPQueuing Chainパターンの派生的なものになるかと思います。
そこで、別のKinesisストリームに流すにはどういう場合が有効かを考えてみました。

Stream Chianパターン


・統計的な処理を連続して行う場合、Producerが投入したデータ単位と、1つ目の処理が終わったデータ単位が異なる場合があります。その場合も1つ目が終わったときに別のストリームを使います。



・処理全体の中で、負荷の高い部分と低い部分を分けて別に処理を行うことで、コストの最適化を行うことも考えられます。




・共通処理もあるが、データの種類によって個別の処理が必要という時も別のストリームを使える場合があります。




・おまけ:JettyをProducerにして、3つストリームのチェーンを使うと、ジェットストリームアタックになります。





最後のが言いたかっただけでした。
以上です。



2013年12月4日水曜日

Kinesisってなんじゃ?(Java実装編)

前回からの続きです。

このストリームをつかってイベント処理を行うアプリケーションを実装します。
Kinesisを扱うには、現時点でJavaのSDKとKinesisのクライアントライブラリ(KCL)が利用可能です。

今回のサンプルとなるシステムですが、以下のように定義してみました。
  • Producerはユーザーの出発地点(0, 0)からの到達位置(x, y)を日時ごとにJSONとしてストリームに入力していきます。
  • ConsumerAではストリームからデータを受け取り、ログとしてS3のバケットに保存していきます。
  • ConsumerBでは同じくストリームからデータを受け取り、ユーザーごとの総移動距離をDynamoDBにインクリメント保存していきます。

これを、限定公開用にKinesis対応されたJavaのSDKとKinesisのクライアントライブラリ(KCL)を使って実装してみます。SDKにはサンプルコードなどが付属しているため、それを参考に実装してみます。
また、KCLはシャードの管理のために裏側でConsumerごとにDynamoDBテーブルを使用します。

JDKの1.7とantがインストールされていることが前提です。


Producer


以下のツリー図の通り、SDKのサンプルコードの一部として他のサンプルと同じようにMemoKinesisというサンプルをつくりました。

/usr/local/src/aws-java-sdk-1.6.4/
├── lib
│   ├── aws-java-sdk-1.6.4.jar
│   ├── aws-java-sdk-1.6.4-javadoc.jar
│   ├── aws-java-sdk-1.6.4-sources.jar
│   ├── aws-java-sdk-flow-build-tools-1.6.4.jar
│   ├── commons-lang-2.6.jar
│   ├── joda-time-2.3-dist.tar.gz
│   ├── joda-time-2.3.jar
│   └── jsonic-1.3.0.jar
├── samples
│   └── MemoKinesis
│       ├── AwsCredentials.properties
│       ├── build.xml
│       ├── Hoge.java
│       └── MemorycraftKinesisProducer.java
└── third-party

配下のbuild.xmlは他サンプルと同様に、../../lib, ../../third-partyにクラスパスを通して、コンパイル→実行するようなターゲットになっているため、クラス名だけ変更してそのまま利用しています。
AwsCredentials.propertiesには自分のアカウントのキー情報を入力してあります。
また、このコードで必要なライブラリは../../libディレクトリに追加してあります(青字)。

また、コードは以下の通りです。

MemorycraftKinesisProducer.java
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.nio.ByteBuffer;
import java.util.Locale;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang.RandomStringUtils;
import net.arnx.jsonic.JSON;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.DateTimeZone;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.CreateStreamRequest;
import com.amazonaws.services.kinesis.model.DeleteStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ListStreamsRequest;
import com.amazonaws.services.kinesis.model.ListStreamsResult;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult;
public class MemorycraftKinesisProducer{
private static final Log LOG = LogFactory.getLog(MemorycraftKinesisProducer.class);
private static final String myStreamName = "memostream";
static AmazonKinesis kinesis;
private static void init() throws Exception {
kinesis = new AmazonKinesisClient(new ClasspathPropertiesFileCredentialsProvider());
}
public static void main(String[] args) throws Exception {
init();
while(true) {
DateTime dt = new DateTime();
String key = RandomStringUtils.randomAlphanumeric(10);
Hoge hoge = new Hoge();
hoge.user_id = "testuser_" + RandomStringUtils.randomAlphabetic(1).toUpperCase();
hoge.datetime = dt.toString("yyyy-MM-dd HH:mm:ss", Locale.JAPAN);
hoge.x = ""+Math.random()*100;
hoge.y = ""+Math.random()*100;
String json = JSON.encode(hoge);
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(myStreamName);
putRecordRequest.setData(ByteBuffer.wrap(json.getBytes()));
putRecordRequest.setPartitionKey(key);
PutRecordResult putRecordResult = kinesis.putRecord(putRecordRequest);
LOG.info(json);
}
}
}

Hoge.java
public class Hoge {
public String user_id;
public String datetime;
public String x;
public String y;
}
view raw Hoge.java hosted with ❤ by GitHub

Producerでは、testuser_[A-Z]の26人のユーザーIDと、日時、ランダムなx,y をJSON化してストリームに投入しつづけます。
AmazonKinesisClientを初期化し、PutRecordRequestを使ってputRecordします。
実行すると以下のように投入され続けていくことがわかります。
[root@ip-10-154-154-57 MemoKinesis]# ant
Buildfile: /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml

run:
    [javac] /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml:12: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
    [javac] Compiling 1 source file to /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis
    [javac] warning: Supported source version 'RELEASE_6' from annotation processor 'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotationProcessor' less than -source '1.7'
    [javac] 1 warning
     [java] Dec 03, 2013 10:08:23 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:22","user_id":"testuser_F","x":"66.41701160878385","y":"85.85361518485006"}
     [java] Dec 03, 2013 10:08:23 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:23","user_id":"testuser_D","x":"64.19880893804586","y":"26.65513499028829"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:23","user_id":"testuser_T","x":"59.34696505060783","y":"85.54404394674596"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_N","x":"76.26154448594934","y":"67.74843698818461"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_Z","x":"29.457789127237753","y":"18.48254643961956"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_Z","x":"9.91729666668798","y":"18.193714091252332"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_M","x":"98.54843645906183","y":"83.25335740427954"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_F","x":"22.2596852199308","y":"39.49631977033009"}
     [java] Dec 03, 2013 10:08:25 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_W","x":"35.34360225385589","y":"7.982969346289337"}
.......



Consumer A


次はConsumerです。これも同じように他のサンプルコードとほぼ同じ構成で実装しました。

/usr/local/src/aws-java-sdk-1.6.4/
├── lib
│   ├── aws-java-sdk-1.6.4.jar
│   ├── aws-java-sdk-1.6.4-javadoc.jar
│   ├── aws-java-sdk-1.6.4-sources.jar
│   ├── aws-java-sdk-flow-build-tools-1.6.4.jar
│   ├── commons-lang-2.6.jar
│   ├── jsonic-1.3.0.jar
│   └── KinesisClientLibrary.jar
├── samples
│   └── MemoKinesis
│       ├── AwsCredentials.properties
│       ├── build.xml
│       ├── ConfigKeys.java
│       ├── MemorycraftKinesisLoggingConsumer.java
│       ├── MemorycraftKinesisLoggingProcessorFactory.java
│       └── MemorycraftKinesisLoggingProcessor.java
└── third-party

Consumer Aのインスタンス群では、ストリームに流れているデータを抽出して、順次S3にログとして保存していきます。
ConsumerではKCLを使用します。

構成としては、MemorycraftKinesisLoggingConsumerからIRecordProcessorFactoryを実装したMemorycraftKinesisLoggingProcessorFactoryを初期化してワーカーとして実行します。
実際の処理はIRecordProcessorを実装したMemorycraftKinesisLoggingProcessorのprocessRecordsメソッド内部に記述します。


MemorycraftKinesisLoggingConsumer.java
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Properties;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
/**
* Sample Kinesis Application.
*/
public final class MemorycraftKinesisLoggingConsumer {
private static final String DEFAULT_APP_NAME = "MemorycraftKinesisLoggingConsumer";
private static final String DEFAULT_STREAM_NAME = "memostream";
private static final String DEFAULT_KINESIS_ENDPOINT = "https://kinesis.us-east-1.amazonaws.com";
// Initial position in the stream when the application starts up for the first time.
// Position can be one of LATEST (most recent data) or TRIM_HORIZON (oldest available data)
private static final InitialPositionInStream DEFAULT_INITIAL_POSITION = InitialPositionInStream.TRIM_HORIZON;
private static String applicationName = DEFAULT_APP_NAME;
private static String streamName = DEFAULT_STREAM_NAME;
private static String kinesisEndpoint = DEFAULT_KINESIS_ENDPOINT;
private static InitialPositionInStream initialPositionInStream = DEFAULT_INITIAL_POSITION;
private static KinesisClientLibConfiguration kinesisClientLibConfiguration;
private static final Log LOG = LogFactory.getLog(MemorycraftKinesisLoggingConsumer.class);
/**
*
*/
private MemorycraftKinesisLoggingConsumer() {
super();
}
/**
* @param args Property file with config overrides (e.g. application name, stream name)
* @throws IOException Thrown if we can't read properties from the specified properties file
*/
public static void main(String[] args) throws IOException {
String propertiesFile = null;
if (args.length > 1) {
System.err.println("Usage: java " + MemorycraftKinesisLoggingConsumer.class.getName() + " <propertiesFile>");
System.exit(1);
} else if (args.length == 1) {
propertiesFile = args[0];
}
configure(propertiesFile);
System.out.println("Starting " + applicationName);
LOG.info("Running " + applicationName + " to process stream " + streamName);
IRecordProcessorFactory recordProcessorFactory = new MemorycraftKinesisLoggingProcessorFactory();
Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
int exitCode = 0;
try {
worker.run();
} catch (Throwable t) {
LOG.error("Caught throwable while processing data.", t);
exitCode = 1;
}
System.exit(exitCode);
}
private static void configure(String propertiesFile) throws IOException {
if (propertiesFile != null) {
loadProperties(propertiesFile);
}
// ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
java.security.Security.setProperty("networkaddress.cache.ttl" , "60");
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
LOG.info("Using workerId: " + workerId);
// Get credentials from IMDS. If unsuccessful, get them from the classpath.
AWSCredentialsProvider credentialsProvider = null;
try {
credentialsProvider = new InstanceProfileCredentialsProvider();
// Verify we can fetch credentials from the provider
credentialsProvider.getCredentials();
LOG.info("Obtained credentials from the IMDS.");
} catch (AmazonClientException e) {
LOG.info("Unable to obtain credentials from the IMDS, trying classpath properties", e);
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
// Verify we can fetch credentials from the provider
credentialsProvider.getCredentials();
LOG.info("Obtained credentials from the properties file.");
}
LOG.info("Using credentials with access key id: " + credentialsProvider.getCredentials().getAWSAccessKeyId());
kinesisClientLibConfiguration = new KinesisClientLibConfiguration(applicationName, streamName, kinesisEndpoint,
initialPositionInStream, credentialsProvider, workerId);
}
/**
* @param propertiesFile
* @throws IOException Thrown when we run into issues reading properties
*/
private static void loadProperties(String propertiesFile) throws IOException {
FileInputStream inputStream = new FileInputStream(propertiesFile);
Properties properties = new Properties();
try {
properties.load(inputStream);
} finally {
inputStream.close();
}
String appNameOverride = properties.getProperty(ConfigKeys.APPLICATION_NAME_KEY);
if (appNameOverride != null) {
applicationName = appNameOverride;
}
LOG.info("Using application name " + applicationName);
String streamNameOverride = properties.getProperty(ConfigKeys.STREAM_NAME_KEY);
if (streamNameOverride != null) {
streamName = streamNameOverride;
}
LOG.info("Using stream name " + streamName);
String kinesisEndpointOverride = properties.getProperty(ConfigKeys.KINESIS_ENDPOINT_KEY);
if (kinesisEndpointOverride != null) {
kinesisEndpoint = kinesisEndpointOverride;
}
String initialPositionOverride = properties.getProperty(ConfigKeys.INITIAL_POSITION_IN_STREAM_KEY);
if (initialPositionOverride != null) {
initialPositionInStream = InitialPositionInStream.valueOf(initialPositionOverride);
}
LOG.info("Using initial position " + initialPositionInStream.toString() + " (if a checkpoint is not found).");
}
}

MemorycraftKinesisLoggingProcessorFactory.java
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
/**
* Used to create new record processors.
*/
public class MemorycraftKinesisLoggingProcessorFactory implements IRecordProcessorFactory {
/**
* Constructor.
*/
public MemorycraftKinesisLoggingProcessorFactory() {
super();
}
/**
* {@inheritDoc}
*/
@Override
public IRecordProcessor createProcessor() {
return new MemorycraftKinesisLoggingProcessor();
}
}

MemorycraftKinesisLoggingProcessor.java
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang.RandomStringUtils;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
public class MemorycraftKinesisLoggingProcessor implements IRecordProcessor {
private static final Log LOG = LogFactory.getLog(MemorycraftKinesisLoggingProcessor.class);
private String kinesisShardId;
private AmazonS3 s3;
private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
private long nextCheckpointTimeInMillis;
private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
/**
* Constructor.
*/
public MemorycraftKinesisLoggingProcessor() {
super();
}
/**
* {@inheritDoc}
*/
@Override
public void initialize(String shardId) {
this.kinesisShardId = shardId;
this.s3 = new AmazonS3Client(new ClasspathPropertiesFileCredentialsProvider());
}
/**
* {@inheritDoc}
*/
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
process(records);
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
/** Process records performing retries as needed. Skip "poison pill" records.
* @param records
*/
private void process(List<Record> records) {
for (Record record : records) {
InputStream in = null;
try{
String seqno = record.getSequenceNumber();
LOG.info("process record ["+seqno+"]");
String json = decoder.decode(record.getData()).toString();
in = new ByteArrayInputStream(json.getBytes("UTF-8"));
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(json.getBytes("UTF-8").length);
String filename = RandomStringUtils.randomAlphanumeric(32);
PutObjectResult putObjectResult = s3.putObject("memorycraft-kinesis", "log/"+filename, in, metadata);
LOG.info(" saved [json]=" + json);
} catch (CharacterCodingException e) {
//do somothing
} catch (Throwable t) {
//do somothing
} finally {
try{
in.close();
}
catch(IOException e){
LOG.error("close error");
}
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
if (reason == ShutdownReason.TERMINATE) {
checkpoint(checkpointer);
}
}
/** Checkpoint with retries.
* @param checkpointer
*/
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
try{
LOG.info("CHECKPOINT !!!!!");
checkpointer.checkpoint();
} catch (ShutdownException se) {
//do something
} catch (ThrottlingException e) {
//do something
} catch (InvalidStateException e) {
//do something
}
}
}

実行してみます。
[root@ip-10-118-97-158 MemoKinesis]# ant
Buildfile: /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml

run:
    [javac] /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml:12: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
    [javac] Compiling 4 source files to /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis
    [javac] warning: Supported source version 'RELEASE_6' from annotation processor 'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotationProcessor' less than -source '1.7'
    [javac] 1 warning
..........
     [java] INFO: Successfully published 3 datums.
     [java] Dec 03, 2013 11:58:06 AM com.amazonaws.services.kinesis.metrics.impl.DefaultCWMetricsPublisher publishMetrics
     [java] INFO: Successfully published 18 datums.
     [java] Dec 03, 2013 11:58:14 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
     [java] INFO: Current stream shard assignments: shardId-000000000001, shardId-000000000000
     [java] Dec 03, 2013 11:58:14 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
     [java] INFO: Sleeping ...
     [java] Dec 03, 2013 11:58:16 AM com.amazonaws.services.kinesis.metrics.impl.DefaultCWMetricsPublisher publishMetrics
     [java] INFO: Successfully published 20 datums.
     [java] Dec 03, 2013 11:58:16 AM com.amazonaws.services.kinesis.metrics.impl.DefaultCWMetricsPublisher publishMetrics
     [java] INFO: Successfully published 3 datums.
     [java] Dec 03, 2013 11:58:18 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO: process record [21269323576357670914947437401207537664]
     [java] Dec 03, 2013 11:58:18 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO:  saved [json]={"datetime":"2013-12-03 08:41:05","user_id":"testuser_G","x":"54.504733903095726","y":"27.966324048617963"}
     [java] Dec 03, 2013 11:58:18 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO: process record [21269323576357720993054772957520920576]
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO:  saved [json]={"datetime":"2013-12-03 08:41:05","user_id":"testuser_W","x":"12.256118907591883","y":"63.004577908019634"}
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO: process record [21269323576357794417499720660453949440]
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO:  saved [json]={"datetime":"2013-12-03 08:41:05","user_id":"testuser_O","x":"69.81831906353719","y":"31.571006624423724"}
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO: process record [21269323576357840331291814692712415232]
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO:  saved [json]={"datetime":"2013-12-03 08:41:04","user_id":"testuser_F","x":"19.924814432111905","y":"41.65256488223227"}
..........


うまく拾えているようです。
S3のバケットを見てみます。

おお、次から次へとファイルが保存されていくのがわかります。
そのうちの一つをダウンロードして、中身を見てみます。
{"datetime":"2013-12-03 08:44:17","user_id":"testuser_H","x":"79.49199108301805","y":"2.4829017202529724"}
中身もうまく入っているようです。


Consumer B



次はConsumerBです。

/usr/local/src/aws-java-sdk-1.6.4/
├── lib
│   ├── aws-java-sdk-1.6.4.jar
│   ├── aws-java-sdk-1.6.4-javadoc.jar
│   ├── aws-java-sdk-1.6.4-sources.jar
│   ├── aws-java-sdk-flow-build-tools-1.6.4.jar
│   ├── commons-lang-2.6.jar
│   ├── jsonic-1.3.0.jar
│   └── KinesisClientLibrary.jar
├── samples
│   └── MemoKinesis
│       ├── AwsCredentials.properties
│       ├── build.xml
│       ├── ConfigKeys.java
│       ├── Hoge.java
│       ├── MemorycraftKinesisDistanceConsumer.java
│       ├── MemorycraftKinesisDistanceProcessorFactory.java
│       └── MemorycraftKinesisDistanceProcessor.java
└── third-party


Consumer Bのインスタンス群では、ユーザーIDをハッシュキーにしたDynamoDBのレコードにXとYから距離を出してインクリメントしていき総距離を加算更新します。

クラスとしてはMemorycraftKinesisDistanceProcessorを中心に実装します。

MemorycraftKinesisDistanceConsumer.java
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Properties;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
/**
* Sample Kinesis Application.
*/
public final class MemorycraftKinesisDistanceConsumer {
private static final String DEFAULT_APP_NAME = "MemorycraftKinesisDistanceConsumer";
private static final String DEFAULT_STREAM_NAME = "memostream";
private static final String DEFAULT_KINESIS_ENDPOINT = "https://kinesis.us-east-1.amazonaws.com";
// Initial position in the stream when the application starts up for the first time.
// Position can be one of LATEST (most recent data) or TRIM_HORIZON (oldest available data)
private static final InitialPositionInStream DEFAULT_INITIAL_POSITION = InitialPositionInStream.TRIM_HORIZON;
private static String applicationName = DEFAULT_APP_NAME;
private static String streamName = DEFAULT_STREAM_NAME;
private static String kinesisEndpoint = DEFAULT_KINESIS_ENDPOINT;
private static InitialPositionInStream initialPositionInStream = DEFAULT_INITIAL_POSITION;
private static KinesisClientLibConfiguration kinesisClientLibConfiguration;
private static final Log LOG = LogFactory.getLog(MemorycraftKinesisDistanceConsumer.class);
/**
*
*/
private MemorycraftKinesisDistanceConsumer() {
super();
}
/**
* @param args Property file with config overrides (e.g. application name, stream name)
* @throws IOException Thrown if we can't read properties from the specified properties file
*/
public static void main(String[] args) throws IOException {
String propertiesFile = null;
if (args.length > 1) {
System.err.println("Usage: java " + MemorycraftKinesisDistanceConsumer.class.getName() + " <propertiesFile>");
System.exit(1);
} else if (args.length == 1) {
propertiesFile = args[0];
}
configure(propertiesFile);
System.out.println("Starting " + applicationName);
LOG.info("Running " + applicationName + " to process stream " + streamName);
IRecordProcessorFactory recordProcessorFactory = new MemorycraftKinesisDistanceProcessorFactory();
Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
int exitCode = 0;
try {
worker.run();
} catch (Throwable t) {
LOG.error("Caught throwable while processing data.", t);
exitCode = 1;
}
System.exit(exitCode);
}
private static void configure(String propertiesFile) throws IOException {
if (propertiesFile != null) {
loadProperties(propertiesFile);
}
// ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
java.security.Security.setProperty("networkaddress.cache.ttl" , "60");
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
LOG.info("Using workerId: " + workerId);
// Get credentials from IMDS. If unsuccessful, get them from the classpath.
AWSCredentialsProvider credentialsProvider = null;
try {
credentialsProvider = new InstanceProfileCredentialsProvider();
// Verify we can fetch credentials from the provider
credentialsProvider.getCredentials();
LOG.info("Obtained credentials from the IMDS.");
} catch (AmazonClientException e) {
LOG.info("Unable to obtain credentials from the IMDS, trying classpath properties", e);
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
// Verify we can fetch credentials from the provider
credentialsProvider.getCredentials();
LOG.info("Obtained credentials from the properties file.");
}
LOG.info("Using credentials with access key id: " + credentialsProvider.getCredentials().getAWSAccessKeyId());
kinesisClientLibConfiguration = new KinesisClientLibConfiguration(applicationName, streamName, kinesisEndpoint,
initialPositionInStream, credentialsProvider, workerId);
}
/**
* @param propertiesFile
* @throws IOException Thrown when we run into issues reading properties
*/
private static void loadProperties(String propertiesFile) throws IOException {
FileInputStream inputStream = new FileInputStream(propertiesFile);
Properties properties = new Properties();
try {
properties.load(inputStream);
} finally {
inputStream.close();
}
String appNameOverride = properties.getProperty(ConfigKeys.APPLICATION_NAME_KEY);
if (appNameOverride != null) {
applicationName = appNameOverride;
}
LOG.info("Using application name " + applicationName);
String streamNameOverride = properties.getProperty(ConfigKeys.STREAM_NAME_KEY);
if (streamNameOverride != null) {
streamName = streamNameOverride;
}
LOG.info("Using stream name " + streamName);
String kinesisEndpointOverride = properties.getProperty(ConfigKeys.KINESIS_ENDPOINT_KEY);
if (kinesisEndpointOverride != null) {
kinesisEndpoint = kinesisEndpointOverride;
}
String initialPositionOverride = properties.getProperty(ConfigKeys.INITIAL_POSITION_IN_STREAM_KEY);
if (initialPositionOverride != null) {
initialPositionInStream = InitialPositionInStream.valueOf(initialPositionOverride);
}
LOG.info("Using initial position " + initialPositionInStream.toString() + " (if a checkpoint is not found).");
}
}

MemorycraftKinesisDistanceProcessorFactory.java
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
/**
* Used to create new record processors.
*/
public class MemorycraftKinesisDistanceProcessorFactory implements IRecordProcessorFactory {
/**
* Constructor.
*/
public MemorycraftKinesisDistanceProcessorFactory() {
super();
}
/**
* {@inheritDoc}
*/
@Override
public IRecordProcessor createProcessor() {
return new MemorycraftKinesisDistanceProcessor();
}
}

MemorycraftKinesisDistanceProcessor.java
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharacterCodingException;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import java.lang.reflect.Field;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import net.arnx.jsonic.JSON;
import com.amazonaws.services.dynamodb.model.Key;
import com.amazonaws.services.dynamodb.model.AttributeAction;
import com.amazonaws.services.dynamodb.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodb.model.AttributeValue;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.dynamodb.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodb.model.PutItemRequest;
import com.amazonaws.services.dynamodb.model.PutItemResult;
import com.amazonaws.services.dynamodb.model.UpdateItemRequest;
import com.amazonaws.services.dynamodb.model.UpdateItemResult;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
public class MemorycraftKinesisDistanceProcessor implements IRecordProcessor {
private static final Log LOG = LogFactory.getLog(MemorycraftKinesisDistanceProcessor.class);
private String kinesisShardId;
private AmazonDynamoDB dynamodb;
private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
private long nextCheckpointTimeInMillis;
private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
/**
* Constructor.
*/
public MemorycraftKinesisDistanceProcessor() {
super();
}
/**
* {@inheritDoc}
*/
@Override
public void initialize(String shardId) {
this.kinesisShardId = shardId;
dynamodb = new AmazonDynamoDBClient(new ClasspathPropertiesFileCredentialsProvider());
}
/**
* {@inheritDoc}
*/
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
process(records);
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
/** Process records performing retries as needed. Skip "poison pill" records.
* @param records
*/
private void process(List<Record> records) {
for (Record record : records) {
try{
String seqno = record.getSequenceNumber();
LOG.info("process record ["+seqno+"]");
String data = decoder.decode(record.getData()).toString();
Hoge hoge = JSON.decode(data, Hoge.class);
Key key = new Key(new AttributeValue(hoge.user_id));
String value = "" + Math.sqrt(Math.pow(Double.parseDouble(hoge.x), 2) + Math.pow(Double.parseDouble(hoge.y), 2));
Map<String, AttributeValueUpdate> item = new HashMap<String, AttributeValueUpdate>();
item.put("distance", new AttributeValueUpdate().withAction(AttributeAction.ADD).withValue(new AttributeValue().withN(value)));
UpdateItemResult updateItemResult = dynamodb.updateItem(new UpdateItemRequest("memorycraft-kinesis", key, item));
LOG.info(" saved [user_id:" + hoge.user_id+", distance:"+value+"]");
} catch (CharacterCodingException e) {
//do somothing
} catch (Throwable t) {
//do somothing
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
if (reason == ShutdownReason.TERMINATE) {
checkpoint(checkpointer);
}
}
/** Checkpoint with retries.
* @param checkpointer
*/
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
try{
LOG.info("CHECKPOINT !!!!!");
checkpointer.checkpoint();
} catch (ShutdownException se) {
//do something
} catch (ThrottlingException e) {
//do something
} catch (InvalidStateException e) {
//do something
}
}
}

実行してみます。

[root@ip-10-60-155-21 MemoKinesis]# ant
Buildfile: /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml

run:
    [javac] /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml:12: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
     [java] Dec 03, 2013 3:22:17 PM MemorycraftKinesisDistanceConsumer configure
     [java] INFO: Using workerId: ip-10-60-155-21.ec2.internal:584216f1-8d4d-429c-ba99-8af365345e17
     [java] Dec 03, 2013 3:22:17 PM MemorycraftKinesisDistanceConsumer configure
.......
     [java] INFO: Successfully published 18 datums.
     [java] Dec 03, 2013 3:23:18 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
     [java] INFO: Current stream shard assignments: shardId-000000000001, shardId-000000000000
     [java] Dec 03, 2013 3:23:18 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
     [java] INFO: Sleeping ...
     [java] Dec 03, 2013 3:23:18 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO: process record [21269323576357670914947437401207537664]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO:  saved [user_id:testuser_G, distance:61.260764757221075]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO: process record [21269323576357720993054772957520920576]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO:  saved [user_id:testuser_W, distance:64.18558473711015]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO: process record [21269323576357794417499720660453949440]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO:  saved [user_id:testuser_O, distance:76.62457919060493]
.....


それではDynamoDBを見てみます。
ちゃんとユーザーごとに集計されているようです。


続々加算されていきます。



チェックポイント



KinesisのコンシューマーでKCLを使うとコンシューマーごとにチェックポイントというマーカーを打つことができます。
IRecordProcessor.checkpointメソッド内で、checkpointer.checkpoint()を呼び出すと、ストリームのシーケンスのなかでどこまで読み込んだという記録が保存され、読み取りを中断したり障害が起こった後に、プロセスを再稼働すると、そのチェックポイントから読み取りが再開されます。
チェックポイントを呼び出すタイミングはユーザーに委ねられていて、今回のサンプルでは1分に設定しています。

   private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
    public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) {

        process(records);

        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            checkpoint(checkpointer);
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }

    }


わかったこと


  • 定期的ではなく常にデータ処理を処理し続けることができる仕組みと、そのためのスケーラビリティを持っている。
  • ユーザーが書くコードは、最低限であればそんなに多くない。
  • バッチではタイムラグがありすぎる!という場合にはうってつけのサービス。
  • Consumer同士の処理順を気にしないでいい。処理順をつけたい場合は、そのConsumer内で行うか、出力先に別のストリームを指定しそちらで次の処理をするとか
  • DynamoDBと同様、ストリームのキャパシティの見積もりは必要です。でもシャード数を指定するだけなので変更自体は簡単
  • 周辺のインスタンス、特にConsumerはバッチと同様インスタンスの負荷をみてロールごとにAutoScalingGroupを作る必要あり
  • Consumerからデータの移動先となるDynamoDBやRedshiftその他のエンドポイントのキャパシティ管理も同様に注意が必要


以上です。

Kinesisってなんじゃ?(ストリーム作成編)

AWSの新サービスKinesisが限定プレビュー公開されています。
私のところでも見れるようになったので、触ってみました。

Kinesisとは


Kinesisは大量なデータのリアルタイムイベント処理をサポートするサービスです。
ユーザーは「ストリーム」を作り、そこにデータを流し、受け取り、処理を行います。

データを流す、受け取る、処理を行う、という部分はユーザーがプログラムで実装します。
Kinesisに接続するプログラムには大きく分けてProducerとConsumerという2つの立場があります。
  • Producerはストリームにデータを入力します。Producerは場合によってWEBまたはAPPサーバーのようなエンドポイントであったり、既存データのフェッチプログラムとして実装されます。
  • Consumerはストリームからデータを受け取り処理します。Consumerは処理内容ごとにAutoScalingグループなどによってクラスタ化されることが多くなります。

ストリームを流れるデータは順序付けされたシーケンスとして扱われ、「シャード」に分散されます。
また、Kinesisではシャードの単位でスループットが決定され、ユーザーは入出力の量に合わせてシャード数を設定できます。


ストリームの作成


それでは、まずストリームを作ってみます。
限定公開なので、コンソールのメニューには現れませんが、申請が通っている場合は、以下のURLでアクセスできます。

https://console.aws.amazon.com/kinesis/



「Create Stream」をクリックして、ストリームを作成します。
ストリーム名とストリーム数を入力します。
ストリーム数の目安がわからない場合は、DynamoDBと同じように、データサイズや書込回数、Consumer数を設定することで自動的に最適なシャード数を適用できます。




「Create」をクリックすると、作成中(CREATING)のストリームが一覧に表示されます。
Statusが「ACTIVE」になると利用可能です。



ストリーム名をクリックすると、ストリームのメトリクスが表示されます。
これをもとに、SDKなどでシャード数の変更などを行うことができます。



料金



  • シャード:1シャードあたり1時間0.015ドル
  • リクエスト:100万PUTで0.028ドル
  • データ転送(入力):無料 
  • データ転送(出力):EC2で処理する分には無料

これとは別に、実際の処理を行うEC2のインスタンス料金が掛かります。
ストリームの料金ではなく、接続するEC2の料金で回収するモデルのようです。


次回は、ストリームを使った実装をやってみます。
以上です。