かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その48 )( Docker Compose でサーバを構築する、Kafka 編15 - Kafka Streams で Apache Avro を使用する )

概要

記事一覧はこちらです。

KStream、KTable で Avro で生成したクラスを使用したサンプルを作成してみます。

参照したサイト・書籍

  1. Data Types and Serialization - Available SerDes - Avro
    https://docs.confluent.io/current/streams/developer-guide/datatypes.html#avro

  2. Kafka Stream aggregation with custom object data type
    https://stackoverflow.com/questions/53400832/kafka-stream-aggregation-with-custom-object-data-type

  3. Creating a SerDes for Windowed Data on Kafka Streams
    https://stackoverflow.com/questions/56135796/creating-a-serdes-for-windowed-data-on-kafka-streams

目次

  1. kafkastreams-avro-app サブプロジェクトを作成する
  2. InputData.avsc、Counter.avsc を作成する
  3. SerdeHelper クラスを作成する
  4. inputdata-streams-app を作成する
  5. counter-streams-app を作成する
  6. winprint-streams-app を作成する
  7. 動作確認

手順

kafkastreams-avro-app サブプロジェクトを作成する

Spring Initializr でプロジェクトを作成した後、

f:id:ksby:20191024001503p:plain

build.gradle を以下の内容に変更します。

buildscript {
    ext {
        group "ksbysample.eipapp.ksbysample-eipapp-kafkastreams"
        version "0.0.1-SNAPSHOT"
    }
    repositories {
        mavenCentral()
        gradlePluginPortal()
    }
    dependencies {
        classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.17.0"
    }
}

plugins {
    id "org.springframework.boot" version "2.1.9.RELEASE"
    id "io.spring.dependency-management" version "1.0.8.RELEASE"
    id "java"
    id "idea"
}

// com.commercehub.gradle.plugin.avro を追加すると src/main/avro, src/test/avro が作成される
// はずだが Gradle Multi-project にしていると作成されなかったので、手動で作成すること
apply plugin: "com.commercehub.gradle.plugin.avro"

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
    maven {
        url "https://packages.confluent.io/maven/"
    }
}

dependencyManagement {
    imports {
        mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES)
        mavenBom("org.junit:junit-bom:5.5.2")
    }
}

dependencies {
    def kafkaVersion = "2.3.0"
    def confluentKafkaVersion = "5.3.0"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.integration:spring-integration-kafka:3.2.0.RELEASE")
    implementation("org.apache.kafka:kafka-clients:${kafkaVersion}")
    implementation("org.apache.kafka:kafka-streams:${kafkaVersion}")
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    // for JUnit 5
    testCompile("org.junit.jupiter:junit-jupiter")
    testRuntime("org.junit.platform:junit-platform-launcher")

    // for Apache Avro
    implementation("org.apache.avro:avro:1.9.0")
    implementation("io.confluent:kafka-streams-avro-serde:${confluentKafkaVersion}")
}

avro {
    fieldVisibility = "PRIVATE"
}

test {
    // for JUnit 5
    useJUnitPlatform()
    testLogging {
        events "STARTED", "PASSED", "FAILED", "SKIPPED"
    }
}

今回も1つのプロジェクト内に複数の Kafka Streams アプリを作成するので、kafkastreams-multistreams-app サブプロジェクトから src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp/BaseKafkaStreamsConfig.java を src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下にコピーします。

src/main/resources/application.prperties に以下の内容を記述します。

spring.kafka.streams.application-id=kafkastreams-avro-app
spring.kafka.streams.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092
spring.kafka.streams.properties.commit.interval.ms=5000
spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.num.stream.threads=9
spring.kafka.streams.replication-factor=3

spring.kafka.properties.schema.registry.url=http://localhost:8081
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
  • Avro を使用するので以下の2行を追加します。
    • spring.kafka.properties.schema.registry.url=http://localhost:8081
    • spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
      • この設定を追加している理由は後述します。

最後に Gradle Multi-project にしているためか build.gradle を変更して更新しただけでは src/main, src/test の下に avro ディレクトリが作成されなかったので、手動で avro ディレクトリを作成します(ディレクトリの青と緑の色は作成したら自動で付きました)。

f:id:ksby:20191024004314p:plainf:id:ksby:20191024004612p:plain

InputData.avsc、Counter.avsc を作成する

src/main/avro の下に入力された文字列とその長さを格納するための InputData と

{
    "namespace": "ksbysample.eipapp.kafkastreams.avroapp.avro",
    "type": "record",
    "name": "InputData",
    "doc": "???",
    "fields" : [
        { "name": "str", "type": "string" },
        { "name": "length", "type": "int" }
    ]
}

文字列毎の件数を格納するための Counter の2つを定義します。

{
    "namespace": "ksbysample.eipapp.kafkastreams.avroapp.avro",
    "type": "record",
    "name": "Counter",
    "doc": "???",
    "fields" : [
        { "name": "count", "type": "long" }
    ]
}

ファイルを作成したら generateAvroJava タスクを実行して java ファイルを生成します。

f:id:ksby:20191027121239p:plain f:id:ksby:20191027121411p:plain f:id:ksby:20191027121537p:plain

今回は InputData, Counter を使用して以下の図の inputdata-streams-app、counter-streams-app、winprint-streams-app を実装します。

f:id:ksby:20191029011115p:plain

SerdeHelper クラスを作成する

Avro で生成した InputData、Counter クラス用の Serde、及び Topic3 の key で使用する Windowed<InputData> 用の Serde を提供する helper クラスを作成します。

src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下に SerdeHelper.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.avroapp;

import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import ksbysample.eipapp.kafkastreams.avroapp.avro.Counter;
import ksbysample.eipapp.kafkastreams.avroapp.avro.InputData;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedSerializer;
import org.apache.kafka.streams.kstream.Windowed;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

@Configuration
public class SerdeHelper {

    public final Serde<InputData> inputDataKeyAvroSerde;

    public final Serde<InputData> inputDataValueAvroSerde;

    public final Serde<Counter> counterValueAvroSerde;

    public final Serde<Windowed<InputData>> windowedInputDataKeySerde;

    public SerdeHelper(KafkaProperties properties) {
        Map<String, String> serdeConfig = properties.getProperties();

        inputDataKeyAvroSerde = new SpecificAvroSerde<>();
        inputDataKeyAvroSerde.configure(serdeConfig, true);

        inputDataValueAvroSerde = new SpecificAvroSerde<>();
        inputDataValueAvroSerde.configure(serdeConfig, false);

        counterValueAvroSerde = new SpecificAvroSerde<>();
        counterValueAvroSerde.configure(serdeConfig, false);

        windowedInputDataKeySerde = Serdes.serdeFrom(
                new TimeWindowedSerializer<>(inputDataKeyAvroSerde.serializer()),
                new TimeWindowedDeserializer<>(inputDataKeyAvroSerde.deserializer()));
        windowedInputDataKeySerde.configure(serdeConfig, true);
    }

}

inputdata-streams-app を作成する

kafka-console-producer で入力された文字列と文字列長を InputData オブジェクトに変換する inputdata-streams-app を作成します。

src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下に InputDataKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.avroapp;

import ksbysample.eipapp.kafkastreams.avroapp.avro.InputData;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.util.Map;

@Configuration
public class InputDataKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "inputdata-streams-app";
    private static final String BEAN_NAME_PREFIX = "inputdata";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    private final SerdeHelper serdeHelper;

    public InputDataKafkaStreamsConfig(KafkaProperties properties
            , SerdeHelper serdeHelper) {
        super(properties);
        this.serdeHelper = serdeHelper;
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, String> stream =
                builder.stream("Topic1", Consumed.with(Serdes.String(), Serdes.String()));

        stream
                .mapValues(value -> new InputData(value, value.length()))
                .to("Topic2", Produced.with(Serdes.String(), serdeHelper.inputDataValueAvroSerde));

        return builder.build();
    }

}

counter-streams-app を作成する

Topic2 に送信された InputData オブジェクト毎の件数を Window 処理で 5秒毎に集計する counter-streams-app を作成します。

src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下に CounterKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.avroapp;

import ksbysample.eipapp.kafkastreams.avroapp.avro.Counter;
import ksbysample.eipapp.kafkastreams.avroapp.avro.InputData;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.time.Duration;
import java.util.Map;

@Configuration
public class CounterKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "counter-streams-app";
    private static final String BEAN_NAME_PREFIX = "counter";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    private final SerdeHelper serdeHelper;

    public CounterKafkaStreamsConfig(KafkaProperties properties
            , SerdeHelper serdeHelper) {
        super(properties);
        this.serdeHelper = serdeHelper;
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<String, InputData> stream =
                builder.stream("Topic2", Consumed.with(Serdes.String(), serdeHelper.inputDataValueAvroSerde));

        KTable<Windowed<InputData>, Counter> count = stream
                .groupBy((key, value) -> value
                        , Grouped.with(serdeHelper.inputDataKeyAvroSerde, serdeHelper.inputDataValueAvroSerde))
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .aggregate(
                        () -> new Counter(0L)
                        , (aggKey, newValue, aggValue) -> new Counter(aggValue.getCount() + 1)
                        , Materialized.with(serdeHelper.inputDataKeyAvroSerde, serdeHelper.counterValueAvroSerde));

        count.toStream()
                .peek((key, value) -> {
                    System.out.println("☆☆☆" + key.key() + "@" + key.window().startTime() + "->" + key.window().endTime());
                })
                .to("Topic3"
                        , Produced.with(serdeHelper.windowedInputDataKeySerde, serdeHelper.counterValueAvroSerde));

        return builder.build();
    }

}

winprint-streams-app を作成する

Topic3 に送信された Windowed<InputData> オブジェクトを表示する winprint-streams-app を作成します。

src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下に WinPrintKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafkastreams.avroapp;

import ksbysample.eipapp.kafkastreams.avroapp.avro.Counter;
import ksbysample.eipapp.kafkastreams.avroapp.avro.InputData;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Windowed;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;

import java.util.Map;

@Configuration
public class WinPrintKafkaStreamsConfig extends BaseKafkaStreamsConfig {

    private static final String APPLICATION_ID = "winprint-streams-app";
    private static final String BEAN_NAME_PREFIX = "winprint";
    private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration";
    private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean";
    private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp";

    private final SerdeHelper serdeHelper;

    public WinPrintKafkaStreamsConfig(KafkaProperties properties
            , SerdeHelper serdeHelper) {
        super(properties);
        this.serdeHelper = serdeHelper;
    }

    @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfiguration() {
        Map<String, Object> streamsProperties = this.properties.buildStreamsProperties();
        streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        return new KafkaStreamsConfiguration(streamsProperties);
    }

    @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME)
    public StreamsBuilderFactoryBean streamsBuilderFactoryBean() {
        return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration());
    }

    @Bean(KSTREAMS_APP_BEAN_NAME)
    public Topology kafkaStreamsApp() throws Exception {
        StreamsBuilder builder = streamsBuilderFactoryBean().getObject();
        KStream<Windowed<InputData>, Counter> stream =
                builder.stream("Topic3"
                        , Consumed.with(serdeHelper.windowedInputDataKeySerde, serdeHelper.counterValueAvroSerde));

        stream.foreach((key, value) -> {
            System.out.println("★★★" + key.key() + "@" + key.window().startTime() + "->" + key.window().endTime()
                    + ":" + value.getCount());
        });

        return builder.build();
    }

}

動作確認

docker-compose up -d で kafka を起動した後、Topic1、Topic2、Topic3 を作成します。

  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic2 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic3 --partitions 3 --replication-factor 3 --if-not-exists

コマンドプロンプトを3つ開き、以下のコマンドを実行します。

  • kafka-console-producer --broker-list localhost:19092 --topic Topic1
  • kafka-avro-console-consumer --topic Topic2 --bootstrap-server cp-kafka1:19092 --property schema.registry.url=http://localhost:8081
  • kafka-avro-console-consumer --topic Topic3 --bootstrap-server cp-kafka1:19092 --property schema.registry.url=http://localhost:8081 --property print.key=true

kafkastreams-avro-app を起動してから kafka-console-producer から a, a, a, b, b を入力すると、2つ目のコマンドプロンプトには InputData オブジェクトのデータが、3つ目のコマンドプロンプトには InputData オブジェクト毎の件数がセットされた Counter オブジェクトが出力されています。

f:id:ksby:20191031001613p:plain

コンソールで Topic3 に送信された Windowed<InputData> オブジェクトを確認すると、送信前にセットされていた startTime, endTime の内 startTime は維持されていますが endTime は維持されませでした。Windowed<?> オブジェクトで endTime を維持したまま別の Topic に送信することは出来ないのでしょうか?(調べたけど分かりませんでした。。。)

f:id:ksby:20191031002414p:plain

履歴

2019/10/31
初版発行。