2013年12月4日水曜日

Kinesisってなんじゃ?(Java実装編)

前回からの続きです。

このストリームをつかってイベント処理を行うアプリケーションを実装します。
Kinesisを扱うには、現時点でJavaのSDKとKinesisのクライアントライブラリ(KCL)が利用可能です。

今回のサンプルとなるシステムですが、以下のように定義してみました。
  • Producerはユーザーの出発地点(0, 0)からの到達位置(x, y)を日時ごとにJSONとしてストリームに入力していきます。
  • ConsumerAではストリームからデータを受け取り、ログとしてS3のバケットに保存していきます。
  • ConsumerBでは同じくストリームからデータを受け取り、ユーザーごとの総移動距離をDynamoDBにインクリメント保存していきます。

これを、限定公開用にKinesis対応されたJavaのSDKとKinesisのクライアントライブラリ(KCL)を使って実装してみます。SDKにはサンプルコードなどが付属しているため、それを参考に実装してみます。
また、KCLはシャードの管理のために裏側でConsumerごとにDynamoDBテーブルを使用します。

JDKの1.7とantがインストールされていることが前提です。


Producer


以下のツリー図の通り、SDKのサンプルコードの一部として他のサンプルと同じようにMemoKinesisというサンプルをつくりました。

/usr/local/src/aws-java-sdk-1.6.4/
├── lib
│   ├── aws-java-sdk-1.6.4.jar
│   ├── aws-java-sdk-1.6.4-javadoc.jar
│   ├── aws-java-sdk-1.6.4-sources.jar
│   ├── aws-java-sdk-flow-build-tools-1.6.4.jar
│   ├── commons-lang-2.6.jar
│   ├── joda-time-2.3-dist.tar.gz
│   ├── joda-time-2.3.jar
│   └── jsonic-1.3.0.jar
├── samples
│   └── MemoKinesis
│       ├── AwsCredentials.properties
│       ├── build.xml
│       ├── Hoge.java
│       └── MemorycraftKinesisProducer.java
└── third-party

配下のbuild.xmlは他サンプルと同様に、../../lib, ../../third-partyにクラスパスを通して、コンパイル→実行するようなターゲットになっているため、クラス名だけ変更してそのまま利用しています。
AwsCredentials.propertiesには自分のアカウントのキー情報を入力してあります。
また、このコードで必要なライブラリは../../libディレクトリに追加してあります(青字)。

また、コードは以下の通りです。

MemorycraftKinesisProducer.java

Hoge.java

Producerでは、testuser_[A-Z]の26人のユーザーIDと、日時、ランダムなx,y をJSON化してストリームに投入しつづけます。
AmazonKinesisClientを初期化し、PutRecordRequestを使ってputRecordします。
実行すると以下のように投入され続けていくことがわかります。
[root@ip-10-154-154-57 MemoKinesis]# ant
Buildfile: /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml

run:
    [javac] /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml:12: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
    [javac] Compiling 1 source file to /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis
    [javac] warning: Supported source version 'RELEASE_6' from annotation processor 'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotationProcessor' less than -source '1.7'
    [javac] 1 warning
     [java] Dec 03, 2013 10:08:23 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:22","user_id":"testuser_F","x":"66.41701160878385","y":"85.85361518485006"}
     [java] Dec 03, 2013 10:08:23 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:23","user_id":"testuser_D","x":"64.19880893804586","y":"26.65513499028829"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:23","user_id":"testuser_T","x":"59.34696505060783","y":"85.54404394674596"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_N","x":"76.26154448594934","y":"67.74843698818461"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_Z","x":"29.457789127237753","y":"18.48254643961956"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_Z","x":"9.91729666668798","y":"18.193714091252332"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_M","x":"98.54843645906183","y":"83.25335740427954"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_F","x":"22.2596852199308","y":"39.49631977033009"}
     [java] Dec 03, 2013 10:08:25 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_W","x":"35.34360225385589","y":"7.982969346289337"}
.......



Consumer A


次はConsumerです。これも同じように他のサンプルコードとほぼ同じ構成で実装しました。

/usr/local/src/aws-java-sdk-1.6.4/
├── lib
│   ├── aws-java-sdk-1.6.4.jar
│   ├── aws-java-sdk-1.6.4-javadoc.jar
│   ├── aws-java-sdk-1.6.4-sources.jar
│   ├── aws-java-sdk-flow-build-tools-1.6.4.jar
│   ├── commons-lang-2.6.jar
│   ├── jsonic-1.3.0.jar
│   └── KinesisClientLibrary.jar
├── samples
│   └── MemoKinesis
│       ├── AwsCredentials.properties
│       ├── build.xml
│       ├── ConfigKeys.java
│       ├── MemorycraftKinesisLoggingConsumer.java
│       ├── MemorycraftKinesisLoggingProcessorFactory.java
│       └── MemorycraftKinesisLoggingProcessor.java
└── third-party

Consumer Aのインスタンス群では、ストリームに流れているデータを抽出して、順次S3にログとして保存していきます。
ConsumerではKCLを使用します。

構成としては、MemorycraftKinesisLoggingConsumerからIRecordProcessorFactoryを実装したMemorycraftKinesisLoggingProcessorFactoryを初期化してワーカーとして実行します。
実際の処理はIRecordProcessorを実装したMemorycraftKinesisLoggingProcessorのprocessRecordsメソッド内部に記述します。


MemorycraftKinesisLoggingConsumer.java

MemorycraftKinesisLoggingProcessorFactory.java

MemorycraftKinesisLoggingProcessor.java

実行してみます。
[root@ip-10-118-97-158 MemoKinesis]# ant
Buildfile: /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml

run:
    [javac] /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml:12: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
    [javac] Compiling 4 source files to /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis
    [javac] warning: Supported source version 'RELEASE_6' from annotation processor 'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotationProcessor' less than -source '1.7'
    [javac] 1 warning
..........
     [java] INFO: Successfully published 3 datums.
     [java] Dec 03, 2013 11:58:06 AM com.amazonaws.services.kinesis.metrics.impl.DefaultCWMetricsPublisher publishMetrics
     [java] INFO: Successfully published 18 datums.
     [java] Dec 03, 2013 11:58:14 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
     [java] INFO: Current stream shard assignments: shardId-000000000001, shardId-000000000000
     [java] Dec 03, 2013 11:58:14 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
     [java] INFO: Sleeping ...
     [java] Dec 03, 2013 11:58:16 AM com.amazonaws.services.kinesis.metrics.impl.DefaultCWMetricsPublisher publishMetrics
     [java] INFO: Successfully published 20 datums.
     [java] Dec 03, 2013 11:58:16 AM com.amazonaws.services.kinesis.metrics.impl.DefaultCWMetricsPublisher publishMetrics
     [java] INFO: Successfully published 3 datums.
     [java] Dec 03, 2013 11:58:18 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO: process record [21269323576357670914947437401207537664]
     [java] Dec 03, 2013 11:58:18 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO:  saved [json]={"datetime":"2013-12-03 08:41:05","user_id":"testuser_G","x":"54.504733903095726","y":"27.966324048617963"}
     [java] Dec 03, 2013 11:58:18 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO: process record [21269323576357720993054772957520920576]
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO:  saved [json]={"datetime":"2013-12-03 08:41:05","user_id":"testuser_W","x":"12.256118907591883","y":"63.004577908019634"}
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO: process record [21269323576357794417499720660453949440]
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO:  saved [json]={"datetime":"2013-12-03 08:41:05","user_id":"testuser_O","x":"69.81831906353719","y":"31.571006624423724"}
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO: process record [21269323576357840331291814692712415232]
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO:  saved [json]={"datetime":"2013-12-03 08:41:04","user_id":"testuser_F","x":"19.924814432111905","y":"41.65256488223227"}
..........


うまく拾えているようです。
S3のバケットを見てみます。

おお、次から次へとファイルが保存されていくのがわかります。
そのうちの一つをダウンロードして、中身を見てみます。
{"datetime":"2013-12-03 08:44:17","user_id":"testuser_H","x":"79.49199108301805","y":"2.4829017202529724"}
中身もうまく入っているようです。


Consumer B



次はConsumerBです。

/usr/local/src/aws-java-sdk-1.6.4/
├── lib
│   ├── aws-java-sdk-1.6.4.jar
│   ├── aws-java-sdk-1.6.4-javadoc.jar
│   ├── aws-java-sdk-1.6.4-sources.jar
│   ├── aws-java-sdk-flow-build-tools-1.6.4.jar
│   ├── commons-lang-2.6.jar
│   ├── jsonic-1.3.0.jar
│   └── KinesisClientLibrary.jar
├── samples
│   └── MemoKinesis
│       ├── AwsCredentials.properties
│       ├── build.xml
│       ├── ConfigKeys.java
│       ├── Hoge.java
│       ├── MemorycraftKinesisDistanceConsumer.java
│       ├── MemorycraftKinesisDistanceProcessorFactory.java
│       └── MemorycraftKinesisDistanceProcessor.java
└── third-party


Consumer Bのインスタンス群では、ユーザーIDをハッシュキーにしたDynamoDBのレコードにXとYから距離を出してインクリメントしていき総距離を加算更新します。

クラスとしてはMemorycraftKinesisDistanceProcessorを中心に実装します。

MemorycraftKinesisDistanceConsumer.java

MemorycraftKinesisDistanceProcessorFactory.java

MemorycraftKinesisDistanceProcessor.java

実行してみます。

[root@ip-10-60-155-21 MemoKinesis]# ant
Buildfile: /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml

run:
    [javac] /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml:12: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
     [java] Dec 03, 2013 3:22:17 PM MemorycraftKinesisDistanceConsumer configure
     [java] INFO: Using workerId: ip-10-60-155-21.ec2.internal:584216f1-8d4d-429c-ba99-8af365345e17
     [java] Dec 03, 2013 3:22:17 PM MemorycraftKinesisDistanceConsumer configure
.......
     [java] INFO: Successfully published 18 datums.
     [java] Dec 03, 2013 3:23:18 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
     [java] INFO: Current stream shard assignments: shardId-000000000001, shardId-000000000000
     [java] Dec 03, 2013 3:23:18 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
     [java] INFO: Sleeping ...
     [java] Dec 03, 2013 3:23:18 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO: process record [21269323576357670914947437401207537664]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO:  saved [user_id:testuser_G, distance:61.260764757221075]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO: process record [21269323576357720993054772957520920576]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO:  saved [user_id:testuser_W, distance:64.18558473711015]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO: process record [21269323576357794417499720660453949440]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO:  saved [user_id:testuser_O, distance:76.62457919060493]
.....


それではDynamoDBを見てみます。
ちゃんとユーザーごとに集計されているようです。


続々加算されていきます。



チェックポイント



KinesisのコンシューマーでKCLを使うとコンシューマーごとにチェックポイントというマーカーを打つことができます。
IRecordProcessor.checkpointメソッド内で、checkpointer.checkpoint()を呼び出すと、ストリームのシーケンスのなかでどこまで読み込んだという記録が保存され、読み取りを中断したり障害が起こった後に、プロセスを再稼働すると、そのチェックポイントから読み取りが再開されます。
チェックポイントを呼び出すタイミングはユーザーに委ねられていて、今回のサンプルでは1分に設定しています。

   private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
    public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) {

        process(records);

        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            checkpoint(checkpointer);
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }

    }


わかったこと


  • 定期的ではなく常にデータ処理を処理し続けることができる仕組みと、そのためのスケーラビリティを持っている。
  • ユーザーが書くコードは、最低限であればそんなに多くない。
  • バッチではタイムラグがありすぎる!という場合にはうってつけのサービス。
  • Consumer同士の処理順を気にしないでいい。処理順をつけたい場合は、そのConsumer内で行うか、出力先に別のストリームを指定しそちらで次の処理をするとか
  • DynamoDBと同様、ストリームのキャパシティの見積もりは必要です。でもシャード数を指定するだけなので変更自体は簡単
  • 周辺のインスタンス、特にConsumerはバッチと同様インスタンスの負荷をみてロールごとにAutoScalingGroupを作る必要あり
  • Consumerからデータの移動先となるDynamoDBやRedshiftその他のエンドポイントのキャパシティ管理も同様に注意が必要


以上です。