Spring Boot + Spring Integration でいろいろ試してみる ( その45 )( Docker Compose でサーバを構築する、Kafka 編12 - 簡単な Kafka Streams アプリを作成してみる )
概要
記事一覧はこちらです。
Spring Boot+Spring Integration+Spring for Apache Kafka で簡単な Kafka Streams アプリケーションを作成してみます。
参照したサイト・書籍
4.2. Kafka Streams Support
https://docs.spring.io/spring-kafka/reference/html/#streams-kafka-streamsKSQL and Kafka Streams
https://docs.confluent.io/current/streams-ksql.htmlStreams DSL
https://docs.confluent.io/current/streams/developer-guide/dsl-api.html- KStream、KTable の関係は以下の URL の図が分かりやすい。 https://docs.confluent.io/current/_images/streams-stateful_operations.png
confluentinc/kafka-streams-examples
https://github.com/confluentinc/kafka-streams-examplesAdventures of Using Kafka Streams
https://engineering.linecorp.com/ja/blog/adventures-of-using-kafka-streams/Kafka Streams work allocation
https://medium.com/@andy.bryant/kafka-streams-work-allocation-4f31c24753ccData Reprocessing with the Streams API in Kafka: Resetting a Streams Application
https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
目次
- Gradle の Multi-project を作成する
- 英小文字→英大文字に変換する Kafka Streams アプリを作成してみる
- application.properties に spring.kafka.streams.properties.num.stream.threads を追加して Kafka Streams アプリのスレッド数を増やす
- Kafka Streams 用 Bean の戻り値を Topology 型にする+
Consumed.with
、Consumed.with
で Topic の key, data の型を明示する
手順
Gradle の Multi-project を作成する
ksbysample-eipapp-kafka プロジェクトに Kafka Streams のサンプルまで入れるとごちゃごちゃしそうなのと、Kafka Streams のアプリは複数に分けたかったので、 Gradle の Multi-project で新規プロジェクトを作成することにします。
- D:\project-springboot\ksbysample-boot-integration\ の下に ksbysample-eipapp-kafkastreams ディレクトリを新規作成します。
- D:\project-springboot\ksbysample-boot-integration\ksbysample-eipapp-kafkastreams の下に ksbysample-eipapp-kafka プロジェクトから以下のディレクトリ、ファイルをコピーします。
- gradle ディレクトリ
- gradlew
- gradlew.bat
- コマンドプロンプトから
gradlew wrapper --gradle-version=5.6.2
、gradlew --version
、gradlew wrapper
コマンドを実行して Gradle を 5.4.1 → 5.6.2 へバージョンアップします。 gradlew init
コマンドを実行します。- ksbysample-eipapp-kafka プロジェクトから以下のディレクトリ、ファイルをコピーします。
- docker ディレクトリ
- .gitignore
- docker-compose.yml
- Picocli+Spring Boot でコマンドラインアプリケーションを作成してみる から settings.gradle 用のコードをコピーし、ksbysample-eipapp-kafkastreams プロジェクト内の build.gradle があるサブプロジェクトは include 分を記述しなくても自動的に Multi-project に認識されるようにします。
英小文字→英大文字に変換する Kafka Streams アプリを作成してみる
ksbysample-eipapp-kafkastreams プロジェクトの下に kafkastreams-uppercase-app サブプロジェクトを作成します。 Spring Initializr で以下の内容でプロジェクトを作成してから、
build.gradle を以下の内容に変更します。
buildscript { ext { group "ksbysample.eipapp.ksbysample-eipapp-kafkastreams" version "0.0.1-SNAPSHOT" } repositories { mavenCentral() gradlePluginPortal() } } plugins { id "org.springframework.boot" version "2.1.9.RELEASE" id "io.spring.dependency-management" version "1.0.8.RELEASE" id "java" id "idea" } sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } repositories { mavenCentral() } dependencyManagement { imports { mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES) mavenBom("org.junit:junit-bom:5.5.2") } } dependencies { def kafkaVersion = "2.3.0" implementation("org.springframework.boot:spring-boot-starter-integration") implementation("org.springframework.integration:spring-integration-kafka:3.2.0.RELEASE") implementation("org.apache.kafka:kafka-clients:${kafkaVersion}") implementation("org.apache.kafka:kafka-streams:${kafkaVersion}") testImplementation("org.springframework.boot:spring-boot-starter-test") // for JUnit 5 testCompile("org.junit.jupiter:junit-jupiter") testRuntime("org.junit.platform:junit-platform-launcher") } test { // for JUnit 5 useJUnitPlatform() testLogging { events "STARTED", "PASSED", "FAILED", "SKIPPED" } }
- Kafka Streams アプリケーションを作成するので dependencies block に以下の3行を追加します。
def kafkaVersion = "2.3.0"
implementation("org.apache.kafka:kafka-clients:${kafkaVersion}")
implementation("org.apache.kafka:kafka-streams:${kafkaVersion}")
変更後、IDEA の Gradle Tool Window の「Reimport All Gradle Projects」ボタンをクリックします。
src/main/java/ksbysample/eipapp/kafkastreams/uppercaseapp の下に KafkaStreamsConfig.java を新規作成した後、以下の内容を記述します。
package ksbysample.eipapp.kafkastreams.uppercaseapp; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; @Configuration @EnableKafkaStreams public class KafkaStreamsConfig { private final StreamsBuilder builder; public KafkaStreamsConfig(StreamsBuilder builder) { this.builder = builder; } /** * Topic1 から取得したメッセージ内の英小文字を英大文字に変換して Topic2 に送信する * Kafka Streams アプリケーション */ @Bean public KStream<Integer, String> kStream() { KStream<Integer, String> stream = builder.stream("Topic1"); stream .mapValues(s -> s.toUpperCase()) .to("Topic2"); return stream; } }
@EnableKafkaStreams
を記述します。記述すると org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration により StreamsBuilderFactoryBean型の defaultKafkaStreamsBuilder bean が生成されます。- StreamsBuilder 型のインスタンスは
private final StreamsBuilder builder;
を記述してコンストラクタインジェクションを記述しておくと、defaultKafkaStreamsBuilder bean により org.springframework.kafka.config.StreamsBuilderFactoryBean#createInstance メソッドで生成されます。 KStream<?, ?>
型を返す Bean を定義します。<?, ?>
の1番目は input となる topic の key の型、2番目は data の型です。Bean 内で Streams DSL を使って Kafka Streams の処理を記述します。
src/main/resources/application.properties に以下の内容を記述します。
spring.kafka.streams.application-id=kafkastreams-simple-app spring.kafka.streams.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092 spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$IntegerSerde spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
- application-id は https://docs.confluent.io/current/streams/developer-guide/config-streams.html#application-id 参照。今回はアプリケーション名のみでバージョン番号は記載しません。input の topic に対して consumer として動作する時に spring.kafka.consumer.group-id と同じ役割を担います。
動作を確認します。docker-compose down
、docker-compose up -d
で Kafka クラスタを起動した後、Topic1、Topic2 を作成します。
kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic2 --partitions 3 --replication-factor 3 --if-not-exists
kafkastreams-uppercase-app アプリを起動した後、
kafka-console-producer、kafka-console-consumer コマンドで producer、consumer を起動してから producer に文字列を入力すると、consumer に大文字に変換された文字列が表示されました。
kafka-console-producer --broker-list localhost:19092 --topic Topic1
kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic2
kafka-consumer-groups --bootstrap-server cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092 --describe --group kafkastreams-simple-app --timeout 30000
コマンドを実行すると、GROUP には spring.kafka.streams.application-id に設定した kafkastreams-simple-app が表示されています。また CLIENT-ID は全て kafkastreams-simple-app-d450dff6-4e81-4e75-8ec1-9add076640c6-StreamThread-1-consumer
なのでクライアントは1つだけでした。
ここまでの実装で build タスクを実行して kafkastreams-uppercase-app-0.0.1-SNAPSHOT.jar を生成した後、java -jar kafkastreams-uppercase-app-0.0.1-SNAPSHOT.jar
を実行すれば、Kafka Streams アプリケーションとして実行されたままになります(コマンドラインで Ctrl+C を押すまで終了しません)。
application.properties に spring.kafka.streams.properties.num.stream.threads を追加して Kafka Streams アプリのスレッド数を増やす
application.properties に spring.kafka.streams.properties.num.stream.threads=3
を記述すると Kafka Streams アプリのスレッド数を増やすことができます。kafka-consumer-groups コマンドを実行すると CLIENT-ID には ...StreamThread-1-consumer
、...StreamThread-2-consumer
、...StreamThread-3-consumer
と異なるものが表示されます。
Kafka Streams 用 Bean の戻り値を Topology 型にする+Consumed.with
、Consumed.with
で Topic の key, data の型を明示する
ksbysample.eipapp.kafkastreams.uppercaseapp.KafkaStreamsConfig#kStream を変更して Topic1 から入力された文字列の文字数をカウントして Topic2 に渡すようにしてみます。
@Bean public Topology kStream() { KStream<Integer, String> stream = builder.stream("Topic1", Consumed.with(Serdes.Integer(), Serdes.String())); stream .mapValues(s -> s.length()) .to("Topic2", Produced.with(Serdes.Integer(), Serdes.Integer())); return builder.build(); }
- 戻り値は
KStream<?, ?>
型ではなくTopology
型で書くことができます。Topology
型の場合、return builder.build();
にします。 - input の topic の key, data の型は
Consumed.with(...)
で、output の topic の key, data の型はProduced.with(...)
で記述することができます。
動作確認します。アプリを起動し直してから kafka-console-consumer コマンドを以下のコマンドで起動した後、
kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic2 --value-deserializer org.apache.kafka.common.serialization.IntegerDeserializer
producer から文字列を入力すると consumer 側に文字数が表示されます。
履歴
2019/10/09
初版発行。