かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その38 )( Docker Compose でサーバを構築する、Kafka 編5 - broker や zookeeper 停止時の動作を確認する )

概要

記事一覧はこちらです。

今回は kafka の broker や zookeeper のコンテナを停止して、その時の動作を確認してみます。

参照したサイト・書籍

  1. Apache Kafkaって本当に大丈夫?~故障検証のオーバービューと興味深い挙動の紹介~
    https://www.slideshare.net/hadoopxnttdata/hadoop-spark-conference-japan-2019nttdata

  2. Kafka 2.3 Documentation - 3.1 Broker Configs
    https://kafka.apache.org/documentation/#brokerconfigs

  3. Kafka トピック区画 Leader の再割り当て
    https://www.ibm.com/support/knowledgecenter/ja/SSCVHB_1.2.0/admin/tnpi_reassign_partitions.html

目次

  1. 停止時の動作を確認しやすくするために org.apache.kafka.clients.NetworkClient のログを OFF にし、producer, consumer にリトライ処理を追加する
  2. leader になっている broker を1台だけ停止してみる
  3. leader が有効な partition が1つだけになるまで broker を停止してみる
  4. メッセージ送受信中に zookeeper を停止してみる
  5. kafka-reassign-partitions コマンドで partition に assign する broker を再割り当てする

手順

停止時の動作を確認しやすくするために org.apache.kafka.clients.NetworkClient のログを OFF にし、producer, consumer にリトライ処理を追加する

broker を停止して1つも有効な leader が設定されていない partition が発生すると下の画像のように org.apache.kafka.clients.NetworkClient が大量のログを出力して動作を追えなくなるので、

f:id:ksby:20190817181251p:plain

src/main/resources/application.properties に logging.level.org.apache.kafka.clients.NetworkClient=OFF を追加してログが出力されないようにします。

spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092
spring.kafka.consumer.group-id=topic1-consumer-group1

logging.level.org.apache.kafka.clients.NetworkClient=OFF

また broker を停止した時にメッセージの送信漏れ、受信漏れを出にくくするために src/main/java/ksbysample/eipapp/kafka/KafkaDslSampleConfig.java を変更してリトライ処理を追加します。

@Slf4j
@Configuration
public class KafkaDslSampleConfig {

    ..........

    @Bean
    public IntegrationFlow topic1ProducerFlow() {
        return IntegrationFlows
                .from(countSupplier()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // メッセージの kafka_topic ヘッダに topic 名をセットすると
                // kafkaMessageHandler メソッドの第2引数に指定した topic ではなく
                // kafka_topic ヘッダの topic に送信される
                // .enrichHeaders(h -> h.header(KafkaHeaders.TOPIC, TOPIC_NAME))
                .log(LoggingHandler.Level.WARN)
                .handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME)
                        , e -> e.advice(retryAdvice()))
                .get();
    }

    ..........

    private KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<Integer, String>
    createKafkaMessageDrivenChannelAdapter() {
        return Kafka.messageDrivenChannelAdapter(kafkaConsumerFactory
                , KafkaMessageDrivenChannelAdapter.ListenerMode.record, TOPIC_NAME)
                .configureListenerContainer(c ->
                        c.ackMode(ContainerProperties.AckMode.RECORD)
                                .idleEventInterval(100L))
                .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel
                        , new RawRecordHeaderErrorMessageStrategy()))
                .retryTemplate(retryTemplate())
                .filterInRetry(true);
    }

    /**
     * リトライ回数は最大3回、リトライ前に 5秒待機する RetryTemplate
     */
    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3, singletonMap(Exception.class, true)));
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(5000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        return retryTemplate;
    }

    @Bean
    public RequestHandlerRetryAdvice retryAdvice() {
        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        retryAdvice.setRetryTemplate(retryTemplate());
        return retryAdvice;
    }

}
  • retryTemplate bean、retryAdvice bean の定義を追加します。
  • メッセージを送信する topic1ProducerFlow bean で .handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME)).handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME), e -> e.advice(retryAdvice())) に変更します。
  • メッセージを受信する bean から呼び出される createKafkaMessageDrivenChannelAdapter メソッドで .retryTemplate(new RetryTemplate()).retryTemplate(retryTemplate()) に変更します。

leader になっている broker を1台だけ停止してみる

docker-compose up -d で kafka cluster を起動した後、kafkacat で broker が追加されるまで待機してから Topic1 を作成します。

f:id:ksby:20190817183027p:plain f:id:ksby:20190817183207p:plain

アプリケーションを起動してメッセージの送受信が行われることを確認します。

f:id:ksby:20190817183417p:plain

partition 0 の leader である cp-kafka4 を停止してみます。

f:id:ksby:20190817183735p:plainf:id:ksby:20190817183858p:plain

各 partition の isrs(in-sync replica)から 4 がなくなり、partition 0 の leader が 4 → 2 に切り替わります。

f:id:ksby:20190817184101p:plain

アプリケーションのログを見ると Exception thrown when sending a message with key='null' and payload='298' to topic Topic1: というメッセージが1度出力されますが、数字が途切れずにメッセージの送受信が継続します。

f:id:ksby:20190817184254p:plain

leader が有効な partition が1つだけになるまで broker を停止してみる

cp-kafka3 → cp-kafka5 → cp-kafka2 の順で停止して leader が有効な partition が1つだけになるようにしてみます。

cp-kafka5 まで停止すると有効な leader が設定されていない partition が発生しますが、

f:id:ksby:20190817185010p:plain

エラーあるいは調整中と思われるログが出力された後、leader が有効な partition 0, 2 を利用してメッセージの送受信が続きます。数字も途切れていません。

f:id:ksby:20190817185211p:plain f:id:ksby:20190817185314p:plain

cp-kafka2 まで停止して partition 2 だけ leader が有効な状態にしても、

f:id:ksby:20190817185558p:plain f:id:ksby:20190817185746p:plain

partition 2 だけでメッセージの送受信が継続されていました(数字も途切れていません)。この状態でもエラーにならずメッセージの送受信が出来るのか。。。

f:id:ksby:20190817190012p:plain f:id:ksby:20190817190115p:plain

今度は cp-kafka2 ~ cp-kafka5 を再開してみます。

f:id:ksby:20190817190540p:plainf:id:ksby:20190817190639p:plain

f:id:ksby:20190817190801p:plain

consumer の topic の assign が再度行われますが、しばらくは partition 2 だけで受信し続けます。停止した時と異なり直ぐに partition 0, 1 は使用されませんでした。

f:id:ksby:20190817191001p:plain

しばらくすると partition 0, 1 から再び受信できるようになりました。

f:id:ksby:20190817191300p:plain

Kafka 2.3 Documentation - 3.1 Broker Configs を見てみると leader.imbalance.check.interval.seconds という設定項目があり default で 300秒で設定されていたので、5分毎にチェックされているようです。

実際に大量のメッセージが送受信されている環境の場合には replica が in-sync の状態になるまで時間がかかったりするのかもしれませんが、broker を停止・起動するだけならばメッセージの送受信は可能な限り継続されるようです。

メッセージ送受信中に zookeeper を停止してみる

docker-compose downdocker-compose up -d で kafka cluster を起動し直した後、kafkacat で broker が追加されるまで待機してから Topic1 を作成します。

アプリケーションを実行してメッセージが送受信されていることを確認してから、

f:id:ksby:20190818194309p:plain

zookeeper1~3 を停止してみます。

f:id:ksby:20190818194423p:plainf:id:ksby:20190818194527p:plain

停止しても特にエラーメッセージは出ずにメッセージの送受信は継続していました。

f:id:ksby:20190818194642p:plain

5分経過してもメッセージの送受信は継続していました。

f:id:ksby:20190818195141p:plain

zookeeper が全て停止している状態で leader にアサインされている broker を1つ停止してみます。kafkacat で現在の状況を確認した後、

f:id:ksby:20190818195407p:plain

partition 1 の leader にアサインされている cp-kafka2 を停止します。

f:id:ksby:20190818195637p:plain

アプリケーションのログを見ると INFO, ERROR ログが出るようになりましたが、メッセージの送受信は継続しています(ただしかなりペースが落ちます)。

f:id:ksby:20190818195900p:plain

kafkacat を実行すると partition 1 の leader は 2 のままでした。zookeeper が全て落ちているので情報が更新されませんが、kafka 側で取得したデータをキャッシュしていて参照はできるようです。

f:id:ksby:20190818200207p:plain

zookeeper を 1 → 2 → 3 の順で起動してみます。

1台だけ起動しても何も変わりませんでしたが、1, 2 の 2台まで起動すると、

f:id:ksby:20190818200620p:plain

partition 1 の leader も更新されて、

f:id:ksby:20190818200735p:plain

メッセージの送受信も正常に行われるようになりました。

f:id:ksby:20190818200912p:plain

kafka-reassign-partitions コマンドで partition に assign する broker を再割り当てする

cp-kafka1~3 だけ起動して Topic1 を作成→アプリケーション起動した後、cp-kafka2, 3 を停止してから cp-kafka4, 5 を起動して、Topic1 の partition の replica に割り当てられている broker を 2, 3 → 4, 5 に変更した時にアプリケーションの動作を確認してみます。

docker-compose downdocker-compose up -d で kafka cluster を起動してから cp-kafka4~5 を停止します。

f:id:ksby:20190819003732p:plain f:id:ksby:20190819003824p:plain

kafkacat で確認すると __confluent.support.metrics partition の replica に cp-kafka4 が割り当てられているので cp-kafka1~3 に調整します。

プロジェクトのルート直下に reassign-partitions.json というファイルを新規作成した後、以下の内容を記述します。

{
  "version": 1,
  "partitions": [
   {
      "topic": "__confluent.support.metrics",
      "partition": 0,
      "replicas": [1, 2, 3]
    }
  ]
}

docker ps コマンドで cp-kafka1 の CONTAINER ID を確認してから docker cp reassign-partitions.json <CONTAINER ID>:/tmp コマンドで reassign-partitions.json を cp-kafka1 の /tmp の下にコピーします。

f:id:ksby:20190819004606p:plain

cp-kafka1 に接続して /tmp の下に移動してから kafka-reassign-partitions --zookeeper cp-zookeeper1:12181 --reassignment-json-file reassign-partitions.json --execute コマンドを実行します。

f:id:ksby:20190819004721p:plain

kafkacat で確認すると __confluent.support.metrics partition の replica が 3, 1, 2 になっています。

f:id:ksby:20190819005637p:plain

Topic1 を作成した後(全ての partition の replica には 1~3 が割り当てられています)、

f:id:ksby:20190819005758p:plain

アプリケーションを起動してメッセージの送受信が行われていることを確認してから、

f:id:ksby:20190819005944p:plain

cp-kafka2~3 を停止 → cp-kafka4~5 を起動します。

f:id:ksby:20190819010158p:plain f:id:ksby:20190819011206p:plain

partition の replica を 1, 4, 5 に変更します。プロジェクトのルート直下に topics-to-move.json というファイルを新規作成した後、以下の内容を記述します。topics には Topic1 だけでなく存在する topic を全て記述します。

{
  "version": 1,
  "topics": [
    { "topic": "Topic1" },
    { "topic": "__confluent.support.metrics" },
    { "topic": "__consumer_offsets" }
  ]
}

topics-to-move.jsondocker cp コマンドで cp-kafka1 の /tmp の下にアップロードし、

f:id:ksby:20190819010616p:plain

cp-kafka1 に接続して /tmp の下に移動した後、kafka-reassign-partitions --zookeeper cp-zookeeper1:12181 --topics-to-move-json-file topics-to-move.json --broker-list "1,4,5" --generate コマンドを実行します。

f:id:ksby:20190819010814p:plain f:id:ksby:20190819010941p:plain

Current partition replica assignmentProposed partition reassignment configuration が出力されますので、Proposed partition reassignment configuration の下に出力されている json をコピーして reassign-partitions.json にペーストします。

docker cp コマンドで reassign-partitions.json を cp-kafka1 の /tmp の下にコピーしてから、

f:id:ksby:20190819011628p:plain

cp-kafka1 に接続して /tmp の下に移動した後、kafka-reassign-partitions --zookeeper cp-zookeeper1:12181 --reassignment-json-file reassign-partitions.json --execute コマンドを実行します。

f:id:ksby:20190819011810p:plain f:id:ksby:20190819011910p:plain

kafkacat を実行すると全ての partition の replica, isr が 1, 4, 5 に変わっています。

f:id:ksby:20190819012055p:plain

アプリケーションのログを見るとメッセージの送受信も継続していました。

f:id:ksby:20190819012241p:plain

現在全ての partition の leader が cp-kafka1 になっているので cp-kafka1 を停止してみます。cp-kafka1 を停止して kafkacat で全ての partition の leader が 4, 5 に変更されたことを確認した後、

f:id:ksby:20190819012414p:plain f:id:ksby:20190819012604p:plain

アプリケーションのログを見ると、しばらくエラーと調整中と思われるログが出た後メッセージの送受信が正常に送受信されている状態に戻りました。

f:id:ksby:20190819012810p:plain f:id:ksby:20190819012904p:plain f:id:ksby:20190819013010p:plain

ただし、この確認は何回か実施してみたのですが、メッセージの送受信が元に戻らなかった時もあったんですよね。。。 今の設定あるいは実装の場合、必ず復旧すると思わない方がよいのかもしれません。

次は cp-schema-registry コンテナを追加して Apache Avro を試してみるつもりです。

履歴

2019/08/19
初版発行。