Spring Boot + Spring Integration でいろいろ試してみる ( その38 )( Docker Compose でサーバを構築する、Kafka 編5 - broker や zookeeper 停止時の動作を確認する )
概要
記事一覧はこちらです。
今回は kafka の broker や zookeeper のコンテナを停止して、その時の動作を確認してみます。
参照したサイト・書籍
Apache Kafkaって本当に大丈夫?~故障検証のオーバービューと興味深い挙動の紹介~
https://www.slideshare.net/hadoopxnttdata/hadoop-spark-conference-japan-2019nttdataKafka 2.3 Documentation - 3.1 Broker Configs
https://kafka.apache.org/documentation/#brokerconfigsKafka トピック区画 Leader の再割り当て
https://www.ibm.com/support/knowledgecenter/ja/SSCVHB_1.2.0/admin/tnpi_reassign_partitions.html
目次
- 停止時の動作を確認しやすくするために org.apache.kafka.clients.NetworkClient のログを OFF にし、producer, consumer にリトライ処理を追加する
- leader になっている broker を1台だけ停止してみる
- leader が有効な partition が1つだけになるまで broker を停止してみる
- メッセージ送受信中に zookeeper を停止してみる
- kafka-reassign-partitions コマンドで partition に assign する broker を再割り当てする
手順
停止時の動作を確認しやすくするために org.apache.kafka.clients.NetworkClient のログを OFF にし、producer, consumer にリトライ処理を追加する
broker を停止して1つも有効な leader が設定されていない partition が発生すると下の画像のように org.apache.kafka.clients.NetworkClient が大量のログを出力して動作を追えなくなるので、
src/main/resources/application.properties に logging.level.org.apache.kafka.clients.NetworkClient=OFF
を追加してログが出力されないようにします。
spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092 spring.kafka.consumer.group-id=topic1-consumer-group1 logging.level.org.apache.kafka.clients.NetworkClient=OFF
また broker を停止した時にメッセージの送信漏れ、受信漏れを出にくくするために src/main/java/ksbysample/eipapp/kafka/KafkaDslSampleConfig.java を変更してリトライ処理を追加します。
@Slf4j @Configuration public class KafkaDslSampleConfig { .......... @Bean public IntegrationFlow topic1ProducerFlow() { return IntegrationFlows .from(countSupplier() , e -> e.poller(Pollers.fixedDelay(1000))) // メッセージの kafka_topic ヘッダに topic 名をセットすると // kafkaMessageHandler メソッドの第2引数に指定した topic ではなく // kafka_topic ヘッダの topic に送信される // .enrichHeaders(h -> h.header(KafkaHeaders.TOPIC, TOPIC_NAME)) .log(LoggingHandler.Level.WARN) .handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME) , e -> e.advice(retryAdvice())) .get(); } .......... private KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<Integer, String> createKafkaMessageDrivenChannelAdapter() { return Kafka.messageDrivenChannelAdapter(kafkaConsumerFactory , KafkaMessageDrivenChannelAdapter.ListenerMode.record, TOPIC_NAME) .configureListenerContainer(c -> c.ackMode(ContainerProperties.AckMode.RECORD) .idleEventInterval(100L)) .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel , new RawRecordHeaderErrorMessageStrategy())) .retryTemplate(retryTemplate()) .filterInRetry(true); } /** * リトライ回数は最大3回、リトライ前に 5秒待機する RetryTemplate */ @Bean public RetryTemplate retryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3, singletonMap(Exception.class, true))); FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(5000); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); return retryTemplate; } @Bean public RequestHandlerRetryAdvice retryAdvice() { RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice(); retryAdvice.setRetryTemplate(retryTemplate()); return retryAdvice; } }
- retryTemplate bean、retryAdvice bean の定義を追加します。
- メッセージを送信する topic1ProducerFlow bean で
.handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME))
→.handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME), e -> e.advice(retryAdvice()))
に変更します。 - メッセージを受信する bean から呼び出される createKafkaMessageDrivenChannelAdapter メソッドで
.retryTemplate(new RetryTemplate())
→.retryTemplate(retryTemplate())
に変更します。
leader になっている broker を1台だけ停止してみる
docker-compose up -d
で kafka cluster を起動した後、kafkacat で broker が追加されるまで待機してから Topic1 を作成します。
アプリケーションを起動してメッセージの送受信が行われることを確認します。
partition 0 の leader である cp-kafka4 を停止してみます。
各 partition の isrs(in-sync replica)から 4 がなくなり、partition 0 の leader が 4 → 2 に切り替わります。
アプリケーションのログを見ると Exception thrown when sending a message with key='null' and payload='298' to topic Topic1:
というメッセージが1度出力されますが、数字が途切れずにメッセージの送受信が継続します。
leader が有効な partition が1つだけになるまで broker を停止してみる
cp-kafka3 → cp-kafka5 → cp-kafka2 の順で停止して leader が有効な partition が1つだけになるようにしてみます。
cp-kafka5 まで停止すると有効な leader が設定されていない partition が発生しますが、
エラーあるいは調整中と思われるログが出力された後、leader が有効な partition 0, 2 を利用してメッセージの送受信が続きます。数字も途切れていません。
cp-kafka2 まで停止して partition 2 だけ leader が有効な状態にしても、
partition 2 だけでメッセージの送受信が継続されていました(数字も途切れていません)。この状態でもエラーにならずメッセージの送受信が出来るのか。。。
今度は cp-kafka2 ~ cp-kafka5 を再開してみます。
consumer の topic の assign が再度行われますが、しばらくは partition 2 だけで受信し続けます。停止した時と異なり直ぐに partition 0, 1 は使用されませんでした。
しばらくすると partition 0, 1 から再び受信できるようになりました。
Kafka 2.3 Documentation - 3.1 Broker Configs を見てみると leader.imbalance.check.interval.seconds
という設定項目があり default で 300秒で設定されていたので、5分毎にチェックされているようです。
実際に大量のメッセージが送受信されている環境の場合には replica が in-sync の状態になるまで時間がかかったりするのかもしれませんが、broker を停止・起動するだけならばメッセージの送受信は可能な限り継続されるようです。
メッセージ送受信中に zookeeper を停止してみる
docker-compose down
、docker-compose up -d
で kafka cluster を起動し直した後、kafkacat で broker が追加されるまで待機してから Topic1 を作成します。
アプリケーションを実行してメッセージが送受信されていることを確認してから、
zookeeper1~3 を停止してみます。
停止しても特にエラーメッセージは出ずにメッセージの送受信は継続していました。
5分経過してもメッセージの送受信は継続していました。
zookeeper が全て停止している状態で leader にアサインされている broker を1つ停止してみます。kafkacat で現在の状況を確認した後、
partition 1 の leader にアサインされている cp-kafka2 を停止します。
アプリケーションのログを見ると INFO, ERROR ログが出るようになりましたが、メッセージの送受信は継続しています(ただしかなりペースが落ちます)。
kafkacat を実行すると partition 1 の leader は 2 のままでした。zookeeper が全て落ちているので情報が更新されませんが、kafka 側で取得したデータをキャッシュしていて参照はできるようです。
zookeeper を 1 → 2 → 3 の順で起動してみます。
1台だけ起動しても何も変わりませんでしたが、1, 2 の 2台まで起動すると、
partition 1 の leader も更新されて、
メッセージの送受信も正常に行われるようになりました。
kafka-reassign-partitions コマンドで partition に assign する broker を再割り当てする
cp-kafka1~3 だけ起動して Topic1 を作成→アプリケーション起動した後、cp-kafka2, 3 を停止してから cp-kafka4, 5 を起動して、Topic1 の partition の replica に割り当てられている broker を 2, 3 → 4, 5 に変更した時にアプリケーションの動作を確認してみます。
docker-compose down
、docker-compose up -d
で kafka cluster を起動してから cp-kafka4~5 を停止します。
kafkacat で確認すると __confluent.support.metrics
partition の replica に cp-kafka4 が割り当てられているので cp-kafka1~3 に調整します。
プロジェクトのルート直下に reassign-partitions.json というファイルを新規作成した後、以下の内容を記述します。
{ "version": 1, "partitions": [ { "topic": "__confluent.support.metrics", "partition": 0, "replicas": [1, 2, 3] } ] }
docker ps
コマンドで cp-kafka1 の CONTAINER ID を確認してから docker cp reassign-partitions.json <CONTAINER ID>:/tmp
コマンドで reassign-partitions.json を cp-kafka1 の /tmp の下にコピーします。
cp-kafka1 に接続して /tmp の下に移動してから kafka-reassign-partitions --zookeeper cp-zookeeper1:12181 --reassignment-json-file reassign-partitions.json --execute
コマンドを実行します。
kafkacat で確認すると __confluent.support.metrics
partition の replica が 3, 1, 2 になっています。
Topic1 を作成した後(全ての partition の replica には 1~3 が割り当てられています)、
アプリケーションを起動してメッセージの送受信が行われていることを確認してから、
cp-kafka2~3 を停止 → cp-kafka4~5 を起動します。
partition の replica を 1, 4, 5 に変更します。プロジェクトのルート直下に topics-to-move.json というファイルを新規作成した後、以下の内容を記述します。topics には Topic1 だけでなく存在する topic を全て記述します。
{ "version": 1, "topics": [ { "topic": "Topic1" }, { "topic": "__confluent.support.metrics" }, { "topic": "__consumer_offsets" } ] }
topics-to-move.json を docker cp
コマンドで cp-kafka1 の /tmp の下にアップロードし、
cp-kafka1 に接続して /tmp の下に移動した後、kafka-reassign-partitions --zookeeper cp-zookeeper1:12181 --topics-to-move-json-file topics-to-move.json --broker-list "1,4,5" --generate
コマンドを実行します。
Current partition replica assignment
と Proposed partition reassignment configuration
が出力されますので、Proposed partition reassignment configuration
の下に出力されている json をコピーして reassign-partitions.json にペーストします。
docker cp
コマンドで reassign-partitions.json を cp-kafka1 の /tmp の下にコピーしてから、
cp-kafka1 に接続して /tmp の下に移動した後、kafka-reassign-partitions --zookeeper cp-zookeeper1:12181 --reassignment-json-file reassign-partitions.json --execute
コマンドを実行します。
kafkacat を実行すると全ての partition の replica, isr が 1, 4, 5 に変わっています。
アプリケーションのログを見るとメッセージの送受信も継続していました。
現在全ての partition の leader が cp-kafka1 になっているので cp-kafka1 を停止してみます。cp-kafka1 を停止して kafkacat で全ての partition の leader が 4, 5 に変更されたことを確認した後、
アプリケーションのログを見ると、しばらくエラーと調整中と思われるログが出た後メッセージの送受信が正常に送受信されている状態に戻りました。
ただし、この確認は何回か実施してみたのですが、メッセージの送受信が元に戻らなかった時もあったんですよね。。。 今の設定あるいは実装の場合、必ず復旧すると思わない方がよいのかもしれません。
次は cp-schema-registry コンテナを追加して Apache Avro を試してみるつもりです。
履歴
2019/08/19
初版発行。