かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

IntelliJ IDEA を 2019.2.4 → 2019.3.1 へ、Git for Windows を 2.24.0(2) → 2.24.1.2 へバージョンアップ

IntelliJ IDEA を 2019.2.4 → 2019.3.1 へバージョンアップする

IntelliJ IDEA の 2019.3.1 がリリースされているのでバージョンアップします。

※ksbysample-webapp-lending プロジェクトを開いた状態でバージョンアップしています。

  1. IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。

  2. IDE and Plugin Updates」ダイアログが表示されます。左下に「Update and Restart」ボタンが表示されていますので、「Update and Restart」ボタンをクリックします。

    f:id:ksby:20191220215424p:plain

  3. Plugin の update も表示されました。このまま「Update and Restart」ボタンをクリックします。

    f:id:ksby:20191220215513p:plain

  4. Patch がダウンロードされて IntelliJ IDEA が再起動します。

  5. メジャーバージョンアップなので起動時に「Import IntelliJ IDEA Settings From...」ダイアログが表示されます。「Previous version」を選択した状態で「OK」ボタンをクリックします。

    f:id:ksby:20191220220446p:plain

  6. IntelliJ IDEA が起動する時に「Plugin 'Spock Framework Enhancements' failed to initialize and will be disabled. Please restart IntelliJ IDEA.」のダイアログが表示されました。「OK」ボタンをクリックします。

    f:id:ksby:20191220220703p:plain

  7. IntelliJ IDEA は終了して自動では起動しないので、idea64.exe を実行して起動します。起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

    f:id:ksby:20191220221251p:plain

  8. IntelliJ IDEA のメインメニューから「Help」-「About」を選択し、2019.3.1 へバージョンアップされていることを確認します。

  9. Gradle Tool Window に Nothing to show のメッセージだけが表示されている状態でした。左上にある「Reimport All Gradle Projects」ボタンをクリックして更新します。import 完了までかなり時間がかかります。

  10. 更新可能な Plugin があるというダイアログが画面右下に表示されたので、再度 IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。

  11. IDE and Plugin Updates」ダイアログが表示されますので「Update」ボタンをクリックします。

    f:id:ksby:20191220222353p:plain

  12. Plugin がインストールされます。画面右下に Restart のダイアログが表示されますので、リンクをクリックして IntelliJ IDEA を再起動します。

  13. IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

  14. メインメニューから「File」-「Settings...」を選択して「Settings」ダイアログを開き、「Spock Framework Enhancements」を Uninstall します。

    f:id:ksby:20191220223139p:plain

  15. clean タスク実行 → Rebuild Project 実行 → build タスクを実行して、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。。。と思いましたが、Rebuild Project 実行を行うと src/main の下に generated ディレクトリが作成されるようになりました。しかもこのフォルダは clean タスクを実行しても消えません。

    f:id:ksby:20191220230646p:plain

    以下の点を変更します。

    • .gitignore に以下の2行を追加します。
      • src/main/generated
      • src/test/generated_tests
    • build.gradle に clean { doLast { ... } } の記述を追加し、clean タスク実行時に src/main/generated、src/test/generated_tests を削除するようにします。
      • rootProject.file("src/main/generated").deleteDir()
      • rootProject.file("src/test/generated_tests").deleteDir()
  16. clean タスク実行 → Rebuild Project 実行 → build タスクを実行して、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。

    f:id:ksby:20191220224449p:plain

  17. Project Tool Window で src/test/groovy/ksbysample、src/test/java/ksbysample でコンテキストメニューを表示して「Run 'Tests in 'ksbysample'' with Coverage」を選択し、テストが全て成功することを確認します。

    f:id:ksby:20191220225510p:plain f:id:ksby:20191220225937p:plain

  18. 最後に C:\Users\root.IntelliJIdea2019.2 を削除します。

Git for Windows を 2.24.0(2) → 2.24.1.2 へバージョンアップする

Git for Windows の 2.24.1.2 がリリースされていたのでバージョンアップします。今回は以下の記事が出ていて、バージョンアップ必須のようです。

「Git」に複数の脆弱性Windowsユーザーはとくに注意
https://forest.watch.impress.co.jp/docs/news/1223826.html

  1. https://gitforwindows.org/ の「Download」ボタンをクリックして Git-2.24.1.2-64-bit.exe をダウンロードします。

  2. Git-2.24.1.2-64-bit.exe を実行します。

  3. 「Git 2.24.1.2 Setup」ダイアログが表示されます。[Next >]ボタンをクリックします。

  4. 「Select Components」画面が表示されます。「Git LFS(Large File Support)」だけチェックした状態で [Next >]ボタンをクリックします。

  5. 「Choosing the default editor used by Git」画面が表示されます。「Use Vim (the ubiquitous text editor) as Git's default editor」が選択された状態で [Next >]ボタンをクリックします。

  6. 「Adjusting your PATH environment」画面が表示されます。中央の「Git from the command line and also from 3rd-party software」が選択されていることを確認後、[Next >]ボタンをクリックします。

  7. 「Choosing HTTPS transport backend」画面が表示されます。「Use the OpenSSL library」が選択されていることを確認後、[Next >]ボタンをクリックします。

  8. 「Configuring the line ending conversions」画面が表示されます。一番上の「Checkout Windows-style, commit Unix-style line endings」が選択されていることを確認した後、[Next >]ボタンをクリックします。

  9. 「Configuring the terminal emulator to use with Git Bash」画面が表示されます。「Use Windows'default console window」が選択されていることを確認した後、[Next >]ボタンをクリックします。

  10. 「Configuring extra options」画面が表示されます。「Enable file system caching」だけがチェックされていることを確認した後、[Next >]ボタンをクリックします。

  11. 「Configuring experimental options」画面が表示されます。何もチェックせずに [Install]ボタンをクリックします。

  12. インストールが完了すると「Completing the Git Setup Wizard」のメッセージが表示された画面が表示されます。中央の「View Release Notes」のチェックを外した後、「Finish」ボタンをクリックしてインストーラーを終了します。

  13. コマンドプロンプトを起動して git --version を実行し、git のバージョンが git version 2.24.1.windows.2 になっていることを確認します。

    f:id:ksby:20191221005440p:plain

  14. git-cmd.exe を起動して日本語の表示・入力が問題ないかを確認します。

    f:id:ksby:20191221005609p:plain

  15. 特に問題はないようですので、2.24.1.2 で作業を進めたいと思います。

AdoptOpenJDK を 11.0.4+11.2 → 11.0.5+10 へ、IntelliJ IDEA を 2019.2.3 → 2019.2.4 へ、Git for Windows を 2.23.0 → 2.24.0(2) へバージョンアップ

AdoptOpenJDK を 11.0.4+11.2 → 11.0.5+10 へバージョンアップする

※ksbysample-webapp-lending プロジェクトを開いた状態でバージョンアップしています。

  1. https://adoptopenjdk.net/?variant=openjdk11&jvmVariant=hotspot を見ると 11.0.5+10 がダウンロードできるようになっていましたので、11.0.5+10 へバージョンアップします。

    f:id:ksby:20191109173040p:plain

  2. OpenJDK11U-jdk_x64_windows_hotspot_11.0.5_10.msi をダウンロードして D:\Java\jdk-11.0.5+10 へインストールした後、環境変数 JAVA_HOME のパスを D:\Java\jdk-11.0.5+10 へ変更します。

    コマンドプロンプトから java -version を実行し、11.0.5 に変更されていることを確認します。

    f:id:ksby:20191109173957p:plain

  3. IntelliJ IDEA を再起動した後、プロジェクトで使用する JDK を 11.0.5+10 へ変更します。

  4. 開いているプロジェクトを閉じて「Welcome to IntelliJ IDEA」ダイアログを表示します。

  5. ダイアログ下部の「Configure」-「Structure for New Projects」を選択します。

    f:id:ksby:20191109174457p:plain

  6. 「Project Structure for New Projects」ダイアログが表示されます。画面左側で「Project Settings」-「Project」を選択後、画面右側の「Project SDK」の「New...」ボタンをクリックし、表示されるメニューから「JDK」を選択します。

    f:id:ksby:20191109174807p:plain

  7. 「Select Home Directory for JDK」ダイアログが表示されます。D:\Java\jdk-11.0.5+10 を選択した後、「OK」ボタンをクリックします。

    f:id:ksby:20191109175046p:plain

  8. 「Default Project Structure」ダイアログに戻るので、今度は「Project SDK」の「Edit」ボタンをクリックします。

    f:id:ksby:20191109175228p:plain

  9. 画面左側で「Platform Settings」-「SDKs」が選択された状態になるので、画面右上の入力フィールドで "11" → "11.0.5+10" へ変更します。

    f:id:ksby:20191109175513p:plain

  10. 次に中央のリストから「11.0.4+11」を選択した後、リストの上の「-」ボタンをクリックして削除します。

    f:id:ksby:20191109175704p:plain

  11. 「OK」ボタンをクリックして「Project Structure for New Projects」ダイアログを閉じます。

  12. 「Welcome to IntelliJ IDEA」ダイアログに戻ったら、ksbysample-webapp-lending プロジェクトを開きます。

  13. IntelliJ IDEA のメイン画面が開いたら、メニューから「File」-「Project Structure...」を選択します。

  14. 「Project Structure」ダイアログが表示されます。以下の画像の状態になっているので、

    f:id:ksby:20191109175943p:plain

    「Project SDK」を選択し直します。「Project SDK」を「11.0.5+10」に変更すると「Project language level」も自動で「SDK default (11 - Local variable syntax for lambda param」が選択されました。

    f:id:ksby:20191109180141p:plain

  15. 「OK」ボタンをクリックして「Project Structure」ダイアログを閉じます。

  16. メイン画面に戻ると画面右下に「Indexing...」の表示が出るので、終了するまで待ちます。

  17. Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。

  18. clean タスク実行 → Rebuild Project 実行 → build タスクを実行して、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。

    f:id:ksby:20191109181509p:plain

  19. Project Tool Window で src/test/groovy/ksbysample、src/test/java/ksbysample でコンテキストメニューを表示して「Run 'Tests in 'ksbysample'' with Coverage」を選択し、テストが全て成功することを確認します。

    f:id:ksby:20191109182124p:plain f:id:ksby:20191109182559p:plain

  20. 特に問題は発生しませんでした。11.0.5+10 で開発を進めます。

IntelliJ IDEA を 2019.2.3 → 2019.2.4 へバージョンアップする

IntelliJ IDEA の 2019.2.4 がリリースされているのでバージョンアップします。

※ksbysample-webapp-lending プロジェクトを開いた状態でバージョンアップしています。

  1. IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。

  2. IDE and Plugin Updates」ダイアログが表示されます。左下に「Update and Restart」ボタンが表示されていますので、「Update and Restart」ボタンをクリックします。

    f:id:ksby:20191109183424p:plain

  3. Plugin の update も表示されました。このまま「Update and Restart」ボタンをクリックします。

    f:id:ksby:20191109183525p:plain

  4. Patch がダウンロードされて IntelliJ IDEA が再起動します。

  5. IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

    f:id:ksby:20191109183932p:plain

  6. IntelliJ IDEA のメインメニューから「Help」-「About」を選択し、2019.2.4 へバージョンアップされていることを確認します。

  7. Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。

  8. 更新可能な Plugin があるというダイアログが画面右下に表示されたので、再度 IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。

  9. IDE and Plugin Updates」ダイアログが表示されますので「Update」ボタンをクリックします。

    f:id:ksby:20191109184148p:plain

  10. Plugin がインストールされます。画面右下に Restart のダイアログが表示されますので、リンクをクリックして IntelliJ IDEA を再起動します。

  11. IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

  12. clean タスク実行 → Rebuild Project 実行 → build タスクを実行して、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。

    f:id:ksby:20191109185528p:plain

  13. Project Tool Window で src/test/groovy/ksbysample、src/test/java/ksbysample でコンテキストメニューを表示して「Run 'Tests in 'ksbysample'' with Coverage」を選択し、テストが全て成功することを確認します。

    f:id:ksby:20191109185922p:plain f:id:ksby:20191109192738p:plain

Git for Windows を 2.23.0 → 2.24.0(2) へバージョンアップする

Git for Windows の 2.24.0(2) がリリースされていたのでバージョンアップします。

  1. https://gitforwindows.org/ の「Download」ボタンをクリックして Git-2.24.0.2-64-bit.exe をダウンロードします。

  2. Git-2.24.0.2-64-bit.exe を実行します。

  3. 「Git 2.24.0.2 Setup」ダイアログが表示されます。[Next >]ボタンをクリックします。

  4. 「Select Components」画面が表示されます。「Git LFS(Large File Support)」だけチェックした状態で [Next >]ボタンをクリックします。

  5. 「Choosing the default editor used by Git」画面が表示されます。「Use Vim (the ubiquitous text editor) as Git's default editor」が選択された状態で [Next >]ボタンをクリックします。

  6. 「Adjusting your PATH environment」画面が表示されます。中央の「Git from the command line and also from 3rd-party software」が選択されていることを確認後、[Next >]ボタンをクリックします。

  7. 「Choosing HTTPS transport backend」画面が表示されます。「Use the OpenSSL library」が選択されていることを確認後、[Next >]ボタンをクリックします。

  8. 「Configuring the line ending conversions」画面が表示されます。一番上の「Checkout Windows-style, commit Unix-style line endings」が選択されていることを確認した後、[Next >]ボタンをクリックします。

  9. 「Configuring the terminal emulator to use with Git Bash」画面が表示されます。「Use Windows'default console window」が選択されていることを確認した後、[Next >]ボタンをクリックします。

  10. 「Configuring extra options」画面が表示されます。「Enable file system caching」だけがチェックされていることを確認した後、[Next >]ボタンをクリックします。

  11. 「Configuring experimental options」画面が表示されます。何もチェックせずに [Install]ボタンをクリックします。

  12. インストールが完了すると「Completing the Git Setup Wizard」のメッセージが表示された画面が表示されます。中央の「View Release Notes」のチェックを外した後、「Finish」ボタンをクリックしてインストーラーを終了します。

  13. コマンドプロンプトを起動して git --version を実行し、git のバージョンが git version 2.24.0.windows.2 になっていることを確認します。

    f:id:ksby:20191109193614p:plain

  14. git-cmd.exe を起動して日本語の表示・入力が問題ないかを確認します。

    f:id:ksby:20191109193947p:plain

  15. 特に問題はないようですので、2.24.0(2) で作業を進めたいと思います。

Fork で global に設定した username 以外で push する

記事一覧はこちらです。

最近 Git の GUI Client に Fork を使用していますが、global に設定した username とは別の username で push したいことが出来たので、その方法を調べてみました。

clone したレポジトリのディレクトリへ移動した後、以下のコマンドを入力すると別のアカウントで push できるようになります。

git config --local user.name "アカウント"
git config --local user.email "メールアドレス"
git config --local credential.namespace git.username

※上記の最後のコマンドの git.usernameusername の部分は git config --local user.name コマンドで指定したアカウント名に置換すること。

一番最初だけ username と password を入力するダイアログが表示されます(GitHub で Two-factor authentication を設定している場合には「Two-factor authentication」のダイアログも表示されます)。

f:id:ksby:20191104230826p:plainf:id:ksby:20191104230925p:plain

入力された認証情報は Windows の資格情報マネージャーに登録されます。資格情報マネージャーに登録されたこの情報を削除すれば push 時に再びダイアログが表示されるようになります。

f:id:ksby:20191104231544p:plain

Two-factor authentication を設定しているのにこの後は username, password も Two-factor authentication の数字6桁も一切聞かれなくなるんですよね。どういう仕組みなのでしょうか。。。?

また毎回 username, password を入力したい場合には、以下のコマンドを実行すれば push の都度ダイアログが表示されるようになります。

git config --local credential.helper ""

今回の記事では以下のサイトを参考にしました。

Spring Boot + Spring Integration でいろいろ試してみる ( その48 )( Docker Compose でサーバを構築する、Kafka 編15 - Kafka Streams で Apache Avro を使用する )

概要

記事一覧はこちらです。

KStream、KTable で Avro で生成したクラスを使用したサンプルを作成してみます。

参照したサイト・書籍

  1. Data Types and Serialization - Available SerDes - Avro
    https://docs.confluent.io/current/streams/developer-guide/datatypes.html#avro

  2. Kafka Stream aggregation with custom object data type
    https://stackoverflow.com/questions/53400832/kafka-stream-aggregation-with-custom-object-data-type

  3. Creating a SerDes for Windowed Data on Kafka Streams
    https://stackoverflow.com/questions/56135796/creating-a-serdes-for-windowed-data-on-kafka-streams

目次

  1. kafkastreams-avro-app サブプロジェクトを作成する
  2. InputData.avsc、Counter.avsc を作成する
  3. SerdeHelper クラスを作成する
  4. inputdata-streams-app を作成する
  5. counter-streams-app を作成する
  6. winprint-streams-app を作成する
  7. 動作確認

手順

kafkastreams-avro-app サブプロジェクトを作成する

Spring Initializr でプロジェクトを作成した後、

f:id:ksby:20191024001503p:plain

build.gradle を以下の内容に変更します。

buildscript {
    ext {
        group "ksbysample.eipapp.ksbysample-eipapp-kafkastreams"
        version "0.0.1-SNAPSHOT"
    }
    repositories {
        mavenCentral()
        gradlePluginPortal()
    }
    dependencies {
        classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.17.0"
    }
}

plugins {
    id "org.springframework.boot" version "2.1.9.RELEASE"
    id "io.spring.dependency-management" version "1.0.8.RELEASE"
    id "java"
    id "idea"
}

// com.commercehub.gradle.plugin.avro を追加すると src/main/avro, src/test/avro が作成される
// はずだが Gradle Multi-project にしていると作成されなかったので、手動で作成すること
apply plugin: "com.commercehub.gradle.plugin.avro"

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
    maven {
        url "https://packages.confluent.io/maven/"
    }
}

dependencyManagement {
    imports {
        mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES)
        mavenBom("org.junit:junit-bom:5.5.2")
    }
}

dependencies {
    def kafkaVersion = "2.3.0"
    def confluentKafkaVersion = "5.3.0"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.integration:spring-integration-kafka:3.2.0.RELEASE")
    implementation("org.apache.kafka:kafka-clients:${kafkaVersion}")
    implementation("org.apache.kafka:kafka-streams:${kafkaVersion}")
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    // for JUnit 5
    testCompile("org.junit.jupiter:junit-jupiter")
    testRuntime("org.junit.platform:junit-platform-launcher")

    // for Apache Avro
    implementation("org.apache.avro:avro:1.9.0")
    implementation("io.confluent:kafka-streams-avro-serde:${confluentKafkaVersion}")
}

avro {
    fieldVisibility = "PRIVATE"
}

test {
    // for JUnit 5
    useJUnitPlatform()
    testLogging {
        events "STARTED", "PASSED", "FAILED", "SKIPPED"
    }
}

今回も1つのプロジェクト内に複数の Kafka Streams アプリを作成するので、kafkastreams-multistreams-app サブプロジェクトから src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp/BaseKafkaStreamsConfig.java を src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下にコピーします。

src/main/resources/application.prperties に以下の内容を記述します。

spring.kafka.streams.application-id=kafkastreams-avro-app
spring.kafka.streams.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092
spring.kafka.streams.properties.commit.interval.ms=5000
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.num.stream.threads=9
spring.kafka.streams.replication-factor=3

spring.kafka.properties.schema.registry.url=http://localhost:8081
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
  • Avro を使用するので以下の2行を追加します。
    • spring.kafka.properties.schema.registry.url=http://localhost:8081
    • spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
      • この設定を追加している理由は後述します。

最後に Gradle Multi-project にしているためか build.gradle を変更して更新しただけでは src/main, src/test の下に avro ディレクトリが作成されなかったので、手動で avro ディレクトリを作成します(ディレクトリの青と緑の色は作成したら自動で付きました)。

f:id:ksby:20191024004314p:plainf:id:ksby:20191024004612p:plain

InputData.avsc、Counter.avsc を作成する

src/main/avro の下に入力された文字列とその長さを格納するための InputData と

{
    "namespace": "ksbysample.eipapp.kafkastreams.avroapp.avro",
    "type": "record",
    "name": "InputData",
    "doc": "???",
    "fields" : [
        { "name": "str", "type": "string" },
        { "name": "length", "type": "int" }
    ]
}

文字列毎の件数を格納するための Counter の2つを定義します。

{
    "namespace": "ksbysample.eipapp.kafkastreams.avroapp.avro",
    "type": "record",
    "name": "Counter",
    "doc": "???",
    "fields" : [
        { "name": "count", "type": "long" }
    ]
}

ファイルを作成したら generateAvroJava タスクを実行して java ファイルを生成します。

f:id:ksby:20191027121239p:plain f:id:ksby:20191027121411p:plain f:id:ksby:20191027121537p:plain

今回は InputData, Counter を使用して以下の図の inputdata-streams-app、counter-streams-app、winprint-streams-app を実装します。

f:id:ksby:20191029011115p:plain

SerdeHelper クラスを作成する

Avro で生成した InputData、Counter クラス用の Serde、及び Topic3 の key で使用する Windowed<InputData> 用の Serde を提供する helper クラスを作成します。

src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下に SerdeHelper.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.avroapp;

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import ksbysample.eipapp.kafkastreams.avroapp.avro.Counter;
import ksbysample.eipapp.kafkastreams.avroapp.avro.InputData;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

@Configuration
public class SerdeHelper {

    public final Serde<InputData> inputDataKeyAvroSerde;

    public final Serde<InputData> inputDataValueAvroSerde;

    public final Serde<Counter> counterValueAvroSerde;

    public final Serde<Windowed<InputData>> windowedInputDataKeySerde;

    public SerdeHelper(KafkaProperties properties) {
        Map<String, String> serdeConfig = properties.getProperties();

        inputDataKeyAvroSerde = new SpecificAvroSerde<>();
        inputDataKeyAvroSerde.configure(serdeConfig, true);

        inputDataValueAvroSerde = new SpecificAvroSerde<>();
        inputDataValueAvroSerde.configure(serdeConfig, false);

        counterValueAvroSerde = new SpecificAvroSerde<>();
        counterValueAvroSerde.configure(serdeConfig, false);

        windowedInputDataKeySerde = Serdes.serdeFrom(
                new TimeWindowedSerializer<>(inputDataKeyAvroSerde.serializer()),
                new TimeWindowedDeserializer<>(inputDataKeyAvroSerde.deserializer()));
        windowedInputDataKeySerde.configure(serdeConfig, true);
    }

}

inputdata-streams-app を作成する

kafka-console-producer で入力された文字列と文字列長を InputData オブジェクトに変換する inputdata-streams-app を作成します。

src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下に InputDataKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.avroapp;

import ksbysample.eipapp.kafkastreams.avroapp.avro.InputData;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.util.Map;

@Configuration
public class InputDataKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "inputdata-streams-app";
    private static final String BEAN_NAME_PREFIX = "inputdata";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    private final SerdeHelper serdeHelper;

    public InputDataKafkaStreamsConfig(KafkaProperties properties
            , SerdeHelper serdeHelper) {
        super(properties);
        this.serdeHelper = serdeHelper;
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, String> stream =
                builder.stream("Topic1", Consumed.with(Serdes.String(), Serdes.String()));

        stream
                .mapValues(value -> new InputData(value, value.length()))
                .to("Topic2", Produced.with(Serdes.String(), serdeHelper.inputDataValueAvroSerde));

        return builder.build();
    }

}

counter-streams-app を作成する

Topic2 に送信された InputData オブジェクト毎の件数を Window 処理で 5秒毎に集計する counter-streams-app を作成します。

src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下に CounterKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.avroapp;

import ksbysample.eipapp.kafkastreams.avroapp.avro.Counter;
import ksbysample.eipapp.kafkastreams.avroapp.avro.InputData;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.time.Duration;
import java.util.Map;

@Configuration
public class CounterKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "counter-streams-app";
    private static final String BEAN_NAME_PREFIX = "counter";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    private final SerdeHelper serdeHelper;

    public CounterKafkaStreamsConfig(KafkaProperties properties
            , SerdeHelper serdeHelper) {
        super(properties);
        this.serdeHelper = serdeHelper;
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, InputData> stream =
                builder.stream("Topic2", Consumed.with(Serdes.String(), serdeHelper.inputDataValueAvroSerde));

        KTable<Windowed<InputData>, Counter> count = stream
                .groupBy((key, value) -> value
                        , Grouped.with(serdeHelper.inputDataKeyAvroSerde, serdeHelper.inputDataValueAvroSerde))
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .aggregate(
                        () -> new Counter(0L)
                        , (aggKey, newValue, aggValue) -> new Counter(aggValue.getCount() + 1)
                        , Materialized.with(serdeHelper.inputDataKeyAvroSerde, serdeHelper.counterValueAvroSerde));

        count.toStream()
                .peek((key, value) -> {
                    System.out.println("☆☆☆" + key.key() + "@" + key.window().startTime() + "->" + key.window().endTime());
                })
                .to("Topic3"
                        , Produced.with(serdeHelper.windowedInputDataKeySerde, serdeHelper.counterValueAvroSerde));

        return builder.build();
    }

}

winprint-streams-app を作成する

Topic3 に送信された Windowed<InputData> オブジェクトを表示する winprint-streams-app を作成します。

src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下に WinPrintKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.avroapp;

import ksbysample.eipapp.kafkastreams.avroapp.avro.Counter;
import ksbysample.eipapp.kafkastreams.avroapp.avro.InputData;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Windowed;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.util.Map;

@Configuration
public class WinPrintKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "winprint-streams-app";
    private static final String BEAN_NAME_PREFIX = "winprint";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    private final SerdeHelper serdeHelper;

    public WinPrintKafkaStreamsConfig(KafkaProperties properties
            , SerdeHelper serdeHelper) {
        super(properties);
        this.serdeHelper = serdeHelper;
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<Windowed<InputData>, Counter> stream =
                builder.stream("Topic3"
                        , Consumed.with(serdeHelper.windowedInputDataKeySerde, serdeHelper.counterValueAvroSerde));

        stream.foreach((key, value) -> {
            System.out.println("★★★" + key.key() + "@" + key.window().startTime() + "->" + key.window().endTime()
                    + ":" + value.getCount());
        });

        return builder.build();
    }

}

動作確認

docker-compose up -d で kafka を起動した後、Topic1、Topic2、Topic3 を作成します。

  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic2 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic3 --partitions 3 --replication-factor 3 --if-not-exists

コマンドプロンプトを3つ開き、以下のコマンドを実行します。

  • kafka-console-producer --broker-list localhost:19092 --topic Topic1
  • kafka-avro-console-consumer --topic Topic2 --bootstrap-server cp-kafka1:19092 --property schema.registry.url=http://localhost:8081
  • kafka-avro-console-consumer --topic Topic3 --bootstrap-server cp-kafka1:19092 --property schema.registry.url=http://localhost:8081 --property print.key=true

kafkastreams-avro-app を起動してから kafka-console-producer から a, a, a, b, b を入力すると、2つ目のコマンドプロンプトには InputData オブジェクトのデータが、3つ目のコマンドプロンプトには InputData オブジェクト毎の件数がセットされた Counter オブジェクトが出力されています。

f:id:ksby:20191031001613p:plain

コンソールで Topic3 に送信された Windowed<InputData> オブジェクトを確認すると、送信前にセットされていた startTime, endTime の内 startTime は維持されていますが endTime は維持されませでした。Windowed<?> オブジェクトで endTime を維持したまま別の Topic に送信することは出来ないのでしょうか?(調べたけど分かりませんでした。。。)

f:id:ksby:20191031002414p:plain

履歴

2019/10/31
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その47 )( Docker Compose でサーバを構築する、Kafka 編14 - Kafka Streams の Window 処理を試してみる )

概要

記事一覧はこちらです。

Kafka Streams の Windowing を参考に(というかほぼコピペです) Window 処理を実装して動作を確認します。

参照したサイト・書籍

  1. Streams DSL - Windowing
    https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing

  2. Spark (Structured) Streaming vs. Kafka Streams
    https://static.rainfocus.com/oracle/oow18/sess/1525892701196001Ps1z/PF/kafka-streams-vs-spark-streaming_1540354303948001JVSB.pdf

  3. user Behavior Analysis with Session Windows and Apache Kafka's Streams API
    https://www.slideshare.net/ConfluentInc/user-behavior-analysis-with-session-windows-and-apache-kafkas-streams-api

  4. KafkaStreams: Getting Window Final Results
    https://stackoverflow.com/questions/54110206/kafkastreams-getting-window-final-results

  5. Kafka Streams failed to delete the state directory - DirectoryNotEmptyException
    https://stackoverflow.com/questions/56282751/kafka-streams-failed-to-delete-the-state-directory-directorynotemptyexception

目次

  1. kafkastreams-window-app サブプロジェクトを作成する
  2. Tumbling time window を試してみる
  3. Hopping time window を試してみる
  4. Session window を試してみる
  5. 最後に

手順

kafkastreams-window-app サブプロジェクトを作成する

Spring Initializr でプロジェクトを作成した後、

f:id:ksby:20191016235502p:plain

build.gradle を Spring Boot + Spring Integration でいろいろ試してみる ( その46 )( Docker Compose でサーバを構築する、Kafka 編13 - 1つのアプリケーション内に複数の Kafka Streams アプリを定義する+KTable を使ってみる ) と同じにします。

今回も1つのプロジェクト内に複数の Kafka Streams アプリを作成するので、kafkastreams-multistreams-app サブプロジェクトから src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp/BaseKafkaStreamsConfig.java を src/main/java/ksbysample/eipapp/kafkastreams/windowapp の下にコピーします。

src/main/resources/application.prperties に以下の内容を記述します。

spring.kafka.streams.application-id=kafkastreams-window-app
spring.kafka.streams.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092
spring.kafka.streams.properties.commit.interval.ms=5000
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.num.stream.threads=9
spring.kafka.streams.replication-factor=3

Tumbling time window を試してみる

Topic1 に入力された文字列毎の件数を 5秒毎に集計して Topic2 に送信する Kafka Streams アプリを作成します。

src/main/java/ksbysample/eipapp/kafkastreams/windowapp の下に TumblingTimeWindowKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.windowapp;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.time.Duration;
import java.util.Map;

@Configuration
public class TumblingTimeWindowKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "tumblingtimewindow-streams-app";
    private static final String BEAN_NAME_PREFIX = "tumblingTimeWindow";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    public TumblingTimeWindowKafkaStreamsConfig(KafkaProperties properties) {
        super(properties);
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, String> stream =
                builder.stream("Topic1", Consumed.with(Serdes.String(), Serdes.String()));

        KTable<Windowed<String>, Long> count = stream
                .groupBy((key, value) -> value)
                // Grouped.with(...) でグルーピングするデータの key, value の型を指定できる
                // .groupBy((key, value) -> value, Grouped.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count();

        count.toStream()
                .map((key, value) ->
                        new KeyValue<>(key.key() + "@" + key.window().startTime() + "->" + key.window().endTime()
                                , value))
                .to("Topic2", Produced.with(Serdes.String(), Serdes.Long()));

        return builder.build();
    }

}

動作確認します。docker-compose up -d で Kafka コンテナを起動してから Topic1、Topic2 を作成した後、

  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic2 --partitions 3 --replication-factor 3 --if-not-exists

入出力に使用する kafka-console-producer、kafka-console-consumer コマンドを実行します。

  • kafka-console-producer --broker-list localhost:19092 --topic Topic1
  • kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic2 --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true

Kafka コンテナが世界標準時で動いているので出力されている時間も世界標準時になっていますが、09:11:10~09:11:15 の間に a, a, abc, b を、09:11:15~09:11:20 の間に c を、09:11:20~09:11:25 の間に a, b を入力すると、5秒毎に集計された文字列と件数が Topic2 に送信されています。
※application.properties の spring.kafka.streams.properties.commit.interval.ms=5000 の設定により 5秒毎に出力されています。

f:id:ksby:20191020181224p:plain

f:id:ksby:20191020183604p:plain

Hopping time window を試してみる

Topic1 に入力された文字列毎の件数を window's size を 15秒、advance interval を 5秒で集計して Topic3 に送信する Kafka Streams アプリを作成します。

src/main/java/ksbysample/eipapp/kafkastreams/windowapp の下に HoppingTimeWindowKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.windowapp;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.time.Duration;
import java.util.Map;

@Configuration
public class HoppingTimeWindowKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "hoppingtimewindow-streams-app";
    private static final String BEAN_NAME_PREFIX = "hoppingTimeWindow";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    public HoppingTimeWindowKafkaStreamsConfig(KafkaProperties properties) {
        super(properties);
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, String> stream =
                builder.stream("Topic1", Consumed.with(Serdes.String(), Serdes.String()));

        KTable<Windowed<String>, Long> count = stream
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(15)).advanceBy(Duration.ofSeconds(5)))
                .count();

        count.toStream()
                .map((key, value) ->
                        new KeyValue<>(key.key() + "@" + key.window().startTime() + "->" + key.window().endTime()
                                , value))
                .to("Topic3", Produced.with(Serdes.String(), Serdes.Long()));

        return builder.build();
    }

}

動作確認します。Topic3 を作成した後、

  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic3 --partitions 3 --replication-factor 3 --if-not-exists

入出力に使用する kafka-console-producer、kafka-console-consumer コマンドを実行します。

  • kafka-console-producer --broker-list localhost:19092 --topic Topic1
  • kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic3 --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true

kafka-console-producer から a を1文字入力して 5秒程度待つと kafka-console-consumer に3メッセージ出力されます。
※Hopping time window の場合も application.properties の spring.kafka.streams.properties.commit.interval.ms=5000 の設定により 5秒毎に出力されます。

f:id:ksby:20191020193710p:plain

その後連続して a, a を入力して 5秒程度待つと再び3メッセージが出力されます。

f:id:ksby:20191020193752p:plain

f:id:ksby:20191020194810p:plain

上の例は a しか入力しませんでしたが、a, b, c の3種類の文字列を入力すればそれぞれの文字列毎にカウントされます。

f:id:ksby:20191020195236p:plain

Session window を試してみる

Topic1 に入力された文字列毎の件数を inactivity gap 5秒、grace 0秒の Session Window で集計して Topic4 に送信する Kafka Streams アプリを作成します。 Session Window で集計した結果は application.properties の spring.kafka.streams.properties.commit.interval.ms=5000 による 5秒毎ではなく、Session Window がクローズしてから Topic4 に送信するようにします。

src/main/java/ksbysample/eipapp/kafkastreams/windowapp の下に SessionWindowKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.windowapp;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.time.Duration;
import java.util.Map;

import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;

@Configuration
public class SessionWindowKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "sessionwindow-streams-app";
    private static final String BEAN_NAME_PREFIX = "sessionWindow";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    public SessionWindowKafkaStreamsConfig(KafkaProperties properties) {
        super(properties);
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, String> stream =
                builder.stream("Topic1", Consumed.with(Serdes.String(), Serdes.String()));

        KTable<Windowed<String>, Long> count = stream
                .groupBy((key, value) -> value)
                .windowedBy(SessionWindows.with(Duration.ofSeconds(5)).grace(Duration.ZERO))
                .count(Materialized.with(Serdes.String(), Serdes.Long()))
                // .suppress(Suppressed.untilWindowCloses(unbounded())) が記述されると
                // 例えば 15秒以内に a, a, a と入力しただけでは Topic4 にはメッセージは送信されず、
                // 15秒経過した後に同じキーである a のメッセージが来ると1つ前の SessionWindow がクローズされて Topic4 にメッセージが送信される
                .suppress(Suppressed.untilWindowCloses(unbounded()));

        count.toStream()
                .map((key, value) ->
                        new KeyValue<>(key.key() + "@" + key.window().startTime() + "->" + key.window().endTime()
                                , value))
                .to("Topic4", Produced.with(Serdes.String(), Serdes.Long()));

        return builder.build();
    }

}
  • .suppress(Suppressed.untilWindowCloses(unbounded())); を記述して、Session Window がクローズしてから Topic4 に集計結果が送信されるようにします。
  • .suppress(Suppressed.untilWindowCloses(unbounded())); を記述する場合、count() だと java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed cannot be cast to class java.lang.String というエラーが発生するので .count(Materialized.with(Serdes.String(), Serdes.Long())) のように保存時の Serdes を指定します。

動作確認します。Topic4 を作成した後、

  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic4 --partitions 3 --replication-factor 3 --if-not-exists

入出力に使用する kafka-console-producer、kafka-console-consumer コマンドを実行します。

  • kafka-console-producer --broker-list localhost:19092 --topic Topic1
  • kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic4 --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true

kafka-console-producer から a を1文字入力して 60秒程度待ちましたが、

f:id:ksby:20191021002414p:plain

Topic4 側には何も出力されませんでした。。。 追加で a を入力すると、

f:id:ksby:20191021002519p:plain

今度は1つ前に入力された分が出力されました。5秒経過したら Session Window がクローズして Topic4 に集計結果が送信されるものと思っていたのですが、そうではないようです。試してみた感じでは、

  • Session Window のクローズ判定は、メッセージを受信したタイミングで行われる模様。指定した時間が経過したらクローズされる訳ではない。よって一番最初に a を入力したタイミングでは、Session Window のクローズが判定されていないので Topic4 にもメッセージは送信されない。
  • Session Window のクローズ判定は異なるキーでも行われる。a 入力 --> b 入力 --> a の集計結果が Topic4 に送信される、という動きになる。
  • SessionWindows.with(...) に指定した秒数を経過した後に文字列を入力した時に、その前のクローズされていると思われる Session Window が全て Topic4 に送信される訳ではない(なぜかすぐには送信されない場合があった)。

Session Window はメッセージが頻繁に送信されてくる環境でないとうまく機能しない気がします(Kafka を使っていてメッセージの送信数が少ない環境というのもないような気がしますが)。

最後に

  • Hopping time window の場合は集計するキー1種類につき3メッセージが送信されます。ドキュメントを読めば分かりそうなことですが、1メッセージずつ送信されるつもりでいました。。。
  • Session Window は .suppress(Suppressed.untilWindowCloses(unbounded())) を記述した場合、指定した時間が経過したら Session Window がクローズして次の Topic にメッセージが送信されると思っていましたが、Session Window がクローズされるのは次のメッセージが送信された時(しかも時間が経過したものが 100% クローズされるとは限らない)でした。

履歴

2019/10/22
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その46 )( Docker Compose でサーバを構築する、Kafka 編13 - 1つのアプリケーション内に複数の Kafka Streams アプリを定義する+KTable を使ってみる )

概要

記事一覧はこちらです。

Spring Boot+Spring Integration+Spring Integration Kafka 構成の1つのプロジェクト内に複数の Kafka Streams アプリを実装してみます。

実装してみた感想としては、やっぱり1プロジェクト1アプリの構成の方が作りやすいしスケールアウトもさせやすいですので、複数入れるのは止めた方がいいです。

参照したサイト・書籍

  1. Data Reprocessing with the Streams API in Kafka: Resetting a Streams Application
    https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

目次

  1. 何を作成するか?
  2. kafkastreams-multistreams-app サブプロジェクトを作成する
  3. 1つのプロジェクト内に複数の Kafka Streams アプリを定義するには?
  4. 共通で使用する TopicUtils クラス、BaseKafkaStreamsConfig クラスを作成する
  5. LengthAppKafkaStreamsConfig クラスを作成する
  6. CountByLengthAppKafkaStreamsConfig クラスを作成する
  7. CountByWordsAppKafkaStreamsConfig クラスを作成する
  8. application.properties を記述する
  9. 動作確認

手順

何を作成するか?

f:id:ksby:20191014124503p:plain

  • topic は4つ作成する(Topic1、Topic2、Topic3、Topic4)。
  • Kafka Streams アプリは3つ作成する。
    • lengthStreamsApp
    • countByLengthStreamsApp
    • countByWordsStreamsApp
  • Kafka Streams アプリは全て kafkastreams-multistreams-app プロジェクト内に定義する。1 Kafka Streams アプリ 1 プロジェクトにはしない。
  • countByLengthStreamsApp、countByWordsStreamsApp は KTable を使用する。今回は Window 処理はなし。
  • Topic1 への入力は kafka-console-producer を使用する。
  • Topic2、Topic3、Topic4 のメッセージの確認には kafka-console-consumer を使用する。

※今回は1台のPC上で Kafka Streams アプリを動かてしているから動作しているが、実際には「同じキーは同じパーティションに送信される」という Kafka の仕様を考慮して、Topic2 → Topic3 の間にもう1つ topic を追加して <[文字数]: [文字列>(例 <1: a>) のようなメッセージを送信するか、Topic1 から Topic2-1(<文字数: 文字列> のメッセージを渡す)、Topic2-2(<文字列: 文字数> のメッセージを渡す)に分けてメッセージを送信するようにしないとダメだな。。。

kafkastreams-multistreams-app サブプロジェクトを作成する

Spring Initializr でプロジェクトを作成した後、

f:id:ksby:20191014130010p:plain

build.gradle を以下の内容に変更します(kafkastreams-uppercase-app と同じ内容です)。

buildscript {
    ext {
        group "ksbysample.eipapp.ksbysample-eipapp-kafkastreams"
        version "0.0.1-SNAPSHOT"
    }
    repositories {
        mavenCentral()
        gradlePluginPortal()
    }
}

plugins {
    id "org.springframework.boot" version "2.1.9.RELEASE"
    id "io.spring.dependency-management" version "1.0.8.RELEASE"
    id "java"
    id "idea"
}

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
}

dependencyManagement {
    imports {
        mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES)
        mavenBom("org.junit:junit-bom:5.5.2")
    }
}

dependencies {
    def kafkaVersion = "2.3.0"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.integration:spring-integration-kafka:3.2.0.RELEASE")
    implementation("org.apache.kafka:kafka-clients:${kafkaVersion}")
    implementation("org.apache.kafka:kafka-streams:${kafkaVersion}")
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    // for JUnit 5
    testCompile("org.junit.jupiter:junit-jupiter")
    testRuntime("org.junit.platform:junit-platform-launcher")
}

test {
    // for JUnit 5
    useJUnitPlatform()
    testLogging {
        events "STARTED", "PASSED", "FAILED", "SKIPPED"
    }
}

1つのプロジェクト内に複数の Kafka Streams アプリを定義するには?

@EnableKafkaStreams アノテーションを付与すると以下の処理が行われますが、

  • org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration(@Configuration アノテーションが付与されている)が import される。
  • KafkaStreamsDefaultConfiguration 内で defaultKafkaStreamsBuilder bean が定義される。
  • defaultKafkaStreamsBuilder bean が生成される場合、その前に org.springframework.boot.autoconfigure.kafka.KafkaStreamsAnnotationDrivenConfiguration で defaultKafkaStreamsConfig bean が生成される。
  • defaultKafkaStreamsBuilder bean に defaultKafkaStreamsConfig bean が引数で渡されて、それを使って bean が生成される。
  • application-id には application.properties に定義した spring.kafka.streams.application-id が使用される。

1つのプロジェクトに複数の Kafka Streams アプリを定義する場合、以下の問題があります。

  • Kafka Streams アプリは各アプリ毎に一意の application-id を付ける必要があるが、defaultKafkaStreamsConfig bean をそのまま利用するとそれが出来ない。
  • StreamsBuilderFactoryBean は Kafka Streams アプリ毎にインスタンスを分けないとうまく動かない。Topic2 からメッセージを受信する2つの Kafka Streams アプリを作ろうとしてもエラーになる。

そこで以下のように実装します。

  • @EnableKafkaStreams アノテーションは使用しない。
  • KafkaStreamsConfiguration 型の Bean と StreamsBuilderFactoryBean 型の Bean を各 Kafka Streams アプリ毎に定義する。
  • application-id は KafkaStreamsConfiguration 型の Bean 内で設定する。

共通で使用する TopicUtils クラス、BaseKafkaStreamsConfig クラスを作成する

Topic の名前、key, value の型を定義する TopicUtils クラスを作成します。 src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に TopicUtils.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.multistreamsapp;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;

public class TopicUtils {

    public static final String TOPIC1_NAME = "Topic1";
    public static final Consumed TOPIC1_CONSUMED = Consumed.with(Serdes.Integer(), Serdes.String());

    public static final String TOPIC2_NAME = "Topic2";
    public static final Produced TOPIC2_PRODUCED = Produced.with(Serdes.String(), Serdes.Integer());
    public static final Consumed TOPIC2_CONSUMED = Consumed.with(Serdes.String(), Serdes.Integer());

    public static final String TOPIC3_NAME = "Topic3";
    public static final Produced TOPIC3_PRODUCED = Produced.with(Serdes.String(), Serdes.Long());

    public static final String TOPIC4_NAME = "Topic4";
    public static final Produced TOPIC4_PRODUCED = Produced.with(Serdes.String(), Serdes.Long());

}

各 Kafka Streams アプリのベースとする BaseKafkaStreamsConfig クラスを作成します。 src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に BaseKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.multistreamsapp;

import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

public class BaseKafkaStreamsConfig {

    protected final KafkaProperties properties;

    public BaseKafkaStreamsConfig(KafkaProperties properties) {
        this.properties = properties;
    }

}

KafkaProperties クラスは org.springframework.boot.autoconfigure.kafka.KafkaProperties にあり、org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration で @EnableConfigurationProperties(KafkaProperties.class) が付与されて Bean が生成されています。

LengthAppKafkaStreamsConfig クラスを作成する

src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に LengthAppKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.multistreamsapp;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.util.Map;

@Configuration
public class LengthAppKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "length-streams-app";
    private static final String BEAN_NAME_PREFIX = "lengthApp";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    public LengthAppKafkaStreamsConfig(KafkaProperties properties) {
        super(properties);
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<Integer, String> stream = builder.stream(TopicUtils.TOPIC1_NAME, TopicUtils.TOPIC1_CONSUMED);

        stream
                .map((key, value) -> new KeyValue<>(value, value.length()))
                .to(TopicUtils.TOPIC2_NAME, TopicUtils.TOPIC2_PRODUCED);

        return builder.build();
    }

}
  • BaseKafkaStreamsConfig クラスを継承します。
  • 定数文字列を5つ定義していますが、APPLICATION_ID, BEAN_NAME_PREFIX の2つに作成する Kafka Streams アプリ固有の文字列を記述します。
  • kafkaStreamsApp メソッドに作成する Kafka Streams アプリの処理を記述します。

CountByLengthAppKafkaStreamsConfig クラスを作成する

src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に CountByLengthAppKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.multistreamsapp;

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.util.Map;

@Configuration
public class CountByLengthAppKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "countbylength-streams-app";
    private static final String BEAN_NAME_PREFIX = "countByLengthApp";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    public CountByLengthAppKafkaStreamsConfig(KafkaProperties properties) {
        super(properties);
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, Integer> stream =
                builder.stream(TopicUtils.TOPIC2_NAME, TopicUtils.TOPIC2_CONSUMED);

        KTable<String, Long> count = stream
                .groupBy((key, value) -> String.valueOf(value))
                .count();

        count.toStream()
                .to(TopicUtils.TOPIC3_NAME, TopicUtils.TOPIC3_PRODUCED);

        return builder.build();
    }

}

CountByWordsAppKafkaStreamsConfig クラスを作成する

src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に CountByWordsAppKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.multistreamsapp;

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.util.Map;

@Configuration
public class CountByWordsAppKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "countbywords-streams-app";
    private static final String BEAN_NAME_PREFIX = "countByWordsApp";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    public CountByWordsAppKafkaStreamsConfig(KafkaProperties properties) {
        super(properties);
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, Integer> stream =
                builder.stream(TopicUtils.TOPIC2_NAME, TopicUtils.TOPIC2_CONSUMED);

        KTable<String, Long> count = stream
                .groupBy((key, value) -> key)
                .count();

        count.toStream()
                .to(TopicUtils.TOPIC4_NAME, TopicUtils.TOPIC4_PRODUCED);

        return builder.build();
    }

}

application.properties を記述する

spring.kafka.streams.application-id=kafkastreams-multistreams-app
spring.kafka.streams.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092
spring.kafka.streams.properties.commit.interval.ms=5000
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.num.stream.threads=9
spring.kafka.streams.replication-factor=3
  • KTable を使用している Kafka Streams アプリが topic にメッセージを送信する間隔を 5 秒毎にします。commit.interval.ms に 5000(ミリ秒)を設定します。
  • default.key.serde、default.value.serde はどちらも Serdes$StringSerde を設定します。Kafka Streams アプリでは全て Produced.with(...)Consumed.with(...) で topic の key, value の型を指定しているのですが、default.key.serde に Serdes$IntegerSerde を指定すると java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Integer ..... のエラーが出ます。default であって型を指定したら無視されると思っていたのですが、どうもそうではないようです。。。
  • 動作確認では各 topic の partition 数を 3つで作成します。Kafka Streams アプリを3つ作成していますので、各 Kafka Streams アプリに3スレッドずつ割り当てられるよう num.stream.threads に 3 x 3 = 9 を設定します。
  • KTable を使用する Kafka Streams アプリは Stateful になり、Kafka Cluster 側に状態を保存するための Topic を作成します。その Topic がレプリケーションするよう replication-factor に 3 を設定します。

動作確認

docker-compose downdocker-compose up -d でコンテナを起動し直した後、Topic1~4 を作成します。

  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic2 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic3 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic4 --partitions 3 --replication-factor 3 --if-not-exists

f:id:ksby:20191014165823p:plain

kafkastreams-multistreams-app プロジェクトを起動した後、

f:id:ksby:20191014170137p:plain

kafka-console-producer コマンドを実行して 5秒以内に a, b, ab, a と入力すると、想定した動作になります。

f:id:ksby:20191014170925p:plain

  • Topic2 には key が入力された文字、value が文字数のメッセージが送信されてきます。
  • Topic3 には 3文字が 1回、1文字が 3回の 2件のメッセージが送信されてきます。
  • Topic4 には a が 2回、abc が 1回、b が 1回の 3件のメッセージが送信されています。

また上の動作確認後に kafkacat コマンドで作成されている topic を見ると、各 Kafka Streams アプリ毎に ~-changelog~-repartition という topic が作成されます。

f:id:ksby:20191014171444p:plain (.....途中は省略.....) f:id:ksby:20191014171546p:plain

  • countbylength-streams-app-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog
  • countbylength-streams-app-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition
  • countbywords-streams-app-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog
  • countbywords-streams-app-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition

ローカルPC の方にも D:\tmp\kafka-streams\ というディレクトリが作成されて、その下に Kafka Streams アプリに設定した application-id のフォルダが作成されます(設定が state.dir = /tmp/kafka-streams になっており D ドライブでアプリが動いているので D:\tmp\kafka-streams\ が作成されるようです)。

f:id:ksby:20191014172009p:plain

partition と ローカルPC のディレクトリ内のデータは1セットになっているので、どちらか片方だけ削除してももう片方からデータがコピーされて復活します。最初 D:\tmp\kafka-streams\ の下に状態が保持されていることが分からなくて、コンテナを再起動しても countByLength、countByWords の Kafka Streams アプリで以前カウントした件数が維持されていて、原因が分からなくて結構困りました。。。

Data Reprocessing with the Streams API in Kafka: Resetting a Streams Application の中に Local State Stores の説明がありました。

履歴

2019/10/14
初版発行。

IntelliJ IDEA を 2019.2.2 → 2019.2.3 へバージョンアップ

IntelliJ IDEA を 2019.2.2 → 2019.2.3 へバージョンアップする

IntelliJ IDEA の 2019.2.3 がリリースされているのでバージョンアップします。

※ksbysample-webapp-lending プロジェクトを開いた状態でバージョンアップしています。

  1. IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。

  2. IDE and Plugin Updates」ダイアログが表示されます。左下に「Update and Restart」ボタンが表示されていますので、「Update and Restart」ボタンをクリックします。

    f:id:ksby:20191012095526p:plain

  3. Plugin の update も表示されました。このまま「Update and Restart」ボタンをクリックします。

    f:id:ksby:20191012095613p:plain

  4. Patch がダウンロードされて IntelliJ IDEA が再起動します。

  5. IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

    f:id:ksby:20191012100413p:plain

  6. IntelliJ IDEA のメインメニューから「Help」-「About」を選択し、2019.2.3 へバージョンアップされていることを確認します。

  7. Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。

  8. 更新可能な Plugin があるというダイアログが画面右下に表示されたので、再度 IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。

  9. IDE and Plugin Updates」ダイアログが表示されますので「Update」ボタンをクリックします。

    f:id:ksby:20191012100741p:plain

  10. Plugin がインストールされます。画面右下に Restart のダイアログが表示されますので、リンクをクリックして IntelliJ IDEA を再起動します。

  11. IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

  12. clean タスク実行 → Rebuild Project 実行 → build タスクを実行して、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。

    f:id:ksby:20191012102309p:plain

  13. Project Tool Window で src/test/groovy/ksbysample、src/test/java/ksbysample でコンテキストメニューを表示して「Run 'Tests in 'ksbysample'' with Coverage」を選択し、テストが全て成功することを確認します。

    f:id:ksby:20191012103258p:plain f:id:ksby:20191012104217p:plain