かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その46 )( Docker Compose でサーバを構築する、Kafka 編13 - 1つのアプリケーション内に複数の Kafka Streams アプリを定義する+KTable を使ってみる )

概要

記事一覧はこちらです。

Spring Boot+Spring Integration+Spring Integration Kafka 構成の1つのプロジェクト内に複数の Kafka Streams アプリを実装してみます。

実装してみた感想としては、やっぱり1プロジェクト1アプリの構成の方が作りやすいしスケールアウトもさせやすいですので、複数入れるのは止めた方がいいです。

参照したサイト・書籍

  1. Data Reprocessing with the Streams API in Kafka: Resetting a Streams Application
    https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

目次

  1. 何を作成するか?
  2. kafkastreams-multistreams-app サブプロジェクトを作成する
  3. 1つのプロジェクト内に複数の Kafka Streams アプリを定義するには?
  4. 共通で使用する TopicUtils クラス、BaseKafkaStreamsConfig クラスを作成する
  5. LengthAppKafkaStreamsConfig クラスを作成する
  6. CountByLengthAppKafkaStreamsConfig クラスを作成する
  7. CountByWordsAppKafkaStreamsConfig クラスを作成する
  8. application.properties を記述する
  9. 動作確認

手順

何を作成するか?

f:id:ksby:20191014124503p:plain

  • topic は4つ作成する(Topic1、Topic2、Topic3、Topic4)。
  • Kafka Streams アプリは3つ作成する。
    • lengthStreamsApp
    • countByLengthStreamsApp
    • countByWordsStreamsApp
  • Kafka Streams アプリは全て kafkastreams-multistreams-app プロジェクト内に定義する。1 Kafka Streams アプリ 1 プロジェクトにはしない。
  • countByLengthStreamsApp、countByWordsStreamsApp は KTable を使用する。今回は Window 処理はなし。
  • Topic1 への入力は kafka-console-producer を使用する。
  • Topic2、Topic3、Topic4 のメッセージの確認には kafka-console-consumer を使用する。

※今回は1台のPC上で Kafka Streams アプリを動かてしているから動作しているが、実際には「同じキーは同じパーティションに送信される」という Kafka の仕様を考慮して、Topic2 → Topic3 の間にもう1つ topic を追加して <[文字数]: [文字列>(例 <1: a>) のようなメッセージを送信するか、Topic1 から Topic2-1(<文字数: 文字列> のメッセージを渡す)、Topic2-2(<文字列: 文字数> のメッセージを渡す)に分けてメッセージを送信するようにしないとダメだな。。。

kafkastreams-multistreams-app サブプロジェクトを作成する

Spring Initializr でプロジェクトを作成した後、

f:id:ksby:20191014130010p:plain

build.gradle を以下の内容に変更します(kafkastreams-uppercase-app と同じ内容です)。

buildscript {
    ext {
        group "ksbysample.eipapp.ksbysample-eipapp-kafkastreams"
        version "0.0.1-SNAPSHOT"
    }
    repositories {
        mavenCentral()
        gradlePluginPortal()
    }
}

plugins {
    id "org.springframework.boot" version "2.1.9.RELEASE"
    id "io.spring.dependency-management" version "1.0.8.RELEASE"
    id "java"
    id "idea"
}

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
}

dependencyManagement {
    imports {
        mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES)
        mavenBom("org.junit:junit-bom:5.5.2")
    }
}

dependencies {
    def kafkaVersion = "2.3.0"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.integration:spring-integration-kafka:3.2.0.RELEASE")
    implementation("org.apache.kafka:kafka-clients:${kafkaVersion}")
    implementation("org.apache.kafka:kafka-streams:${kafkaVersion}")
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    // for JUnit 5
    testCompile("org.junit.jupiter:junit-jupiter")
    testRuntime("org.junit.platform:junit-platform-launcher")
}

test {
    // for JUnit 5
    useJUnitPlatform()
    testLogging {
        events "STARTED", "PASSED", "FAILED", "SKIPPED"
    }
}

1つのプロジェクト内に複数の Kafka Streams アプリを定義するには?

@EnableKafkaStreams アノテーションを付与すると以下の処理が行われますが、

  • org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration(@Configuration アノテーションが付与されている)が import される。
  • KafkaStreamsDefaultConfiguration 内で defaultKafkaStreamsBuilder bean が定義される。
  • defaultKafkaStreamsBuilder bean が生成される場合、その前に org.springframework.boot.autoconfigure.kafka.KafkaStreamsAnnotationDrivenConfiguration で defaultKafkaStreamsConfig bean が生成される。
  • defaultKafkaStreamsBuilder bean に defaultKafkaStreamsConfig bean が引数で渡されて、それを使って bean が生成される。
  • application-id には application.properties に定義した spring.kafka.streams.application-id が使用される。

1つのプロジェクトに複数の Kafka Streams アプリを定義する場合、以下の問題があります。

  • Kafka Streams アプリは各アプリ毎に一意の application-id を付ける必要があるが、defaultKafkaStreamsConfig bean をそのまま利用するとそれが出来ない。
  • StreamsBuilderFactoryBean は Kafka Streams アプリ毎にインスタンスを分けないとうまく動かない。Topic2 からメッセージを受信する2つの Kafka Streams アプリを作ろうとしてもエラーになる。

そこで以下のように実装します。

  • @EnableKafkaStreams アノテーションは使用しない。
  • KafkaStreamsConfiguration 型の Bean と StreamsBuilderFactoryBean 型の Bean を各 Kafka Streams アプリ毎に定義する。
  • application-id は KafkaStreamsConfiguration 型の Bean 内で設定する。

共通で使用する TopicUtils クラス、BaseKafkaStreamsConfig クラスを作成する

Topic の名前、key, value の型を定義する TopicUtils クラスを作成します。 src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に TopicUtils.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.multistreamsapp;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;

public class TopicUtils {

    public static final String TOPIC1_NAME = "Topic1";
    public static final Consumed TOPIC1_CONSUMED = Consumed.with(Serdes.Integer(), Serdes.String());

    public static final String TOPIC2_NAME = "Topic2";
    public static final Produced TOPIC2_PRODUCED = Produced.with(Serdes.String(), Serdes.Integer());
    public static final Consumed TOPIC2_CONSUMED = Consumed.with(Serdes.String(), Serdes.Integer());

    public static final String TOPIC3_NAME = "Topic3";
    public static final Produced TOPIC3_PRODUCED = Produced.with(Serdes.String(), Serdes.Long());

    public static final String TOPIC4_NAME = "Topic4";
    public static final Produced TOPIC4_PRODUCED = Produced.with(Serdes.String(), Serdes.Long());

}

各 Kafka Streams アプリのベースとする BaseKafkaStreamsConfig クラスを作成します。 src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に BaseKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.multistreamsapp;

import org.springframework.boot.autoconfigure.kafka.KafkaProperties;

public class BaseKafkaStreamsConfig {

    protected final KafkaProperties properties;

    public BaseKafkaStreamsConfig(KafkaProperties properties) {
        this.properties = properties;
    }

}

KafkaProperties クラスは org.springframework.boot.autoconfigure.kafka.KafkaProperties にあり、org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration で @EnableConfigurationProperties(KafkaProperties.class) が付与されて Bean が生成されています。

LengthAppKafkaStreamsConfig クラスを作成する

src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に LengthAppKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.multistreamsapp;

import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.util.Map;

@Configuration
public class LengthAppKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "length-streams-app";
    private static final String BEAN_NAME_PREFIX = "lengthApp";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    public LengthAppKafkaStreamsConfig(KafkaProperties properties) {
        super(properties);
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<Integer, String> stream = builder.stream(TopicUtils.TOPIC1_NAME, TopicUtils.TOPIC1_CONSUMED);

        stream
                .map((key, value) -> new KeyValue<>(value, value.length()))
                .to(TopicUtils.TOPIC2_NAME, TopicUtils.TOPIC2_PRODUCED);

        return builder.build();
    }

}
  • BaseKafkaStreamsConfig クラスを継承します。
  • 定数文字列を5つ定義していますが、APPLICATION_ID, BEAN_NAME_PREFIX の2つに作成する Kafka Streams アプリ固有の文字列を記述します。
  • kafkaStreamsApp メソッドに作成する Kafka Streams アプリの処理を記述します。

CountByLengthAppKafkaStreamsConfig クラスを作成する

src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に CountByLengthAppKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.multistreamsapp;

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.util.Map;

@Configuration
public class CountByLengthAppKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "countbylength-streams-app";
    private static final String BEAN_NAME_PREFIX = "countByLengthApp";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    public CountByLengthAppKafkaStreamsConfig(KafkaProperties properties) {
        super(properties);
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, Integer> stream =
                builder.stream(TopicUtils.TOPIC2_NAME, TopicUtils.TOPIC2_CONSUMED);

        KTable<String, Long> count = stream
                .groupBy((key, value) -> String.valueOf(value))
                .count();

        count.toStream()
                .to(TopicUtils.TOPIC3_NAME, TopicUtils.TOPIC3_PRODUCED);

        return builder.build();
    }

}

CountByWordsAppKafkaStreamsConfig クラスを作成する

src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp の下に CountByWordsAppKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.multistreamsapp;

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.util.Map;

@Configuration
public class CountByWordsAppKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "countbywords-streams-app";
    private static final String BEAN_NAME_PREFIX = "countByWordsApp";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    public CountByWordsAppKafkaStreamsConfig(KafkaProperties properties) {
        super(properties);
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, Integer> stream =
                builder.stream(TopicUtils.TOPIC2_NAME, TopicUtils.TOPIC2_CONSUMED);

        KTable<String, Long> count = stream
                .groupBy((key, value) -> key)
                .count();

        count.toStream()
                .to(TopicUtils.TOPIC4_NAME, TopicUtils.TOPIC4_PRODUCED);

        return builder.build();
    }

}

application.properties を記述する

spring.kafka.streams.application-id=kafkastreams-multistreams-app
spring.kafka.streams.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092
spring.kafka.streams.properties.commit.interval.ms=5000
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.num.stream.threads=9
spring.kafka.streams.replication-factor=3
  • KTable を使用している Kafka Streams アプリが topic にメッセージを送信する間隔を 5 秒毎にします。commit.interval.ms に 5000(ミリ秒)を設定します。
  • default.key.serde、default.value.serde はどちらも Serdes$StringSerde を設定します。Kafka Streams アプリでは全て Produced.with(...)Consumed.with(...) で topic の key, value の型を指定しているのですが、default.key.serde に Serdes$IntegerSerde を指定すると java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Integer ..... のエラーが出ます。default であって型を指定したら無視されると思っていたのですが、どうもそうではないようです。。。
  • 動作確認では各 topic の partition 数を 3つで作成します。Kafka Streams アプリを3つ作成していますので、各 Kafka Streams アプリに3スレッドずつ割り当てられるよう num.stream.threads に 3 x 3 = 9 を設定します。
  • KTable を使用する Kafka Streams アプリは Stateful になり、Kafka Cluster 側に状態を保存するための Topic を作成します。その Topic がレプリケーションするよう replication-factor に 3 を設定します。

動作確認

docker-compose downdocker-compose up -d でコンテナを起動し直した後、Topic1~4 を作成します。

  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic2 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic3 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic4 --partitions 3 --replication-factor 3 --if-not-exists

f:id:ksby:20191014165823p:plain

kafkastreams-multistreams-app プロジェクトを起動した後、

f:id:ksby:20191014170137p:plain

kafka-console-producer コマンドを実行して 5秒以内に a, b, ab, a と入力すると、想定した動作になります。

f:id:ksby:20191014170925p:plain

  • Topic2 には key が入力された文字、value が文字数のメッセージが送信されてきます。
  • Topic3 には 3文字が 1回、1文字が 3回の 2件のメッセージが送信されてきます。
  • Topic4 には a が 2回、abc が 1回、b が 1回の 3件のメッセージが送信されています。

また上の動作確認後に kafkacat コマンドで作成されている topic を見ると、各 Kafka Streams アプリ毎に ~-changelog~-repartition という topic が作成されます。

f:id:ksby:20191014171444p:plain (.....途中は省略.....) f:id:ksby:20191014171546p:plain

  • countbylength-streams-app-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog
  • countbylength-streams-app-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition
  • countbywords-streams-app-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog
  • countbywords-streams-app-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition

ローカルPC の方にも D:\tmp\kafka-streams\ というディレクトリが作成されて、その下に Kafka Streams アプリに設定した application-id のフォルダが作成されます(設定が state.dir = /tmp/kafka-streams になっており D ドライブでアプリが動いているので D:\tmp\kafka-streams\ が作成されるようです)。

f:id:ksby:20191014172009p:plain

partition と ローカルPC のディレクトリ内のデータは1セットになっているので、どちらか片方だけ削除してももう片方からデータがコピーされて復活します。最初 D:\tmp\kafka-streams\ の下に状態が保持されていることが分からなくて、コンテナを再起動しても countByLength、countByWords の Kafka Streams アプリで以前カウントした件数が維持されていて、原因が分からなくて結構困りました。。。

Data Reprocessing with the Streams API in Kafka: Resetting a Streams Application の中に Local State Stores の説明がありました。

履歴

2019/10/14
初版発行。