かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その47 )( Docker Compose でサーバを構築する、Kafka 編14 - Kafka Streams の Window 処理を試してみる )

概要

記事一覧はこちらです。

Kafka Streams の Windowing を参考に(というかほぼコピペです) Window 処理を実装して動作を確認します。

参照したサイト・書籍

  1. Streams DSL - Windowing
    https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#windowing

  2. Spark (Structured) Streaming vs. Kafka Streams
    https://static.rainfocus.com/oracle/oow18/sess/1525892701196001Ps1z/PF/kafka-streams-vs-spark-streaming_1540354303948001JVSB.pdf

  3. user Behavior Analysis with Session Windows and Apache Kafka's Streams API
    https://www.slideshare.net/ConfluentInc/user-behavior-analysis-with-session-windows-and-apache-kafkas-streams-api

  4. KafkaStreams: Getting Window Final Results
    https://stackoverflow.com/questions/54110206/kafkastreams-getting-window-final-results

  5. Kafka Streams failed to delete the state directory - DirectoryNotEmptyException
    https://stackoverflow.com/questions/56282751/kafka-streams-failed-to-delete-the-state-directory-directorynotemptyexception

目次

  1. kafkastreams-window-app サブプロジェクトを作成する
  2. Tumbling time window を試してみる
  3. Hopping time window を試してみる
  4. Session window を試してみる
  5. 最後に

手順

kafkastreams-window-app サブプロジェクトを作成する

Spring Initializr でプロジェクトを作成した後、

f:id:ksby:20191016235502p:plain

build.gradle を Spring Boot + Spring Integration でいろいろ試してみる ( その46 )( Docker Compose でサーバを構築する、Kafka 編13 - 1つのアプリケーション内に複数の Kafka Streams アプリを定義する+KTable を使ってみる ) と同じにします。

今回も1つのプロジェクト内に複数の Kafka Streams アプリを作成するので、kafkastreams-multistreams-app サブプロジェクトから src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp/BaseKafkaStreamsConfig.java を src/main/java/ksbysample/eipapp/kafkastreams/windowapp の下にコピーします。

src/main/resources/application.prperties に以下の内容を記述します。

spring.kafka.streams.application-id=kafkastreams-window-app
spring.kafka.streams.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092
spring.kafka.streams.properties.commit.interval.ms=5000
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.num.stream.threads=9
spring.kafka.streams.replication-factor=3

Tumbling time window を試してみる

Topic1 に入力された文字列毎の件数を 5秒毎に集計して Topic2 に送信する Kafka Streams アプリを作成します。

src/main/java/ksbysample/eipapp/kafkastreams/windowapp の下に TumblingTimeWindowKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.windowapp;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.time.Duration;
import java.util.Map;

@Configuration
public class TumblingTimeWindowKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "tumblingtimewindow-streams-app";
    private static final String BEAN_NAME_PREFIX = "tumblingTimeWindow";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    public TumblingTimeWindowKafkaStreamsConfig(KafkaProperties properties) {
        super(properties);
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, String> stream =
                builder.stream("Topic1", Consumed.with(Serdes.String(), Serdes.String()));

        KTable<Windowed<String>, Long> count = stream
                .groupBy((key, value) -> value)
                // Grouped.with(...) でグルーピングするデータの key, value の型を指定できる
                // .groupBy((key, value) -> value, Grouped.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count();

        count.toStream()
                .map((key, value) ->
                        new KeyValue<>(key.key() + "@" + key.window().startTime() + "->" + key.window().endTime()
                                , value))
                .to("Topic2", Produced.with(Serdes.String(), Serdes.Long()));

        return builder.build();
    }

}

動作確認します。docker-compose up -d で Kafka コンテナを起動してから Topic1、Topic2 を作成した後、

  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic2 --partitions 3 --replication-factor 3 --if-not-exists

入出力に使用する kafka-console-producer、kafka-console-consumer コマンドを実行します。

  • kafka-console-producer --broker-list localhost:19092 --topic Topic1
  • kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic2 --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true

Kafka コンテナが世界標準時で動いているので出力されている時間も世界標準時になっていますが、09:11:10~09:11:15 の間に a, a, abc, b を、09:11:15~09:11:20 の間に c を、09:11:20~09:11:25 の間に a, b を入力すると、5秒毎に集計された文字列と件数が Topic2 に送信されています。
※application.properties の spring.kafka.streams.properties.commit.interval.ms=5000 の設定により 5秒毎に出力されています。

f:id:ksby:20191020181224p:plain

f:id:ksby:20191020183604p:plain

Hopping time window を試してみる

Topic1 に入力された文字列毎の件数を window's size を 15秒、advance interval を 5秒で集計して Topic3 に送信する Kafka Streams アプリを作成します。

src/main/java/ksbysample/eipapp/kafkastreams/windowapp の下に HoppingTimeWindowKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.windowapp;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.time.Duration;
import java.util.Map;

@Configuration
public class HoppingTimeWindowKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "hoppingtimewindow-streams-app";
    private static final String BEAN_NAME_PREFIX = "hoppingTimeWindow";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    public HoppingTimeWindowKafkaStreamsConfig(KafkaProperties properties) {
        super(properties);
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, String> stream =
                builder.stream("Topic1", Consumed.with(Serdes.String(), Serdes.String()));

        KTable<Windowed<String>, Long> count = stream
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(15)).advanceBy(Duration.ofSeconds(5)))
                .count();

        count.toStream()
                .map((key, value) ->
                        new KeyValue<>(key.key() + "@" + key.window().startTime() + "->" + key.window().endTime()
                                , value))
                .to("Topic3", Produced.with(Serdes.String(), Serdes.Long()));

        return builder.build();
    }

}

動作確認します。Topic3 を作成した後、

  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic3 --partitions 3 --replication-factor 3 --if-not-exists

入出力に使用する kafka-console-producer、kafka-console-consumer コマンドを実行します。

  • kafka-console-producer --broker-list localhost:19092 --topic Topic1
  • kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic3 --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true

kafka-console-producer から a を1文字入力して 5秒程度待つと kafka-console-consumer に3メッセージ出力されます。
※Hopping time window の場合も application.properties の spring.kafka.streams.properties.commit.interval.ms=5000 の設定により 5秒毎に出力されます。

f:id:ksby:20191020193710p:plain

その後連続して a, a を入力して 5秒程度待つと再び3メッセージが出力されます。

f:id:ksby:20191020193752p:plain

f:id:ksby:20191020194810p:plain

上の例は a しか入力しませんでしたが、a, b, c の3種類の文字列を入力すればそれぞれの文字列毎にカウントされます。

f:id:ksby:20191020195236p:plain

Session window を試してみる

Topic1 に入力された文字列毎の件数を inactivity gap 5秒、grace 0秒の Session Window で集計して Topic4 に送信する Kafka Streams アプリを作成します。 Session Window で集計した結果は application.properties の spring.kafka.streams.properties.commit.interval.ms=5000 による 5秒毎ではなく、Session Window がクローズしてから Topic4 に送信するようにします。

src/main/java/ksbysample/eipapp/kafkastreams/windowapp の下に SessionWindowKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.windowapp;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.time.Duration;
import java.util.Map;

import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;

@Configuration
public class SessionWindowKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "sessionwindow-streams-app";
    private static final String BEAN_NAME_PREFIX = "sessionWindow";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    public SessionWindowKafkaStreamsConfig(KafkaProperties properties) {
        super(properties);
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, String> stream =
                builder.stream("Topic1", Consumed.with(Serdes.String(), Serdes.String()));

        KTable<Windowed<String>, Long> count = stream
                .groupBy((key, value) -> value)
                .windowedBy(SessionWindows.with(Duration.ofSeconds(5)).grace(Duration.ZERO))
                .count(Materialized.with(Serdes.String(), Serdes.Long()))
                // .suppress(Suppressed.untilWindowCloses(unbounded())) が記述されると
                // 例えば 15秒以内に a, a, a と入力しただけでは Topic4 にはメッセージは送信されず、
                // 15秒経過した後に同じキーである a のメッセージが来ると1つ前の SessionWindow がクローズされて Topic4 にメッセージが送信される
                .suppress(Suppressed.untilWindowCloses(unbounded()));

        count.toStream()
                .map((key, value) ->
                        new KeyValue<>(key.key() + "@" + key.window().startTime() + "->" + key.window().endTime()
                                , value))
                .to("Topic4", Produced.with(Serdes.String(), Serdes.Long()));

        return builder.build();
    }

}
  • .suppress(Suppressed.untilWindowCloses(unbounded())); を記述して、Session Window がクローズしてから Topic4 に集計結果が送信されるようにします。
  • .suppress(Suppressed.untilWindowCloses(unbounded())); を記述する場合、count() だと java.lang.ClassCastException: class org.apache.kafka.streams.kstream.Windowed cannot be cast to class java.lang.String というエラーが発生するので .count(Materialized.with(Serdes.String(), Serdes.Long())) のように保存時の Serdes を指定します。

動作確認します。Topic4 を作成した後、

  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic4 --partitions 3 --replication-factor 3 --if-not-exists

入出力に使用する kafka-console-producer、kafka-console-consumer コマンドを実行します。

  • kafka-console-producer --broker-list localhost:19092 --topic Topic1
  • kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic4 --value-deserializer org.apache.kafka.common.serialization.LongDeserializer --property print.key=true

kafka-console-producer から a を1文字入力して 60秒程度待ちましたが、

f:id:ksby:20191021002414p:plain

Topic4 側には何も出力されませんでした。。。 追加で a を入力すると、

f:id:ksby:20191021002519p:plain

今度は1つ前に入力された分が出力されました。5秒経過したら Session Window がクローズして Topic4 に集計結果が送信されるものと思っていたのですが、そうではないようです。試してみた感じでは、

  • Session Window のクローズ判定は、メッセージを受信したタイミングで行われる模様。指定した時間が経過したらクローズされる訳ではない。よって一番最初に a を入力したタイミングでは、Session Window のクローズが判定されていないので Topic4 にもメッセージは送信されない。
  • Session Window のクローズ判定は異なるキーでも行われる。a 入力 --> b 入力 --> a の集計結果が Topic4 に送信される、という動きになる。
  • SessionWindows.with(...) に指定した秒数を経過した後に文字列を入力した時に、その前のクローズされていると思われる Session Window が全て Topic4 に送信される訳ではない(なぜかすぐには送信されない場合があった)。

Session Window はメッセージが頻繁に送信されてくる環境でないとうまく機能しない気がします(Kafka を使っていてメッセージの送信数が少ない環境というのもないような気がしますが)。

最後に

  • Hopping time window の場合は集計するキー1種類につき3メッセージが送信されます。ドキュメントを読めば分かりそうなことですが、1メッセージずつ送信されるつもりでいました。。。
  • Session Window は .suppress(Suppressed.untilWindowCloses(unbounded())) を記述した場合、指定した時間が経過したら Session Window がクローズして次の Topic にメッセージが送信されると思っていましたが、Session Window がクローズされるのは次のメッセージが送信された時(しかも時間が経過したものが 100% クローズされるとは限らない)でした。

履歴

2019/10/22
初版発行。