かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その38 )( Docker Compose でサーバを構築する、Kafka 編5 - broker や zookeeper 停止時の動作を確認する )

概要

記事一覧はこちらです。

今回は kafka の broker や zookeeper のコンテナを停止して、その時の動作を確認してみます。

参照したサイト・書籍

  1. Apache Kafkaって本当に大丈夫?~故障検証のオーバービューと興味深い挙動の紹介~
    https://www.slideshare.net/hadoopxnttdata/hadoop-spark-conference-japan-2019nttdata

  2. Kafka 2.3 Documentation - 3.1 Broker Configs
    https://kafka.apache.org/documentation/#brokerconfigs

  3. Kafka トピック区画 Leader の再割り当て
    https://www.ibm.com/support/knowledgecenter/ja/SSCVHB_1.2.0/admin/tnpi_reassign_partitions.html

目次

  1. 停止時の動作を確認しやすくするために org.apache.kafka.clients.NetworkClient のログを OFF にし、producer, consumer にリトライ処理を追加する
  2. leader になっている broker を1台だけ停止してみる
  3. leader が有効な partition が1つだけになるまで broker を停止してみる
  4. メッセージ送受信中に zookeeper を停止してみる
  5. kafka-reassign-partitions コマンドで partition に assign する broker を再割り当てする

手順

停止時の動作を確認しやすくするために org.apache.kafka.clients.NetworkClient のログを OFF にし、producer, consumer にリトライ処理を追加する

broker を停止して1つも有効な leader が設定されていない partition が発生すると下の画像のように org.apache.kafka.clients.NetworkClient が大量のログを出力して動作を追えなくなるので、

f:id:ksby:20190817181251p:plain

src/main/resources/application.properties に logging.level.org.apache.kafka.clients.NetworkClient=OFF を追加してログが出力されないようにします。

spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092
spring.kafka.consumer.group-id=topic1-consumer-group1

logging.level.org.apache.kafka.clients.NetworkClient=OFF

また broker を停止した時にメッセージの送信漏れ、受信漏れを出にくくするために src/main/java/ksbysample/eipapp/kafka/KafkaDslSampleConfig.java を変更してリトライ処理を追加します。

@Slf4j
@Configuration
public class KafkaDslSampleConfig {

    ..........

    @Bean
    public IntegrationFlow topic1ProducerFlow() {
        return IntegrationFlows
                .from(countSupplier()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // メッセージの kafka_topic ヘッダに topic 名をセットすると
                // kafkaMessageHandler メソッドの第2引数に指定した topic ではなく
                // kafka_topic ヘッダの topic に送信される
                // .enrichHeaders(h -> h.header(KafkaHeaders.TOPIC, TOPIC_NAME))
                .log(LoggingHandler.Level.WARN)
                .handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME)
                        , e -> e.advice(retryAdvice()))
                .get();
    }

    ..........

    private KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<Integer, String>
    createKafkaMessageDrivenChannelAdapter() {
        return Kafka.messageDrivenChannelAdapter(kafkaConsumerFactory
                , KafkaMessageDrivenChannelAdapter.ListenerMode.record, TOPIC_NAME)
                .configureListenerContainer(c ->
                        c.ackMode(ContainerProperties.AckMode.RECORD)
                                .idleEventInterval(100L))
                .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel
                        , new RawRecordHeaderErrorMessageStrategy()))
                .retryTemplate(retryTemplate())
                .filterInRetry(true);
    }

    /**
     * リトライ回数は最大3回、リトライ前に 5秒待機する RetryTemplate
     */
    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3, singletonMap(Exception.class, true)));
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(5000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        return retryTemplate;
    }

    @Bean
    public RequestHandlerRetryAdvice retryAdvice() {
        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        retryAdvice.setRetryTemplate(retryTemplate());
        return retryAdvice;
    }

}
  • retryTemplate bean、retryAdvice bean の定義を追加します。
  • メッセージを送信する topic1ProducerFlow bean で .handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME)).handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME), e -> e.advice(retryAdvice())) に変更します。
  • メッセージを受信する bean から呼び出される createKafkaMessageDrivenChannelAdapter メソッドで .retryTemplate(new RetryTemplate()).retryTemplate(retryTemplate()) に変更します。

leader になっている broker を1台だけ停止してみる

docker-compose up -d で kafka cluster を起動した後、kafkacat で broker が追加されるまで待機してから Topic1 を作成します。

f:id:ksby:20190817183027p:plain f:id:ksby:20190817183207p:plain

アプリケーションを起動してメッセージの送受信が行われることを確認します。

f:id:ksby:20190817183417p:plain

partition 0 の leader である cp-kafka4 を停止してみます。

f:id:ksby:20190817183735p:plainf:id:ksby:20190817183858p:plain

各 partition の isrs(in-sync replica)から 4 がなくなり、partition 0 の leader が 4 → 2 に切り替わります。

f:id:ksby:20190817184101p:plain

アプリケーションのログを見ると Exception thrown when sending a message with key='null' and payload='298' to topic Topic1: というメッセージが1度出力されますが、数字が途切れずにメッセージの送受信が継続します。

f:id:ksby:20190817184254p:plain

leader が有効な partition が1つだけになるまで broker を停止してみる

cp-kafka3 → cp-kafka5 → cp-kafka2 の順で停止して leader が有効な partition が1つだけになるようにしてみます。

cp-kafka5 まで停止すると有効な leader が設定されていない partition が発生しますが、

f:id:ksby:20190817185010p:plain

エラーあるいは調整中と思われるログが出力された後、leader が有効な partition 0, 2 を利用してメッセージの送受信が続きます。数字も途切れていません。

f:id:ksby:20190817185211p:plain f:id:ksby:20190817185314p:plain

cp-kafka2 まで停止して partition 2 だけ leader が有効な状態にしても、

f:id:ksby:20190817185558p:plain f:id:ksby:20190817185746p:plain

partition 2 だけでメッセージの送受信が継続されていました(数字も途切れていません)。この状態でもエラーにならずメッセージの送受信が出来るのか。。。

f:id:ksby:20190817190012p:plain f:id:ksby:20190817190115p:plain

今度は cp-kafka2 ~ cp-kafka5 を再開してみます。

f:id:ksby:20190817190540p:plainf:id:ksby:20190817190639p:plain

f:id:ksby:20190817190801p:plain

consumer の topic の assign が再度行われますが、しばらくは partition 2 だけで受信し続けます。停止した時と異なり直ぐに partition 0, 1 は使用されませんでした。

f:id:ksby:20190817191001p:plain

しばらくすると partition 0, 1 から再び受信できるようになりました。

f:id:ksby:20190817191300p:plain

Kafka 2.3 Documentation - 3.1 Broker Configs を見てみると leader.imbalance.check.interval.seconds という設定項目があり default で 300秒で設定されていたので、5分毎にチェックされているようです。

実際に大量のメッセージが送受信されている環境の場合には replica が in-sync の状態になるまで時間がかかったりするのかもしれませんが、broker を停止・起動するだけならばメッセージの送受信は可能な限り継続されるようです。

メッセージ送受信中に zookeeper を停止してみる

docker-compose downdocker-compose up -d で kafka cluster を起動し直した後、kafkacat で broker が追加されるまで待機してから Topic1 を作成します。

アプリケーションを実行してメッセージが送受信されていることを確認してから、

f:id:ksby:20190818194309p:plain

zookeeper1~3 を停止してみます。

f:id:ksby:20190818194423p:plainf:id:ksby:20190818194527p:plain

停止しても特にエラーメッセージは出ずにメッセージの送受信は継続していました。

f:id:ksby:20190818194642p:plain

5分経過してもメッセージの送受信は継続していました。

f:id:ksby:20190818195141p:plain

zookeeper が全て停止している状態で leader にアサインされている broker を1つ停止してみます。kafkacat で現在の状況を確認した後、

f:id:ksby:20190818195407p:plain

partition 1 の leader にアサインされている cp-kafka2 を停止します。

f:id:ksby:20190818195637p:plain

アプリケーションのログを見ると INFO, ERROR ログが出るようになりましたが、メッセージの送受信は継続しています(ただしかなりペースが落ちます)。

f:id:ksby:20190818195900p:plain

kafkacat を実行すると partition 1 の leader は 2 のままでした。zookeeper が全て落ちているので情報が更新されませんが、kafka 側で取得したデータをキャッシュしていて参照はできるようです。

f:id:ksby:20190818200207p:plain

zookeeper を 1 → 2 → 3 の順で起動してみます。

1台だけ起動しても何も変わりませんでしたが、1, 2 の 2台まで起動すると、

f:id:ksby:20190818200620p:plain

partition 1 の leader も更新されて、

f:id:ksby:20190818200735p:plain

メッセージの送受信も正常に行われるようになりました。

f:id:ksby:20190818200912p:plain

kafka-reassign-partitions コマンドで partition に assign する broker を再割り当てする

cp-kafka1~3 だけ起動して Topic1 を作成→アプリケーション起動した後、cp-kafka2, 3 を停止してから cp-kafka4, 5 を起動して、Topic1 の partition の replica に割り当てられている broker を 2, 3 → 4, 5 に変更した時にアプリケーションの動作を確認してみます。

docker-compose downdocker-compose up -d で kafka cluster を起動してから cp-kafka4~5 を停止します。

f:id:ksby:20190819003732p:plain f:id:ksby:20190819003824p:plain

kafkacat で確認すると __confluent.support.metrics partition の replica に cp-kafka4 が割り当てられているので cp-kafka1~3 に調整します。

プロジェクトのルート直下に reassign-partitions.json というファイルを新規作成した後、以下の内容を記述します。

{
  "version": 1,
  "partitions": [
   {
      "topic": "__confluent.support.metrics",
      "partition": 0,
      "replicas": [1, 2, 3]
    }
  ]
}

docker ps コマンドで cp-kafka1 の CONTAINER ID を確認してから docker cp reassign-partitions.json <CONTAINER ID>:/tmp コマンドで reassign-partitions.json を cp-kafka1 の /tmp の下にコピーします。

f:id:ksby:20190819004606p:plain

cp-kafka1 に接続して /tmp の下に移動してから kafka-reassign-partitions --zookeeper cp-zookeeper1:12181 --reassignment-json-file reassign-partitions.json --execute コマンドを実行します。

f:id:ksby:20190819004721p:plain

kafkacat で確認すると __confluent.support.metrics partition の replica が 3, 1, 2 になっています。

f:id:ksby:20190819005637p:plain

Topic1 を作成した後(全ての partition の replica には 1~3 が割り当てられています)、

f:id:ksby:20190819005758p:plain

アプリケーションを起動してメッセージの送受信が行われていることを確認してから、

f:id:ksby:20190819005944p:plain

cp-kafka2~3 を停止 → cp-kafka4~5 を起動します。

f:id:ksby:20190819010158p:plain f:id:ksby:20190819011206p:plain

partition の replica を 1, 4, 5 に変更します。プロジェクトのルート直下に topics-to-move.json というファイルを新規作成した後、以下の内容を記述します。topics には Topic1 だけでなく存在する topic を全て記述します。

{
  "version": 1,
  "topics": [
    { "topic": "Topic1" },
    { "topic": "__confluent.support.metrics" },
    { "topic": "__consumer_offsets" }
  ]
}

topics-to-move.jsondocker cp コマンドで cp-kafka1 の /tmp の下にアップロードし、

f:id:ksby:20190819010616p:plain

cp-kafka1 に接続して /tmp の下に移動した後、kafka-reassign-partitions --zookeeper cp-zookeeper1:12181 --topics-to-move-json-file topics-to-move.json --broker-list "1,4,5" --generate コマンドを実行します。

f:id:ksby:20190819010814p:plain f:id:ksby:20190819010941p:plain

Current partition replica assignmentProposed partition reassignment configuration が出力されますので、Proposed partition reassignment configuration の下に出力されている json をコピーして reassign-partitions.json にペーストします。

docker cp コマンドで reassign-partitions.json を cp-kafka1 の /tmp の下にコピーしてから、

f:id:ksby:20190819011628p:plain

cp-kafka1 に接続して /tmp の下に移動した後、kafka-reassign-partitions --zookeeper cp-zookeeper1:12181 --reassignment-json-file reassign-partitions.json --execute コマンドを実行します。

f:id:ksby:20190819011810p:plain f:id:ksby:20190819011910p:plain

kafkacat を実行すると全ての partition の replica, isr が 1, 4, 5 に変わっています。

f:id:ksby:20190819012055p:plain

アプリケーションのログを見るとメッセージの送受信も継続していました。

f:id:ksby:20190819012241p:plain

現在全ての partition の leader が cp-kafka1 になっているので cp-kafka1 を停止してみます。cp-kafka1 を停止して kafkacat で全ての partition の leader が 4, 5 に変更されたことを確認した後、

f:id:ksby:20190819012414p:plain f:id:ksby:20190819012604p:plain

アプリケーションのログを見ると、しばらくエラーと調整中と思われるログが出た後メッセージの送受信が正常に送受信されている状態に戻りました。

f:id:ksby:20190819012810p:plain f:id:ksby:20190819012904p:plain f:id:ksby:20190819013010p:plain

ただし、この確認は何回か実施してみたのですが、メッセージの送受信が元に戻らなかった時もあったんですよね。。。 今の設定あるいは実装の場合、必ず復旧すると思わない方がよいのかもしれません。

次は cp-schema-registry コンテナを追加して Apache Avro を試してみるつもりです。

履歴

2019/08/19
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その37 )( Docker Compose でサーバを構築する、Kafka 編4 - zookeeper と kafka を cluster 構成に変更する )

概要

記事一覧はこちらです。

zookeeper, kafka をどちらも単体サーバで構築していましたが、zookeeper x 3、kafka x 5 の cluster 構成に変更してみます。Topic1 も 3 partition and 3 replicas で作成します。

参照したサイト・書籍

  1. cp-docker-images/examples/kafka-cluster/docker-compose.yml
    https://github.com/confluentinc/cp-docker-images/blob/5.3.0-post/examples/kafka-cluster/docker-compose.yml

  2. Building Apache Kafka cluster using docker-compose and VirtualBox
    https://better-coding.com/building-apache-kafka-cluster-using-docker-compose-and-virtualbox/

  3. confluentinc/cp-kafkacat
    https://hub.docker.com/r/confluentinc/cp-kafkacat

  4. Kafka __consumer_offsets topic
    https://medium.com/@felipedutratine/kafka-consumer-offsets-topic-3d5483cda4a6

目次

  1. docker-compose.yml を変更する
  2. docker-compose up -d で起動し、kafkacat で broker が追加されたことを確認する
  3. Topic1 を 3 partition and 3 replicas で作成する
  4. cp-kafka2 で kafka-console-producer を、cp-kafka3 で kafka-console-consumer を実行しメッセージを送受信してみる
  5. Spring Integration DSL のアプリケーションでメッセージの送受信ができるようにする

手順

docker-compose.yml を変更する

docker-compose.yml を以下のように変更します。

version: '3'

services:
  ..........

  # zookeeper cluster
  cp-zookeeper1:
    image: confluentinc/cp-zookeeper:5.3.0
    container_name: cp-zookeeper1
    ports:
      - "12181:12181"
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: cp-zookeeper1:12888:13888;cp-zookeeper2:22888:23888;cp-zookeeper3:32888:33888
  cp-zookeeper2:
    image: confluentinc/cp-zookeeper:5.3.0
    container_name: cp-zookeeper2
    ports:
      - "22181:22181"
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 22181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: cp-zookeeper1:12888:13888;cp-zookeeper2:22888:23888;cp-zookeeper3:32888:33888
  cp-zookeeper3:
    image: confluentinc/cp-zookeeper:5.3.0
    container_name: cp-zookeeper3
    ports:
      - "32181:32181"
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
      ZOOKEEPER_SERVERS: cp-zookeeper1:12888:13888;cp-zookeeper2:22888:23888;cp-zookeeper3:32888:33888

  ..........

  # Kafka cluster
  # docker exec -it cp-kafka1 /bin/bash
  # docker exec -it cp-kafka2 /bin/bash
  # docker exec -it cp-kafka3 /bin/bash
  # topic の作成・一覧取得・詳細表示
  # kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
  # kafka-topics --zookeeper cp-zookeeper1:12181 --list
  # kafka-topics --zookeeper cp-zookeeper1:12181 --topic Topic1 --describe
  # kafka-console-producer --broker-list localhost:19092 --topic Topic1
  # kafka-console-consumer --bootstrap-server localhost:19092 --topic Topic1
  cp-kafka1:
    image: confluentinc/cp-kafka:5.3.0
    container_name: cp-kafka1
    ports:
      - "19092:19092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka1:19092
    depends_on:
      - cp-zookeeper1
      - cp-zookeeper2
      - cp-zookeeper3
  cp-kafka2:
    image: confluentinc/cp-kafka:5.3.0
    container_name: cp-kafka2
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka2:29092
    depends_on:
      - cp-zookeeper1
      - cp-zookeeper2
      - cp-zookeeper3
  cp-kafka3:
    image: confluentinc/cp-kafka:5.3.0
    container_name: cp-kafka3
    ports:
      - "39092:39092"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka3:39092
    depends_on:
      - cp-zookeeper1
      - cp-zookeeper2
      - cp-zookeeper3
  cp-kafka4:
    image: confluentinc/cp-kafka:5.3.0
    container_name: cp-kafka4
    ports:
      - "49092:49092"
    environment:
      KAFKA_BROKER_ID: 4
      KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka4:49092
    depends_on:
      - cp-zookeeper1
      - cp-zookeeper2
      - cp-zookeeper3
  cp-kafka5:
    image: confluentinc/cp-kafka:5.3.0
    container_name: cp-kafka5
    ports:
      - "59092:59092"
    environment:
      KAFKA_BROKER_ID: 5
      KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper1:12181,cp-zookeeper2:22181,cp-zookeeper3:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://cp-kafka5:59092
    depends_on:
      - cp-zookeeper1
      - cp-zookeeper2
      - cp-zookeeper3
  • confluentinc/cp-zookeeper, confluentinc/cp-kafka の最新バージョンが 5.3.0 になっていたので、このバージョンを使用します。
  • zookeeper の cluster 構成は以下のように設定します。
    • コンテナ名は cp-zookeeper1, cp-zookeeper2, cp-zookeeper3 と末尾に連番の数字(1~3)を付けた名前にします。
    • ポート番号は単体サーバの時は 2181 を使用しましたが、cluster 構成では cp-zookeeper1(12181番), cp-zookeeper2(22181番), cp-zookeeper3(32181番)を使用します。
    • ZOOKEEPER_SERVER_ID には各コンテナで一意の数字になるよう 1~3 の値を設定します。
    • ZOOKEEPER_INIT_LIMIT, ZOOKEEPER_SYNC_LIMIT, ZOOKEEPER_SERVERS は cp-docker-images/examples/kafka-cluster/docker-compose.yml を参考に設定します。
  • kafka の cluster 構成は以下のように設定します。
    • コンテナ名は cp-kafka1, cp-kafka2, cp-kafka3, cp-kafka4, cp-kafka5 と末尾に連番の数字(1~5)を付けた名前にします。
    • ポート番号は単体サーバの時は 9092 を使用しましたが、cluster 構成では cp-kafka1(19092番), cp-kafka2(29092番), cp-kafka3(39092番), cp-kafka4(49092番), cp-kafka5(59092番)を使用します。
    • KAFKA_BROKER_ID には各コンテナで一意の数字になるよう 1~5 の値を設定します。
    • KAFKA_ZOOKEEPER_CONNECT には cluster 構成に変更した cp-zookeeper コンテナ名+ポート番号をカンマ区切りで列挙します。
    • KAFKA_ADVERTISED_LISTENERS には PLAINTEXT://<cp-kafka コンテナ名>:<コンテナで使用するポート番号> を記述します。
    • KAFKA_LISTENER_SECURITY_PROTOCOL_MAP、KAFKA_INTER_BROKER_LISTENER_NAME、KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR の設定は入れません。
    • depends_on に cp-zookeeper コンテナ名を列挙します。

docker-compose up -d で起動し、kafkacat で broker が追加されたことを確認する

kafka cluster は起動しても broker が追加されるまで少し時間がかかります。broker が追加されたことを確認できるよう confluentinc/cp-kafkacat コンテナを利用して kafkacat コマンドを使用できるようにします。

kafka cluster を起動する前に docker run --rm --name cp-kafkacat confluentinc/cp-kafkacat:5.3.0 コマンドを実行して cp-kafkacat の docker image をダウンロードしておきます。

f:id:ksby:20190813161520p:plain ※kafkacat の usage が表示されます。

kafka cluster を起動します。docker-compose up -d コマンドを実行した後、

f:id:ksby:20190813162354p:plain

docker run --rm --name cp-kafkacat --tty --network ksbysample-eipapp-kafka_default confluentinc/cp-kafkacat:5.3.0 kafkacat -b cp-kafka1:19092 -L コマンドを実行して broker が追加されるまで待ちます。最初のうちは ERROR: Failed to acquire metadata: Local: Broker transport failure のメッセージが表示されますが、しばらくすると broker が表示されるようになります。topics の情報まで表示されるようになったら完了です。

f:id:ksby:20190813162545p:plain f:id:ksby:20190813162731p:plain

Topic1 を 3 partition and 3 replicas で作成する

cp-kafka1 コンテナに接続した後、以下のコマンドを実行して Topic1 を作成・確認します。

  • kafka-topics --zookeeper cp-zookeeper1:12181 --create --topic Topic1 --partitions 3 --replication-factor 3 --if-not-exists
  • kafka-topics --zookeeper cp-zookeeper1:12181 --topic Topic1 --describe

f:id:ksby:20190813173008p:plain ※topic の各 partition の一番右側に表示されている isrs は in-sync replica(isr)+複数形の s です。

docker run --rm --name cp-kafkacat --tty --network ksbysample-eipapp-kafka_default confluentinc/cp-kafkacat:5.3.0 kafkacat -b cp-kafka1:19092 -L コマンドを実行すると、こちらでも追加された Topic1 の情報が表示されます。

f:id:ksby:20190813173220p:plain

cp-kafka2 で kafka-console-producer を、cp-kafka3 で kafka-console-consumer を実行しメッセージを送受信してみる

cp-kafka2 コンテナに接続して kafka-console-producer --broker-list localhost:29092 --topic Topic1 コマンドを実行し、cp-kafka3 コンテナに接続して kafka-console-consumer --bootstrap-server localhost:39092 --topic Topic1 コマンドを実行して、メッセージの送受信ができることを確認してみます。

f:id:ksby:20190813202218p:plain

別の kafka に接続していますが、問題なくメッセージの送受信が出来ました。

またメッセージを送受信した後で docker run --rm --name cp-kafkacat --tty --network ksbysample-eipapp-kafka_default confluentinc/cp-kafkacat:5.3.0 kafkacat -b cp-kafka1:19092 -L コマンドを実行すると topic "__consumer_offsets" with 50 partitions というものが作成されていました(しかもレプリケーションされています)。

f:id:ksby:20190813203130p:plain f:id:ksby:20190813203231p:plain

Spring Integration DSL のアプリケーションでメッセージの送受信ができるようにする

※Spring Integration DSL のアプリケーションは 前回 の内容から「メッセージの数値の 1桁目が 0~5 なら partition 0、6~8 なら partition 1、9 なら partition 2 へ送信する」の変更だけ元に戻しました。

ホスト名が localhost だと Docker Compose で起動した kafka cluster と通信ができないため、コンテナ名で通信できるよう以下の設定を hosts ファイルに記述します。

127.0.0.1   cp-kafka1 cp-kafka2 cp-kafka3 cp-kafka4 cp-kafka5

src/main/resources/application.properties を以下のように変更します。

spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092
spring.kafka.consumer.group-id=topic1-consumer-group1
  • spring.kafka.consumer.bootstrap-servers=localhost:9092 を削除し、spring.kafka.bootstrap-servers=cp-kafka1:19092,cp-kafka2:29092,cp-kafka3:39092,cp-kafka4:49092,cp-kafka5:59092 を追加します。producer、consumer で同じサーバを指定する場合には spring.kafka.bootstrap-servers で指定すればよいことに気づきました。。。

アプリケーションを起動すると最初に consumer が受信する topic の partition が調整された後、以前と同様にメッセージを送受信するようになりました。

f:id:ksby:20190814231810p:plain f:id:ksby:20190814231914p:plain

次回は broker(kafka サーバ)を停止した時にどのような動きをするのか見てみます。

履歴

2019/08/16
初版発行。

IntelliJ IDEA 2019.1.4 にバージョンアップしたら Source Code Pro フォントが表示されなくなったので、別途ダウンロード&インストールする

概要

記事一覧はこちらです。

IntelliJ IDEA を 2019.1.4 にバージョンアップしたところ、画面に表示されるフォントが少し見にくくなりました。フォントが変更された?と思って Settings ダイアログを確認したところ、これまで使用していた Source Code Pro がフォントの選択肢として表示されていませんでした。

f:id:ksby:20190811160326p:plain

Source Code Pro は見やすかったので adobe-fonts/source-code-pro からダウンロード& Windows の font ディレクトリにインストールして使用できるようにします。

参照したサイト・書籍

  1. adobe-fonts/source-code-pro
    https://github.com/adobe-fonts/source-code-pro

  2. Rider Source Code Pro font disappeared after upgrading
    https://intellij-support.jetbrains.com/hc/en-us/community/posts/360004316339--Rider-Source-Code-Pro-font-disappeared-after-upgrading

目次

  1. Source Code Pro をダウンロードして Windows の font ディレクトリにインストールする
  2. IntelliJ IDEA で Source Code Pro を使用するよう設定する

手順

Source Code Pro をダウンロードして Windows の font ディレクトリにインストールする

Source Code Pro のファイルをダウンロードする作業用ディレクトリとして D:\tmp ディレクトリを新規作成します。

コマンドプロンプトを起動して、カレントディレクトリを D:\tmp に変更した後、npm install git://github.com/adobe-fonts/source-code-pro.git#release を実行します。

f:id:ksby:20190811161545p:plain

D:\tmp の下に node_modules ディレクトリが作成されており、

f:id:ksby:20190811161704p:plain

D:\tmp\node_modules\source-code-pro\TTF\ に移動するとインストールする ttf ファイル一式があります。

f:id:ksby:20190811161912p:plain

ttf ファイル一式を C:\Windows\Fonts の下へコピーします。

f:id:ksby:20190811162327p:plain

IntelliJ IDEA で Source Code Pro を使用するよう設定する

IntelliJ IDEA を再起動した後、「Settings」ダイアログを開いて「Editor」-「Color Scheme」-「Color Scheme Font」を選択します。フォントの選択肢に「Source Code Pro」が表示されていますので、選択した後「OK」ボタンをクリックしてダイアログを閉じます。

f:id:ksby:20190811162938p:plain

履歴

2019/08/12
初版発行。

AdoptOpenJDK を 11.0.3+7 → 11.0.4+11.2 へ、IntelliJ IDEA を 2019.1.3 → 2019.1.4 へ、Git for Windows を 2.21.1 → 2.22.0 へバージョンアップ

docker-compose.yml から mail-server、rainloop を docker-compose.mail.yml へ分離する

バージョンアップ前に build が正常終了することを確認しようとしたところ、GreenMail のメールサーバ(localhost:25)が起動できなくなっていました。

f:id:ksby:20190811122851p:plain

以前は Docker で mail-server(0.0.0.0:25)を起動していても GreenMail のメールサーバ(localhost:25)も起動できていたので気にしていなかったのですが(というよりは同じ 25番ポートを使用しているのになぜ起動できるのか不思議でしたが)、おそらく Docker Desktop をバージョンアップして起動できなくなったものと思われます。テストの時には mail-server を起動しないよう docker-compose.yml へ分離します。

プロジェクトのルート直下に docker-compose.mail.yml を新規作成し、以下の内容を記述します。

# docker-compose -f docker-compose.mail.yml up -d
# docker-compose -f docker-compose.mail.yml down
version: '3'

services:
  # docker-mailserver
  # https://hub.docker.com/r/tvial/docker-mailserver/
  # https://github.com/tomav/docker-mailserver
  #
  # 起動した docker-mailserver のコンテナ(mail-server) にアクセスする場合には以下のコマンドを実行する
  # docker exec -it mail-server /bin/sh
  #
  # アカウントのパスワードを SHA512 で作成する場合には、コンテナにアクセスして以下のようにコマンドを実行して生成する
  # doveadm pw -s SHA512-CRYPT -u [メールアドレス(例:tanaka@mail.example.com)] -p [パスワード]
  #
  mail-server:
    image: tvial/docker-mailserver:${MAILSERVER_VERSION}
    container_name: mail-server
    hostname: mail
    domainname: example.com
    ports:
      - "25:25"
      - "143:143"
    volumes:
      - ./docker/mail-server/config/:/tmp/docker-mailserver/
    environment:
      - TZ=Asia/Tokyo
      # debug したい場合には以下の行のコメントアウトを解除する
      # - DMS_DEBUG=1
      - ENABLE_SPAMASSASSIN=0
      - ENABLE_CLAMAV=0
      - ENABLE_FETCHMAIL=0
      - ENABLE_FAIL2BAN=0
      - ENABLE_POSTGREY=0
      - ENABLE_POP3=0
    cap_add:
      - NET_ADMIN
      - SYS_PTRACE
    restart: always

  # Webmail クライアント
  rainloop:
    image: hardware/rainloop
    container_name: rainloop
    ports:
      - "8888:8888"
    # TZ=Asia/Tokyo を設定してみたが日本時間に変わらなかったのでコメントアウトしておく
    # environment:
    #   - TZ=Asia/Tokyo
    volumes:
      - ./docker/rainloop/data/:/rainloop/data

networks:
  default:
    external:
      name: ksbysample-webapp-lending_default

docker-compose.yml から mail-server, rainloop の記述を削除します。

docker-compose up -d だけ実行後テストを実行すると、今度は build タスクが正常終了しました。

f:id:ksby:20190811124839p:plain

AdoptOpenJDK を 11.0.3+7 → 11.0.4+11.2 へバージョンアップする

※ksbysample-webapp-lending プロジェクトを開いた状態でバージョンアップしています。

  1. https://adoptopenjdk.net/?variant=openjdk11&jvmVariant=hotspot を見ると 11.0.4+11.2 がダウンロードできるようになっていましたので、11.0.4+11.2 へバージョンアップします。

    f:id:ksby:20190811113842p:plain

  2. OpenJDK11U-jdk_x64_windows_hotspot_11.0.4_11.zip をダウンロードして D:\Java\jdk-11.0.4+11 へインストールした後、環境変数 JAVA_HOME のパスを D:\Java\jdk-11.0.4+11 へ変更します。

    コマンドプロンプトから java -version を実行し、11.0.4 に変更されていることを確認します。

    f:id:ksby:20190811131144p:plain

  3. IntelliJ IDEA を再起動した後、プロジェクトで使用する JDK を 11.0.4+11.2 へ変更します。

  4. 開いているプロジェクトを閉じて「Welcome to IntelliJ IDEA」ダイアログを表示します。

  5. ダイアログ下部の「Configure」-「Structure for New Projects」を選択します。

    f:id:ksby:20190811133125p:plain

  6. 「Default Project Structure」ダイアログが表示されます。画面左側で「Project Settings」-「Project」を選択後、画面右側の「Project SDK」の「New...」ボタンをクリックし、表示されるメニューから「JDK」を選択します。

    f:id:ksby:20190811133336p:plain

  7. 「Select Home Directory for JDK」ダイアログが表示されます。D:\Java\jdk-11.0.3+7 を選択した後、「OK」ボタンをクリックします。

    f:id:ksby:20190811133459p:plain

  8. 「Default Project Structure」ダイアログに戻るので、今度は「Project SDK」の「Edit」ボタンをクリックします。

    f:id:ksby:20190811133654p:plain

  9. 画面左側で「Platform Settings」-「SDKs」が選択された状態になるので、画面右上の入力フィールドで "11" → "11.0.4+11" へ変更します。

    f:id:ksby:20190811133856p:plain

  10. 次に中央のリストから「11.0.3+7」を選択した後、リストの上の「-」ボタンをクリックして削除します。

    f:id:ksby:20190811134024p:plain

  11. 「OK」ボタンをクリックして「Default Project Structure」ダイアログを閉じます。

  12. 「Welcome to IntelliJ IDEA」ダイアログに戻ったら、ksbysample-webapp-lending プロジェクトを開きます。

  13. IntelliJ IDEA のメイン画面が開いたら、メニューから「File」-「Project Structure...」を選択します。

  14. 「Project Structure」ダイアログが表示されます。以下の画像の状態になっているので、

    f:id:ksby:20190811134219p:plain

    「Project SDK」を選択し直します。「Project SDK」を「11.0.4+11」に変更すると「Project language level」も自動で「SDK default (11 - Local variable syntax for lambda pa」が選択されました。

    f:id:ksby:20190811134402p:plain

  15. 「OK」ボタンをクリックして「Project Structure」ダイアログを閉じます。

  16. メイン画面に戻ると画面右下に「Indexing...」の表示が出るので、終了するまで待ちます。

  17. Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。

  18. clean タスク実行 → Rebuild Project 実行 → build タスクを実行して、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。

    f:id:ksby:20190811135443p:plain

  19. Project Tool Window で src/test/groovy/ksbysample、src/test/java/ksbysample でコンテキストメニューを表示して「Run 'Tests in 'ksbysample'' with Coverage」を選択し、テストが全て成功することを確認します。

    f:id:ksby:20190811140034p:plain f:id:ksby:20190811140347p:plain

  20. 特に問題は発生しませんでした。11.0.4+11 で開発を進めます。

IntelliJ IDEA を 2019.1.3 → 2019.1.4 へバージョンアップする

IntelliJ IDEA の 2019.1.4 がリリースされているのでバージョンアップします。

※ksbysample-webapp-lending プロジェクトを開いた状態でバージョンアップしています。

  1. IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。

  2. IDE and Plugin Updates」ダイアログが表示されます。左下に「Update and Restart」ボタンが表示されていますので、「Update and Restart」ボタンをクリックします。

    f:id:ksby:20190811141205p:plain

  3. Plugin の update も表示されました。このまま「Update and Restart」ボタンをクリックします。

    f:id:ksby:20190811141249p:plain

  4. Patch がダウンロードされて IntelliJ IDEA が再起動します。

  5. IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

    f:id:ksby:20190811142136p:plain

  6. IntelliJ IDEA のメインメニューから「Help」-「About」を選択し、2019.1.4 へバージョンアップされていることを確認します。

  7. Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。

  8. clean タスク実行 → Rebuild Project 実行 → build タスクを実行して、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。

    f:id:ksby:20190811143500p:plain

  9. Project Tool Window で src/test/groovy/ksbysample、src/test/java/ksbysample でコンテキストメニューを表示して「Run 'Tests in 'ksbysample'' with Coverage」を選択し、テストが全て成功することを確認します。

    f:id:ksby:20190811144230p:plain f:id:ksby:20190811144539p:plain

Git for Windows を 2.21.1 → 2.22.0 へバージョンアップする

Git for Windows の 2.22.0 がリリースされていたのでバージョンアップします。

  1. https://gitforwindows.org/ の「Download」ボタンをクリックして Git-2.22.0-64-bit.exe をダウンロードします。

  2. Git-2.22.0-64-bit.exe を実行します。

  3. 「Git 2.22.0 Setup」ダイアログが表示されます。[Next >]ボタンをクリックします。

  4. 「Select Components」画面が表示されます。「Git LFS(Large File Support)」だけチェックした状態で [Next >]ボタンをクリックします。

  5. 「Choosing the default editor used by Git」画面が表示されます。「Use Vim (the ubiquitous text editor) as Git's default editor」が選択された状態で [Next >]ボタンをクリックします。

  6. 「Adjusting your PATH environment」画面が表示されます。中央の「Git from the command line and also from 3rd-party software」が選択されていることを確認後、[Next >]ボタンをクリックします。

  7. 「Choosing HTTPS transport backend」画面が表示されます。「Use the OpenSSL library」が選択されていることを確認後、[Next >]ボタンをクリックします。

  8. 「Configuring the line ending conversions」画面が表示されます。一番上の「Checkout Windows-style, commit Unix-style line endings」が選択されていることを確認した後、[Next >]ボタンをクリックします。

  9. 「Configuring the terminal emulator to use with Git Bash」画面が表示されます。「Use Windows'default console window」が選択されていることを確認した後、[Next >]ボタンをクリックします。

  10. 「Configuring extra options」画面が表示されます。「Enable file system caching」だけがチェックされていることを確認した後、[Next >]ボタンをクリックします。

  11. 「Configuring experimental options」画面が表示されます。何もチェックせずに [Install]ボタンをクリックします。

  12. インストールが完了すると「Completing the Git Setup Wizard」のメッセージが表示された画面が表示されます。中央の「View Release Notes」のチェックを外した後、「Finish」ボタンをクリックしてインストーラーを終了します。

  13. コマンドプロンプトを起動して git --version を実行し、git のバージョンが git version 2.22.0.windows.1 になっていることを確認します。

    f:id:ksby:20190811153956p:plain

  14. git-cmd.exe を起動して日本語の表示・入力が問題ないかを確認します。

    f:id:ksby:20190811154116p:plain

  15. 特に問題はないようですので、2.22.0 で作業を進めたいと思います。

Spring Boot + Spring Integration でいろいろ試してみる ( その36 )( Docker Compose でサーバを構築する、Kafka 編3 - Spring Integration DSL で producer, consumer を実装する )

概要

記事一覧はこちらです。

今回は Spring Integration DSL で Kafka の producer, consumer を実装します。

参照したサイト・書籍

  1. spring-projects/spring-integration-kafka
    https://github.com/spring-projects/spring-integration-kafka

  2. spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java
    https://github.com/spring-projects/spring-integration-kafka/blob/b27f8b3eead708d5309a881aa76ac9a244ee5098/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java

  3. Spring for Apache Kafka - 5. Spring Integration
    https://docs.spring.io/spring-kafka/reference/html/#spring-integration

目次

  1. Spring Integration DSL で Kafka の producer, consumer を実装する
  2. consumer の数を 1 → 3 に変更する
  3. メッセージの数値の 1桁目が 0~5 なら partition 0、6~8 なら partition 1、それ以外なら partition 2 へ送信する

手順

Spring Integration DSL で Kafka の producer, consumer を実装する

まずは 1 producer, 1 consumer のサンプルを以下の仕様で実装します。

  • Topic1 という名前の topic を 3 partition and 1 replicas で作成する。
  • 送信するメッセージの serializer, deserializer はデフォルトのもの(key / value どちらも StringSerializer/StringDeserializer、org.springframework.boot.autoconfigure.kafka.KafkaProperties 参照)を使用する。
  • producer からは 1, 2, 3, ... と 1 から始まり +1 ずつ増やした数字の文字列を value にセットした message を1秒に1件送信する。key には何もセットしない。送信時に partition は指定しない。
  • consumer は1つ作成する。受信はメッセージがあれば連続して受信するが、ない場合には 100ms 待機する。

Jackson の ObjectMapper が必要になるので build.gradle に implementation("org.springframework.boot:spring-boot-starter-json") を追加します。

dependencies {
    def lombokVersion = "1.18.8"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.boot:spring-boot-starter-json") {
        exclude group: "org.springframework", module: "spring-web"
    }
    implementation("org.springframework.integration:spring-integration-kafka:3.1.4.RELEASE")
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    ..........
}

src/main/java/ksbysample/eipapp/kafka の下に KafkaDslSampleConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.kafka;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
import org.springframework.integration.kafka.dsl.Kafka;
import org.springframework.integration.kafka.dsl.KafkaProducerMessageHandlerSpec;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageChannel;
import org.springframework.retry.support.RetryTemplate;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

@Slf4j
@Configuration
public class KafkaDslSampleConfig {

    private static final String TOPIC_NAME = "Topic1";

    // kafkaProducerFactory bean, kafkaConsumerFactory bean は
    // org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
    // で生成されている
    // 
    // 設定は application.properties の spring.kafka.producer.~ で行う
    private final ProducerFactory<Integer, String> kafkaProducerFactory;
    // 設定は application.properties の spring.kafka.consumer.~ で行う
    private final ConsumerFactory<Integer, String> kafkaConsumerFactory;

    private final MessageChannel errorChannel;

    private AtomicInteger count = new AtomicInteger(0);

    public KafkaDslSampleConfig(ProducerFactory<Integer, String> kafkaProducerFactory
            , ConsumerFactory<Integer, String> kafkaConsumerFactory
            , MessageChannel errorChannel) {
        this.kafkaProducerFactory = kafkaProducerFactory;
        this.kafkaConsumerFactory = kafkaConsumerFactory;
        this.errorChannel = errorChannel;
    }

    @Bean
    public Supplier<String> countSupplier() {
        return () -> String.valueOf(this.count.addAndGet(1));
    }

    @Bean
    public IntegrationFlow topic1ProducerFlow() {
        return IntegrationFlows
                .from(countSupplier()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // メッセージの kafka_topic ヘッダに topic 名をセットすると
                // kafkaMessageHandler メソッドの第2引数に指定した topic ではなく
                // kafka_topic ヘッダの topic に送信される
                // .enrichHeaders(h -> h.header(KafkaHeaders.TOPIC, TOPIC_NAME))
                .log(LoggingHandler.Level.WARN)
                .handle(kafkaMessageHandler(kafkaProducerFactory, TOPIC_NAME))
                .get();
    }

    @Bean
    public DefaultKafkaHeaderMapper mapper() {
        return new DefaultKafkaHeaderMapper();
    }

    private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
            ProducerFactory<Integer, String> producerFactory, String topic) {
        return Kafka
                .outboundChannelAdapter(producerFactory)
                .sync(true)
                // kafka_messageKey ヘッダにセットされている値を key にする場合には以下のように書く
                // .messageKey(m -> m
                //         .getHeaders()
                //         .get(KafkaHeaders.MESSAGE_KEY))
                .headerMapper(mapper())
                // メッセージの header に "kafka_topic" があれば、そこにセットされている topic へ、
                // なければ第2引数 topic で渡された topic へ送信する
                .topicExpression("headers[kafka_topic] ?: '" + topic + "'");
    }

    @Bean
    public IntegrationFlow topic1ConsumerFlow() {
        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(kafkaConsumerFactory
                        , KafkaMessageDrivenChannelAdapter.ListenerMode.record, TOPIC_NAME)
                        .configureListenerContainer(c ->
                                c.ackMode(ContainerProperties.AckMode.RECORD)
                                        .idleEventInterval(100L)
                                        .id("topic1ConsumerContainer"))
                        .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel
                                , new RawRecordHeaderErrorMessageStrategy()))
                        .retryTemplate(new RetryTemplate())
                        .filterInRetry(true))
                .handle((p, h) -> {
                    log.error("★★★ " + p);
                    return null;
                })
                .get();
    }

}

動作確認します。docker-compose up -d で Kafka+zookeeper のコンテナを起動した後、kafka-topics --zookeeper cp-zookeeper:2181 --create --topic Topic1 --partitions 3 --replication-factor 1 --if-not-exists コマンドで Topic1 を作成します。

f:id:ksby:20190725225230p:plain

アプリケーションを実行してみると、

f:id:ksby:20190725225812p:plain

  • 上のキャプチャには表示されていませんが、起動直後に INFO ログで ConsumerConfig values:ProducerConfig values: と consumer, producer の設定一覧が表示されていました。
  • producer は 1, 2, 3, ... と 1 から順にインクリメントしたメッセージを送信しています(黄色の WARN ログ)。
  • consumer は group に join して partition がアサインされるまでメッセージを受信しません。また今回は初回接続で最初のメッセージから受信するようには設定していないので、partition がアサインされた後に送信された分(6~)から受信しています(赤色の ERROR ログ)。

consumer の数を 1 → 3 に変更する

src/main/java/ksbysample/eipapp/kafka/KafkaDslSampleConfig.java を以下のように変更します。

    @Bean
    public IntegrationFlow topic1Consumer1Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter())
                .handle((p, h) -> {
                    log.error(String.format("★★★ partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p));
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow topic1Consumer2Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter())
                .handle((p, h) -> {
                    log.error(String.format("●●● partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p));
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow topic1Consumer3Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter())
                .handle((p, h) -> {
                    log.error(String.format("▲▲▲ partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p));
                    return null;
                })
                .get();
    }

    private KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<Integer, String>
    createKafkaMessageDrivenChannelAdapter() {
        return Kafka.messageDrivenChannelAdapter(kafkaConsumerFactory
                , KafkaMessageDrivenChannelAdapter.ListenerMode.record, TOPIC_NAME)
                .configureListenerContainer(c ->
                        c.ackMode(ContainerProperties.AckMode.RECORD)
                                .idleEventInterval(100L))
                .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel
                        , new RawRecordHeaderErrorMessageStrategy()))
                .retryTemplate(new RetryTemplate())
                .filterInRetry(true);
    }
  • topic1ConsumerFlow メソッドの from(...) の中に記述していた Kafka.messageDrivenChannelAdapter(...)~.filterInRetry(true); の部分を抽出して private メソッド createKafkaMessageDrivenChannelAdapter を作成します。
  • topic1ConsumerFlow メソッドのメソッド名を topic1Consumer1Flow に変更し、コピーして topic1Consumer2Flow, topic1Consumer3Flow メソッドを作成します。

Kafka+zookeeper のコンテナを再起動して topic を作成した後、アプリケーションを実行すると、

f:id:ksby:20190729003711p:plain

  • consumer が3つほぼ同時に作成されて、consumer 1つに付き partition 1つが assing されます。assign された後からメッセージを受信します。
  • producer から送信されたメッセージは partition の 0 → 2 → 1 の順に格納されています。メッセージに key がセットされていないのでラウンドロビンで partition に格納されているようです。

メッセージの数値の 1桁目が 0~5 なら partition 0、6~8 なら partition 1、9 なら partition 2 へ送信する

src/main/java/ksbysample/eipapp/kafka/KafkaDslSampleConfig.java を以下のように変更します。

    private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
            ProducerFactory<Integer, String> producerFactory, String topic) {
        return Kafka
                .outboundChannelAdapter(producerFactory)
                .sync(true)
                // kafka_messageKey ヘッダにセットされている値を key にする場合には以下のように書く
                // .messageKey(m -> m
                //         .getHeaders()
                //         .get(KafkaHeaders.MESSAGE_KEY))
                .headerMapper(mapper())
                // メッセージの header に "kafka_topic" があれば、そこにセットされている topic へ、
                // なければ第2引数 topic で渡された topic へ送信する
                .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
                .partitionId(m -> {
                    int payload = Integer.valueOf((String) m.getPayload());
                    int remainder  = payload % 10;
                    if ((0 <= remainder ) && (remainder  <= 5)) {
                        return 0;
                    } else if ((6 <= remainder ) && (remainder  <= 8)) {
                        return 1;
                    } else {
                        return 2;
                    }
                })
                ;
    }

    @Bean
    public IntegrationFlow topic1Consumer1Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter(0))
                .handle((p, h) -> {
                    log.error(String.format("★★★ partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p));
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow topic1Consumer2Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter(1))
                .handle((p, h) -> {
                    log.error(String.format("●●● partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p));
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow topic1Consumer3Flow() {
        return IntegrationFlows
                .from(createKafkaMessageDrivenChannelAdapter(2))
                .handle((p, h) -> {
                    log.error(String.format("▲▲▲ partition = %s, value = %s", h.get("kafka_receivedPartitionId"), p));
                    return null;
                })
                .get();
    }

    private KafkaMessageDrivenChannelAdapterSpec.KafkaMessageDrivenChannelAdapterListenerContainerSpec<Integer, String>
    createKafkaMessageDrivenChannelAdapter(int partition) {
        return Kafka.messageDrivenChannelAdapter(kafkaConsumerFactory
                , KafkaMessageDrivenChannelAdapter.ListenerMode.record, new TopicPartitionInitialOffset(TOPIC_NAME, partition))
                .configureListenerContainer(c ->
                        c.ackMode(ContainerProperties.AckMode.RECORD)
                                .idleEventInterval(100L))
                .recoveryCallback(new ErrorMessageSendingRecoverer(errorChannel
                        , new RawRecordHeaderErrorMessageStrategy()))
                .retryTemplate(new RetryTemplate())
                .filterInRetry(true);
    }
  • kafkaMessageHandler メソッドで .partitionId(...) を追加して、メッセージの payload の数値の 1桁目を見て partitionId を指定するようにします。
  • createKafkaMessageDrivenChannelAdapter メソッドに int partition の引数を追加し、Kafka.messageDrivenChannelAdapter(...) の第3引数を TOPIC_NAMEnew TopicPartitionInitialOffset(TOPIC_NAME, partition) に変更して、引数で渡された parition にメッセージを送信するようにします。
  • topic1Consumer1Flow ~ topic1Consumer3Flow メソッド内の createKafkaMessageDrivenChannelAdapter(...) を呼び出しているところで引数に 0 ~ 2 の paritionId を指定します。

Kafka+zookeeper のコンテナを再起動して topic を作成した後アプリケーションを実行すると、メッセージの数値の 1桁目が 0~5 なら topic1Consumer1Flow(★★★)、6~8 なら topic1Consumer2Flow(●●●)、9 なら topic1Consumer3Flow(▲▲▲)で受信されていることが確認できます。

f:id:ksby:20190811111801p:plain

履歴

2019/08/11
初版発行。

Picocli+Spring Boot でコマンドラインアプリケーションを作成してみる

概要

記事一覧はこちらです。

Twitter を見ていたところ picocli というライブラリの 4.0 GA release のツイートを見かけました。picocli のことを知らなかったので調べてみたところ、

  • Java で command line application を作成するための framework。考えられるものはほとんど実装されているのではないだろうか、というぐらい機能が充実している。マニュアルも分かりやすい。
  • picocli-spring-boot-starter が提供されている。
  • bashzsh 限定だが、コマンドラインから実行する時に TAB で自動補完が効くコマンドを作成できる。
  • GraalVM に対応しているらしい。GraalVM を全然理解していないので自分ではどう対応されているのか全く分かりませんでしたが。。。

調べたことをメモして残し、サンプルを作成してみます。作成したサンプルは以下の URL の場所に置いてあります。
https://github.com/ksby/ksbysample-boot-miscellaneous/tree/master/picocli-boot-cmdapp

参照したサイト・書籍

  1. picocli - a mighty tiny command line interface
    https://picocli.info/

  2. Quick Guide
    https://picocli.info/quick-guide.html

  3. remkop/picocli
    https://github.com/remkop/picocli

  4. picocli/picocli-spring-boot-starter/
    https://github.com/remkop/picocli/tree/master/picocli-spring-boot-starter

  5. Autocomplete for Java Command Line Applications
    https://picocli.info/autocomplete.html

  6. Create a Java Command Line Program with Picocli
    https://www.baeldung.com/java-picocli-create-command-line-program

  7. Spring Boot Exit Codes
    https://www.baeldung.com/spring-boot-exit-codes

  8. Including subprojects using a wildcard in a Gradle settings file
    https://stackoverflow.com/questions/2297032/including-subprojects-using-a-wildcard-in-a-gradle-settings-file

目次

  1. Spring Boot ベースのコマンドラインアプリケーションのサンプルを作成する(Subcommand なし)
  2. Spring Boot ベースのコマンドラインアプリケーションのサンプルを作成する(Subcommand あり)
  3. --version(-V)オプション指定時に build.gradle に記述した build.version を表示する
  4. TAB キー押下時に subcommand, option の候補の表示や自動補完が行われるようにする

手順

Spring Boot ベースのコマンドラインアプリケーションのサンプルを作成する(Subcommand なし)

Subcommand なしと Subcommand ありの2つのサンプルを Gradle Multi-project の中に作成します。

  • D:\project-springboot\ksbysample-boot-miscellaneous の下に picocli-boot-cmdapp ディレクトリを作成する。
  • 別のプロジェクトから Gradle Wrapper のファイルをコピーする(コピーしたのは 5.4.1)。
  • Gradle を最新バージョン(5.5.1)にする。
  • gradlew init を実行する。
  • settings.gradle を以下の内容に変更する(Including subprojects using a wildcard in a Gradle settings file 参照)。これで build.gradle があるサブプロジェクトは include 分を記述しなくても自動的に Multi-project に認識されるようになる。
rootProject.name = 'picocli-boot-cmdapp'
rootDir.eachFileRecurse { f ->
    if ( f.name == "build.gradle" ) {
        String relativePath = f.parentFile.absolutePath - rootDir.absolutePath
        String projectName = relativePath.replaceAll("[\\\\\\/]", ":")
        include projectName
    }
}

Multi-project のベースが出来ました。次に Spring Boot ベースのコマンドラインアプリケーション(Subcommand なし)の Project を作成します。以下の仕様のコマンドを作成します。

  • filetools --create <ファイル> <ファイル> ...(--create は -c も可) で指定されたファイル名の空ファイルを作成する。
  • filetools --delete <ファイル> <ファイル> ...(--delete は -d も可) で指定されたファイルを削除する。
  • --create--delete のオプションはいずれか一方が必須。どちらも指定しない、あるいはどちらも指定した場合にはエラーになる。

まず D:\project-springboot\ksbysample-boot-miscellaneous\picocli-boot-cmdapp の下に Spring Initializr で nosubcmd-cmdapp プロジェクトを作成した後、build.gradle を以下のように変更します。

buildscript {
    ext {
        group "ksby.ksbysample-boot-miscellaneous.picocli-boot-cmdapp"
        version "1.0.0-RELEASE"
    }
    repositories {
        mavenCentral()
        maven { url "https://repo.spring.io/release/" }
        maven { url "https://plugins.gradle.org/m2/" }
    }
}

plugins {
    id 'org.springframework.boot' version '2.1.6.RELEASE'
    id "io.spring.dependency-management" version "1.0.8.RELEASE"
    id 'java'
    id 'idea'
}

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
}

dependencyManagement {
    imports {
        mavenBom(org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES)
    }
}

dependencies {
    def picocliVersion = "4.0.0"

    implementation("org.springframework.boot:spring-boot-starter")
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    // picocli
    implementation("info.picocli:picocli-spring-boot-starter:${picocliVersion}")
}
  • dependencies block に Picocli の Spring Boot Starter である picocli-spring-boot-starter を追加します。

src/main/java/ksbysample/cmdapp/nosubcmd の下に FileToolsCommand.java を新規作成して、以下の内容を記述します。

package ksbysample.cmdapp.nosubcmd;

import org.springframework.stereotype.Component;
import picocli.CommandLine.*;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileSystemException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.concurrent.Callable;

@Component
@Command(name = "filetools", mixinStandardHelpOptions = true,
        version = "1.0.0",
        description = "create/delete file(s) command")
public class FileToolsCommand implements Callable<Integer>, IExitCodeExceptionMapper {

    // --create オプションと --delete オプションはいずれか一方しか指定できないようにする
    @ArgGroup(exclusive = true, multiplicity = "1")
    private Exclusive exclusive;

    static class Exclusive {

        @Option(names = {"-c", "--create"}, description = "create file(s)")
        private boolean isCreate;

        @Option(names = {"-d", "--delete"}, description = "delete file(s)")
        private boolean isDelete;

    }

    @Parameters(paramLabel = "ファイル", description = "作成あるいは削除するファイル")
    private File[] files;

    @Override
    public Integer call() {
        Arrays.asList(this.files).forEach(f -> {
            try {
                if (exclusive.isCreate) {
                    Files.createFile(Paths.get(f.getName()));
                    System.out.println(f.getName() + " is created.");
                } else if (exclusive.isDelete) {
                    Files.deleteIfExists(Paths.get(f.getName()));
                    System.out.println(f.getName() + " is deleted.");
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        });

        return ExitCode.OK;
    }

    @Override
    public int getExitCode(Throwable exception) {
        Throwable cause = exception.getCause();
        if (cause instanceof FileAlreadyExistsException) {
            // 既に存在するファイルを作成しようとしている
            return 12;
        } else if (cause instanceof FileSystemException) {
            // 削除しようとしたファイルが別のプロセスでオープンされている等
            return 13;
        }
        return 11;
    }

}

src/main/java/ksbysample/cmdapp/nosubcmd/Application.java を以下のように変更します。

package ksbysample.cmdapp.nosubcmd;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.ExitCodeGenerator;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import picocli.CommandLine;
import picocli.CommandLine.IFactory;

@SpringBootApplication
public class Application implements CommandLineRunner, ExitCodeGenerator {

    private int exitCode;

    private final FileToolsCommand fileToolsCommand;

    private final IFactory factory;

    public Application(FileToolsCommand fileToolsCommand,
                       IFactory factory) {
        this.fileToolsCommand = fileToolsCommand;
        this.factory = factory;
    }

    public static void main(String[] args) {
        System.exit(SpringApplication.exit(SpringApplication.run(Application.class, args)));
    }

    @Override
    public void run(String... args) {
        exitCode = new CommandLine(fileToolsCommand, factory)
                .setExitCodeExceptionMapper(fileToolsCommand)
                .execute(args);
    }

    @Override
    public int getExitCode() {
        return exitCode;
    }

}

gradle の build タスクを実行して build/libs の下に nosubcmd-cmdapp-1.0.0-RELEASE.jar を生成します。

Git for Windowsbash を起動してから D:\project-springboot\ksbysample-boot-miscellaneous\picocli-boot-cmdapp\nosubcmd-cmdapp\build\libs\ の下に移動し、alias filetools='java -jar nosubcmd-cmdapp-1.0.0-RELEASE.jar' コマンドを実行して filetools だけでコマンドを実行できるようにします。

filetools コマンドを実行してみると、

f:id:ksby:20190718064717p:plain

Spring Boot のロゴと INFO ログが邪魔でした。。。 今回は表示されないようにします。

src/main/resources/application.properties に以下の内容を記載します。

spring.main.banner-mode=off
logging.level.root=OFF

build し直してから filetools コマンドを実行すると -c, -d のどちらのオプションを指定していないというエラーメッセージ(Error: Missing required argument (specify one of these): ([-c] | [-d]))とヘルプが表示されました。あれだけの記述なのにヘルプは見やすいし、オプションのところに色が付いているのがいいですね。

f:id:ksby:20190718065132p:plain

-c, -d のオプションをどちらも指定すると Error: --create, --delete are mutually exclusive (specify only one) というエラーメッセージが表示されます。

f:id:ksby:20190718065540p:plain

filetools -h コマンドを実行するとヘルプだけが表示され、filetools -V コマンドを実行するとバージョン番号だけが表示されます。

f:id:ksby:20190718070714p:plain

ファイルの作成、削除を試してみます。ディレクトリ内に jar 以外のファイルがない状態で、

f:id:ksby:20190718070004p:plain

filetools -c 1.txt 2.txt 3.txt コマンドを実行すると、

f:id:ksby:20190718065816p:plain

ディレクトリ内にファイルが作成されます。

f:id:ksby:20190718070102p:plain

filetools -d 1.txt 2.txt 3.txt コマンドを実行すると、

f:id:ksby:20190718070209p:plain

ファイルが削除されます。

f:id:ksby:20190718070252p:plain

filetools -c a.txt a.txt コマンドを実行して同じファイルを2度作成しようとすると、コマンドの戻り値が 0 ではなく 12 になります。

f:id:ksby:20190718225317p:plain

Spring Boot ベースのコマンドラインアプリケーションのサンプルを作成する(Subcommand あり)

今度は Subcommand ありのコマンドラインアプリケーションを作成してみます。Git コマンドでの git commit ...git branch ... のように commit, branch が Subcommand にあたります。

以下の仕様のコマンドを作成します。

  • cal add 数値 数値 ... で指定された数値を全て加算した結果を表示する。--avg('-a' でも可)オプションを付けると数値の個数で割った平均値を表示する。
  • cal multi 数値 数値 ... で指定された数値を全て乗算した結果を表示する。--compare 数値 オプションを付けると計算結果と数値を比較して、計算結果 < 数値なら -1、計算結果 = 数値なら 0、計算結果 > 数値なら 1 を返す。

まず D:\project-springboot\ksbysample-boot-miscellaneous\picocli-boot-cmdapp の下に Spring Initializr で subcmd-cmdapp プロジェクトを作成した後、nosubcmd-cmdapp プロジェクトの build.gradle をコピーします。

src/main/java/ksbysample/cmdapp/subcmd の下に CalCommand.java を新規作成して、以下の内容を記載します。

package ksbysample.cmdapp.subcmd;

import org.springframework.stereotype.Component;
import picocli.CommandLine.*;

import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.Callable;

@Component
@Command(name = "cal", mixinStandardHelpOptions = true,
        versionProvider = CalCommand.class,
        description = "渡された数値の加算・乗算を行うツール",
        subcommands = {
                CalCommand.AddCommand.class,
                CalCommand.MultiCommand.class
        })
public class CalCommand implements Callable<Integer>, IExitCodeExceptionMapper, IVersionProvider {

    @Override
    public Integer call() {
        return ExitCode.OK;
    }

    @Override
    public int getExitCode(Throwable exception) {
        Throwable cause = exception.getCause();
        if (cause instanceof NumberFormatException) {
            // 数値パラメータに数値以外の文字が指定された
            return 12;
        }

        return 11;
    }

    @Override
    public String[] getVersion() {
        return new String[]{"1.0.0"};
    }

    @Component
    @Command(name = "add", mixinStandardHelpOptions = true,
            versionProvider = CalCommand.class,
            description = "渡された数値を加算する")
    static class AddCommand implements Callable<Integer> {

        @Option(names = {"-a", "--avg"}, description = "平均値を算出する")
        private boolean optAvg;

        @Parameters(paramLabel = "数値", arity = "1..*", description = "加算する数値")
        private BigDecimal[] nums;

        @Override
        public Integer call() {
            BigDecimal sum =
                    Arrays.asList(nums).stream()
                            .reduce(new BigDecimal("0"), (a, v) -> a.add(v));
            Optional<BigDecimal> avg = optAvg
                    ? Optional.of(sum.divide(BigDecimal.valueOf(nums.length)))
                    : Optional.empty();
            System.out.println(avg.orElse(sum));
            return ExitCode.OK;
        }

    }

    @Component
    @Command(name = "multi", mixinStandardHelpOptions = true,
            versionProvider = CalCommand.class,
            description = "渡された数値を乗算する")
    static class MultiCommand implements Callable<Integer> {

        @Parameters(paramLabel = "数値", arity = "1..*", description = "乗算する数値")
        private BigDecimal[] nums;

        @Option(names = {"-c", "--compare"},
                description = "計算結果と比較して、計算結果 < 数値なら -1、計算結果 = 数値なら 0、計算結果 > 数値なら 1 を返す")
        private BigDecimal compareNum;

        @Override
        public Integer call() {
            BigDecimal result =
                    Arrays.asList(nums).stream()
                            .reduce(new BigDecimal("1"), (a, v) -> a.multiply(v));
            Optional<Integer> compareResult = (compareNum == null)
                    ? Optional.empty()
                    : Optional.of(result.compareTo(compareNum));
            System.out.println(compareResult.isPresent() ? compareResult.get() : result);
            return ExitCode.OK;
        }

    }

}

src/main/java/ksbysample/cmdapp/subcmd/Application.java を以下のように変更します。

package ksbysample.cmdapp.subcmd;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.ExitCodeGenerator;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import picocli.CommandLine;
import picocli.CommandLine.ExitCode;
import picocli.CommandLine.IFactory;
import picocli.CommandLine.ParameterException;
import picocli.CommandLine.ParseResult;

@SpringBootApplication
public class Application implements CommandLineRunner, ExitCodeGenerator {

    private int exitCode;

    private final CalCommand calCommand;

    private final IFactory factory;

    public Application(CalCommand calCommand
            , IFactory factory) {
        this.calCommand = calCommand;
        this.factory = factory;
    }

    public static void main(String[] args) {
        System.exit(SpringApplication.exit(SpringApplication.run(Application.class, args)));
    }

    @Override
    public void run(String... args) {
        CommandLine commandLine = new CommandLine(calCommand, factory);

        // subcommand が指定されていない場合にはエラーメッセージと usage を表示する
        try {
            ParseResult parsed = commandLine.parseArgs(args);
            if (parsed.subcommand() == null &&
                    !parsed.isUsageHelpRequested() &&
                    !parsed.isVersionHelpRequested()) {
                System.err.println("Error: at least 1 command and 1 subcommand found.");
                commandLine.usage(System.out);
                exitCode = ExitCode.USAGE;
                return;
            }
        } catch (ParameterException ignored) {
            // CommandLine#parseArgs で ParameterException が throw されても
            // CommandLine#execute を実行しないと subcommand の usage が表示されないので
            // ここでは何もしない
        }

        exitCode = commandLine
                .setExitCodeExceptionMapper(calCommand)
                .execute(args);
    }

    @Override
    public int getExitCode() {
        return exitCode;
    }

}

src/main/resources/application.properties を以下のように変更します。

spring.main.banner-mode=off
logging.level.root=OFF

bashD:\project-springboot\ksbysample-boot-miscellaneous\picocli-boot-cmdapp\subcmd-cmdapp\build\libs\ の下に移動し、alias cal='java -jar subcmd-cmdapp-1.0.0-RELEASE.jar' コマンドを実行して cal だけでコマンドを実行できるようにします。

cal コマンドだけを実行すると Subcommand が指定されていないので Error: at least 1 command and 1 subcommand found. のエラーメッセージと usage が表示されます。

f:id:ksby:20190720005515p:plain

cal add コマンドを実行すると、

f:id:ksby:20190720070032p:plain

cal multi コマンドを実行すると、

f:id:ksby:20190720070247p:plain

--version(-V)オプション指定時に build.gradle に記述した build.version を表示する

build.gradle に version を記述しているので、cal -V 実行時にこの文字列を出力するようにしてみます。

buildscript {
    ext {
        group "ksby.ksbysample-boot-miscellaneous.picocli-boot-cmdapp"
        version "1.0.0-RELEASE"
    }

まず build.gradle に springBoot { buildInfo() } を追加します。 `

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

springBoot {
    buildInfo()
}

repositories {
    mavenCentral()
}

src/main/java/ksbysample/cmdapp/subcmd/CalCommand.java を以下のように変更します。

public class CalCommand implements Callable<Integer>, IExitCodeExceptionMapper, IVersionProvider {

    // picocli.AutoComplete で generate Completion Script をする時に引数なしのコンストラクタが必要になる
    // のでコンストラクタインジェクションは使用しないこと
    // https://picocli.info/autocomplete.html 参照
    @Autowired
    private BuildProperties buildProperties;

    ..........

    @Override
    public String[] getVersion() {
        return new String[]{buildProperties.getVersion()};
    }

    ..........
  • private final BuildProperties buildProperties; を追加します。
  • getVersion メソッド内で "1.0.0"buildProperties.getVersion() に変更します。

build して jar ファイルを作成し直してから cal -V コマンドを実行すると build.gradle の version に記述した文字列が表示されます。

f:id:ksby:20190720080754p:plain

TAB キー押下時に subcommand, option の候補の表示や自動補完が行われるようにする

Autocomplete for Java Command Line Applications のマニュアルに従い、cal コマンドの subcommand, option の候補の表示や自動補完が行わえるようにしてみます。

subcmd-cmdapp-1.0.0-RELEASE.jar を zip 解凍可能なツール(今回は Explzh を使用)で開いた後、BOOT-INF/lib の下にある spring-boot-2.1.6.RELEASE.jar と picocli-4.0.0.jar を取り出します。

f:id:ksby:20190720083531p:plain f:id:ksby:20190720083724p:plain

java -cp "picocli-4.0.0.jar;subcmd-cmdapp-1.0.0-RELEASE.jar" picocli.AutoComplete -n cal ksbysample.cmdapp.subcmd.CalCommand を実行しても ClassNotFoundException が発生して動作しなかったのですが、

f:id:ksby:20190720082703p:plain

class ファイルは subcmd-cmdapp/build/classes/java/main の下に生成されているので、

f:id:ksby:20190720082830p:plain

java -cp "picocli-4.0.0.jar;spring-boot-2.1.6.RELEASE.jar;../classes/java/main" picocli.AutoComplete -n cal ksbysample.cmdapp.subcmd.CalCommand を実行します。

f:id:ksby:20190720084700p:plain

cal_completion というファイルが生成されます。

f:id:ksby:20190720084852p:plain

何もしていない時には bash 上で cal と入力してから TAB キーを2回押すととディレクトリ内のファイル一覧が表示されますが、

f:id:ksby:20190720085425p:plain

. cal_completion コマンドを実行してから cal を入力+TAB キーを2回押すと subcommand の候補が表示されます。

f:id:ksby:20190720085704p:plain

cal m とだけ入力して TABキーを1回押すと、

f:id:ksby:20190720085950p:plain

multi の文字列が自動補完されます。

f:id:ksby:20190720090039p:plain

cal multi - と入力してから TAB キーを2回押すと指定可能なオプションが表示されますし、

f:id:ksby:20190720090641p:plain

cal multi --c と入力してから TAB キーを1回押すと

f:id:ksby:20190720090930p:plain

--compare のオプションが自動補完されます。

f:id:ksby:20190720091017p:plain

履歴

2019/07/20
初版発行。

気軽にメモ書き ( 大目次 )

  1. Picocli+Spring Boot でコマンドラインアプリケーションを作成してみる
  2. IntelliJ IDEA 2019.1.4 にバージョンアップしたら Source Code Pro フォントが表示されなくなったので、別途ダウンロード&インストールする