Spring Boot + Spring Integration でいろいろ試してみる ( その46 )( Docker Compose でサーバを構築する、Kafka 編13 - 1つのアプリケーション内に複数の Kafka Streams アプリを定義する+KTable を使ってみる )
概要
記事一覧はこちらです。
Spring Boot+Spring Integration+Spring Integration Kafka 構成の1つのプロジェクト内に複数の Kafka Streams アプリを実装してみます。
実装してみた感想としては、やっぱり1プロジェクト1アプリの構成の方が作りやすいしスケールアウトもさせやすいですので、複数入れるのは止めた方がいいです。
参照したサイト・書籍
- Data Reprocessing with the Streams API in Kafka: Resetting a Streams Application
https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
目次
- 何を作成するか?
- kafkastreams-multistreams-app サブプロジェクトを作成する
- 1つのプロジェクト内に複数の Kafka Streams アプリを定義するには?
- 共通で使用する TopicUtils クラス、BaseKafkaStreamsConfig クラスを作成する
- LengthAppKafkaStreamsConfig クラスを作成する
- CountByLengthAppKafkaStreamsConfig クラスを作成する
- CountByWordsAppKafkaStreamsConfig クラスを作成する
- application.properties を記述する
- 動作確認
手順
何を作成するか?
- topic は4つ作成する(Topic1、Topic2、Topic3、Topic4)。
- Kafka Streams アプリは3つ作成する。
- lengthStreamsApp
- countByLengthStreamsApp
- countByWordsStreamsApp
- Kafka Streams アプリは全て kafkastreams-multistreams-app プロジェクト内に定義する。1 Kafka Streams アプリ 1 プロジェクトにはしない。
- countByLengthStreamsApp、countByWordsStreamsApp は KTable を使用する。今回は Window 処理はなし。
- Topic1 への入力は kafka-console-producer を使用する。
- Topic2、Topic3、Topic4 のメッセージの確認には kafka-console-consumer を使用する。
※今回は1台のPC上で Kafka Streams アプリを動かてしているから動作しているが、実際には「同じキーは同じパーティションに送信される」という Kafka の仕様を考慮して、Topic2 → Topic3 の間にもう1つ topic を追加して <[文字数]: [文字列>(例 <1: a>) のようなメッセージを送信するか、Topic1 から Topic2-1(<文字数: 文字列> のメッセージを渡す)、Topic2-2(<文字列: 文字数> のメッセージを渡す)に分けてメッセージを送信するようにしないとダメだな。。。
kafkastreams-multistreams-app サブプロジェクトを作成する
Spring Initializr でプロジェクトを作成した後、
build.gradle を以下の内容に変更します(kafkastreams-uppercase-app と同じ内容です)。
buildscript { ext { group "ksbysample.eipapp.ksbysample-eipapp-kafkastreams" version "0.0.1-SNAPSHOT" } repositories { mavenCentral() gradlePluginPortal() } } plugins { id "org.springframework.boot" version "2.1.9.RELEASE" id "io.spring.dependency-management" version "1.0.8.RELEASE" id "java" id "idea" } sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } repositories { mavenCentral() } dependencyManagement { imports { mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES) mavenBom("org.junit:junit-bom:5.5.2") } } dependencies { def kafkaVersion = "2.3.0" implementation("org.springframework.boot:spring-boot-starter-integration") implementation("org.springframework.integration:spring-integration-kafka:3.2.0.RELEASE") implementation("org.apache.kafka:kafka-clients:${kafkaVersion}") implementation("org.apache.kafka:kafka-streams:${kafkaVersion}") testImplementation("org.springframework.boot:spring-boot-starter-test") // for JUnit 5 testCompile("org.junit.jupiter:junit-jupiter") testRuntime("org.junit.platform:junit-platform-launcher") } test { // for JUnit 5 useJUnitPlatform() testLogging { events "STARTED", "PASSED", "FAILED", "SKIPPED" } }
1つのプロジェクト内に複数の Kafka Streams アプリを定義するには?
@EnableKafkaStreams
アノテーションを付与すると以下の処理が行われますが、
- org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration(@Configuration アノテーションが付与されている)が import される。
- KafkaStreamsDefaultConfiguration 内で defaultKafkaStreamsBuilder bean が定義される。
- defaultKafkaStreamsBuilder bean が生成される場合、その前に org.springframework.boot.autoconfigure.kafka.KafkaStreamsAnnotationDrivenConfiguration で defaultKafkaStreamsConfig bean が生成される。
- defaultKafkaStreamsBuilder bean に defaultKafkaStreamsConfig bean が引数で渡されて、それを使って bean が生成される。
- application-id には application.properties に定義した spring.kafka.streams.application-id が使用される。
1つのプロジェクトに複数の Kafka Streams アプリを定義する場合、以下の問題があります。
- Kafka Streams アプリは各アプリ毎に一意の application-id を付ける必要があるが、defaultKafkaStreamsConfig bean をそのまま利用するとそれが出来ない。
- StreamsBuilderFactoryBean は Kafka Streams アプリ毎にインスタンスを分けないとうまく動かない。Topic2 からメッセージを受信する2つの Kafka Streams アプリを作ろうとしてもエラーになる。
そこで以下のように実装します。
- @EnableKafkaStreams アノテーションは使用しない。
- KafkaStreamsConfiguration 型の Bean と StreamsBuilderFactoryBean 型の Bean を各 Kafka Streams アプリ毎に定義する。
- application-id は KafkaStreamsConfiguration 型の Bean 内で設定する。
共通で使用する TopicUtils クラス、BaseKafkaStreamsConfig クラスを作成する
Topic の名前、key, value の型を定義する TopicUtils クラスを作成します。 src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に TopicUtils.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.kafkastreams.multistreamsapp; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; public class TopicUtils { public static final String TOPIC1_NAME = "Topic1"; public static final Consumed TOPIC1_CONSUMED = Consumed.with(Serdes.Integer(), Serdes.String()); public static final String TOPIC2_NAME = "Topic2"; public static final Produced TOPIC2_PRODUCED = Produced.with(Serdes.String(), Serdes.Integer()); public static final Consumed TOPIC2_CONSUMED = Consumed.with(Serdes.String(), Serdes.Integer()); public static final String TOPIC3_NAME = "Topic3"; public static final Produced TOPIC3_PRODUCED = Produced.with(Serdes.String(), Serdes.Long()); public static final String TOPIC4_NAME = "Topic4"; public static final Produced TOPIC4_PRODUCED = Produced.with(Serdes.String(), Serdes.Long()); }
各 Kafka Streams アプリのベースとする BaseKafkaStreamsConfig クラスを作成します。 src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に BaseKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.kafkastreams.multistreamsapp; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; public class BaseKafkaStreamsConfig { protected final KafkaProperties properties; public BaseKafkaStreamsConfig(KafkaProperties properties) { this.properties = properties; } }
KafkaProperties クラスは org.springframework.boot.autoconfigure.kafka.KafkaProperties にあり、org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration で @EnableConfigurationProperties(KafkaProperties.class)
が付与されて Bean が生成されています。
LengthAppKafkaStreamsConfig クラスを作成する
src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に LengthAppKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.kafkastreams.multistreamsapp; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; import java.util.Map; @Configuration public class LengthAppKafkaStreamsConfig extends BaseKafkaStreamsConfig { private static final String APPLICATION_ID = "length-streams-app"; private static final String BEAN_NAME_PREFIX = "lengthApp"; private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration"; private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean"; private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp"; public LengthAppKafkaStreamsConfig(KafkaProperties properties) { super(properties); } @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME) public KafkaStreamsConfiguration kafkaStreamsConfiguration() { Map<String, Object> streamsProperties = this.properties.buildStreamsProperties(); streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); return new KafkaStreamsConfiguration(streamsProperties); } @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME) public StreamsBuilderFactoryBean streamsBuilderFactoryBean() { return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration()); } @Bean(KSTREAMS_APP_BEAN_NAME) public Topology kafkaStreamsApp() throws Exception { StreamsBuilder builder = streamsBuilderFactoryBean().getObject(); KStream<Integer, String> stream = builder.stream(TopicUtils.TOPIC1_NAME, TopicUtils.TOPIC1_CONSUMED); stream .map((key, value) -> new KeyValue<>(value, value.length())) .to(TopicUtils.TOPIC2_NAME, TopicUtils.TOPIC2_PRODUCED); return builder.build(); } }
- BaseKafkaStreamsConfig クラスを継承します。
- 定数文字列を5つ定義していますが、APPLICATION_ID, BEAN_NAME_PREFIX の2つに作成する Kafka Streams アプリ固有の文字列を記述します。
- kafkaStreamsApp メソッドに作成する Kafka Streams アプリの処理を記述します。
CountByLengthAppKafkaStreamsConfig クラスを作成する
src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に CountByLengthAppKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.kafkastreams.multistreamsapp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; import java.util.Map; @Configuration public class CountByLengthAppKafkaStreamsConfig extends BaseKafkaStreamsConfig { private static final String APPLICATION_ID = "countbylength-streams-app"; private static final String BEAN_NAME_PREFIX = "countByLengthApp"; private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration"; private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean"; private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp"; public CountByLengthAppKafkaStreamsConfig(KafkaProperties properties) { super(properties); } @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME) public KafkaStreamsConfiguration kafkaStreamsConfiguration() { Map<String, Object> streamsProperties = this.properties.buildStreamsProperties(); streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); return new KafkaStreamsConfiguration(streamsProperties); } @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME) public StreamsBuilderFactoryBean streamsBuilderFactoryBean() { return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration()); } @Bean(KSTREAMS_APP_BEAN_NAME) public Topology kafkaStreamsApp() throws Exception { StreamsBuilder builder = streamsBuilderFactoryBean().getObject(); KStream<String, Integer> stream = builder.stream(TopicUtils.TOPIC2_NAME, TopicUtils.TOPIC2_CONSUMED); KTable<String, Long> count = stream .groupBy((key, value) -> String.valueOf(value)) .count(); count.toStream() .to(TopicUtils.TOPIC3_NAME, TopicUtils.TOPIC3_PRODUCED); return builder.build(); } }
CountByWordsAppKafkaStreamsConfig クラスを作成する
src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に CountByWordsAppKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.kafkastreams.multistreamsapp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; import java.util.Map; @Configuration public class CountByWordsAppKafkaStreamsConfig extends BaseKafkaStreamsConfig { private static final String APPLICATION_ID = "countbywords-streams-app"; private static final String BEAN_NAME_PREFIX = "countByWordsApp"; private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration"; private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean"; private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp"; public CountByWordsAppKafkaStreamsConfig(KafkaProperties properties) { super(properties); } @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME) public KafkaStreamsConfiguration kafkaStreamsConfiguration() { Map<String, Object> streamsProperties = this.properties.buildStreamsProperties(); streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); return new KafkaStreamsConfiguration(streamsProperties); } @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME) public StreamsBuilderFactoryBean streamsBuilderFactoryBean() { return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration()); } @Bean(KSTREAMS_APP_BEAN_NAME) public Topology kafkaStreamsApp() throws Exception { StreamsBuilder builder = streamsBuilderFactoryBean().getObject(); KStream<String, Integer> stream = builder.stream(TopicUtils.TOPIC2_NAME, TopicUtils.TOPIC2_CONSUMED); KTable<String, Long> count = stream .groupBy((key, value) -> key) .count(); count.toStream() .to(TopicUtils.TOPIC4_NAME, TopicUtils.TOPIC4_PRODUCED); return builder.build(); } }
application.properties を記述する
spring.kafka.streams.application-id=kafkastreams-multistreams-app spring.kafka.streams.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092 spring.kafka.streams.properties.commit.interval.ms=5000 spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde spring.kafka.streams.properties.num.stream.threads=9 spring.kafka.streams.replication-factor=3
- KTable を使用している Kafka Streams アプリが topic にメッセージを送信する間隔を 5 秒毎にします。commit.interval.ms に 5000(ミリ秒)を設定します。
- default.key.serde、default.value.serde はどちらも Serdes$StringSerde を設定します。Kafka Streams アプリでは全て
Produced.with(...)
、Consumed.with(...)
で topic の key, value の型を指定しているのですが、default.key.serde に Serdes$IntegerSerde を指定するとjava.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Integer .....
のエラーが出ます。default であって型を指定したら無視されると思っていたのですが、どうもそうではないようです。。。 - 動作確認では各 topic の partition 数を 3つで作成します。Kafka Streams アプリを3つ作成していますので、各 Kafka Streams アプリに3スレッドずつ割り当てられるよう num.stream.threads に 3 x 3 = 9 を設定します。
- KTable を使用する Kafka Streams アプリは Stateful になり、Kafka Cluster 側に状態を保存するための Topic を作成します。その Topic がレプリケーションするよう replication-factor に 3 を設定します。
動作確認
docker-compose down
、docker-compose up -d
でコンテナを起動し直した後、Topic1~4 を作成します。
kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic2 --partitions 3 --replication-factor 3 --if-not-exists
kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic3 --partitions 3 --replication-factor 3 --if-not-exists
kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic4 --partitions 3 --replication-factor 3 --if-not-exists
kafkastreams-multistreams-app プロジェクトを起動した後、
kafka-console-producer コマンドを実行して 5秒以内に a, b, ab, a と入力すると、想定した動作になります。
- Topic2 には key が入力された文字、value が文字数のメッセージが送信されてきます。
- Topic3 には 3文字が 1回、1文字が 3回の 2件のメッセージが送信されてきます。
- Topic4 には a が 2回、abc が 1回、b が 1回の 3件のメッセージが送信されています。
また上の動作確認後に kafkacat コマンドで作成されている topic を見ると、各 Kafka Streams アプリ毎に ~-changelog
、~-repartition
という topic が作成されます。
(.....途中は省略.....)
- countbylength-streams-app-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog
- countbylength-streams-app-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition
- countbywords-streams-app-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog
- countbywords-streams-app-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition
ローカルPC の方にも D:\tmp\kafka-streams\
というディレクトリが作成されて、その下に Kafka Streams アプリに設定した application-id のフォルダが作成されます(設定が state.dir = /tmp/kafka-streams
になっており D ドライブでアプリが動いているので D:\tmp\kafka-streams\
が作成されるようです)。
partition と ローカルPC のディレクトリ内のデータは1セットになっているので、どちらか片方だけ削除してももう片方からデータがコピーされて復活します。最初 D:\tmp\kafka-streams\
の下に状態が保持されていることが分からなくて、コンテナを再起動しても countByLength、countByWords の Kafka Streams アプリで以前カウントした件数が維持されていて、原因が分からなくて結構困りました。。。
Data Reprocessing with the Streams API in Kafka: Resetting a Streams Application の中に Local State Stores
の説明がありました。
履歴
2019/10/14
初版発行。