かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その43 )( Docker Compose でサーバを構築する、Kafka 編10 - consumer の metrics を収集・表示する2 )

概要

記事一覧はこちらです。

前回作成した consumer の metrics を収集・表示する仕組みで、いろいろ条件を変えてどのように表示内容が変わるのかを試してみます。

参照したサイト・書籍

目次

  1. アプリケーションを動かして consumer の metrics を表示する
    1. producer のメッセージ送信間隔 1000 ミリ秒、partition x 3、consumer x 3
    2. producer のメッセージ送信間隔 10 ミリ秒、partition x 3、consumer x 3
    3. producer のメッセージ送信間隔 1 ミリ秒、partition x 3、consumer x 3
    4. producer のメッセージ送信間隔 1 ミリ秒、partition x 6、consumer x 6
    5. producer のメッセージ送信間隔 1 ミリ秒、partition x 9、consumer x 9
  2. 考察

手順

アプリケーションを動かして consumer の metrics を表示する

producer のメッセージ送信間隔 1000 ミリ秒、partition x 3、consumer x 3

以下の手順で動かします。

  • docker-compose downdocker-compose up -d コマンドを実行してコンテナを再起動する。
  • http://localhost:1936/haproxy-cp-schema-registry?stats にアクセスして cp-schema-registry が全て利用可能になるまで待機する。
  • cp-kafka1 に接続してから kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions N --replication-factor 3 --if-not-exists コマンドを実行して partition を作成する。partition 数(N)は試そうとしている条件に応じた値にする。
  • consumer 数を試そうとしている条件に調整してアプリケーションを実行する。また producer、consumer は送受信するメッセージをログに出力しないようにする。
    @Bean
    public IntegrationFlow topic1ProducerFlow() {
        return IntegrationFlows
                .from(countSupplier()
                        , e -> e.poller(Pollers.fixedDelay(1)))
                // メッセージの kafka_topic ヘッダに topic 名をセットすると
                // kafkaMessageHandler メソッドの第2引数に指定した topic ではなく
                // kafka_topic ヘッダの topic に送信される
                // .enrichHeaders(h -> h.header(KafkaHeaders.TOPIC, TOPIC_NAME))
                .<Integer, Counter>transform(p -> Counter.newBuilder()
                        .setCount(p)
                        .setFullName(FULL_NAMES[ThreadLocalRandom.current().nextInt(3)])
                        .build())
//                .log(LoggingHandler.Level.WARN)
                .handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME)
                        , e -> e.advice(retryAdvice()))
                .get();
    }

    ..........

    @Bean
    public IntegrationFlow topic1Consumer1Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter())
                .<Counter>handle((p, h) -> {
//                    log.error(String.format("★★★ partition = %s, count = %s, fullName = %s"
//                            , h.get("kafka_receivedPartitionId"), p.getCount(), p.getFullName()));
                    return null;
                })
                .get();
    }
  • Grafana で consumer の metrics を確認する。

まずはこれまでの実装である producer のメッセージ送信間隔を 1000 ミリ秒、partition x 3、consumer x 3 で動かします。

partition を3つ作成してから、

f:id:ksby:20190916133015p:plain

メッセージ送信間隔 1000 ミリ秒でアプリケーションを実行します。

    @Bean
    public IntegrationFlow topic1ProducerFlow() {
        return IntegrationFlows
                .from(countSupplier()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                ..........

10分程経過してから Grafana を見ると以下の状況でした。

  • 秒間のメッセージ数は 155msg/sec 前後。
  • Topic1 のメッセージ数は 1秒1件なので 60.00msg/min 前後と表示されるが、それ以外に __consumer_offsets という Topic でメッセージが送受信されている(96.00前後)。
  • Topic1 のメッセージ数は cp-kafka1~3 で 20件ずつに分散されているが、consumer_offsets のメッセージ数は cp-kafka2 に集中している。また consumer_offsets は in(write) だけで out(read) はない。
  • consumer の lag はサーバ側の metrics でもクライアント側の metrics でも全く検知されていない。

f:id:ksby:20190916133927p:plain f:id:ksby:20190916134036p:plain f:id:ksby:20190916134159p:plain f:id:ksby:20190916134247p:plain f:id:ksby:20190916134342p:plain f:id:ksby:20190916143403p:plain

producer のメッセージ送信間隔 10 ミリ秒、partition x 3、consumer x 3

今度は producer のメッセージ送信間隔を 1/100 の 10ミリ秒にして動かしてみます。

    @Bean
    public IntegrationFlow topic1ProducerFlow() {
        return IntegrationFlows
                .from(countSupplier()
                        , e -> e.poller(Pollers.fixedDelay(10)))
                ..........

10分程経過してから Grafana を見ると以下の状況でした。

  • 秒間のメッセージ数は 6700~6900msg/sec。
  • Topic1 のメッセージ数は 60 * 100 = 6000 までは増えておらず、3000~4000msg/min 程度。メッセージ送信間隔を 1/100 にしてもメッセージの件数は 100倍にはならない模様。
  • __consumer_offsets のメッセージ数は Topic1 とほとんど変わらない。
  • __consumer_offsets のメッセージの送受信が cp-kafka2 だけで行われているのは前回と変わらず。
  • consumer の lag は最初だけ発生したが、それ以降はほぼ発生していない(線が真っ平らではないのでちょっとだけ発生している模様)。

f:id:ksby:20190916143747p:plain f:id:ksby:20190916143839p:plain f:id:ksby:20190916143943p:plain f:id:ksby:20190916144033p:plain f:id:ksby:20190916144132p:plain f:id:ksby:20190916144229p:plain

またアプリケーションの方で Offset commit failed on partition Topic1-2 at offset 2777: The request timed out. というエラーメッセージが3件発生していました(起動して3分くらい経過してから)。

f:id:ksby:20190916144333p:plain

producer のメッセージ送信間隔 1 ミリ秒、partition x 3、consumer x 3

今度は producer のメッセージ送信間隔を 1ミリ秒にして動かしてみます。

    @Bean
    public IntegrationFlow topic1ProducerFlow() {
        return IntegrationFlows
                .from(countSupplier()
                        , e -> e.poller(Pollers.fixedDelay(1)))
                ..........

10分程経過してから Grafana を見ると以下の状況でした。

  • 秒間のメッセージ数は 14000~20000msg/sec(結構ばらつきがある)。
  • Topic1 のメッセージ数は 7000~10000msg/min 程度。メッセージ送信間隔 10 ミリ秒の時と比較すると約2~2.5倍程度。
  • __consumer_offsets のメッセージ数は Topic1 とほとんど変わらない。
  • __consumer_offsets のメッセージの送受信が cp-kafka2 だけで行われているのは前回と変わらず。
  • consumer の lag は 1,000~2,000件程度発生している。「Kafka Consumer Dashboard」の kafka_consumergroup_group_lag(サーバ側の metrics) と kafka_consumer_records_lag_records(クライアント側の metrics)は同じ件数を表示しているような気がしたが、実際に lag が発生するとグラフの表示内容は結構異なる。

f:id:ksby:20190916201944p:plain f:id:ksby:20190916202034p:plain f:id:ksby:20190916202126p:plain f:id:ksby:20190916202213p:plain f:id:ksby:20190916202302p:plain f:id:ksby:20190916202352p:plain

またアプリケーションの方では Offset commit failed on partition Topic1-2 at offset 2522: The request timed out. の ERROR ログが結構出力されていました。

f:id:ksby:20190916202446p:plain

producer のメッセージ送信間隔 1 ミリ秒、partition x 6、consumer x 6

メッセージ送信間隔 1 ミリ秒だと明らかにメッセージの受信が間に合っていなかったのですが、partition と consumer の数を倍の 6 にしたらどうなるか見てみます。

partition を6つ作成してから、

f:id:ksby:20190916204720p:plain

src/main/java/ksbysample/eipapp/kafka/KafkaDslSampleConfig.java に topic1Consumer4Flow~topic1Consumer6Flow Bean を追加します。

    @Bean
    public IntegrationFlow topic1Consumer3Flow() {
        ..........
    }

    @Bean
    public IntegrationFlow topic1Consumer4Flow() {
        return topic1ConsumerFlow();
    }

    @Bean
    public IntegrationFlow topic1Consumer5low() {
        return topic1ConsumerFlow();
    }

    @Bean
    public IntegrationFlow topic1Consumer6Flow() {
        return topic1ConsumerFlow();
    }

    private IntegrationFlow topic1ConsumerFlow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter())
                .<Counter>handle((p, h) -> {
                    return null;
                })
                .get();
    }

アプリケーションを実行し、10分程経過してから Grafana を見ると以下の状況でした。

  • consumer の lag が劇的に減った。「Kafka Consumer Dashboard」では kafka_consumergroup_group_lag、kafka_consumergroup_group_lag_seconds には lag が表示されたが、kafka_consumer_records_lag_records、kafka_consumer_records_lag_avg_records、kafka_consumer_records_lag_max_records は 0件のままだった。
  • メッセージの件数 Global avg msg/sec (1m) はそれ程変わらず。
  • 今回は __consumer_offsets のメッセージの送受信は cp-kafka1 で行われていた。

f:id:ksby:20190916210134p:plain f:id:ksby:20190916210227p:plain f:id:ksby:20190916210327p:plain f:id:ksby:20190916210416p:plain f:id:ksby:20190916210511p:plain f:id:ksby:20190916210559p:plain

アプリケーションの ERROR ログも1件しか出ていません。

f:id:ksby:20190916210645p:plain

partition と consumer を倍にするくらいでここまで効果が出るとは思いませんでした。

producer のメッセージ送信間隔 1 ミリ秒、partition x 9、consumer x 9

partition と consumer の数を +3 して 9 まで増やせば lag がなくなるのではないか?と思ったので試してみます。

partition を9つ作成してから、

f:id:ksby:20190916222125p:plain

src/main/java/ksbysample/eipapp/kafka/KafkaDslSampleConfig.java に topic1Consumer7Flow~topic1Consumer9Flow Bean を追加します。

    @Bean
    public IntegrationFlow topic1Consumer7Flow() {
        return topic1ConsumerFlow();
    }
    
    @Bean
    public IntegrationFlow topic1Consumer8Flow() {
        return topic1ConsumerFlow();
    }

    @Bean
    public IntegrationFlow topic1Consumer9Flow() {
        return topic1ConsumerFlow();
    }

アプリケーションを実行し、10分程経過してから Grafana を見ると以下の状況でした。

  • lag は発生しても 1~2件程度になった。
  • msg/sec の件数はほぼ変わらず。

f:id:ksby:20190916223341p:plain f:id:ksby:20190916223447p:plain f:id:ksby:20190916223538p:plain f:id:ksby:20190916223627p:plain f:id:ksby:20190916223725p:plain f:id:ksby:20190916223811p:plain

アプリケーションを見ても ERROR ログは全く発生しませんでした。

f:id:ksby:20190916223858p:plain

考察

  • ローカルPC+Docker の構成でも partition と consumer の数を増やせば lag を劇的に減らせる模様。
  • ローカルPC で Docker コンテナで Kafka クラスタを構築して Spring Integration のアプリケーションからメッセージを送受信した場合、partition と consumer の数を調整すれば 8,000~10,000msg/sec くらい(__consumer_offsets へのメッセージ送信を除く)のパフォーマンスは出せる模様。
  • lag が発生しているか否かの情報は Kafka Lag Exporter から収集した metrics の方が信用できそう。Spring Boot Actuator から収集する kafka_consumer_records_lag_records、kafka_consumer_records_lag_avg_records、kafka_consumer_records_lag_max_records は lag が顕著に発生している状況でなければ 0 のままの印象がある。何か設定が足りないのだろうか。。。
  • __consumer_offsets は Topic1 とほぼ同じ件数のメッセージが送信されるがなぜか broker が1台しか利用されない。分散させる設定があるのか不明。

最後に Kafka を使うなら metricsの監視は必須なんだろうなと思いました。あと限界にチャレンジしてみたいとも思いましたが、負荷試験する方法なんてあるのかな?

履歴

2019/09/17
初版発行。