かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その45 )( Docker Compose でサーバを構築する、Kafka 編12 - 簡単な Kafka Streams アプリを作成してみる )

概要

記事一覧はこちらです。

Spring Boot+Spring Integration+Spring for Apache Kafka で簡単な Kafka Streams アプリケーションを作成してみます。

参照したサイト・書籍

  1. 4.2. Kafka Streams Support
    https://docs.spring.io/spring-kafka/reference/html/#streams-kafka-streams

  2. KSQL and Kafka Streams
    https://docs.confluent.io/current/streams-ksql.html

  3. Streams DSL
    https://docs.confluent.io/current/streams/developer-guide/dsl-api.html

  4. confluentinc/kafka-streams-examples
    https://github.com/confluentinc/kafka-streams-examples

  5. Adventures of Using Kafka Streams
    https://engineering.linecorp.com/ja/blog/adventures-of-using-kafka-streams/

  6. Kafka Streams work allocation
    https://medium.com/@andy.bryant/kafka-streams-work-allocation-4f31c24753cc

  7. Data Reprocessing with the Streams API in Kafka: Resetting a Streams Application
    https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

目次

  1. Gradle の Multi-project を作成する
  2. 英小文字→英大文字に変換する Kafka Streams アプリを作成してみる
  3. application.properties に spring.kafka.streams.properties.num.stream.threads を追加して Kafka Streams アプリのスレッド数を増やす
  4. Kafka Streams 用 Bean の戻り値を Topology 型にする+Consumed.withConsumed.with で Topic の key, data の型を明示する

手順

Gradle の Multi-project を作成する

ksbysample-eipapp-kafka プロジェクトに Kafka Streams のサンプルまで入れるとごちゃごちゃしそうなのと、Kafka Streams のアプリは複数に分けたかったので、 Gradle の Multi-project で新規プロジェクトを作成することにします。

  • D:\project-springboot\ksbysample-boot-integration\ の下に ksbysample-eipapp-kafkastreams ディレクトリを新規作成します。
  • D:\project-springboot\ksbysample-boot-integration\ksbysample-eipapp-kafkastreams の下に ksbysample-eipapp-kafka プロジェクトから以下のディレクトリ、ファイルをコピーします。
  • コマンドプロンプトから gradlew wrapper --gradle-version=5.6.2gradlew --versiongradlew wrapper コマンドを実行して Gradle を 5.4.1 → 5.6.2 へバージョンアップします。
  • gradlew init コマンドを実行します。
  • ksbysample-eipapp-kafka プロジェクトから以下のディレクトリ、ファイルをコピーします。
  • Picocli+Spring Boot でコマンドラインアプリケーションを作成してみる から settings.gradle 用のコードをコピーし、ksbysample-eipapp-kafkastreams プロジェクト内の build.gradle があるサブプロジェクトは include 分を記述しなくても自動的に Multi-project に認識されるようにします。

英小文字→英大文字に変換する Kafka Streams アプリを作成してみる

ksbysample-eipapp-kafkastreams プロジェクトの下に kafkastreams-uppercase-app サブプロジェクトを作成します。 Spring Initializr で以下の内容でプロジェクトを作成してから、

f:id:ksby:20191009082806p:plain

build.gradle を以下の内容に変更します。

buildscript {
    ext {
        group "ksbysample.eipapp.ksbysample-eipapp-kafkastreams"
        version "0.0.1-SNAPSHOT"
    }
    repositories {
        mavenCentral()
        gradlePluginPortal()
    }
}

plugins {
    id "org.springframework.boot" version "2.1.9.RELEASE"
    id "io.spring.dependency-management" version "1.0.8.RELEASE"
    id "java"
    id "idea"
}

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
}

dependencyManagement {
    imports {
        mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES)
        mavenBom("org.junit:junit-bom:5.5.2")
    }
}

dependencies {
    def kafkaVersion = "2.3.0"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.integration:spring-integration-kafka:3.2.0.RELEASE")
    implementation("org.apache.kafka:kafka-clients:${kafkaVersion}")
    implementation("org.apache.kafka:kafka-streams:${kafkaVersion}")
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    // for JUnit 5
    testCompile("org.junit.jupiter:junit-jupiter")
    testRuntime("org.junit.platform:junit-platform-launcher")
}

test {
    // for JUnit 5
    useJUnitPlatform()
    testLogging {
        events "STARTED", "PASSED", "FAILED", "SKIPPED"
    }
}
  • Kafka Streams アプリケーションを作成するので dependencies block に以下の3行を追加します。
    • def kafkaVersion = "2.3.0"
    • implementation("org.apache.kafka:kafka-clients:${kafkaVersion}")
    • implementation("org.apache.kafka:kafka-streams:${kafkaVersion}")

変更後、IDEA の Gradle Tool Window の「Reimport All Gradle Projects」ボタンをクリックします。

src/main/java/ksbysample/eipapp/kafkastreams/uppercaseapp の下に KafkaStreamsConfig.java を新規作成した後、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.uppercaseapp;

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    private final StreamsBuilder builder;

    public KafkaStreamsConfig(StreamsBuilder builder) {
        this.builder = builder;
    }

    /**
     * Topic1 から取得したメッセージ内の英小文字を英大文字に変換して Topic2 に送信する
     * Kafka Streams アプリケーション
     */
    @Bean
    public KStream<Integer, String> kStream() {
        KStream<Integer, String> stream = builder.stream("Topic1");
        stream
                .mapValues(s -> s.toUpperCase())
                .to("Topic2");
        return stream;
    }

}
  • @EnableKafkaStreams を記述します。記述すると org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration により StreamsBuilderFactoryBean型の defaultKafkaStreamsBuilder bean が生成されます。
  • StreamsBuilder 型のインスタンスprivate final StreamsBuilder builder; を記述してコンストラクタインジェクションを記述しておくと、defaultKafkaStreamsBuilder bean により org.springframework.kafka.config.StreamsBuilderFactoryBean#createInstance メソッドで生成されます。
  • KStream<?, ?> 型を返す Bean を定義します。<?, ?> の1番目は input となる topic の key の型、2番目は data の型です。Bean 内で Streams DSL を使って Kafka Streams の処理を記述します。

src/main/resources/application.properties に以下の内容を記述します。

spring.kafka.streams.application-id=kafkastreams-simple-app
spring.kafka.streams.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$IntegerSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde

動作を確認します。docker-compose downdocker-compose up -d で Kafka クラスタを起動した後、Topic1、Topic2 を作成します。

  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic2 --partitions 3 --replication-factor 3 --if-not-exists

f:id:ksby:20191009212820p:plain

kafkastreams-uppercase-app アプリを起動した後、

f:id:ksby:20191009213337p:plain

kafka-console-producer、kafka-console-consumer コマンドで producer、consumer を起動してから producer に文字列を入力すると、consumer に大文字に変換された文字列が表示されました。

  • kafka-console-producer --broker-list localhost:19092 --topic Topic1
  • kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic2

f:id:ksby:20191009213605p:plain

kafka-consumer-groups --bootstrap-server cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092 --describe --group kafkastreams-simple-app --timeout 30000 コマンドを実行すると、GROUP には spring.kafka.streams.application-id に設定した kafkastreams-simple-app が表示されています。また CLIENT-ID は全て kafkastreams-simple-app-d450dff6-4e81-4e75-8ec1-9add076640c6-StreamThread-1-consumer なのでクライアントは1つだけでした。

f:id:ksby:20191009214132p:plain

ここまでの実装で build タスクを実行して kafkastreams-uppercase-app-0.0.1-SNAPSHOT.jar を生成した後、java -jar kafkastreams-uppercase-app-0.0.1-SNAPSHOT.jar を実行すれば、Kafka Streams アプリケーションとして実行されたままになります(コマンドラインで Ctrl+C を押すまで終了しません)。

application.properties に spring.kafka.streams.properties.num.stream.threads を追加して Kafka Streams アプリのスレッド数を増やす

application.properties に spring.kafka.streams.properties.num.stream.threads=3 を記述すると Kafka Streams アプリのスレッド数を増やすことができます。kafka-consumer-groups コマンドを実行すると CLIENT-ID には ...StreamThread-1-consumer...StreamThread-2-consumer...StreamThread-3-consumer と異なるものが表示されます。

f:id:ksby:20191009215151p:plain

Kafka Streams 用 Bean の戻り値を Topology 型にする+Consumed.withConsumed.with で Topic の key, data の型を明示する

ksbysample.eipapp.kafkastreams.uppercaseapp.KafkaStreamsConfig#kStream を変更して Topic1 から入力された文字列の文字数をカウントして Topic2 に渡すようにしてみます。

    @Bean
    public Topology kStream() {
        KStream<Integer, String> stream =
                builder.stream("Topic1", Consumed.with(Serdes.Integer(), Serdes.String()));
        stream
                .mapValues(s -> s.length())
                .to("Topic2", Produced.with(Serdes.Integer(), Serdes.Integer()));
        return builder.build();
    }
  • 戻り値は KStream<?, ?> 型ではなく Topology 型で書くことができます。Topology 型の場合、return builder.build(); にします。
  • input の topic の key, data の型は Consumed.with(...) で、output の topic の key, data の型は Produced.with(...) で記述することができます。

動作確認します。アプリを起動し直してから kafka-console-consumer コマンドを以下のコマンドで起動した後、

  • kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic2 --value-deserializer org.apache.kafka.common.serialization.IntegerDeserializer

producer から文字列を入力すると consumer 側に文字数が表示されます。

f:id:ksby:20191009222312p:plain

履歴

2019/10/09
初版発行。