かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その36 )( Docker Compose でサーバを構築する、Kafka 編3 - Spring Integration DSL で producer, consumer を実装する )

概要

記事一覧はこちらです。

今回は Spring Integration DSL で Kafka の producer, consumer を実装します。

参照したサイト・書籍

  1. spring-projects/spring-integration-kafka
    https://github.com/spring-projects/spring-integration-kafka

  2. spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java
    https://github.com/spring-projects/spring-integration-kafka/blob/b27f8b3eead708d5309a881aa76ac9a244ee5098/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java

  3. Spring for Apache Kafka - 5. Spring Integration
    https://docs.spring.io/spring-kafka/reference/html/#spring-integration

目次

  1. Spring Integration DSL で Kafka の producer, consumer を実装する
  2. consumer の数を 1 → 3 に変更する
  3. メッセージの数値の 1桁目が 0~5 なら partition 0、6~8 なら partition 1、それ以外なら partition 2 へ送信する

手順

Spring Integration DSL で Kafka の producer, consumer を実装する

まずは 1 producer, 1 consumer のサンプルを以下の仕様で実装します。

  • Topic1 という名前の topic を 3 partition and 1 replicas で作成する。
  • 送信するメッセージの serializer, deserializer はデフォルトのもの(key / value どちらも StringSerializer/StringDeserializer、org.springframework.boot.autoconfigure.kafka.KafkaProperties 参照)を使用する。
  • producer からは 1, 2, 3, ... と 1 から始まり +1 ずつ増やした数字の文字列を value にセットした message を1秒に1件送信する。key には何もセットしない。送信時に partition は指定しない。
  • consumer は1つ作成する。受信はメッセージがあれば連続して受信するが、ない場合には 100ms 待機する。

Jackson の ObjectMapper が必要になるので build.gradle に implementation("org.springframework.boot:spring-boot-starter-json") を追加します。

dependencies {
    def lombokVersion = "1.18.8"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.boot:spring-boot-starter-json") {
        exclude group: "org.springframework", module: "spring-web"
    }
    implementation("org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE")
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    ..........
}

src/main/java/ksbysample/eipapp/kafka の下に KafkaDslSampleConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafka;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.kafka.dsl.KafkaProducerMessageHandlerSpec;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.retry.support.RetryTemplate;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

@Slf4j
@Configuration
public class KafkaDslSampleConfig {

    private static final String TOPIC_NAME = "Topic1";

    // kafkaProducerFactory bean, kafkaConsumerFactory bean は
    // org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
    // で生成されている
    // 
    // 設定は application.properties の spring.kafka.producer.~ で行う
    private final ProducerFactory<Integer, String> kafkaProducerFactory;
    // 設定は application.properties の spring.kafka.consumer.~ で行う
    private final ConsumerFactory<Integer, String> kafkaConsumerFactory;

    private final MessageChannel errorChannel;

    private AtomicInteger count = new AtomicInteger(0);

    public KafkaDslSampleConfig(ProducerFactory<Integer, String> kafkaProducerFactory
            , ConsumerFactory<Integer, String> kafkaConsumerFactory
            , MessageChannel errorChannel) {
        this.kafkaProducerFactory = kafkaProducerFactory;
        this.kafkaConsumerFactory = kafkaConsumerFactory;
        this.errorChannel = errorChannel;
    }

    @Bean
    public Supplier<String> countSupplier() {
        return () -> String.valueOf(this.count.addAndGet(1));
    }

    @Bean
    public IntegrationFlow topic1ProducerFlow() {
        return IntegrationFlows
                .from(countSupplier()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // メッセージの kafka_topic ヘッダに topic 名をセットすると
                // kafkaMessageHandler メソッドの第2引数に指定した topic ではなく
                // kafka_topic ヘッダの topic に送信される
                // .enrichHeaders(h -> h.header(KafkaHeaders.TOPIC, TOPIC_NAME))
                .log(LoggingHandler.Level.WARN)
                .handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME))
                .get();
    }

    @Bean
    public DefaultKafkaHeaderMapper mapper() {
        return new DefaultKafkaHeaderMapper();
    }

    private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
            ProducerFactory<Integer, String> producerFactory, String topic) {
        return Kafka
                .outboundChannelAdapter(producerFactory)
                .sync(true)
                // kafka_messageKey ヘッダにセットされている値を key にする場合には以下のように書く
                // .messageKey(m -> m
                //         .getHeaders()
                //         .get(KafkaHeaders.MESSAGE_KEY))
                .headerMapper(mapper())
                // メッセージの header に "kafka_topic" があれば、そこにセットされている topic へ、
                // なければ第2引数 topic で渡された topic へ送信する
                .topicExpression("headers[kafka_topic] ?: '" + topic + "'");
    }

    @Bean
    public IntegrationFlow topic1ConsumerFlow() {
        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(kafkaConsumerFactory
                        , KafkaMessageDrivenChannelAdapter.ListenerMode.record, TOPIC_NAME)
                        .configureListenerContainer(c ->
                                c.ackMode(ContainerProperties.AckMode.RECORD)
                                        .idleEventInterval(100L)
                                        .id("topic1ConsumerContainer"))
                        .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel
                                , new RawRecordHeaderErrorMessageStrategy()))
                        .retryTemplate(new RetryTemplate())
                        .filterInRetry(true))
                .handle((p, h) -> {
                    log.error("★★★ " + p);
                    return null;
                })
                .get();
    }

}

動作確認します。docker-compose up -d で Kafka+zookeeper のコンテナを起動した後、kafka-topics --zookeeper cp-zookeeper:2181 --create --topic Topic1 --partitions 3 --replication-factor 1 --if-not-exists コマンドで Topic1 を作成します。

f:id:ksby:20190725225230p:plain

アプリケーションを実行してみると、

f:id:ksby:20190725225812p:plain

  • 上のキャプチャには表示されていませんが、起動直後に INFO ログで ConsumerConfig values:ProducerConfig values: と consumer, producer の設定一覧が表示されていました。
  • producer は 1, 2, 3, ... と 1 から順にインクリメントしたメッセージを送信しています(黄色の WARN ログ)。
  • consumer は group に join して partition がアサインされるまでメッセージを受信しません。また今回は初回接続で最初のメッセージから受信するようには設定していないので、partition がアサインされた後に送信された分(6~)から受信しています(赤色の ERROR ログ)。

consumer の数を 1 → 3 に変更する

src/main/java/ksbysample/eipapp/kafka/KafkaDslSampleConfig.java を以下のように変更します。

    @Bean
    public IntegrationFlow topic1Consumer1Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter())
                .handle((p, h) -> {
                    log.error(String.format("★★★ partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p));
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow topic1Consumer2Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter())
                .handle((p, h) -> {
                    log.error(String.format("●●● partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p));
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow topic1Consumer3Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter())
                .handle((p, h) -> {
                    log.error(String.format("▲▲▲ partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p));
                    return null;
                })
                .get();
    }

    private KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<Integer, String>
    createKafkaMessageDrivenChannelAdapter() {
        return Kafka.messageDrivenChannelAdapter(kafkaConsumerFactory
                , KafkaMessageDrivenChannelAdapter.ListenerMode.record, TOPIC_NAME)
                .configureListenerContainer(c ->
                        c.ackMode(ContainerProperties.AckMode.RECORD)
                                .idleEventInterval(100L))
                .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel
                        , new RawRecordHeaderErrorMessageStrategy()))
                .retryTemplate(new RetryTemplate())
                .filterInRetry(true);
    }
  • topic1ConsumerFlow メソッドの from(...) の中に記述していた Kafka.messageDrivenChannelAdapter(...)~.filterInRetry(true); の部分を抽出して private メソッド createKafkaMessageDrivenChannelAdapter を作成します。
  • topic1ConsumerFlow メソッドのメソッド名を topic1Consumer1Flow に変更し、コピーして topic1Consumer2Flow, topic1Consumer3Flow メソッドを作成します。

Kafka+zookeeper のコンテナを再起動して topic を作成した後、アプリケーションを実行すると、

f:id:ksby:20190729003711p:plain

  • consumer が3つほぼ同時に作成されて、consumer 1つに付き partition 1つが assing されます。assign された後からメッセージを受信します。
  • producer から送信されたメッセージは partition の 0 → 2 → 1 の順に格納されています。メッセージに key がセットされていないのでラウンドロビンで partition に格納されているようです。

メッセージの数値の 1桁目が 0~5 なら partition 0、6~8 なら partition 1、9 なら partition 2 へ送信する

src/main/java/ksbysample/eipapp/kafka/KafkaDslSampleConfig.java を以下のように変更します。

    private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
            ProducerFactory<Integer, String> producerFactory, String topic) {
        return Kafka
                .outboundChannelAdapter(producerFactory)
                .sync(true)
                // kafka_messageKey ヘッダにセットされている値を key にする場合には以下のように書く
                // .messageKey(m -> m
                //         .getHeaders()
                //         .get(KafkaHeaders.MESSAGE_KEY))
                .headerMapper(mapper())
                // メッセージの header に "kafka_topic" があれば、そこにセットされている topic へ、
                // なければ第2引数 topic で渡された topic へ送信する
                .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
                .partitionId(m -> {
                    int payload = Integer.valueOf((String) m.getPayload());
                    int remainder  = payload % 10;
                    if ((0 <= remainder ) && (remainder  <= 5)) {
                        return 0;
                    } else if ((6 <= remainder ) && (remainder  <= 8)) {
                        return 1;
                    } else {
                        return 2;
                    }
                })
                ;
    }

    @Bean
    public IntegrationFlow topic1Consumer1Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter(0))
                .handle((p, h) -> {
                    log.error(String.format("★★★ partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p));
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow topic1Consumer2Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter(1))
                .handle((p, h) -> {
                    log.error(String.format("●●● partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p));
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow topic1Consumer3Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter(2))
                .handle((p, h) -> {
                    log.error(String.format("▲▲▲ partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p));
                    return null;
                })
                .get();
    }

    private KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<Integer, String>
    createKafkaMessageDrivenChannelAdapter(int partition) {
        return Kafka.messageDrivenChannelAdapter(kafkaConsumerFactory
                , KafkaMessageDrivenChannelAdapter.ListenerMode.record, new TopicPartitionInitialOffset(TOPIC_NAME, partition))
                .configureListenerContainer(c ->
                        c.ackMode(ContainerProperties.AckMode.RECORD)
                                .idleEventInterval(100L))
                .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel
                        , new RawRecordHeaderErrorMessageStrategy()))
                .retryTemplate(new RetryTemplate())
                .filterInRetry(true);
    }
  • kafkaMessageHandler メソッドで .partitionId(...) を追加して、メッセージの payload の数値の 1桁目を見て partitionId を指定するようにします。
  • createKafkaMessageDrivenChannelAdapter メソッドに int partition の引数を追加し、Kafka.messageDrivenChannelAdapter(...) の第3引数を TOPIC_NAMEnew TopicPartitionInitialOffset(TOPIC_NAME, partition) に変更して、引数で渡された parition にメッセージを送信するようにします。
  • topic1Consumer1Flow ~ topic1Consumer3Flow メソッド内の createKafkaMessageDrivenChannelAdapter(...) を呼び出しているところで引数に 0 ~ 2 の paritionId を指定します。

Kafka+zookeeper のコンテナを再起動して topic を作成した後アプリケーションを実行すると、メッセージの数値の 1桁目が 0~5 なら topic1Consumer1Flow(★★★)、6~8 なら topic1Consumer2Flow(●●●)、9 なら topic1Consumer3Flow(▲▲▲)で受信されていることが確認できます。

f:id:ksby:20190811111801p:plain

履歴

2019/08/11
初版発行。