Spring Boot + Spring Integration でいろいろ試してみる ( その36 )( Docker Compose でサーバを構築する、Kafka 編3 - Spring Integration DSL で producer, consumer を実装する )
概要
記事一覧はこちらです。
今回は Spring Integration DSL で Kafka の producer, consumer を実装します。
参照したサイト・書籍
spring-projects/spring-integration-kafka
https://github.com/spring-projects/spring-integration-kafkaspring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java
https://github.com/spring-projects/spring-integration-kafka/blob/b27f8b3eead708d5309a881aa76ac9a244ee5098/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.javaSpring for Apache Kafka - 5. Spring Integration
https://docs.spring.io/spring-kafka/reference/html/#spring-integration
目次
- Spring Integration DSL で Kafka の producer, consumer を実装する
- consumer の数を 1 → 3 に変更する
- メッセージの数値の 1桁目が 0~5 なら partition 0、6~8 なら partition 1、それ以外なら partition 2 へ送信する
手順
Spring Integration DSL で Kafka の producer, consumer を実装する
まずは 1 producer, 1 consumer のサンプルを以下の仕様で実装します。
Topic1
という名前の topic を 3 partition and 1 replicas で作成する。- 送信するメッセージの serializer, deserializer はデフォルトのもの(key / value どちらも StringSerializer/StringDeserializer、org.springframework.boot.autoconfigure.kafka.KafkaProperties 参照)を使用する。
- producer からは 1, 2, 3, ... と 1 から始まり +1 ずつ増やした数字の文字列を value にセットした message を1秒に1件送信する。key には何もセットしない。送信時に partition は指定しない。
- consumer は1つ作成する。受信はメッセージがあれば連続して受信するが、ない場合には 100ms 待機する。
Jackson の ObjectMapper が必要になるので build.gradle に implementation("org.springframework.boot:spring-boot-starter-json")
を追加します。
dependencies { def lombokVersion = "1.18.8" implementation("org.springframework.boot:spring-boot-starter-integration") implementation("org.springframework.boot:spring-boot-starter-json") { exclude group: "org.springframework", module: "spring-web" } implementation("org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE") testImplementation("org.springframework.boot:spring-boot-starter-test") .......... }
src/main/java/ksbysample/eipapp/kafka の下に KafkaDslSampleConfig.java を新規作成し、以下の内容を記述します。
package ksbysample.eipapp.kafka; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.Pollers; import org.springframework.integration.handler.LoggingHandler; import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer; import org.springframework.integration.kafka.dsl.Kafka; import org.springframework.integration.kafka.dsl.KafkaProducerMessageHandlerSpec; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.support.DefaultKafkaHeaderMapper; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.MessageChannel; import org.springframework.retry.support.RetryTemplate; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @Slf4j @Configuration public class KafkaDslSampleConfig { private static final String TOPIC_NAME = "Topic1"; // kafkaProducerFactory bean, kafkaConsumerFactory bean は // org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration // で生成されている // // 設定は application.properties の spring.kafka.producer.~ で行う private final ProducerFactory<Integer, String> kafkaProducerFactory; // 設定は application.properties の spring.kafka.consumer.~ で行う private final ConsumerFactory<Integer, String> kafkaConsumerFactory; private final MessageChannel errorChannel; private AtomicInteger count = new AtomicInteger(0); public KafkaDslSampleConfig(ProducerFactory<Integer, String> kafkaProducerFactory , ConsumerFactory<Integer, String> kafkaConsumerFactory , MessageChannel errorChannel) { this.kafkaProducerFactory = kafkaProducerFactory; this.kafkaConsumerFactory = kafkaConsumerFactory; this.errorChannel = errorChannel; } @Bean public Supplier<String> countSupplier() { return () -> String.valueOf(this.count.addAndGet(1)); } @Bean public IntegrationFlow topic1ProducerFlow() { return IntegrationFlows .from(countSupplier() , e -> e.poller(Pollers.fixedDelay(1000))) // メッセージの kafka_topic ヘッダに topic 名をセットすると // kafkaMessageHandler メソッドの第2引数に指定した topic ではなく // kafka_topic ヘッダの topic に送信される // .enrichHeaders(h -> h.header(KafkaHeaders.TOPIC, TOPIC_NAME)) .log(LoggingHandler.Level.WARN) .handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME)) .get(); } @Bean public DefaultKafkaHeaderMapper mapper() { return new DefaultKafkaHeaderMapper(); } private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler( ProducerFactory<Integer, String> producerFactory, String topic) { return Kafka .outboundChannelAdapter(producerFactory) .sync(true) // kafka_messageKey ヘッダにセットされている値を key にする場合には以下のように書く // .messageKey(m -> m // .getHeaders() // .get(KafkaHeaders.MESSAGE_KEY)) .headerMapper(mapper()) // メッセージの header に "kafka_topic" があれば、そこにセットされている topic へ、 // なければ第2引数 topic で渡された topic へ送信する .topicExpression("headers[kafka_topic] ?: '" + topic + "'"); } @Bean public IntegrationFlow topic1ConsumerFlow() { return IntegrationFlows .from(Kafka.messageDrivenChannelAdapter(kafkaConsumerFactory , KafkaMessageDrivenChannelAdapter.ListenerMode.record, TOPIC_NAME) .configureListenerContainer(c -> c.ackMode(ContainerProperties.AckMode.RECORD) .idleEventInterval(100L) .id("topic1ConsumerContainer")) .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel , new RawRecordHeaderErrorMessageStrategy())) .retryTemplate(new RetryTemplate()) .filterInRetry(true)) .handle((p, h) -> { log.error("★★★ " + p); return null; }) .get(); } }
動作確認します。docker-compose up -d
で Kafka+zookeeper のコンテナを起動した後、kafka-topics --zookeeper cp-zookeeper:2181 --create --topic Topic1 --partitions 3 --replication-factor 1 --if-not-exists
コマンドで Topic1 を作成します。
アプリケーションを実行してみると、
- 上のキャプチャには表示されていませんが、起動直後に INFO ログで
ConsumerConfig values:
、ProducerConfig values:
と consumer, producer の設定一覧が表示されていました。 - producer は 1, 2, 3, ... と 1 から順にインクリメントしたメッセージを送信しています(黄色の WARN ログ)。
- consumer は group に join して partition がアサインされるまでメッセージを受信しません。また今回は初回接続で最初のメッセージから受信するようには設定していないので、partition がアサインされた後に送信された分(6~)から受信しています(赤色の ERROR ログ)。
consumer の数を 1 → 3 に変更する
src/main/java/ksbysample/eipapp/kafka/KafkaDslSampleConfig.java を以下のように変更します。
@Bean public IntegrationFlow topic1Consumer1Flow() { return IntegrationFlows .from(createKafkaMessageDrivenChannelAdapter()) .handle((p, h) -> { log.error(String.format("★★★ partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p)); return null; }) .get(); } @Bean public IntegrationFlow topic1Consumer2Flow() { return IntegrationFlows .from(createKafkaMessageDrivenChannelAdapter()) .handle((p, h) -> { log.error(String.format("●●● partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p)); return null; }) .get(); } @Bean public IntegrationFlow topic1Consumer3Flow() { return IntegrationFlows .from(createKafkaMessageDrivenChannelAdapter()) .handle((p, h) -> { log.error(String.format("▲▲▲ partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p)); return null; }) .get(); } private KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<Integer, String> createKafkaMessageDrivenChannelAdapter() { return Kafka.messageDrivenChannelAdapter(kafkaConsumerFactory , KafkaMessageDrivenChannelAdapter.ListenerMode.record, TOPIC_NAME) .configureListenerContainer(c -> c.ackMode(ContainerProperties.AckMode.RECORD) .idleEventInterval(100L)) .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel , new RawRecordHeaderErrorMessageStrategy())) .retryTemplate(new RetryTemplate()) .filterInRetry(true); }
- topic1ConsumerFlow メソッドの
from(...)
の中に記述していたKafka.messageDrivenChannelAdapter(...)~.filterInRetry(true);
の部分を抽出して private メソッド createKafkaMessageDrivenChannelAdapter を作成します。 - topic1ConsumerFlow メソッドのメソッド名を topic1Consumer1Flow に変更し、コピーして topic1Consumer2Flow, topic1Consumer3Flow メソッドを作成します。
Kafka+zookeeper のコンテナを再起動して topic を作成した後、アプリケーションを実行すると、
- consumer が3つほぼ同時に作成されて、consumer 1つに付き partition 1つが assing されます。assign された後からメッセージを受信します。
- producer から送信されたメッセージは partition の 0 → 2 → 1 の順に格納されています。メッセージに key がセットされていないのでラウンドロビンで partition に格納されているようです。
メッセージの数値の 1桁目が 0~5 なら partition 0、6~8 なら partition 1、9 なら partition 2 へ送信する
src/main/java/ksbysample/eipapp/kafka/KafkaDslSampleConfig.java を以下のように変更します。
private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler( ProducerFactory<Integer, String> producerFactory, String topic) { return Kafka .outboundChannelAdapter(producerFactory) .sync(true) // kafka_messageKey ヘッダにセットされている値を key にする場合には以下のように書く // .messageKey(m -> m // .getHeaders() // .get(KafkaHeaders.MESSAGE_KEY)) .headerMapper(mapper()) // メッセージの header に "kafka_topic" があれば、そこにセットされている topic へ、 // なければ第2引数 topic で渡された topic へ送信する .topicExpression("headers[kafka_topic] ?: '" + topic + "'") .partitionId(m -> { int payload = Integer.valueOf((String) m.getPayload()); int remainder = payload % 10; if ((0 <= remainder ) && (remainder <= 5)) { return 0; } else if ((6 <= remainder ) && (remainder <= 8)) { return 1; } else { return 2; } }) ; } @Bean public IntegrationFlow topic1Consumer1Flow() { return IntegrationFlows .from(createKafkaMessageDrivenChannelAdapter(0)) .handle((p, h) -> { log.error(String.format("★★★ partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p)); return null; }) .get(); } @Bean public IntegrationFlow topic1Consumer2Flow() { return IntegrationFlows .from(createKafkaMessageDrivenChannelAdapter(1)) .handle((p, h) -> { log.error(String.format("●●● partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p)); return null; }) .get(); } @Bean public IntegrationFlow topic1Consumer3Flow() { return IntegrationFlows .from(createKafkaMessageDrivenChannelAdapter(2)) .handle((p, h) -> { log.error(String.format("▲▲▲ partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p)); return null; }) .get(); } private KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<Integer, String> createKafkaMessageDrivenChannelAdapter(int partition) { return Kafka.messageDrivenChannelAdapter(kafkaConsumerFactory , KafkaMessageDrivenChannelAdapter.ListenerMode.record, new TopicPartitionInitialOffset(TOPIC_NAME, partition)) .configureListenerContainer(c -> c.ackMode(ContainerProperties.AckMode.RECORD) .idleEventInterval(100L)) .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel , new RawRecordHeaderErrorMessageStrategy())) .retryTemplate(new RetryTemplate()) .filterInRetry(true); }
- kafkaMessageHandler メソッドで
.partitionId(...)
を追加して、メッセージの payload の数値の 1桁目を見て partitionId を指定するようにします。 - createKafkaMessageDrivenChannelAdapter メソッドに
int partition
の引数を追加し、Kafka.messageDrivenChannelAdapter(...)
の第3引数をTOPIC_NAME
→new TopicPartitionInitialOffset(TOPIC_NAME, partition)
に変更して、引数で渡された parition にメッセージを送信するようにします。 - topic1Consumer1Flow ~ topic1Consumer3Flow メソッド内の
createKafkaMessageDrivenChannelAdapter(...)
を呼び出しているところで引数に 0 ~ 2 の paritionId を指定します。
Kafka+zookeeper のコンテナを再起動して topic を作成した後アプリケーションを実行すると、メッセージの数値の 1桁目が 0~5 なら topic1Consumer1Flow(★★★)、6~8 なら topic1Consumer2Flow(●●●)、9 なら topic1Consumer3Flow(▲▲▲)で受信されていることが確認できます。
履歴
2019/08/11
初版発行。