2013年12月4日水曜日

Kinesisってなんじゃ?(Java実装編)

前回からの続きです。

このストリームをつかってイベント処理を行うアプリケーションを実装します。
Kinesisを扱うには、現時点でJavaのSDKとKinesisのクライアントライブラリ(KCL)が利用可能です。

今回のサンプルとなるシステムですが、以下のように定義してみました。
  • Producerはユーザーの出発地点(0, 0)からの到達位置(x, y)を日時ごとにJSONとしてストリームに入力していきます。
  • ConsumerAではストリームからデータを受け取り、ログとしてS3のバケットに保存していきます。
  • ConsumerBでは同じくストリームからデータを受け取り、ユーザーごとの総移動距離をDynamoDBにインクリメント保存していきます。

これを、限定公開用にKinesis対応されたJavaのSDKとKinesisのクライアントライブラリ(KCL)を使って実装してみます。SDKにはサンプルコードなどが付属しているため、それを参考に実装してみます。
また、KCLはシャードの管理のために裏側でConsumerごとにDynamoDBテーブルを使用します。

JDKの1.7とantがインストールされていることが前提です。


Producer


以下のツリー図の通り、SDKのサンプルコードの一部として他のサンプルと同じようにMemoKinesisというサンプルをつくりました。

/usr/local/src/aws-java-sdk-1.6.4/
├── lib
│   ├── aws-java-sdk-1.6.4.jar
│   ├── aws-java-sdk-1.6.4-javadoc.jar
│   ├── aws-java-sdk-1.6.4-sources.jar
│   ├── aws-java-sdk-flow-build-tools-1.6.4.jar
│   ├── commons-lang-2.6.jar
│   ├── joda-time-2.3-dist.tar.gz
│   ├── joda-time-2.3.jar
│   └── jsonic-1.3.0.jar
├── samples
│   └── MemoKinesis
│       ├── AwsCredentials.properties
│       ├── build.xml
│       ├── Hoge.java
│       └── MemorycraftKinesisProducer.java
└── third-party

配下のbuild.xmlは他サンプルと同様に、../../lib, ../../third-partyにクラスパスを通して、コンパイル→実行するようなターゲットになっているため、クラス名だけ変更してそのまま利用しています。
AwsCredentials.propertiesには自分のアカウントのキー情報を入力してあります。
また、このコードで必要なライブラリは../../libディレクトリに追加してあります(青字)。

また、コードは以下の通りです。

MemorycraftKinesisProducer.java
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.nio.ByteBuffer;
import java.util.Locale;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang.RandomStringUtils;
import net.arnx.jsonic.JSON;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.DateTimeZone;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.CreateStreamRequest;
import com.amazonaws.services.kinesis.model.DeleteStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.ListStreamsRequest;
import com.amazonaws.services.kinesis.model.ListStreamsResult;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.PutRecordResult;
public class MemorycraftKinesisProducer{
private static final Log LOG = LogFactory.getLog(MemorycraftKinesisProducer.class);
private static final String myStreamName = "memostream";
static AmazonKinesis kinesis;
private static void init() throws Exception {
kinesis = new AmazonKinesisClient(new ClasspathPropertiesFileCredentialsProvider());
}
public static void main(String[] args) throws Exception {
init();
while(true) {
DateTime dt = new DateTime();
String key = RandomStringUtils.randomAlphanumeric(10);
Hoge hoge = new Hoge();
hoge.user_id = "testuser_" + RandomStringUtils.randomAlphabetic(1).toUpperCase();
hoge.datetime = dt.toString("yyyy-MM-dd HH:mm:ss", Locale.JAPAN);
hoge.x = ""+Math.random()*100;
hoge.y = ""+Math.random()*100;
String json = JSON.encode(hoge);
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(myStreamName);
putRecordRequest.setData(ByteBuffer.wrap(json.getBytes()));
putRecordRequest.setPartitionKey(key);
PutRecordResult putRecordResult = kinesis.putRecord(putRecordRequest);
LOG.info(json);
}
}
}

Hoge.java
public class Hoge {
public String user_id;
public String datetime;
public String x;
public String y;
}
view raw Hoge.java hosted with ❤ by GitHub

Producerでは、testuser_[A-Z]の26人のユーザーIDと、日時、ランダムなx,y をJSON化してストリームに投入しつづけます。
AmazonKinesisClientを初期化し、PutRecordRequestを使ってputRecordします。
実行すると以下のように投入され続けていくことがわかります。
[root@ip-10-154-154-57 MemoKinesis]# ant
Buildfile: /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml

run:
    [javac] /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml:12: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
    [javac] Compiling 1 source file to /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis
    [javac] warning: Supported source version 'RELEASE_6' from annotation processor 'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotationProcessor' less than -source '1.7'
    [javac] 1 warning
     [java] Dec 03, 2013 10:08:23 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:22","user_id":"testuser_F","x":"66.41701160878385","y":"85.85361518485006"}
     [java] Dec 03, 2013 10:08:23 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:23","user_id":"testuser_D","x":"64.19880893804586","y":"26.65513499028829"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:23","user_id":"testuser_T","x":"59.34696505060783","y":"85.54404394674596"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_N","x":"76.26154448594934","y":"67.74843698818461"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_Z","x":"29.457789127237753","y":"18.48254643961956"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_Z","x":"9.91729666668798","y":"18.193714091252332"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_M","x":"98.54843645906183","y":"83.25335740427954"}
     [java] Dec 03, 2013 10:08:24 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_F","x":"22.2596852199308","y":"39.49631977033009"}
     [java] Dec 03, 2013 10:08:25 AM MemorycraftKinesisProducer main
     [java] INFO: {"datetime":"2013-12-03 10:08:24","user_id":"testuser_W","x":"35.34360225385589","y":"7.982969346289337"}
.......



Consumer A


次はConsumerです。これも同じように他のサンプルコードとほぼ同じ構成で実装しました。

/usr/local/src/aws-java-sdk-1.6.4/
├── lib
│   ├── aws-java-sdk-1.6.4.jar
│   ├── aws-java-sdk-1.6.4-javadoc.jar
│   ├── aws-java-sdk-1.6.4-sources.jar
│   ├── aws-java-sdk-flow-build-tools-1.6.4.jar
│   ├── commons-lang-2.6.jar
│   ├── jsonic-1.3.0.jar
│   └── KinesisClientLibrary.jar
├── samples
│   └── MemoKinesis
│       ├── AwsCredentials.properties
│       ├── build.xml
│       ├── ConfigKeys.java
│       ├── MemorycraftKinesisLoggingConsumer.java
│       ├── MemorycraftKinesisLoggingProcessorFactory.java
│       └── MemorycraftKinesisLoggingProcessor.java
└── third-party

Consumer Aのインスタンス群では、ストリームに流れているデータを抽出して、順次S3にログとして保存していきます。
ConsumerではKCLを使用します。

構成としては、MemorycraftKinesisLoggingConsumerからIRecordProcessorFactoryを実装したMemorycraftKinesisLoggingProcessorFactoryを初期化してワーカーとして実行します。
実際の処理はIRecordProcessorを実装したMemorycraftKinesisLoggingProcessorのprocessRecordsメソッド内部に記述します。


MemorycraftKinesisLoggingConsumer.java
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Properties;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
/**
* Sample Kinesis Application.
*/
public final class MemorycraftKinesisLoggingConsumer {
private static final String DEFAULT_APP_NAME = "MemorycraftKinesisLoggingConsumer";
private static final String DEFAULT_STREAM_NAME = "memostream";
private static final String DEFAULT_KINESIS_ENDPOINT = "https://kinesis.us-east-1.amazonaws.com";
// Initial position in the stream when the application starts up for the first time.
// Position can be one of LATEST (most recent data) or TRIM_HORIZON (oldest available data)
private static final InitialPositionInStream DEFAULT_INITIAL_POSITION = InitialPositionInStream.TRIM_HORIZON;
private static String applicationName = DEFAULT_APP_NAME;
private static String streamName = DEFAULT_STREAM_NAME;
private static String kinesisEndpoint = DEFAULT_KINESIS_ENDPOINT;
private static InitialPositionInStream initialPositionInStream = DEFAULT_INITIAL_POSITION;
private static KinesisClientLibConfiguration kinesisClientLibConfiguration;
private static final Log LOG = LogFactory.getLog(MemorycraftKinesisLoggingConsumer.class);
/**
*
*/
private MemorycraftKinesisLoggingConsumer() {
super();
}
/**
* @param args Property file with config overrides (e.g. application name, stream name)
* @throws IOException Thrown if we can't read properties from the specified properties file
*/
public static void main(String[] args) throws IOException {
String propertiesFile = null;
if (args.length > 1) {
System.err.println("Usage: java " + MemorycraftKinesisLoggingConsumer.class.getName() + " <propertiesFile>");
System.exit(1);
} else if (args.length == 1) {
propertiesFile = args[0];
}
configure(propertiesFile);
System.out.println("Starting " + applicationName);
LOG.info("Running " + applicationName + " to process stream " + streamName);
IRecordProcessorFactory recordProcessorFactory = new MemorycraftKinesisLoggingProcessorFactory();
Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
int exitCode = 0;
try {
worker.run();
} catch (Throwable t) {
LOG.error("Caught throwable while processing data.", t);
exitCode = 1;
}
System.exit(exitCode);
}
private static void configure(String propertiesFile) throws IOException {
if (propertiesFile != null) {
loadProperties(propertiesFile);
}
// ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
java.security.Security.setProperty("networkaddress.cache.ttl" , "60");
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
LOG.info("Using workerId: " + workerId);
// Get credentials from IMDS. If unsuccessful, get them from the classpath.
AWSCredentialsProvider credentialsProvider = null;
try {
credentialsProvider = new InstanceProfileCredentialsProvider();
// Verify we can fetch credentials from the provider
credentialsProvider.getCredentials();
LOG.info("Obtained credentials from the IMDS.");
} catch (AmazonClientException e) {
LOG.info("Unable to obtain credentials from the IMDS, trying classpath properties", e);
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
// Verify we can fetch credentials from the provider
credentialsProvider.getCredentials();
LOG.info("Obtained credentials from the properties file.");
}
LOG.info("Using credentials with access key id: " + credentialsProvider.getCredentials().getAWSAccessKeyId());
kinesisClientLibConfiguration = new KinesisClientLibConfiguration(applicationName, streamName, kinesisEndpoint,
initialPositionInStream, credentialsProvider, workerId);
}
/**
* @param propertiesFile
* @throws IOException Thrown when we run into issues reading properties
*/
private static void loadProperties(String propertiesFile) throws IOException {
FileInputStream inputStream = new FileInputStream(propertiesFile);
Properties properties = new Properties();
try {
properties.load(inputStream);
} finally {
inputStream.close();
}
String appNameOverride = properties.getProperty(ConfigKeys.APPLICATION_NAME_KEY);
if (appNameOverride != null) {
applicationName = appNameOverride;
}
LOG.info("Using application name " + applicationName);
String streamNameOverride = properties.getProperty(ConfigKeys.STREAM_NAME_KEY);
if (streamNameOverride != null) {
streamName = streamNameOverride;
}
LOG.info("Using stream name " + streamName);
String kinesisEndpointOverride = properties.getProperty(ConfigKeys.KINESIS_ENDPOINT_KEY);
if (kinesisEndpointOverride != null) {
kinesisEndpoint = kinesisEndpointOverride;
}
String initialPositionOverride = properties.getProperty(ConfigKeys.INITIAL_POSITION_IN_STREAM_KEY);
if (initialPositionOverride != null) {
initialPositionInStream = InitialPositionInStream.valueOf(initialPositionOverride);
}
LOG.info("Using initial position " + initialPositionInStream.toString() + " (if a checkpoint is not found).");
}
}

MemorycraftKinesisLoggingProcessorFactory.java
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
/**
* Used to create new record processors.
*/
public class MemorycraftKinesisLoggingProcessorFactory implements IRecordProcessorFactory {
/**
* Constructor.
*/
public MemorycraftKinesisLoggingProcessorFactory() {
super();
}
/**
* {@inheritDoc}
*/
@Override
public IRecordProcessor createProcessor() {
return new MemorycraftKinesisLoggingProcessor();
}
}

MemorycraftKinesisLoggingProcessor.java
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang.RandomStringUtils;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
public class MemorycraftKinesisLoggingProcessor implements IRecordProcessor {
private static final Log LOG = LogFactory.getLog(MemorycraftKinesisLoggingProcessor.class);
private String kinesisShardId;
private AmazonS3 s3;
private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
private long nextCheckpointTimeInMillis;
private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
/**
* Constructor.
*/
public MemorycraftKinesisLoggingProcessor() {
super();
}
/**
* {@inheritDoc}
*/
@Override
public void initialize(String shardId) {
this.kinesisShardId = shardId;
this.s3 = new AmazonS3Client(new ClasspathPropertiesFileCredentialsProvider());
}
/**
* {@inheritDoc}
*/
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
process(records);
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
/** Process records performing retries as needed. Skip "poison pill" records.
* @param records
*/
private void process(List<Record> records) {
for (Record record : records) {
InputStream in = null;
try{
String seqno = record.getSequenceNumber();
LOG.info("process record ["+seqno+"]");
String json = decoder.decode(record.getData()).toString();
in = new ByteArrayInputStream(json.getBytes("UTF-8"));
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(json.getBytes("UTF-8").length);
String filename = RandomStringUtils.randomAlphanumeric(32);
PutObjectResult putObjectResult = s3.putObject("memorycraft-kinesis", "log/"+filename, in, metadata);
LOG.info(" saved [json]=" + json);
} catch (CharacterCodingException e) {
//do somothing
} catch (Throwable t) {
//do somothing
} finally {
try{
in.close();
}
catch(IOException e){
LOG.error("close error");
}
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
if (reason == ShutdownReason.TERMINATE) {
checkpoint(checkpointer);
}
}
/** Checkpoint with retries.
* @param checkpointer
*/
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
try{
LOG.info("CHECKPOINT !!!!!");
checkpointer.checkpoint();
} catch (ShutdownException se) {
//do something
} catch (ThrottlingException e) {
//do something
} catch (InvalidStateException e) {
//do something
}
}
}

実行してみます。
[root@ip-10-118-97-158 MemoKinesis]# ant
Buildfile: /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml

run:
    [javac] /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml:12: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
    [javac] Compiling 4 source files to /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis
    [javac] warning: Supported source version 'RELEASE_6' from annotation processor 'com.amazonaws.eclipse.simpleworkflow.asynchrony.annotationprocessor.AsynchronyDeciderAnnotationProcessor' less than -source '1.7'
    [javac] 1 warning
..........
     [java] INFO: Successfully published 3 datums.
     [java] Dec 03, 2013 11:58:06 AM com.amazonaws.services.kinesis.metrics.impl.DefaultCWMetricsPublisher publishMetrics
     [java] INFO: Successfully published 18 datums.
     [java] Dec 03, 2013 11:58:14 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
     [java] INFO: Current stream shard assignments: shardId-000000000001, shardId-000000000000
     [java] Dec 03, 2013 11:58:14 AM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
     [java] INFO: Sleeping ...
     [java] Dec 03, 2013 11:58:16 AM com.amazonaws.services.kinesis.metrics.impl.DefaultCWMetricsPublisher publishMetrics
     [java] INFO: Successfully published 20 datums.
     [java] Dec 03, 2013 11:58:16 AM com.amazonaws.services.kinesis.metrics.impl.DefaultCWMetricsPublisher publishMetrics
     [java] INFO: Successfully published 3 datums.
     [java] Dec 03, 2013 11:58:18 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO: process record [21269323576357670914947437401207537664]
     [java] Dec 03, 2013 11:58:18 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO:  saved [json]={"datetime":"2013-12-03 08:41:05","user_id":"testuser_G","x":"54.504733903095726","y":"27.966324048617963"}
     [java] Dec 03, 2013 11:58:18 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO: process record [21269323576357720993054772957520920576]
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO:  saved [json]={"datetime":"2013-12-03 08:41:05","user_id":"testuser_W","x":"12.256118907591883","y":"63.004577908019634"}
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO: process record [21269323576357794417499720660453949440]
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO:  saved [json]={"datetime":"2013-12-03 08:41:05","user_id":"testuser_O","x":"69.81831906353719","y":"31.571006624423724"}
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO: process record [21269323576357840331291814692712415232]
     [java] Dec 03, 2013 11:58:19 AM MemorycraftKinesisLoggingProcessor process
     [java] INFO:  saved [json]={"datetime":"2013-12-03 08:41:04","user_id":"testuser_F","x":"19.924814432111905","y":"41.65256488223227"}
..........


うまく拾えているようです。
S3のバケットを見てみます。

おお、次から次へとファイルが保存されていくのがわかります。
そのうちの一つをダウンロードして、中身を見てみます。
{"datetime":"2013-12-03 08:44:17","user_id":"testuser_H","x":"79.49199108301805","y":"2.4829017202529724"}
中身もうまく入っているようです。


Consumer B



次はConsumerBです。

/usr/local/src/aws-java-sdk-1.6.4/
├── lib
│   ├── aws-java-sdk-1.6.4.jar
│   ├── aws-java-sdk-1.6.4-javadoc.jar
│   ├── aws-java-sdk-1.6.4-sources.jar
│   ├── aws-java-sdk-flow-build-tools-1.6.4.jar
│   ├── commons-lang-2.6.jar
│   ├── jsonic-1.3.0.jar
│   └── KinesisClientLibrary.jar
├── samples
│   └── MemoKinesis
│       ├── AwsCredentials.properties
│       ├── build.xml
│       ├── ConfigKeys.java
│       ├── Hoge.java
│       ├── MemorycraftKinesisDistanceConsumer.java
│       ├── MemorycraftKinesisDistanceProcessorFactory.java
│       └── MemorycraftKinesisDistanceProcessor.java
└── third-party


Consumer Bのインスタンス群では、ユーザーIDをハッシュキーにしたDynamoDBのレコードにXとYから距離を出してインクリメントしていき総距離を加算更新します。

クラスとしてはMemorycraftKinesisDistanceProcessorを中心に実装します。

MemorycraftKinesisDistanceConsumer.java
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Properties;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
/**
* Sample Kinesis Application.
*/
public final class MemorycraftKinesisDistanceConsumer {
private static final String DEFAULT_APP_NAME = "MemorycraftKinesisDistanceConsumer";
private static final String DEFAULT_STREAM_NAME = "memostream";
private static final String DEFAULT_KINESIS_ENDPOINT = "https://kinesis.us-east-1.amazonaws.com";
// Initial position in the stream when the application starts up for the first time.
// Position can be one of LATEST (most recent data) or TRIM_HORIZON (oldest available data)
private static final InitialPositionInStream DEFAULT_INITIAL_POSITION = InitialPositionInStream.TRIM_HORIZON;
private static String applicationName = DEFAULT_APP_NAME;
private static String streamName = DEFAULT_STREAM_NAME;
private static String kinesisEndpoint = DEFAULT_KINESIS_ENDPOINT;
private static InitialPositionInStream initialPositionInStream = DEFAULT_INITIAL_POSITION;
private static KinesisClientLibConfiguration kinesisClientLibConfiguration;
private static final Log LOG = LogFactory.getLog(MemorycraftKinesisDistanceConsumer.class);
/**
*
*/
private MemorycraftKinesisDistanceConsumer() {
super();
}
/**
* @param args Property file with config overrides (e.g. application name, stream name)
* @throws IOException Thrown if we can't read properties from the specified properties file
*/
public static void main(String[] args) throws IOException {
String propertiesFile = null;
if (args.length > 1) {
System.err.println("Usage: java " + MemorycraftKinesisDistanceConsumer.class.getName() + " <propertiesFile>");
System.exit(1);
} else if (args.length == 1) {
propertiesFile = args[0];
}
configure(propertiesFile);
System.out.println("Starting " + applicationName);
LOG.info("Running " + applicationName + " to process stream " + streamName);
IRecordProcessorFactory recordProcessorFactory = new MemorycraftKinesisDistanceProcessorFactory();
Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration);
int exitCode = 0;
try {
worker.run();
} catch (Throwable t) {
LOG.error("Caught throwable while processing data.", t);
exitCode = 1;
}
System.exit(exitCode);
}
private static void configure(String propertiesFile) throws IOException {
if (propertiesFile != null) {
loadProperties(propertiesFile);
}
// ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints).
java.security.Security.setProperty("networkaddress.cache.ttl" , "60");
String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
LOG.info("Using workerId: " + workerId);
// Get credentials from IMDS. If unsuccessful, get them from the classpath.
AWSCredentialsProvider credentialsProvider = null;
try {
credentialsProvider = new InstanceProfileCredentialsProvider();
// Verify we can fetch credentials from the provider
credentialsProvider.getCredentials();
LOG.info("Obtained credentials from the IMDS.");
} catch (AmazonClientException e) {
LOG.info("Unable to obtain credentials from the IMDS, trying classpath properties", e);
credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();
// Verify we can fetch credentials from the provider
credentialsProvider.getCredentials();
LOG.info("Obtained credentials from the properties file.");
}
LOG.info("Using credentials with access key id: " + credentialsProvider.getCredentials().getAWSAccessKeyId());
kinesisClientLibConfiguration = new KinesisClientLibConfiguration(applicationName, streamName, kinesisEndpoint,
initialPositionInStream, credentialsProvider, workerId);
}
/**
* @param propertiesFile
* @throws IOException Thrown when we run into issues reading properties
*/
private static void loadProperties(String propertiesFile) throws IOException {
FileInputStream inputStream = new FileInputStream(propertiesFile);
Properties properties = new Properties();
try {
properties.load(inputStream);
} finally {
inputStream.close();
}
String appNameOverride = properties.getProperty(ConfigKeys.APPLICATION_NAME_KEY);
if (appNameOverride != null) {
applicationName = appNameOverride;
}
LOG.info("Using application name " + applicationName);
String streamNameOverride = properties.getProperty(ConfigKeys.STREAM_NAME_KEY);
if (streamNameOverride != null) {
streamName = streamNameOverride;
}
LOG.info("Using stream name " + streamName);
String kinesisEndpointOverride = properties.getProperty(ConfigKeys.KINESIS_ENDPOINT_KEY);
if (kinesisEndpointOverride != null) {
kinesisEndpoint = kinesisEndpointOverride;
}
String initialPositionOverride = properties.getProperty(ConfigKeys.INITIAL_POSITION_IN_STREAM_KEY);
if (initialPositionOverride != null) {
initialPositionInStream = InitialPositionInStream.valueOf(initialPositionOverride);
}
LOG.info("Using initial position " + initialPositionInStream.toString() + " (if a checkpoint is not found).");
}
}

MemorycraftKinesisDistanceProcessorFactory.java
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
/**
* Used to create new record processors.
*/
public class MemorycraftKinesisDistanceProcessorFactory implements IRecordProcessorFactory {
/**
* Constructor.
*/
public MemorycraftKinesisDistanceProcessorFactory() {
super();
}
/**
* {@inheritDoc}
*/
@Override
public IRecordProcessor createProcessor() {
return new MemorycraftKinesisDistanceProcessor();
}
}

MemorycraftKinesisDistanceProcessor.java
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharacterCodingException;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import java.lang.reflect.Field;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import net.arnx.jsonic.JSON;
import com.amazonaws.services.dynamodb.model.Key;
import com.amazonaws.services.dynamodb.model.AttributeAction;
import com.amazonaws.services.dynamodb.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodb.model.AttributeValue;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.dynamodb.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodb.model.PutItemRequest;
import com.amazonaws.services.dynamodb.model.PutItemResult;
import com.amazonaws.services.dynamodb.model.UpdateItemRequest;
import com.amazonaws.services.dynamodb.model.UpdateItemResult;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
public class MemorycraftKinesisDistanceProcessor implements IRecordProcessor {
private static final Log LOG = LogFactory.getLog(MemorycraftKinesisDistanceProcessor.class);
private String kinesisShardId;
private AmazonDynamoDB dynamodb;
private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
private long nextCheckpointTimeInMillis;
private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
/**
* Constructor.
*/
public MemorycraftKinesisDistanceProcessor() {
super();
}
/**
* {@inheritDoc}
*/
@Override
public void initialize(String shardId) {
this.kinesisShardId = shardId;
dynamodb = new AmazonDynamoDBClient(new ClasspathPropertiesFileCredentialsProvider());
}
/**
* {@inheritDoc}
*/
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
process(records);
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
/** Process records performing retries as needed. Skip "poison pill" records.
* @param records
*/
private void process(List<Record> records) {
for (Record record : records) {
try{
String seqno = record.getSequenceNumber();
LOG.info("process record ["+seqno+"]");
String data = decoder.decode(record.getData()).toString();
Hoge hoge = JSON.decode(data, Hoge.class);
Key key = new Key(new AttributeValue(hoge.user_id));
String value = "" + Math.sqrt(Math.pow(Double.parseDouble(hoge.x), 2) + Math.pow(Double.parseDouble(hoge.y), 2));
Map<String, AttributeValueUpdate> item = new HashMap<String, AttributeValueUpdate>();
item.put("distance", new AttributeValueUpdate().withAction(AttributeAction.ADD).withValue(new AttributeValue().withN(value)));
UpdateItemResult updateItemResult = dynamodb.updateItem(new UpdateItemRequest("memorycraft-kinesis", key, item));
LOG.info(" saved [user_id:" + hoge.user_id+", distance:"+value+"]");
} catch (CharacterCodingException e) {
//do somothing
} catch (Throwable t) {
//do somothing
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
if (reason == ShutdownReason.TERMINATE) {
checkpoint(checkpointer);
}
}
/** Checkpoint with retries.
* @param checkpointer
*/
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
try{
LOG.info("CHECKPOINT !!!!!");
checkpointer.checkpoint();
} catch (ShutdownException se) {
//do something
} catch (ThrottlingException e) {
//do something
} catch (InvalidStateException e) {
//do something
}
}
}

実行してみます。

[root@ip-10-60-155-21 MemoKinesis]# ant
Buildfile: /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml

run:
    [javac] /usr/local/src/aws-java-sdk-1.6.4/samples/MemoKinesis/build.xml:12: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
     [java] Dec 03, 2013 3:22:17 PM MemorycraftKinesisDistanceConsumer configure
     [java] INFO: Using workerId: ip-10-60-155-21.ec2.internal:584216f1-8d4d-429c-ba99-8af365345e17
     [java] Dec 03, 2013 3:22:17 PM MemorycraftKinesisDistanceConsumer configure
.......
     [java] INFO: Successfully published 18 datums.
     [java] Dec 03, 2013 3:23:18 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
     [java] INFO: Current stream shard assignments: shardId-000000000001, shardId-000000000000
     [java] Dec 03, 2013 3:23:18 PM com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker$WorkerLog info
     [java] INFO: Sleeping ...
     [java] Dec 03, 2013 3:23:18 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO: process record [21269323576357670914947437401207537664]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO:  saved [user_id:testuser_G, distance:61.260764757221075]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO: process record [21269323576357720993054772957520920576]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO:  saved [user_id:testuser_W, distance:64.18558473711015]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO: process record [21269323576357794417499720660453949440]
     [java] Dec 03, 2013 3:23:19 PM MemorycraftKinesisDistanceProcessor process
     [java] INFO:  saved [user_id:testuser_O, distance:76.62457919060493]
.....


それではDynamoDBを見てみます。
ちゃんとユーザーごとに集計されているようです。


続々加算されていきます。



チェックポイント



KinesisのコンシューマーでKCLを使うとコンシューマーごとにチェックポイントというマーカーを打つことができます。
IRecordProcessor.checkpointメソッド内で、checkpointer.checkpoint()を呼び出すと、ストリームのシーケンスのなかでどこまで読み込んだという記録が保存され、読み取りを中断したり障害が起こった後に、プロセスを再稼働すると、そのチェックポイントから読み取りが再開されます。
チェックポイントを呼び出すタイミングはユーザーに委ねられていて、今回のサンプルでは1分に設定しています。

   private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
    public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) {

        process(records);

        if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
            checkpoint(checkpointer);
            nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
        }

    }


わかったこと


  • 定期的ではなく常にデータ処理を処理し続けることができる仕組みと、そのためのスケーラビリティを持っている。
  • ユーザーが書くコードは、最低限であればそんなに多くない。
  • バッチではタイムラグがありすぎる!という場合にはうってつけのサービス。
  • Consumer同士の処理順を気にしないでいい。処理順をつけたい場合は、そのConsumer内で行うか、出力先に別のストリームを指定しそちらで次の処理をするとか
  • DynamoDBと同様、ストリームのキャパシティの見積もりは必要です。でもシャード数を指定するだけなので変更自体は簡単
  • 周辺のインスタンス、特にConsumerはバッチと同様インスタンスの負荷をみてロールごとにAutoScalingGroupを作る必要あり
  • Consumerからデータの移動先となるDynamoDBやRedshiftその他のエンドポイントのキャパシティ管理も同様に注意が必要


以上です。