Spring Boot + Spring Integration でいろいろ試してみる ( その48 )( Docker Compose でサーバを構築する、Kafka 編15 - Kafka Streams で Apache Avro を使用する )
概要
記事一覧はこちらです。
KStream、KTable で Avro で生成したクラスを使用したサンプルを作成してみます。
参照したサイト・書籍
Data Types and Serialization - Available SerDes - Avro
https://docs.confluent.io/current/streams/developer-guide/datatypes.html#avroKafka Stream aggregation with custom object data type
https://stackoverflow.com/questions/53400832/kafka-stream-aggregation-with-custom-object-data-typeCreating a SerDes for Windowed Data on Kafka Streams
https://stackoverflow.com/questions/56135796/creating-a-serdes-for-windowed-data-on-kafka-streams
目次
- kafkastreams-avro-app サブプロジェクトを作成する
- InputData.avsc、Counter.avsc を作成する
- SerdeHelper クラスを作成する
- inputdata-streams-app を作成する
- counter-streams-app を作成する
- winprint-streams-app を作成する
- 動作確認
手順
kafkastreams-avro-app サブプロジェクトを作成する
Spring Initializr でプロジェクトを作成した後、
build.gradle を以下の内容に変更します。
buildscript { ext { group "ksbysample.eipapp.ksbysample-eipapp-kafkastreams" version "0.0.1-SNAPSHOT" } repositories { mavenCentral() gradlePluginPortal() } dependencies { classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.17.0" } } plugins { id "org.springframework.boot" version "2.1.9.RELEASE" id "io.spring.dependency-management" version "1.0.8.RELEASE" id "java" id "idea" } // com.commercehub.gradle.plugin.avro を追加すると src/main/avro, src/test/avro が作成される // はずだが Gradle Multi-project にしていると作成されなかったので、手動で作成すること apply plugin: "com.commercehub.gradle.plugin.avro" sourceCompatibility = JavaVersion.VERSION_11 targetCompatibility = JavaVersion.VERSION_11 idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } repositories { mavenCentral() maven { url "https://packages.confluent.io/maven/" } } dependencyManagement { imports { mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES) mavenBom("org.junit:junit-bom:5.5.2") } } dependencies { def kafkaVersion = "2.3.0" def confluentKafkaVersion = "5.3.0" implementation("org.springframework.boot:spring-boot-starter-integration") implementation("org.springframework.integration:spring-integration-kafka:3.2.0.RELEASE") implementation("org.apache.kafka:kafka-clients:${kafkaVersion}") implementation("org.apache.kafka:kafka-streams:${kafkaVersion}") testImplementation("org.springframework.boot:spring-boot-starter-test") // for JUnit 5 testCompile("org.junit.jupiter:junit-jupiter") testRuntime("org.junit.platform:junit-platform-launcher") // for Apache Avro implementation("org.apache.avro:avro:1.9.0") implementation("io.confluent:kafka-streams-avro-serde:${confluentKafkaVersion}") } avro { fieldVisibility = "PRIVATE" } test { // for JUnit 5 useJUnitPlatform() testLogging { events "STARTED", "PASSED", "FAILED", "SKIPPED" } }
- Avro を使用するので Spring Boot + Spring Integration でいろいろ試してみる ( その46 )( Docker Compose でサーバを構築する、Kafka 編13 - 1つのアプリケーション内に複数の Kafka Streams アプリを定義する+KTable を使ってみる ) のものと比較して以下の記述を追加します。
- buildscript block に
dependencies { classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.17.0" }
を追加します。 apply plugin: "com.commercehub.gradle.plugin.avro"
を追加します。- repositories block に
maven { url "https://packages.confluent.io/maven/" }
を追加します。 - dependencies block に以下の3行を追加します。
def confluentKafkaVersion = "5.3.0"
implementation("org.apache.avro:avro:1.9.0")
implementation("io.confluent:kafka-streams-avro-serde:${confluentKafkaVersion}")
- 書いている時点で 5.3.1 が出ていますが、docker-compose で起動している Kafka にバージョンを合わせて 5.3.0 を使用します。
- buildscript block に
今回も1つのプロジェクト内に複数の Kafka Streams アプリを作成するので、kafkastreams-multistreams-app サブプロジェクトから src/main/java/ksbysample/eipapp/kafkastreams/multistreamsapp/BaseKafkaStreamsConfig.java を src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下にコピーします。
src/main/resources/application.prperties に以下の内容を記述します。
spring.kafka.streams.application-id=kafkastreams-avro-app spring.kafka.streams.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092 spring.kafka.streams.properties.commit.interval.ms=5000 spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde spring.kafka.streams.properties.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde spring.kafka.streams.properties.num.stream.threads=9 spring.kafka.streams.replication-factor=3 spring.kafka.properties.schema.registry.url=http://localhost:8081 spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
- Avro を使用するので以下の2行を追加します。
spring.kafka.properties.schema.registry.url=http://localhost:8081
spring.kafka.properties.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy
- この設定を追加している理由は後述します。
最後に Gradle Multi-project にしているためか build.gradle を変更して更新しただけでは src/main, src/test の下に avro ディレクトリが作成されなかったので、手動で avro ディレクトリを作成します(ディレクトリの青と緑の色は作成したら自動で付きました)。
InputData.avsc、Counter.avsc を作成する
src/main/avro の下に入力された文字列とその長さを格納するための InputData と
{ "namespace": "ksbysample.eipapp.kafkastreams.avroapp.avro", "type": "record", "name": "InputData", "doc": "???", "fields" : [ { "name": "str", "type": "string" }, { "name": "length", "type": "int" } ] }
文字列毎の件数を格納するための Counter の2つを定義します。
{ "namespace": "ksbysample.eipapp.kafkastreams.avroapp.avro", "type": "record", "name": "Counter", "doc": "???", "fields" : [ { "name": "count", "type": "long" } ] }
ファイルを作成したら generateAvroJava タスクを実行して java ファイルを生成します。
今回は InputData, Counter を使用して以下の図の inputdata-streams-app、counter-streams-app、winprint-streams-app を実装します。
SerdeHelper クラスを作成する
Avro で生成した InputData、Counter クラス用の Serde、及び Topic3 の key で使用する Windowed<InputData>
用の Serde を提供する helper クラスを作成します。
src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下に SerdeHelper.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.kafkastreams.avroapp; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import ksbysample.eipapp.kafkastreams.avroapp.avro.Counter; import ksbysample.eipapp.kafkastreams.avroapp.avro.InputData; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.TimeWindowedDeserializer; import org.apache.kafka.streams.kstream.TimeWindowedSerializer; import org.apache.kafka.streams.kstream.Windowed; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Configuration; import java.util.Map; @Configuration public class SerdeHelper { public final Serde<InputData> inputDataKeyAvroSerde; public final Serde<InputData> inputDataValueAvroSerde; public final Serde<Counter> counterValueAvroSerde; public final Serde<Windowed<InputData>> windowedInputDataKeySerde; public SerdeHelper(KafkaProperties properties) { Map<String, String> serdeConfig = properties.getProperties(); inputDataKeyAvroSerde = new SpecificAvroSerde<>(); inputDataKeyAvroSerde.configure(serdeConfig, true); inputDataValueAvroSerde = new SpecificAvroSerde<>(); inputDataValueAvroSerde.configure(serdeConfig, false); counterValueAvroSerde = new SpecificAvroSerde<>(); counterValueAvroSerde.configure(serdeConfig, false); windowedInputDataKeySerde = Serdes.serdeFrom( new TimeWindowedSerializer<>(inputDataKeyAvroSerde.serializer()), new TimeWindowedDeserializer<>(inputDataKeyAvroSerde.deserializer())); windowedInputDataKeySerde.configure(serdeConfig, true); } }
inputdata-streams-app を作成する
kafka-console-producer で入力された文字列と文字列長を InputData オブジェクトに変換する inputdata-streams-app を作成します。
src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下に InputDataKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.kafkastreams.avroapp; import ksbysample.eipapp.kafkastreams.avroapp.avro.InputData; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; import java.util.Map; @Configuration public class InputDataKafkaStreamsConfig extends BaseKafkaStreamsConfig { private static final String APPLICATION_ID = "inputdata-streams-app"; private static final String BEAN_NAME_PREFIX = "inputdata"; private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration"; private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean"; private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp"; private final SerdeHelper serdeHelper; public InputDataKafkaStreamsConfig(KafkaProperties properties , SerdeHelper serdeHelper) { super(properties); this.serdeHelper = serdeHelper; } @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME) public KafkaStreamsConfiguration kafkaStreamsConfiguration() { Map<String, Object> streamsProperties = this.properties.buildStreamsProperties(); streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); return new KafkaStreamsConfiguration(streamsProperties); } @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME) public StreamsBuilderFactoryBean streamsBuilderFactoryBean() { return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration()); } @Bean(KSTREAMS_APP_BEAN_NAME) public Topology kafkaStreamsApp() throws Exception { StreamsBuilder builder = streamsBuilderFactoryBean().getObject(); KStream<String, String> stream = builder.stream("Topic1", Consumed.with(Serdes.String(), Serdes.String())); stream .mapValues(value -> new InputData(value, value.length())) .to("Topic2", Produced.with(Serdes.String(), serdeHelper.inputDataValueAvroSerde)); return builder.build(); } }
counter-streams-app を作成する
Topic2 に送信された InputData オブジェクト毎の件数を Window 処理で 5秒毎に集計する counter-streams-app を作成します。
src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下に CounterKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.kafkastreams.avroapp; import ksbysample.eipapp.kafkastreams.avroapp.avro.Counter; import ksbysample.eipapp.kafkastreams.avroapp.avro.InputData; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.*; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; import java.time.Duration; import java.util.Map; @Configuration public class CounterKafkaStreamsConfig extends BaseKafkaStreamsConfig { private static final String APPLICATION_ID = "counter-streams-app"; private static final String BEAN_NAME_PREFIX = "counter"; private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration"; private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean"; private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp"; private final SerdeHelper serdeHelper; public CounterKafkaStreamsConfig(KafkaProperties properties , SerdeHelper serdeHelper) { super(properties); this.serdeHelper = serdeHelper; } @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME) public KafkaStreamsConfiguration kafkaStreamsConfiguration() { Map<String, Object> streamsProperties = this.properties.buildStreamsProperties(); streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); return new KafkaStreamsConfiguration(streamsProperties); } @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME) public StreamsBuilderFactoryBean streamsBuilderFactoryBean() { return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration()); } @Bean(KSTREAMS_APP_BEAN_NAME) public Topology kafkaStreamsApp() throws Exception { StreamsBuilder builder = streamsBuilderFactoryBean().getObject(); KStream<String, InputData> stream = builder.stream("Topic2", Consumed.with(Serdes.String(), serdeHelper.inputDataValueAvroSerde)); KTable<Windowed<InputData>, Counter> count = stream .groupBy((key, value) -> value , Grouped.with(serdeHelper.inputDataKeyAvroSerde, serdeHelper.inputDataValueAvroSerde)) .windowedBy(TimeWindows.of(Duration.ofSeconds(5))) .aggregate( () -> new Counter(0L) , (aggKey, newValue, aggValue) -> new Counter(aggValue.getCount() + 1) , Materialized.with(serdeHelper.inputDataKeyAvroSerde, serdeHelper.counterValueAvroSerde)); count.toStream() .peek((key, value) -> { System.out.println("☆☆☆" + key.key() + "@" + key.window().startTime() + "->" + key.window().endTime()); }) .to("Topic3" , Produced.with(serdeHelper.windowedInputDataKeySerde, serdeHelper.counterValueAvroSerde)); return builder.build(); } }
winprint-streams-app を作成する
Topic3 に送信された Windowed<InputData>
オブジェクトを表示する winprint-streams-app を作成します。
src/main/java/ksbysample/eipapp/kafkastreams/avroapp の下に WinPrintKafkaStreamsConfig.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.kafkastreams.avroapp; import ksbysample.eipapp.kafkastreams.avroapp.avro.Counter; import ksbysample.eipapp.kafkastreams.avroapp.avro.InputData; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Windowed; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.StreamsBuilderFactoryBean; import java.util.Map; @Configuration public class WinPrintKafkaStreamsConfig extends BaseKafkaStreamsConfig { private static final String APPLICATION_ID = "winprint-streams-app"; private static final String BEAN_NAME_PREFIX = "winprint"; private static final String KAFKA_STREAMS_CONFIGURATION_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsConfiguration"; private static final String STREAMS_BUILDER_FACTORY_BEAN_NAME = BEAN_NAME_PREFIX + "StreamsBuilderFactoryBean"; private static final String KSTREAMS_APP_BEAN_NAME = BEAN_NAME_PREFIX + "KafkaStreamsApp"; private final SerdeHelper serdeHelper; public WinPrintKafkaStreamsConfig(KafkaProperties properties , SerdeHelper serdeHelper) { super(properties); this.serdeHelper = serdeHelper; } @Bean(KAFKA_STREAMS_CONFIGURATION_BEAN_NAME) public KafkaStreamsConfiguration kafkaStreamsConfiguration() { Map<String, Object> streamsProperties = this.properties.buildStreamsProperties(); streamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); return new KafkaStreamsConfiguration(streamsProperties); } @Bean(STREAMS_BUILDER_FACTORY_BEAN_NAME) public StreamsBuilderFactoryBean streamsBuilderFactoryBean() { return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration()); } @Bean(KSTREAMS_APP_BEAN_NAME) public Topology kafkaStreamsApp() throws Exception { StreamsBuilder builder = streamsBuilderFactoryBean().getObject(); KStream<Windowed<InputData>, Counter> stream = builder.stream("Topic3" , Consumed.with(serdeHelper.windowedInputDataKeySerde, serdeHelper.counterValueAvroSerde)); stream.foreach((key, value) -> { System.out.println("★★★" + key.key() + "@" + key.window().startTime() + "->" + key.window().endTime() + ":" + value.getCount()); }); return builder.build(); } }
動作確認
docker-compose up -d
で kafka を起動した後、Topic1、Topic2、Topic3 を作成します。
kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic2 --partitions 3 --replication-factor 3 --if-not-exists
kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic3 --partitions 3 --replication-factor 3 --if-not-exists
コマンドプロンプトを3つ開き、以下のコマンドを実行します。
kafka-console-producer --broker-list localhost:19092 --topic Topic1
kafka-avro-console-consumer --topic Topic2 --bootstrap-server cp-kafka1:19092 --property schema.registry.url=http://localhost:8081
kafka-avro-console-consumer --topic Topic3 --bootstrap-server cp-kafka1:19092 --property schema.registry.url=http://localhost:8081 --property print.key=true
kafkastreams-avro-app を起動してから kafka-console-producer
から a, a, a, b, b を入力すると、2つ目のコマンドプロンプトには InputData オブジェクトのデータが、3つ目のコマンドプロンプトには InputData オブジェクト毎の件数がセットされた Counter オブジェクトが出力されています。
コンソールで Topic3 に送信された Windowed<InputData>
オブジェクトを確認すると、送信前にセットされていた startTime, endTime の内 startTime は維持されていますが endTime は維持されませでした。Windowed<?>
オブジェクトで endTime を維持したまま別の Topic に送信することは出来ないのでしょうか?(調べたけど分かりませんでした。。。)
履歴
2019/10/31
初版発行。