かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot 1.3.x の Web アプリを 1.4.x へバージョンアップする ( 大目次 )

今回から大目次を先に書きます。

  1. その1 ( 概要 )
  2. その2 ( IntelliJ IDEA の Gradle Tool Window の「Refresh all Gradle projects」を押してもエラーが出ないようにする )
  3. その3 ( build.gradle の修正 )
  4. その4 ( build.gradle 修正後の Rebuild で出た Warning を解消する )
  5. その5 ( メールのテンプレートに使用していた Velocity を FreeMarker に変更する )
  6. その6 ( 「Run ‘All Tests’ with Coverage」実行時のエラーを解消する+build タスク実行時の警告を解消する )
  7. その7 ( Google の Java コンパイル時バグチェックツール? Error Prone を試してみる )
  8. その8 ( build.gradle への checkstyle, findbugs の導入+CheckStyle-IDEA, FindBugs-IDEA Plugin の導入 )
  9. その9 ( 1.3系 → 1.4系で実装方法が変更された点を修正する )
  10. その10 ( インジェクションの方法を @Autowired によるフィールドインジェクション → コンストラクタインジェクションへ変更する )
  11. その11 ( Error Prone を 2.0.15 → 2.0.18 へバージョンアップ。。。できませんでした )
  12. その12 ( RestTemplateBuilder を使用するように変更したらテストが失敗するようになった理由とは? )
  13. その13 ( RestTemplate で WebAPI を呼び出している処理に spring-retry でリトライ処理を入れる )
  14. 番外編 ( IntelliJ IDEA に Request mapper Plugin をインストールする )
  15. その14 ( spring-boot-gradle-plugin は dependency-management-plugin を自動的に適用するので build.gradle に記述する必要がありませんでした )
  16. その15 ( テストクラスのアノテーションを 1.4 のものに変更する )
  17. その16 ( テストクラスのモックを @MockBean + Mockito で作り直す )
  18. その17 ( テストクラスのモックを @MockBean + Mockito で作り直す2 )
  19. その18 ( Gradle のバージョンを 2.13 → 3.x へバージョンアップ。。。しようと思いましたが止めました )
  20. 番外編 ( Optional をもう少しまともに使ってみる )
  21. その19 ( Spring Boot を 1.4.4 → 1.4.5 にバージョンアップする )
  22. その20 ( 気になった点を修正する )
  23. その21 ( Log4jdbc Spring Boot Starter を入れてみる )
  24. その22 ( application.properties に記述する spring.datasource.tomcat.~ の設定を見直す )
  25. その23 ( Spring Security 関連で修正した方がよい箇所を見直す )
  26. その24 ( Spring Boot を 1.4.5 → 1.4.6 にバージョンアップする )
  27. その25 ( jar ファイルを作成して動作確認する )
  28. その26 ( jar ファイルを作成して動作確認する2 )
  29. 番外編 ( Thymeleaf 3 へのバージョンアップを試してみる )
  30. 番外編 ( Thymeleaf 3 へのバージョンアップを試してみる2 )
  31. その27 ( Thymeleaf parser-level comment blocks で @thymesVar のコメント文が HTML に出力されないようにする )
  32. 感想

IntelliJ IDEA を 2016.3.3 → 2016.3.4 へ、Git for Windows を 2.11.0(3) → 2.11.1 へバージョンアップ

IntelliJ IDEA を 2016.3.3 → 2016.3.4 へバージョンアップする

IntelliJ IDEA の 2016.3.4 がリリースされたのでバージョンアップします。

※ksbysample-nexus-repomng/ksbysample-library-depend-nospring プロジェクトを開いた状態でバージョンアップしています。

  1. IntelliJ IDEA のメインメニューから「Help」-「Check for Updates…」を選択します。

  2. 「Platform and Plugin Updates」ダイアログが表示されます。左下に「Update and Restart」ボタンが表示されていますので、「Update and Restart」ボタンをクリックします。

    f:id:ksby:20170206001521p:plain

  3. Patch がダウンロードされて IntelliJ IDEA が再起動します。

  4. メイン画面が表示されると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

  5. 処理が終了すると Gradle Tool Window のツリーの表示が other グループしかない初期の状態に戻っていますので、左上の「Refresh all Gradle projects」ボタンをクリックして更新します。更新が完了すると build グループ等が表示されます。

  6. clean タスク実行 → Rebuild Project 実行をした後、build タスクを実行して “BUILD SUCCESSFUL” のメッセージが表示されることを確認します。

    f:id:ksby:20170206002118p:plain

  7. Project Tool Window で src/test を選択した後、コンテキストメニューを表示して「Run ‘All Tests’ with Coverage」を選択し、テストが全て成功することを確認します。

    f:id:ksby:20170206002300p:plain

Git for Windows を 2.11.0(3) → 2.11.1 へバージョンアップする

Git for Windows の 2.11.1 がリリースされていたのでバージョンアップします。

  1. https://git-for-windows.github.io/ の「Download」ボタンをクリックして Git-2.11.1-64-bit.exe をダウンロードします。

  2. Git-2.11.1-64-bit.exe を実行します。

  3. 「Git 2.11.1 Setup」ダイアログが表示されます。[Next >]ボタンをクリックします。

  4. 「Select Components」画面が表示されます。全てのチェックが外れたままであることを確認した後、[Next >]ボタンをクリックします。

  5. 「Adjusting your PATH environment」画面が表示されます。中央の「Use Git from the Windows Command Prompt」が選択されていることを確認後、[Next >]ボタンをクリックします。

  6. 「Configuring the line ending conversions」画面が表示されます。「Checkout Windows-style, commit Unix-style line endings」が選択されていることを確認した後、[Next >]ボタンをクリックします。

  7. 「Configuring the terminal emulator to use with Git Bash」画面が表示されます。「Use Windows'default console window」が選択されていることを確認した後、[Next >]ボタンをクリックします。

  8. 「Configuring extra options」画面が表示されます。「Enable file system caching」だけがチェックされていることを確認した後、[Next >]ボタンをクリックします。

  9. 「Configuring experimental options」画面が表示されます。全てのチェックが外れたままであることを確認した後、[Install]ボタンをクリックします。

  10. インストールが完了すると「Completing the Git Setup Wizard」のメッセージが表示された画面が表示されます。中央の「View ReleaseNotes.html」のチェックを外した後、「Finish」ボタンをクリックしてインストーラーを終了します。

  11. コマンドプロンプトを起動して git のバージョンが git version 2.11.1.windows.1 になっていることを確認します。

    f:id:ksby:20170206005119p:plain

  12. git-cmd.exe を起動して日本語の表示・入力が問題ないかを確認します。

    f:id:ksby:20170206005311p:plain

  13. 特に問題はないようですので、2.11.1 で作業を進めたいと思います。

Spring Boot + Spring Integration でいろいろ試してみる ( その16 )( ExpressionEvaluatingRequestHandlerAdvice のサンプルを作ってみる )

概要

記事一覧はこちらです。

Spring Boot + Spring Integration でいろいろ試してみる ( その15 )( RequestHandlerRetryAdvice のサンプルを作ってみる ) の続きです。

ExpressionEvaluatingRequestHandlerAdvice のサンプルを作成します。ExpressionEvaluatingRequestHandlerAdvice は以下の2つを指定するための RequestHandlerAdvice です。

  • 成功、失敗(例外が throw された)時の処理を SpEL で記述できる。
  • 成功・失敗時に送信する MessageChannel を指定できる。

参照したサイト・書籍

  1. Spring Integration Reference Manual - 8.9.2 Provided Advice Classes - Expression Evaluating Advice
    http://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints-chapter.html#expression-advice

  2. Move file with file-adapter with SI
    http://stackoverflow.com/questions/33835657/move-file-with-file-adapter-with-si

  3. instanceof in SpEL
    http://stackoverflow.com/questions/7628437/instanceof-in-spel

目次

  1. Spring Boot を 1.4.3 → 1.4.4 へ、Spring IO Platform を Athens-SR2 → Athens-SR3 へバージョンアップする
  2. C:\eipapp\ksbysample-eipapp-advice ディレクトリを変更する
  3. application.properties, logback-spring.xml を追加する
  4. ExpressionEvaluatingRequestHandlerAdvice のサンプルを作成する
    1. setOnSuccessExpressionString, setOnFailureExpressionString だけ指定する
    2. setSuccessChannelName, setFailureChannelName だけ指定することはできない
    3. setOnSuccessExpressionString, setOnFailureExpressionString+setSuccessChannelName, setFailureChannelName の組み合わせで指定する
    4. setOnSuccessExpressionString, setOnFailureExpressionString で Success, Failure 用の MessageChannel に渡す payload の型を変更する
  5. ExpressionEvaluatingRequestHandlerAdvice を使用した Advice は Bean として定義しないと機能しない?
  6. ExpressionEvaluatingRequestHandlerAdvice を e -> e.advice(…) で指定した後に処理を書いたらどうなる?
  7. RequestHandlerRetryAdvice と一緒に指定してみる
  8. 続くのか。。。?

手順

Spring Boot を 1.4.3 → 1.4.4 へ、Spring IO Platform を Athens-SR2 → Athens-SR3 へバージョンアップする

Spring Boot、Spring IO Platform がバージョンアップされていますので、build.gradle を リンク先の内容 に変更してバージョンアップします。これにより Spring Integration も 4.3.6 → 4.3.7 へバージョンアップされます。

build.gradle 変更後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。

ちょうど試そうとしていた ExpressionEvaluatingRequestHandlerAdvice が、Spring Integration 4.3.7 から以下の変更が入っていました。

  • setOnSuccessExpression(String)、setOnFailureExpression(String) が Deprecated となり、setOnSuccessExpressionString(String)、setOnFailureExpressionString(String) に変わりました。

C:\eipapp\ksbysample-eipapp-advice ディレクトリを変更する

in06, in07, in08, in09, success, error ディレクトリを追加します。

C:\eipapp\ksbysample-eipapp-advice
├ error
├ in01
├ in02
├ in03
├ in04
├ in05
├ in06
├ in07
├ in08
├ in09
└ success

application.properties, logback-spring.xml を追加する

SpEL を使用するので org.springframework.integration.expression.ExpressionUtils の WARN ログが出力されないようにします。

  1. src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。

  2. src/main/resources の下に logback-spring.xml を作成し、リンク先の内容 を記述します。

ExpressionEvaluatingRequestHandlerAdvice のサンプルを作成する

動作確認のためにサンプルを作成していきます。サンプルは src/main/java/ksbysample/eipapp/advice の下に SuccessOrFailureFlowConfig.java を作成して、その中に記述します。完成形は リンク先の内容 です。

setOnSuccessExpressionString, setOnFailureExpressionString だけ指定する

成功時にはファイルを削除し、失敗時にはファイルを error ディレクトリへ移動するサンプルを作成します。

まずは成功する場合を試します。advice 対象の処理内で例外を throw しません。

    private final String EIPAPP_ADVICE_ROOT_DIR = "C:/eipapp/ksbysample-eipapp-advice";


    // setOnSuccessExpressionString, setOnFailureExpressionString だけ指定するサンプル
    //  ・成功時にはファイルを削除する。
    //  ・失敗時にはファイルを error ディレクトリへ移動する。
    //  ・削除、移動の処理は SpEL で記述する。

    @Bean
    public Advice deleteOrMoveAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload.delete()");
        advice.setOnFailureExpressionString(
                "payload.renameTo(new java.io.File('" + EIPAPP_ADVICE_ROOT_DIR + "/error/' + payload.name))");
        // setTrapException(true) を指定すると throw された例外が再 throw されず、
        // ログに出力されない
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow in06Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in06"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(deleteOrMoveAdvice()))
                .get();
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in06 の下に empty.txt を置くとファイルは削除されました。

f:id:ksby:20170205112733p:plain
↓↓↓
f:id:ksby:20170205113033p:plain

今度は失敗する場合を試します。advice 対象の処理内で例外を throw します。

                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
                    if (true) {
                        throw new RuntimeException("エラーです");
                    }
                    return null;
                }, e -> e.advice(deleteOrMoveAdvice()))

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in06 の下に empty.txt を置くとファイルは error ディレクトリへ移動しました。

f:id:ksby:20170205113837p:plain
↓↓↓
f:id:ksby:20170205114107p:plain

また今回、以下のように記述していますが、

                .<File>handle((p, h) -> {

これは以下と同じです。

                .handle((GenericHandler<File>) (p, h) -> {

第2引数の e -> ... を記述する場合にはどちらかのパターンで記述しないとコンパイルエラーになります。IntelliJ IDEA では記述がない場合 .handle に赤波下線が表示され、Alt+Enter を押すと後者のパターンで補完されます(ただし、File ではなく Object になります)。

setSuccessChannelName, setFailureChannelName だけ指定することはできない

以下の実装では動作しません。

    @Bean
    public Advice successOrFailureChannelAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("successFlow.input");
        advice.setFailureChannelName("failureFlow.input");
        advice.setTrapException(true);
        return advice;
    }

payload をそのまま Success, Failure 用の MessageChannel に渡す場合には advice.setOnSuccessExpressionString("payload"); のように "payload" とだけ記述した setOn…ExpressionString(…) を書く必要があります。

    @Bean
    public Advice successOrFailureChannelAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload");
        advice.setSuccessChannelName("successFlow.input");
        advice.setOnFailureExpressionString("payload");
        advice.setFailureChannelName("failureFlow.input");
        advice.setTrapException(true);
        return advice;
    }

setOnSuccessExpressionString, setOnFailureExpressionString+setSuccessChannelName, setFailureChannelName の組み合わせで指定する

サンプルの動作は上で書いた成功時にはファイルを削除し、失敗時にはファイルを error ディレクトリへ移動するというものですが、今度は SpEL ではなく転送した MessageChannel の先の処理で削除、移動します。

まずは成功する場合を試します。advice 対象の処理内で例外を throw しません。

    // setOnSuccessExpressionString, setOnFailureExpressionString+setSuccessChannelName, setFailureChannelName
    // の組み合わせで指定するサンプル
    //  ・成功時には successFlow.input へ Message を送信する。
    //    successFlow ではファイルを削除する。
    //  ・失敗時には failureFlow.input へ Message を送信する。
    //    failureFlow ではファイルを error ディレクトリへ移動する。

    @Bean
    public Advice successOrFailureChannelAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload");
        advice.setSuccessChannelName("successFlow.input");
        // setOnFailureExpressionString に "payload" と記述しても Failure 用の MessageChannel には
        // File クラスではなく MessageHandlingExpressionEvaluatingAdviceException クラスの payload が渡される
        advice.setOnFailureExpressionString("payload");
        advice.setFailureChannelName("failureFlow.input");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow in07Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in07"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(successOrFailureChannelAdvice()))
                .get();
    }

    @Bean
    public IntegrationFlow successFlow() {
        return f -> f
                .<File>handle((p, h) -> {
                    // ファイルを削除する
                    try {
                        Files.delete(Paths.get(p.getAbsolutePath()));
                        log.info("ファイルを削除しました ( {} )", p.getAbsolutePath());
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                });
    }

    @Bean
    public IntegrationFlow failureFlow() {
        return f -> f
                .<ExpressionEvaluatingRequestHandlerAdvice.MessageHandlingExpressionEvaluatingAdviceException>
                        handle((p, h) -> {
                    // MessageHandlingExpressionEvaluatingAdviceException クラスの payload から
                    // Exception 発生前の File クラスの payload を取得する
                    File file = (File) p.getEvaluationResult();

                    // ファイルを error ディレクトリへ移動する
                    Path src = Paths.get(file.getAbsolutePath());
                    Path dst = Paths.get(EIPAPP_ADVICE_ROOT_DIR + "/error/" + file.getName());
                    try {
                        Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING);
                        log.info("ファイルを移動しました ( {} --> {} )"
                                , src.toAbsolutePath(), dst.toAbsolutePath());
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                });
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in07 の下に empty.txt を置くとファイルは削除されました。

f:id:ksby:20170205142858p:plain
↓↓↓
f:id:ksby:20170205143117p:plain

今度は失敗する場合を試します。advice 対象の処理内で例外を throw します。

                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
                    if (true) {
                        throw new RuntimeException("エラーです");
                    }
                    return null;
                }, e -> e.advice(successOrFailureChannelAdvice()))

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in07 の下に empty.txt を置くとファイルは error ディレクトリへ移動しました。

f:id:ksby:20170205143420p:plain
↓↓↓
f:id:ksby:20170205143902p:plain

SpEL だけで処理が書けるならそれで済ませた方がコード量は少なくて楽ですね。

setOnSuccessExpressionString, setOnFailureExpressionString で Success, Failure 用の MessageChannel に渡す payload の型を変更する

setOnSuccessExpressionString, setOnFailureExpressionString に記述する SpEL の結果により Success, Failure 用の MessageChannel に送信される Message の payload にセットされるデータの型を変えることができます。

以下のように実装すると元の Message の payload は File クラスですが、Success, Failure 用の MessageChannel に送信される Message の payload は String クラスになります(まあ Failure 用に送信される Message の payload は更に MessageHandlingExpressionEvaluatingAdviceException になるので実際は少し面倒ですが)。

    // setOnSuccessExpressionString, setOnFailureExpressionString で Success, Failure 用の
    // MessageChannel に渡す payload の型を変更するサンプル
    //  ・元の File クラスの payload から String クラスの payload の Message に変換して
    //    Success, Failure 用の MessageChannel に送信する

    @Bean
    public Advice convertFileToStringAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload + ' の処理に成功しました。'");
        advice.setSuccessChannelName("printFlow.input");
        advice.setOnFailureExpressionString(
                "payload + ' の処理に失敗しました ( ' + #exception.class.name + ', ' + #exception.cause.message + ' )'");
        advice.setFailureChannelName("printFlow.input");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow in08Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in08"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(convertFileToStringAdvice()))
                .get();
    }

    @Bean
    public IntegrationFlow printFlow() {
        return f -> f
                // setFailureChannelName(...) の指定で転送された Message は
                // MessageHandlingExpressionEvaluatingAdviceException クラスなので、
                // .transform(...) で SpEL を利用して元の String を取得する
                .transform("payload instanceof T(org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException)"
                        + " ? payload.evaluationResult : payload")
                .handle((p, h) -> {
                    System.out.println("●●● " + p);
                    return null;
                });

    }

まずは成功する場合を試します。advice 対象の処理内で例外を throw しません。

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in08 の下に empty.txt を置くと以下のログが出力されます。

f:id:ksby:20170205161843p:plain

今度は失敗する場合を試します。advice 対象の処理内で例外を throw します。

                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
                    if (true) {
                        throw new RuntimeException("エラーです");
                    }
                    return null;
                }, e -> e.advice(convertFileToStringAdvice()))

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in08 の下に empty.txt を置くと以下のログが出力されます。

f:id:ksby:20170205162204p:plain

ExpressionEvaluatingRequestHandlerAdvice を使用した Advice は Bean として定義しないと機能しない?

結論から言うと setSuccessChannelName, setFailureChannelName 等のメソッドを使用して成功、失敗時に別の MessageChannel に Message を送信しないのであれば Bean を作成する必要はありませんが、送信する場合には必ず Bean にする必要があります。

例えば in06Flow を以下のようにメソッド内で ExpressionEvaluatingRequestHandlerAdvice のインスタンスを生成するよう変更します。

    @Bean
    public IntegrationFlow in06Flow() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload.delete()");
        advice.setOnFailureExpressionString(
                "payload.renameTo(new java.io.File('" + EIPAPP_ADVICE_ROOT_DIR + "/error/' + payload.name))");
        advice.setTrapException(true);

        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in06"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(advice))
                .get();
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in06 の下に empty.txt を置くとファイルは削除されます。

f:id:ksby:20170205170845p:plain
↓↓↓
f:id:ksby:20170205171202p:plain

今度は in07Flow を以下のようにメソッド内で ExpressionEvaluatingRequestHandlerAdvice のインスタンスを生成するよう変更します。

    @Bean
    public IntegrationFlow in07Flow() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload");
        advice.setSuccessChannelName("successFlow.input");
        advice.setOnFailureExpressionString("payload");
        advice.setFailureChannelName("failureFlow.input");
        advice.setTrapException(true);

        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in07"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(advice))
                .get();
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in07 の下に empty.txt を置いてもファイルは削除されませんでした。

f:id:ksby:20170205171851p:plain
↓↓↓
f:id:ksby:20170205172155p:plain

ログには Caused by: java.lang.IllegalArgumentException: BeanFactory must not be null の例外が throw されていました。

f:id:ksby:20170205172343p:plain

ExpressionEvaluatingRequestHandlerAdvice を e -> e.advice(…) で指定した後に処理を書いたらどうなる?

成功、失敗時に別の MessageChannel に Message を送信する場合としない場合、それぞれで動作を確認してみます。

最初は送信しない場合です。in06Flow を以下のように変更します。

    @Bean
    public IntegrationFlow in06Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in06"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ PASS1 " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return p;
                }, e -> e.advice(deleteOrMoveAdvice()))
                .handle((p, h) -> {
                    log.info("★★★ PASS2 " + p.getClass().getName());
                    return null;
                })
                .get();
    }
  • advice を指定している処理で return null;return p; に変更して Message を次の処理に渡すようにします。またログに “PASS1” の文字列を追加します。
  • 2つ目の .handle(...) を追加します。

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in06 の下に empty.txt を置くと以下のログが出力されました。追加した .handle(...) の処理が実行されています。

f:id:ksby:20170205182324p:plain

in06 ディレクトリの下に置いた empty.txt も削除されていました。ExpressionEvaluatingRequestHandlerAdvice で指定した処理も実行されていました。

f:id:ksby:20170205181614p:plain
↓↓↓
f:id:ksby:20170205182056p:plain

次は送信する場合です。in07Flow を以下のように変更します。

    @Bean
    public IntegrationFlow in07Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in07"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ PASS1 " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return p;
                }, e -> e.advice(successOrFailureChannelAdvice()))
                .handle((p, h) -> {
                    log.info("★★★ PASS2 " + p.getClass().getName());
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow successFlow() {
        return f -> f
                .<File>handle((p, h) -> {
                    // ファイルを削除する
                    try {
                        Files.delete(Paths.get(p.getAbsolutePath()));
                        log.info("★★★ PASS3 ファイルを削除しました ( {} )", p.getAbsolutePath());
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                });
    }
  • advice を指定している処理で return null;return p; に変更して Message を次の処理に渡すようにします。またログに “PASS1” の文字列を追加します。
  • 2つ目の .handle(...) を追加します。
  • successFlow のログに “★★★ PASS3” の文字列を追加します。

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in07 の下に empty.txt を置くと以下のログが出力されました。追加した .handle(...) の処理が実行されています。

f:id:ksby:20170205183630p:plain

in07 ディレクトリの下に置いた empty.txt も削除されていました。ExpressionEvaluatingRequestHandlerAdvice で指定した処理も実行されていました。

f:id:ksby:20170205183303p:plain
↓↓↓
f:id:ksby:20170205183527p:plain

以上の結果から、以下のことが分かりました。

  • advice を書いても次の処理は実行されます。
  • 1つ目の .handle(...) の処理 → ExpressionEvaluatingRequestHandlerAdvice で指定した成功時の処理 → 2つ目の .handle(...) の処理、の順で実行されます。

RequestHandlerRetryAdvice と一緒に指定してみる

以下のコードを追加します。

    // RequestHandlerRetryAdvice と ExpressionEvaluatingRequestHandlerAdvice を一緒に指定するサンプル
    //  ・RequestHandlerRetryAdvice に指定する RetryTemplate には FlowConfig.java に書いた
    //    simpleAndFixedRetryTemplate Bean を使用する

    @Autowired
    @Qualifier("simpleAndFixedRetryTemplate")
    private RetryTemplate simpleAndFixedRetryTemplate;

    @Bean
    public IntegrationFlow in09Flow() {
        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        retryAdvice.setRetryTemplate(this.simpleAndFixedRetryTemplate);
        retryAdvice.setRecoveryCallback(context -> {
            // リトライが全て失敗するとこの処理が実行される
            MessageHandlingException e = (MessageHandlingException) context.getLastThrowable();
            Message<?> message = ((MessageHandlingException) context.getLastThrowable()).getFailedMessage();
            File payload = (File) message.getPayload();
            log.error("●●● " + e.getRootCause().getClass().getName());
            log.error("●●● " + payload.getName());
            // 例外を再 throw して ExpressionEvaluatingRequestHandlerAdvice の失敗時の処理
            // が実行されるようにする
            throw e;
        });

        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in09"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    RetryContext retryContext = RetrySynchronizationManager.getContext();
                    log.info("★★★ リトライ回数 = " + retryContext.getRetryCount());

                    // 例外を throw して必ずリトライさせる
                    if (true) {
                        throw new RuntimeException("エラーです");
                    }
                    return null;
                }, e -> e
                        // RequestHandlerRetryAdvice と ExpressionEvaluatingRequestHandlerAdvice
                        // を一緒に指定する場合には RequestHandlerRetryAdvice を後に書くこと。
                        // 最初に書くとリトライしてくれない。
                        .advice(successOrFailureChannelAdvice())
                        .advice(retryAdvice))
                .get();
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in09 の下に empty.txt を置くと5回リトライした後、error ディレクトリにファイルを移動しました。

f:id:ksby:20170205192401p:plain

ちなみにコメントに書いてありますが、RequestHandlerRetryAdvice を先に書くとこうなります。

                }, e -> e
                        // RequestHandlerRetryAdvice と ExpressionEvaluatingRequestHandlerAdvice
                        // を一緒に指定する場合には RequestHandlerRetryAdvice を後に書くこと。
                        // 最初に書くとリトライしてくれない。
                        .advice(retryAdvice)
                        .advice(successOrFailureChannelAdvice()))
                .get();

f:id:ksby:20170205192650p:plain

リトライされずにすぐに ExpressionEvaluatingRequestHandlerAdvice の失敗時の処理が実行されます。

上の結果は AOP の処理がどう挿入されるかに依存するので、一緒に指定した場合の処理順序はもしかすると今後ライブラリの実装内容によって変わるかもしれません。リトライ処理は RequestHandlerRetryAdvice は使用せずに RetryTemplate で直接 .handle(...) 内に記述して、advice で指定するのは ExpressionEvaluatingRequestHandlerAdvice だけにする方が無難かもしれません。

続くのか。。。?

あと1つ残った RequestHandlerCircuitBreakerAdvice や、TransactionSynchronizationFactory を使用して Flow 全体の正常、失敗を見て処理を行う方法をまとめておきたいですが、RequestHandlerRetryAdvice, ExpressionEvaluatingRequestHandlerAdvice が調べると意外にボリュームがあって重かったので、一旦サンプル作成に戻ります。気が向いたらまた書きます。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.4.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

..........

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR3'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

..........
  • buildscript の中の springBootVersion = '1.4.3.RELEASE'springBootVersion = '1.4.4.RELEASE' へ変更します。
  • dependencyManagement の中の mavenBom 'io.spring.platform:platform-bom:Athens-SR2'mavenBom 'io.spring.platform:platform-bom:Athens-SR3' へ変更します。

application.properties

spring.application.name=advice
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.percentage=1.0

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <springProperty scope="context" name="springAppName" source="spring.application.name"/>
    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId},%X{X-B3-SpanId}]){yellow} %clr(${PID:-}){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>
    <include resource="org/springframework/boot/logging/logback/console-appender.xml"/>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
    <logger name="org.springframework.integration.expression.ExpressionUtils" level="ERROR"/>
</configuration>

<logger name="org.springframework.integration.expression.ExpressionUtils" level="ERROR"/> 以外に、以前と比較して以下の点を変更しました。

  • %X{X-B3-TraceId:-}%X{X-B3-TraceId} へ変更しました。
  • %X{X-B3-SpanId:-}%X{X-B3-SpanId} へ変更しました。
  • ,%X{X-Span-Export:-} を削除しました。
  • <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> ... </appender> と独自で指定せずに <include resource="org/springframework/boot/logging/logback/console-appender.xml"/> を使用するようにしました。

SuccessOrFailureFlowConfig.java

package ksbysample.eipapp.advice;

import lombok.extern.slf4j.Slf4j;
import org.aopalliance.aop.Advice;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.retry.support.RetryTemplate;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;

@Slf4j
@Configuration
public class SuccessOrFailureFlowConfig {

    private final String EIPAPP_ADVICE_ROOT_DIR = "C:/eipapp/ksbysample-eipapp-advice";


    // setOnSuccessExpressionString, setOnFailureExpressionString だけ指定するサンプル
    //  ・成功時にはファイルを削除する。
    //  ・失敗時にはファイルを error ディレクトリへ移動する。
    //  ・削除、移動の処理は SpEL で記述する。

    @Bean
    public Advice deleteOrMoveAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload.delete()");
        advice.setOnFailureExpressionString(
                "payload.renameTo(new java.io.File('" + EIPAPP_ADVICE_ROOT_DIR + "/error/' + payload.name))");
        // setTrapException(true) を指定すると throw された例外が再 throw されず、
        // ログに出力されない
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow in06Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in06"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(deleteOrMoveAdvice()))
                .get();
    }


    // setOnSuccessExpressionString, setOnFailureExpressionString+setSuccessChannelName, setFailureChannelName
    // の組み合わせで指定するサンプル
    //  ・成功時には successFlow.input へ Message を送信する。
    //    successFlow ではファイルを削除する。
    //  ・失敗時には failureFlow.input へ Message を送信する。
    //    failureFlow ではファイルを error ディレクトリへ移動する。

    @Bean
    public Advice successOrFailureChannelAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload");
        advice.setSuccessChannelName("successFlow.input");
        // setOnFailureExpressionString に "payload" と記述しても Failure 用の MessageChannel には
        // File クラスではなく MessageHandlingExpressionEvaluatingAdviceException クラスの payload が渡される
        advice.setOnFailureExpressionString("payload");
        advice.setFailureChannelName("failureFlow.input");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow in07Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in07"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(successOrFailureChannelAdvice()))
                .get();
    }

    @Bean
    public IntegrationFlow successFlow() {
        return f -> f
                .<File>handle((p, h) -> {
                    // ファイルを削除する
                    try {
                        Files.delete(Paths.get(p.getAbsolutePath()));
                        log.info("ファイルを削除しました ( {} )", p.getAbsolutePath());
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                });
    }

    @Bean
    public IntegrationFlow failureFlow() {
        return f -> f
                .<ExpressionEvaluatingRequestHandlerAdvice.MessageHandlingExpressionEvaluatingAdviceException>
                        handle((p, h) -> {
                    // MessageHandlingExpressionEvaluatingAdviceException クラスの payload から
                    // Exception 発生前の File クラスの payload を取得する
                    File file = (File) p.getEvaluationResult();

                    // ファイルを error ディレクトリへ移動する
                    Path src = Paths.get(file.getAbsolutePath());
                    Path dst = Paths.get(EIPAPP_ADVICE_ROOT_DIR + "/error/" + file.getName());
                    try {
                        Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING);
                        log.info("ファイルを移動しました ( {} --> {} )"
                                , src.toAbsolutePath(), dst.toAbsolutePath());
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                });
    }


    // setOnSuccessExpressionString, setOnFailureExpressionString で Success, Failure 用の
    // MessageChannel に渡す payload の型を変更するサンプル
    //  ・元の File クラスの payload から String クラスの payload の Message に変換して
    //    Success, Failure 用の MessageChannel に送信する

    @Bean
    public Advice convertFileToStringAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload + ' の処理に成功しました。'");
        advice.setSuccessChannelName("printFlow.input");
        advice.setOnFailureExpressionString(
                "payload + ' の処理に失敗しました ( ' + #exception.class.name + ', ' + #exception.cause.message + ' )'");
        advice.setFailureChannelName("printFlow.input");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow in08Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in08"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(convertFileToStringAdvice()))
                .get();
    }

    @Bean
    public IntegrationFlow printFlow() {
        return f -> f
                // setFailureChannelName(...) の指定で転送された Message は
                // MessageHandlingExpressionEvaluatingAdviceException クラスなので、
                // .transform(...) で SpEL を利用して元の String を取得する
                .transform("payload instanceof T(org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException)"
                        + " ? payload.evaluationResult : payload")
                .handle((p, h) -> {
                    System.out.println("●●● " + p);
                    return null;
                });

    }


    // RequestHandlerRetryAdvice と ExpressionEvaluatingRequestHandlerAdvice を一緒に指定するサンプル
    //  ・RequestHandlerRetryAdvice に指定する RetryTemplate には FlowConfig.java に書いた
    //    simpleAndFixedRetryTemplate Bean を使用する

    @Autowired
    @Qualifier("simpleAndFixedRetryTemplate")
    private RetryTemplate simpleAndFixedRetryTemplate;

    @Bean
    public IntegrationFlow in09Flow() {
        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        retryAdvice.setRetryTemplate(this.simpleAndFixedRetryTemplate);
        retryAdvice.setRecoveryCallback(context -> {
            // リトライが全て失敗するとこの処理が実行される
            MessageHandlingException e = (MessageHandlingException) context.getLastThrowable();
            Message<?> message = ((MessageHandlingException) context.getLastThrowable()).getFailedMessage();
            File payload = (File) message.getPayload();
            log.error("●●● " + e.getRootCause().getClass().getName());
            log.error("●●● " + payload.getName());
            // 例外を再 throw して ExpressionEvaluatingRequestHandlerAdvice の失敗時の処理
            // が実行されるようにする
            throw e;
        });

        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in09"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    RetryContext retryContext = RetrySynchronizationManager.getContext();
                    log.info("★★★ リトライ回数 = " + retryContext.getRetryCount());

                    // 例外を throw して必ずリトライさせる
                    if (true) {
                        throw new RuntimeException("エラーです");
                    }
                    return null;
                }, e -> e
                        // RequestHandlerRetryAdvice と ExpressionEvaluatingRequestHandlerAdvice
                        // を一緒に指定する場合には RequestHandlerRetryAdvice を後に書くこと。
                        // 最初に書くとリトライしてくれない。
                        .advice(successOrFailureChannelAdvice())
                        .advice(retryAdvice))
                .get();
    }

}

履歴

2017/02/05
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その15 )( RequestHandlerRetryAdvice のサンプルを作ってみる )

概要

記事一覧はこちらです。

参照したサイト・書籍

  1. 8.9 Adding Behavior to Endpoints
    http://docs.spring.io/spring-integration/docs/4.3.7.RELEASE/reference/html/messaging-endpoints-chapter.html#message-handler-advice-chain

目次

  1. ksbysample-eipapp-advice プロジェクトを作成する
  2. C:\eipapp\ksbysample-eipapp-advice ディレクトリを作成する
  3. RequestHandlerRetryAdvice のサンプルを作成する
    1. RetryPolicy, BackOffPolicy には何があるか?
    2. SimpleRetryPolicy+FixedBackOffPolicy のサンプルを作成する
    3. TimeoutRetryPolicy+ExponentialBackOffPolicy のサンプルを作成する
    4. CompositeRetryPolicy のサンプルを作成する
  4. RequestHandlerRetryAdvice ではなく直接 RetryTemplate を利用して ExceptionClassifierRetryPolicy のサンプルを作成する
  5. 続きます!

手順

ksbysample-eipapp-advice プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. src/main/java の下に ksbysample.eipapp.advice パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/advice の下に Application.java を作成し、リンク先の内容 を記述します。

C:\eipapp\ksbysample-eipapp-advice ディレクトリを作成する

以下の構成のディレクトリを作成します。

C:\eipapp\ksbysample-eipapp-advice
├ in01
├ in02
├ in03
├ in04
└ in05

RequestHandlerRetryAdvice のサンプルを作成する

RetryPolicy, BackOffPolicy には何があるか?

RequestHandlerRetryAdvice クラスを利用してリトライする場合、リトライの終了条件を RetryPolicy インターフェースの実装クラスで指定し、リトライ時の待機時間を BackOffPolicy インターフェースの実装クラスで指定します。

RetryPolicy インターフェースの実装クラスには以下のものがあります。

f:id:ksby:20170129231917p:plain

クラス 説明
AlwaysRetryPolicy 必ずリトライします(ただしリトライ対象の処理の中で例外は throw すること)。テストのためのスタブ等で使用します。
CompositeRetryPolicy 複数の RetryPolicy を組み合わせることができる RetryPolicy です。例えば Exception が3回発生するか(SimpleRetryPolicy で指定)、10 秒経過する(TimeoutRetryPolicy で指定)まではリトライする、という条件が指定できます。
ExceptionClassifierRetryPolicy Exception 毎に RetryPolicy を指定できる RetryPolicy です。ただし例えば ExceptionClassifierRetryPolicy で NullPointerException に TimeoutRetryPolicy が適用されるよう設定して .handle(...) 内で NullPointerException を throw しても LambdaMessageProcessor::processMessage で発生した例外が MessageHandlingException に変換されてしまうため、Spring Integration DSL+RequestHandlerRetryAdvice の組み合わせではこの RetryPolicy は使えないのでは?
NeverRetryPolicy 初回実行のみでリトライしません。これも AlwaysRetryPolicy と同様テスト用です。
SimpleRetryPolicy リトライ回数、リトライの対象とする例外を指定できる RetryPolicy です。デフォルトではリトライ回数は3、例外は Exception.class です。通常使用するのはおそらくこれでしょう。
TimeoutRetryPolicy リトライの条件を回数ではなくミリ秒数で指定します。例外が throw されても指定時間が経過するまでリトライし続けます。ただし例外が throw されずに指定された時間が経過してもリトライ対象の処理から応答が返ってこない場合には何も起きません。

BackOffPolicy インターフェースの実装クラスには以下のものがあります。

f:id:ksby:20170129225634p:plain

クラス 説明
ExponentialRandomBackOffPolicy ExponentialBackOffPolicy と同様、初期の待機時間(ミリ秒)、最大待機時間(ミリ秒)、倍数を指定しますが、次の待機時間が固定の倍数ではなく1~倍数の間のランダムの数値の倍数になります。このクラスのソースのコメントに分かりやすい説明があります。
ExponentialBackOffPolicy 初期の待機時間(ミリ秒)、最大待機時間(ミリ秒)、倍数(例えば2を指定すれば2倍)を指定し、次の待機時間が前回待機時間 * 倍数のミリ秒数になります。次回の待機時間を前回の待機時間の2倍にする、等の用途で使用します。デフォルトでは初期の待機時間は 100 ミリ秒、最大待機時間は 30000 ミリ秒、倍数は 2、です。
FixedBackOffPolicy リトライ前に指定されたミリ秒間、固定で待機します。時間を指定しない場合、デフォルトでは1秒待機します。
UniformRandomBackOffPolicy 最小と最大のミリ秒を指定し、その間のランダムな時間待機します。待機時間はリトライ毎に変わります。時間を指定しない場合、デフォルトでは最小は 500 ミリ秒、最大は 1500 ミリ秒です。
NoBackOffPolicy リトライ前に何も実行しません。

こちらでメインに使うのは FixedBackOffPolicy、ExponentialBackOffPolicy の2つでしょうか。

SimpleRetryPolicy+FixedBackOffPolicy のサンプルを作成する

動作確認のためにサンプルを作成していきます。src/main/java/ksbysample/eipapp/advice/FlowConfig.java の完成形は リンク先の内容 です。

SimpleRetryPolicy でリトライ最大回数5回を指定し、FixedBackOffPolicy でリトライ時に2秒待機することを指定します。

条件を設定した SimpleRetryPolicy、FixedBackOffPolicy を RetryTemplate にセットし、RetryTemplate を RequestHandlerRetryAdvice にセットします。

RequestHandlerRetryAdvice を .handle(...) の第2引数に e -> e.advice(retryAdvice) と渡します。これで .handle(...) の第1引数に渡したラムダ式を最大5回、2秒待機でリトライします。

尚、以降の RequestHandlerRetryAdvice のサンプルでは IntegrationFlow はほぼ同じものを使うので、共通で呼び出す private メソッドを作成して引数で in ディレクトリ、RetryTemplate を変更できるようにしています。

    /**
     * 共通 retry 用 Flow
     * retryTemplate で設定されたリトライを実行する
     *
     * @param inDir         監視対象の in ディレクトリ
     * @param retryTemplate リトライ条件を設定した RetryTemplate クラスのインスタンス
     * @return
     */
    private IntegrationFlow retryFlow(File inDir, RetryTemplate retryTemplate) {
        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        retryAdvice.setRetryTemplate(retryTemplate);
        retryAdvice.setRecoveryCallback(context -> {
            // リトライが全て失敗するとこの処理が実行される
            MessageHandlingException e = (MessageHandlingException) context.getLastThrowable();
            Message<?> message = ((MessageHandlingException) context.getLastThrowable()).getFailedMessage();
            File payload = (File) message.getPayload();
            log.error("●●● " + e.getRootCause().getClass().getName());
            log.error("●●● " + payload.getName());
            return null;
        });

        return IntegrationFlows
                .from(s -> s.file(inDir)
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                .handle((GenericHandler<Object>) (p, h) -> {
                    RetryContext retryContext = RetrySynchronizationManager.getContext();
                    log.info("★★★ リトライ回数 = " + retryContext.getRetryCount());

                    // 例外を throw して強制的にリトライさせる
                    if (true) {
                        throw new RuntimeException();
                    }
                    return p;
                }, e -> e.advice(retryAdvice))
                .log()
                .channel(nullChannel)
                .get();
    }

    /**
     * リトライ最大回数は5回、リトライ時は2秒待機する
     *
     * @return
     */
    @Bean
    public RetryTemplate simpleAndFixedRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // SimpleRetryPolicy
        retryTemplate.setRetryPolicy(
                new SimpleRetryPolicy(5, singletonMap(Exception.class, true)));

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(2000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry01Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in01")
                , simpleAndFixedRetryTemplate());
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in01 の下に empty.txt を置くと以下のログが出力されます。2秒待機で5回リトライしていることが確認できます。回数指定の場合、最後のリトライの処理で例外が throw されるとすぐに RequestHandlerRetryAdvice::setRecoveryCallback(...) の処理が呼び出されています。

f:id:ksby:20170201224951p:plain

TimeoutRetryPolicy+ExponentialBackOffPolicy のサンプルを作成する

TimeoutRetryPolicy でリトライ最大45秒を指定し、ExponentialBackOffPolicy でリトライ時に初期値2秒、最大10秒、倍数2を指定します。

    /**
     * リトライは最大45秒、リトライ時は初期値2秒、最大10秒、倍数2(2,4,8,10,10,...)待機する
     *
     * @return
     */
    @Bean
    public RetryTemplate timeoutAndExponentialRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // TimeoutRetryPolicy
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(45000);
        retryTemplate.setRetryPolicy(timeoutRetryPolicy);

        // ExponentialBackOffPolicy
        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
        exponentialBackOffPolicy.setInitialInterval(2000);
        exponentialBackOffPolicy.setMaxInterval(10000);
        exponentialBackOffPolicy.setMultiplier(2.0);
        retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry02Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in02")
                , timeoutAndExponentialRetryTemplate());
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in02 の下に empty.txt を置くと以下のログが出力されます。2, 4, 8, 10, 10, 10, 10 とリトライ毎に待機時間が2倍になっており(ただし最大は10秒)、最後のリトライで 45 秒を超えたので処理が終了していることが確認できます。時間指定の場合、最後のリトライの処理で例外が throw されてもすぐには RequestHandlerRetryAdvice::setRecoveryCallback(...) の処理が呼び出されず、待機してから呼び出されるようです。

f:id:ksby:20170201225601p:plain

CompositeRetryPolicy のサンプルを作成する

SimpleRetryPolicy で最大3回、TimeoutRetryPolicy で最大10秒の2つのルールを CompositeRetryPolicy に設定します。

まずは FixedBackOffPolicy でリトライ時の待機時間を1秒にして、SimpleRetryPolicy の最大3回が適用されることを確認します。

    /**
     * 3回リトライするか、10秒を越えるまでリトライする
     * FixedBackOffPolicy はリトライ時の待機時間を1秒にしているので、
     * 3回リトライで終了する
     *
     * @return
     */
    @Bean
    public RetryTemplate compositeRetryTemplate01() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // CompositeRetryPolicy
        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
        RetryPolicy[] retryPolicies = new RetryPolicy[2];
        retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true));
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(10000);
        retryPolicies[1] = timeoutRetryPolicy;
        compositeRetryPolicy.setPolicies(retryPolicies);
        retryTemplate.setRetryPolicy(compositeRetryPolicy);

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry03Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in03")
                , compositeRetryTemplate01());
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in03 の下に empty.txt を置くと以下のログが出力されます。1秒待機で3回リトライして処理が終了していることが確認できます。

f:id:ksby:20170201225949p:plain

今度は FixedBackOffPolicy でリトライ時の待機時間を5秒にして、TimeoutRetryPolicy の最大10秒が適用されることを確認します。

    /**
     * 3回リトライするか、10秒を越えるまでリトライする
     * FixedBackOffPolicy はリトライ時の待機時間を5秒にしているので、
     * 10秒リトライで終了する
     *
     * @return
     */
    @Bean
    public RetryTemplate compositeRetryTemplate02() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // CompositeRetryPolicy
        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
        RetryPolicy[] retryPolicies = new RetryPolicy[2];
        retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true));
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(10000);
        retryPolicies[1] = timeoutRetryPolicy;
        compositeRetryPolicy.setPolicies(retryPolicies);
        retryTemplate.setRetryPolicy(compositeRetryPolicy);

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(5000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry04Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in04")
                , compositeRetryTemplate02());
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in04 の下に empty.txt を置くと以下のログが出力されます。5秒待機で2回リトライして10秒を超えたので処理が終了していることが確認できます。

f:id:ksby:20170201230345p:plain

RequestHandlerRetryAdvice ではなく直接 RetryTemplate を利用して ExceptionClassifierRetryPolicy のサンプルを作成する

以下の内容で実装します。

  • FileSystemAlreadyExistsException なら最大1分間リトライします。
  • FileSystemNotFoundException なら最大5回リトライします。
  • ExponentialBackOffPolicy で初期の待機時間2秒、最大待機時間10秒、倍数2を指定します。

最初は FileSystemAlreadyExistsException を throw します。

    @Autowired
    private RetryTemplateMessageHandler retryTemplateMessageHandler;

    @Bean
    public IntegrationFlow retry05Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-advice/in05"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                .handle(this.retryTemplateMessageHandler)
                .log()
                .channel(nullChannel)
                .get();
    }

    @Configuration
    public static class RetryTemplateConfig {

        @Bean
        public RetryTemplate exceptionClassifierRetryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();

            // ExceptionClassifierRetryPolicy
            ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
            Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
            //  FileSystemAlreadyExistsException なら TimeoutRetryPolicy で最大1分間リトライ
            TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
            timeoutRetryPolicy.setTimeout(60000);
            policyMap.put(FileSystemAlreadyExistsException.class, timeoutRetryPolicy);
            //  FileSystemNotFoundException なら SimpleRetryPolicy で最大5回リトライ
            policyMap.put(FileSystemNotFoundException.class
                    , new SimpleRetryPolicy(5
                            , singletonMap(FileSystemNotFoundException.class, true)));
            retryPolicy.setPolicyMap(policyMap);
            retryTemplate.setRetryPolicy(retryPolicy);

            // ExponentialBackOffPolicy で初期の待機時間2秒、最大待機時間10秒、倍数2を指定
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(2000);
            exponentialBackOffPolicy.setMaxInterval(10000);
            exponentialBackOffPolicy.setMultiplier(2.0);
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

            return retryTemplate;
        }

    }

    @MessageEndpoint
    public static class RetryTemplateMessageHandler implements GenericHandler<File> {

        private final RetryTemplate exceptionClassifierRetryTemplate;

        public RetryTemplateMessageHandler(
                @Qualifier("exceptionClassifierRetryTemplate") RetryTemplate exceptionClassifierRetryTemplate) {
            this.exceptionClassifierRetryTemplate = exceptionClassifierRetryTemplate;
        }

        @Override
        public Object handle(File payload, Map<String, Object> headers) {
            Object result = this.exceptionClassifierRetryTemplate.execute(
                    context -> {
                        log.info("★★★ リトライ回数 = " + context.getRetryCount());

                        if (true) {
                            throw new FileSystemAlreadyExistsException();
                        }
                        return payload;
                    }, context -> {
                        Exception e = (Exception) context.getLastThrowable();
                        log.error("●●● " + e.getClass().getName());
                        log.error("●●● " + payload.getName());
                        return payload;
                    });

            return result;
        }

    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in05 の下に empty.txt を置くと以下のログが出力されます。リトライをして1分を超えたら処理が終了していることが確認できます。

f:id:ksby:20170203003657p:plain

次は FileSystemNotFoundException を throw します。

            Object result = this.exceptionClassifierRetryTemplate.execute(
                    context -> {
                        log.info("★★★ リトライ回数 = " + context.getRetryCount());

                        if (true) {
//                            throw new FileSystemAlreadyExistsException();
                            throw new FileSystemNotFoundException();
                        }
                        return payload;
                    }, context -> {

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in05 の下に empty.txt を置くと以下のログが出力されます。5回リトライして1分を超えずに処理が終了していることが確認できます。

f:id:ksby:20170203005049p:plain

続きます!

spring-retry って結構機能豊富なんですね。。。

次は ExpressionEvaluatingRequestHandlerAdvice を使用したサンプルを作成してみます。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

[compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
    maven { url "http://repo.spring.io/repo/" }
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-file")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin") {
        exclude module: 'spring-boot-starter-web'
    }

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.2")
}

Application.java

package ksbysample.eipapp.advice;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

FlowConfig.java

package ksbysample.eipapp.advice;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.CompositeRetryPolicy;
import org.springframework.retry.policy.ExceptionClassifierRetryPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.policy.TimeoutRetryPolicy;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.retry.support.RetryTemplate;

import java.io.File;
import java.nio.file.FileSystemAlreadyExistsException;
import java.nio.file.FileSystemNotFoundException;
import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.singletonMap;

@Slf4j
@Configuration
public class FlowConfig {

    private final NullChannel nullChannel;

    public FlowConfig(NullChannel nullChannel) {
        this.nullChannel = nullChannel;
    }

    /**
     * 共通 retry 用 Flow
     * retryTemplate で設定されたリトライを実行する
     *
     * @param inDir         監視対象の in ディレクトリ
     * @param retryTemplate リトライ条件を設定した RetryTemplate クラスのインスタンス
     * @return
     */
    private IntegrationFlow retryFlow(File inDir, RetryTemplate retryTemplate) {
        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        retryAdvice.setRetryTemplate(retryTemplate);
        retryAdvice.setRecoveryCallback(context -> {
            // リトライが全て失敗するとこの処理が実行される
            MessageHandlingException e = (MessageHandlingException) context.getLastThrowable();
            Message<?> message = ((MessageHandlingException) context.getLastThrowable()).getFailedMessage();
            File payload = (File) message.getPayload();
            log.error("●●● " + e.getRootCause().getClass().getName());
            log.error("●●● " + payload.getName());
            return null;
        });

        return IntegrationFlows
                .from(s -> s.file(inDir)
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                .handle((GenericHandler<Object>) (p, h) -> {
                    RetryContext retryContext = RetrySynchronizationManager.getContext();
                    log.info("★★★ リトライ回数 = " + retryContext.getRetryCount());

                    // 例外を throw して強制的にリトライさせる
                    if (true) {
                        throw new RuntimeException();
                    }
                    return p;
                }, e -> e.advice(retryAdvice))
                .log()
                .channel(nullChannel)
                .get();
    }

    /**
     * リトライ最大回数は5回、リトライ時は2秒待機する
     *
     * @return
     */
    @Bean
    public RetryTemplate simpleAndFixedRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // SimpleRetryPolicy
        retryTemplate.setRetryPolicy(
                new SimpleRetryPolicy(5, singletonMap(Exception.class, true)));

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(2000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry01Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in01")
                , simpleAndFixedRetryTemplate());
    }

    /**
     * リトライは最大45秒、リトライ時は初期値2秒、最大10秒、倍数2(2,4,8,10,10,...)待機する
     *
     * @return
     */
    @Bean
    public RetryTemplate timeoutAndExponentialRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // TimeoutRetryPolicy
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(45000);
        retryTemplate.setRetryPolicy(timeoutRetryPolicy);

        // ExponentialBackOffPolicy
        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
        exponentialBackOffPolicy.setInitialInterval(2000);
        exponentialBackOffPolicy.setMaxInterval(10000);
        exponentialBackOffPolicy.setMultiplier(2.0);
        retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry02Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in02")
                , timeoutAndExponentialRetryTemplate());
    }

    /**
     * 3回リトライするか、10秒を越えるまでリトライする
     * FixedBackOffPolicy はリトライ時の待機時間を1秒にしているので、
     * 3回リトライで終了する
     *
     * @return
     */
    @Bean
    public RetryTemplate compositeRetryTemplate01() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // CompositeRetryPolicy
        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
        RetryPolicy[] retryPolicies = new RetryPolicy[2];
        retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true));
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(10000);
        retryPolicies[1] = timeoutRetryPolicy;
        compositeRetryPolicy.setPolicies(retryPolicies);
        retryTemplate.setRetryPolicy(compositeRetryPolicy);

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry03Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in03")
                , compositeRetryTemplate01());
    }

    /**
     * 3回リトライするか、10秒を越えるまでリトライする
     * FixedBackOffPolicy はリトライ時の待機時間を5秒にしているので、
     * 10秒リトライで終了する
     *
     * @return
     */
    @Bean
    public RetryTemplate compositeRetryTemplate02() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // CompositeRetryPolicy
        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
        RetryPolicy[] retryPolicies = new RetryPolicy[2];
        retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true));
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(10000);
        retryPolicies[1] = timeoutRetryPolicy;
        compositeRetryPolicy.setPolicies(retryPolicies);
        retryTemplate.setRetryPolicy(compositeRetryPolicy);

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(5000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry04Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in04")
                , compositeRetryTemplate02());
    }

    /**
     * ここから下は RequestHandlerRetryAdvice は使用せず RetryTemplate を直接使用して
     * ExceptionClassifierRetryPolicy でリトライした例である
     */

    @Autowired
    private RetryTemplateMessageHandler retryTemplateMessageHandler;

    @Bean
    public IntegrationFlow retry05Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-advice/in05"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                .handle(this.retryTemplateMessageHandler)
                .log()
                .channel(nullChannel)
                .get();
    }

    @Configuration
    public static class RetryTemplateConfig {

        @Bean
        public RetryTemplate exceptionClassifierRetryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();

            // ExceptionClassifierRetryPolicy
            ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
            Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
            //  FileSystemAlreadyExistsException なら TimeoutRetryPolicy で最大1分間リトライ
            TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
            timeoutRetryPolicy.setTimeout(60000);
            policyMap.put(FileSystemAlreadyExistsException.class, timeoutRetryPolicy);
            //  FileSystemNotFoundException なら SimpleRetryPolicy で最大5回リトライ
            policyMap.put(FileSystemNotFoundException.class
                    , new SimpleRetryPolicy(5
                            , singletonMap(FileSystemNotFoundException.class, true)));
            retryPolicy.setPolicyMap(policyMap);
            retryTemplate.setRetryPolicy(retryPolicy);

            // ExponentialBackOffPolicy で初期の待機時間2秒、最大待機時間10秒、倍数2を指定
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(2000);
            exponentialBackOffPolicy.setMaxInterval(10000);
            exponentialBackOffPolicy.setMultiplier(2.0);
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

            return retryTemplate;
        }

    }

    @MessageEndpoint
    public static class RetryTemplateMessageHandler implements GenericHandler<File> {

        private final RetryTemplate exceptionClassifierRetryTemplate;

        public RetryTemplateMessageHandler(
                @Qualifier("exceptionClassifierRetryTemplate") RetryTemplate exceptionClassifierRetryTemplate) {
            this.exceptionClassifierRetryTemplate = exceptionClassifierRetryTemplate;
        }

        @Override
        public Object handle(File payload, Map<String, Object> headers) {
            Object result = this.exceptionClassifierRetryTemplate.execute(
                    context -> {
                        log.info("★★★ リトライ回数 = " + context.getRetryCount());

                        if (true) {
//                            throw new FileSystemAlreadyExistsException();
                            throw new FileSystemNotFoundException();
                        }
                        return payload;
                    }, context -> {
                        Exception e = (Exception) context.getLastThrowable();
                        log.error("●●● " + e.getClass().getName());
                        log.error("●●● " + payload.getName());
                        return payload;
                    });

            return result;
        }

    }

}

履歴

2017/02/03
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その14 )( delayer のサンプルを作ってみる )

概要

記事一覧はこちらです。

  • Spring Integration DSL8.6 Delayer を使用したサンプルを作成します。
  • Delayer は指定した時間 Message を待機させる機能です。
  • RequestHandlerAdvice、.bridge(…)、MessageStore も初めて使っていますが今回説明は入れていません。RequestHandlerAdvice、MessageStore は慣れておきたいので、その内調べてまとめておきたいですね。。。

参照したサイト・書籍

  1. Spring Integration Reference Manual - 8.6 Delayer
    http://docs.spring.io/spring-integration/reference/htmlsingle/#delayer

  2. handling file backup/archive on success and retry on failure
    http://stackoverflow.com/questions/24016152/handling-file-backup-archive-on-success-and-retry-on-failure

    • ExpressionEvaluatingRequestHandlerAdvice の使い方の参考にしました。

目次

  1. ksbysample-eipapp-delayer プロジェクトを作成する
  2. C:\eipapp\ksbysample-eipapp-delayer ディレクトリを作成する
  3. まずはシンプルに時間を指定して待機させてみる
  4. Message の header, payload に指定した時間で待機させてみる
  5. 待機する時間を動的に変えてみる
  6. .delay(...) の第1引数には GROUP_ID を指定するが、何かを MessageGroupStore に保存するのか?

手順

ksbysample-eipapp-delayer プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. src/main/java の下に ksbysample.eipapp.delayer パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/delayer の下に Application.java を作成し、リンク先の内容 を記述します。

C:\eipapp\ksbysample-eipapp-delayer ディレクトリを作成する

以下の構成のディレクトリを作成します。

C:\eipapp\ksbysample-eipapp-delayer
├ in01
├ in02
├ in03
└ in04

処理開始用のファイルには Spring Boot + Spring Integration でいろいろ試してみる ( その10 )( URL一覧のファイルが置かれたらアクセス可能かチェックして結果ファイルに出力する ) で作成した 2014.txt と、必要に応じて作成したもの使用します。

まずはシンプルに時間を指定して待機させてみる

  1. src/main/java/ksbysample/eipapp/delayer の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

  2. bootRun タスクを実行した後、C:\eipapp\ksbysample-eipapp-delayer\in01 に 2014.txt を置きます。

    以下のログが出力されて、実装通り3秒待機→6秒待機していることが確認できます。

    f:id:ksby:20170129023442p:plain

  3. bootRun タスクを停止します。

Message の header, payload に指定した時間で待機させてみる

  1. src/main/java/ksbysample/eipapp/delayer の下の FlowConfig.javaリンク先のその2の内容 に変更します。

  2. 動作確認用に、中が空の empty.txt と、中に “10000” と記述した 10000.txt を作成します。

  3. bootRun タスクを実行した後、最初に C:\eipapp\ksbysample-eipapp-delayer\in02 に empty.txt を置きます。

    以下のログが出力されて、(DelayerEndpointSpec e) -> e.defaultDelay(...) で指定されているデフォルトの待機秒数が使用されて、8秒待機→2秒待機していることが確認できます。

    f:id:ksby:20170129024605p:plain

  4. 次に C:\eipapp\ksbysample-eipapp-delayer\in02 に 10000.txt を置きます。

    以下のログが出力されて、ファイルに記述した待機秒数が使用されて、10秒待機→10秒待機していることが確認できます。

    f:id:ksby:20170129025132p:plain

  5. bootRun タスクを停止します。

待機する時間を動的に変えてみる

Message の headers にセットされた delayInit, delayCount の2つを参照して delayInit + delayCount * 1000 ミリ秒待機する、というサンプルを作成します。

  1. src/main/java/ksbysample/eipapp/delayer の下の FlowConfig.javaリンク先のその3の内容 に変更します。

  2. bootRun タスクを実行した後、C:\eipapp\ksbysample-eipapp-delayer\in03 に 2014.txt を置きます。

    以下のログが出力されて、待機秒数が3秒→4秒→5秒と増えていることが確認できます。

    f:id:ksby:20170129094149p:plain

  3. bootRun タスクを停止します。

.delay(...) の第1引数には GROUP_ID を指定するが、何かを MessageGroupStore に保存するのか?

org.springframework.integration.handler の DelayHandler.java の doInit メソッドを見ると messageStore が指定されていない場合には this.messageStore = new SimpleMessageStore(); と実装されているので、デフォルトでは delay 中は SimpleMessageStore に何かのデータを入れており、その時に GROUP_ID を使用するようです。

SimpleMessageStore のクラス図を IntelliJ IDEA で作成してみると MessageGroupStore インターフェースを実装していることが分かります。

f:id:ksby:20170129165355p:plain

何を MessageGroupStore に入れるのか確認してみます。

  1. src/main/java/ksbysample/eipapp/delayer の下の FlowConfig.javaリンク先のその4の内容 に変更します。

  2. bootRun タスクを実行した後、C:\eipapp\ksbysample-eipapp-delayer\in04 に 2014.txt を置きます。

    以下のログが出力されます。

    f:id:ksby:20170129165607p:plain

    • delay が開始されると指定した simpleMessageStore にデータがセットされて、delay が終了すると削除されることが確認できます。
    • header は id, timestamp のみでした。カスタムの header が追加されてはいないようです。
    • payload は org.springframework.integration.handler の DelayHandler クラス内で定義されている DelayedMessageWrapper クラスのデータがセットされていました。DelayedMessageWrapper クラスは処理開始時点の日時がセットされる long requestDate と、オリジナルの Message がセットされる Message<?> original をフィールドに持つクラスでした。
  3. bootRun タスクを停止します。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

[compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
    maven { url "http://repo.spring.io/repo/" }
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-file")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.2")
}

Application.java

package ksbysample.eipapp.delayer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

FlowConfig.java

■その1

package ksbysample.eipapp.delayer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.DelayerEndpointSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;

import java.io.File;

@Slf4j
@Configuration
public class FlowConfig {

    private static final String GROUP_ID_DELAY = "DELAYFLOW_DELAY";

    private final NullChannel nullChannel;

    public FlowConfig(NullChannel nullChannel) {
        this.nullChannel = nullChannel;
    }

    /**
     * 正常処理時に payload にセットされた File クラスのファイルを削除する RequestHandlerAdvice
     *
     * @return
     */
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice fileDeleteAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice requestHandlerAdvice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        requestHandlerAdvice.setOnSuccessExpression("payload.delete()");
        return requestHandlerAdvice;
    }


    @Bean
    public IntegrationFlow delayFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in01"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                // 3秒待機する
                .delay(GROUP_ID_DELAY, (DelayerEndpointSpec e) -> e.defaultDelay(3000))
                .log()
                // 6秒待機する
                .delay(GROUP_ID_DELAY, "6000")
                .log()
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

}

■その2

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public IntegrationFlow delayByMessageFlow() {
        return IntegrationFlows
                // C:/eipapp/ksbysample-eipapp-delayer/in02 には待機秒数を記述したファイルを置く
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in02"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // payload にセットされた File クラスを headers.originalPayload へセットする
                .enrichHeaders(h -> h.headerExpression("originalPayload", "payload"))
                // ファイルの内容を読み込んで payload にセットする
                // ※payload のクラスが File クラス --> String クラスに変わる
                .transform(Transformers.fileToString())
                .log()
                // payload にセットされた秒数待機する
                .delay(GROUP_ID_DELAY, "payload", (DelayerEndpointSpec e) -> e.defaultDelay(8000))
                .log()
                // headers.delay にセットされた秒数待機する
                .enrichHeaders(h -> h.headerExpression("delay", "payload"))
                .delay(GROUP_ID_DELAY, "headers.delay", (DelayerEndpointSpec e) -> e.defaultDelay(2000))
                .log()
                // headers.originalPayload にセットされている File クラスを payload へ戻す
                .transform("headers.originalPayload")
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

}

■その3

@Slf4j
@Configuration
public class FlowConfig {

    ..........


    @Bean
    public IntegrationFlow delayDynamicFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in03"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // 待機秒数を計算するための delayInit, delayCount の初期値を header にセットする
                .enrichHeaders(h -> h
                        .header("delayInit", 3000)
                        .header("delayCount", 0))
                .log()
                // delayCount が 3 になるまで処理をループさせたいので、別の Flow にする
                // return f -> f.~ で書いた場合、Bean名 + ".input" という MessageChannel
                // が自動生成されて、ここに Message を送信すると処理が開始される
                .channel("loopCountFlow.input")
                .get();
    }

    @Bean
    public IntegrationFlow loopCountFlow() {
        return f -> f
                // delayInit + delayCount * 1000 のミリ秒数待機する
                .delay(GROUP_ID_DELAY, m -> {
                    return ((int) m.getHeaders().get("delayInit"))
                            + (((int) m.getHeaders().get("delayCount")) * 1000);
                })
                .log()
                // headers.delayCount を +1 する
                .enrichHeaders(h -> h.headerFunction("delayCount"
                        , m -> ((int) m.getHeaders().get("delayCount")) + 1
                        , true))
                // delayCount が3未満なら loopCountFlow の最初に戻る
                // そうでなければ次の処理へ
                .routeToRecipients(r -> r
                        .recipientFlow("headers.delayCount < 3"
                                , sf -> sf.channel("loopCountFlow.input"))
                        .defaultOutputToParentFlow())
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .handle((p, h) -> {
                    System.out.println("ファイルを削除しました");
                    return null;
                });
    }

}

■その4

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public MessageGroupStore simpleMessageStore() {
        return new SimpleMessageStore();
    }

    /**
     * "" の String の Message を返す MessageSource
     *
     * @return
     */
    @Bean
    public MessageSource<String> stringMessageSource() {
        return () -> MessageBuilder.withPayload("").build();
    }

    @Bean
    public IntegrationFlow printMessageStoreFlow() {
        return IntegrationFlows
                // 1秒毎に処理を実行したいので、1秒毎に Message を送信させる
                .from(stringMessageSource()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // MessageStore に格納された Message の headers, payload を出力する
                .handle((p, h) -> {
                    Collection<Message<?>> delayMessagesList
                            = simpleMessageStore().getMessagesForGroup(GROUP_ID_DELAY);
                    delayMessagesList.forEach(m -> {
                        m.getHeaders().entrySet().forEach(entry -> {
                            System.out.println("[header ] " + entry.getKey() + " = " + entry.getValue());
                        });
                        System.out.println("[payload] " + m.getPayload());
                    });
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow checkMessageStoreFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in04"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                // 3秒待機する
                .delay(GROUP_ID_DELAY
                        , (DelayerEndpointSpec e) -> e.defaultDelay(3000)
                                .messageStore(simpleMessageStore()))
                .log()
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

}
  • checkMessageStoreFlow は in04 ディレクトリにファイルが置かれる→3秒待機→ファイル削除する、Flow です。MessageGroupStore のデータを確認したいので .messageStore(simpleMessageStore()) で指定しています。
  • printMessageStoreFlow は1秒毎に MessageGroupStore をチェックしてデータがあれば出力する Flow です。checkMessageStoreFlow とは別に並行して処理が実行されます。

■最終形

package ksbysample.eipapp.delayer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.DelayerEndpointSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.Transformers;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

import java.io.File;
import java.util.Collection;
import java.util.Map;

@Slf4j
@Configuration
public class FlowConfig {

    private static final String GROUP_ID_DELAY = "DELAYFLOW_DELAY";

    private final NullChannel nullChannel;

    public FlowConfig(NullChannel nullChannel) {
        this.nullChannel = nullChannel;
    }

    /**
     * 正常処理時に payload にセットされた File クラスのファイルを削除する RequestHandlerAdvice
     *
     * @return
     */
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice fileDeleteAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice requestHandlerAdvice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        requestHandlerAdvice.setOnSuccessExpression("payload.delete()");
        return requestHandlerAdvice;
    }

    @Bean
    public IntegrationFlow delayFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in01"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                // 3秒待機する
                .delay(GROUP_ID_DELAY
                        , (DelayerEndpointSpec e) -> e.defaultDelay(3000))
                .log()
                // 6秒待機する
                .delay(GROUP_ID_DELAY, "6000")
                .log()
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

    @Bean
    public IntegrationFlow delayByMessageFlow() {
        return IntegrationFlows
                // C:/eipapp/ksbysample-eipapp-delayer/in02 には待機秒数を記述したファイルを置く
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in02"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // payload にセットされた File クラスを headers.originalPayload へセットする
                .enrichHeaders(h -> h.headerExpression("originalPayload", "payload"))
                // ファイルの内容を読み込んで payload にセットする
                // ※payload のクラスが File クラス --> String クラスに変わる
                .transform(Transformers.fileToString())
                .log()
                // payload にセットされた秒数待機する
                .delay(GROUP_ID_DELAY, "payload", (DelayerEndpointSpec e) -> e.defaultDelay(8000))
                .log()
                // headers.delay にセットされた秒数待機する
                .enrichHeaders(h -> h.headerExpression("delay", "payload"))
                .delay(GROUP_ID_DELAY, "headers.delay", (DelayerEndpointSpec e) -> e.defaultDelay(2000))
                .log()
                // headers.originalPayload にセットされている File クラスを payload へ戻す
                .transform("headers.originalPayload")
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

    @Bean
    public IntegrationFlow delayDynamicFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in03"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // 待機秒数を計算するための delayInit, delayCount の初期値を header にセットする
                .enrichHeaders(h -> h
                        .header("delayInit", 3000)
                        .header("delayCount", 0))
                .log()
                // delayCount が 3 になるまで処理をループさせたいので、別の Flow にする
                // return f -> f.~ で書いた場合、Bean名 + ".input" という MessageChannel
                // が自動生成されて、ここに Message を送信すると処理が開始される
                .channel("loopCountFlow.input")
                .get();
    }

    @Bean
    public IntegrationFlow loopCountFlow() {
        return f -> f
                // delayInit + delayCount * 1000 のミリ秒数待機する
                .delay(GROUP_ID_DELAY, m -> {
                    return ((int) m.getHeaders().get("delayInit"))
                            + (((int) m.getHeaders().get("delayCount")) * 1000);
                })
                .log()
                // headers.delayCount を +1 する
                .enrichHeaders(h -> h.headerFunction("delayCount"
                        , m -> ((int) m.getHeaders().get("delayCount")) + 1
                        , true))
                // delayCount が3未満なら loopCountFlow の最初に戻る
                // そうでなければ次の処理へ
                .routeToRecipients(r -> r
                        .recipientFlow("headers.delayCount < 3"
                                , sf -> sf.channel("loopCountFlow.input"))
                        .defaultOutputToParentFlow())
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .handle((p, h) -> {
                    System.out.println("ファイルを削除しました");
                    return null;
                });
    }

    @Bean
    public MessageGroupStore simpleMessageStore() {
        return new SimpleMessageStore();
    }

    /**
     * "" の String の Message を返す MessageSource
     *
     * @return
     */
    @Bean
    public MessageSource<String> stringMessageSource() {
        return () -> MessageBuilder.withPayload("").build();
    }

    @Bean
    public IntegrationFlow printMessageStoreFlow() {
        return IntegrationFlows
                // 1秒毎に処理を実行したいので、1秒毎に Message を送信させる
                .from(stringMessageSource()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // MessageStore に格納された Message の headers, payload を出力する
                .handle((p, h) -> {
                    Collection<Message<?>> delayMessagesList
                            = simpleMessageStore().getMessagesForGroup(GROUP_ID_DELAY);
                    delayMessagesList.forEach(m -> {
                        m.getHeaders().entrySet().forEach(entry -> {
                            System.out.println("[header ] " + entry.getKey() + " = " + entry.getValue());
                        });
                        System.out.println("[payload] " + m.getPayload());
                    });
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow checkMessageStoreFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in04"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                // 3秒待機する
                .delay(GROUP_ID_DELAY
                        , (DelayerEndpointSpec e) -> e.defaultDelay(3000)
                                .messageStore(simpleMessageStore()))
                .log()
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

}

履歴

2017/01/29
初版発行。

IntelliJ IDEA 2016.3 の新機能 Semantic highlighting がとても気に入りました!

共有ライブラリを管理するために Sonatype の Nexus Repository Manager OSS を使用する ( 番外編 )( IntelliJ IDEA 2016.3 の新機能を試してみる ) でいくつか 2016.3 の新機能を使う設定にして IntelliJ IDEA を使い続けていましたが、設定当初はあまり期待していなかった Semantic highlighting が使っていくうちに一番気に入りました。ソースが非常に読みやすくなります!

標準では有効になっていないので、設定していない人は是非設定することをオススメします。白い画面だと Semantic highlighting は見にくいので、Scheme は Darcula にしましょう。

以下が画面サンプルです。

Scheme: Default、Semantic highlighting 無効 f:id:ksby:20170128224943p:plain

Scheme: Default、Semantic highlighting 有効 f:id:ksby:20170128225206p:plain

Scheme: Darcula、Semantic highlighting 無効 f:id:ksby:20170128225417p:plain

Scheme: Darcula、Semantic highlighting 有効 f:id:ksby:20170128225625p:plain

また他の機能ですが、

  • 「Enable font ligatures」はソースによってあまりに表示・編集が重くなりすぎるので、無効にしました。
  • フォントも「Enable font ligatures」を有効にしないなら Fira Code font はあまり好きなフォントではなかったので、Source Code Pro に戻して Size も 12pt に戻しています。
  • Parameter hints は Semantic highlighting 程ではありませんが、気に入っています。変数名が出ると便利です。

Java SE を 8u112 → 8u121 へ、IntelliJ IDEA を 2016.3.2 → 2016.3.3 へ、Git for Windows を 2.11.0 → 2.11.0(3) へバージョンアップ

Java SE を 8u112 → 8u121 へバージョンアップする

  1. OracleJava SE Downloads を見ると 8u121 がダウンロードできるようになっていました。以下のページに説明があります。

    8u121 へバージョンアップします。

  2. jdk-8u121-windows-x64.exe をダウンロードして C:\Java\jdk1.8.0_121 へインストールした後、環境変数 JAVA_HOME のパスを C:\Java\jdk1.8.0_121 へ変更します。

    コマンドプロンプトから java -version を実行し、1.8.0_121 に変更されていることを確認します。

    f:id:ksby:20170128192340p:plain

  3. IntelliJ IDEA を再起動した後、プロジェクトで使用する Java SE を 8u121 へ変更します。

  4. 開いているプロジェクトを閉じて「Welcome to IntelliJ IDEA」ダイアログを表示します。

  5. ダイアログ下部の「Configure」-「Project Defaults」-「Project Structure」を選択します。

    f:id:ksby:20170128192957p:plain

  6. 「Default Project Structure」ダイアログが表示されます。画面左側で「Project Settings」-「Project」を選択後、画面右側の「Project SDK」の「New…」ボタンをクリックし、表示されるメニューから「JDK」を選択します。

    f:id:ksby:20170128193323p:plain

  7. 「Select Home Directory for JDK」ダイアログが表示されます。環境変数 JAVA_HOME のディレクトリが選択された状態で表示されますので、そのまま「OK」ボタンをクリックします。

    f:id:ksby:20170128193928p:plain

  8. 「Default Project Structure」ダイアログに戻るので、今度は「Project SDK」の「Edit」ボタンをクリックします。

    f:id:ksby:20170128194842p:plain

  9. 画面左側で「Platform Settings」-「SDKs」が選択された状態になるので、画面右上の入力フィールドで “1.8” → “1.8.0_121” へ変更します。

    f:id:ksby:20170128195507p:plain

  10. 次に中央のリストから「1.8.0_112」を選択した後、リストの上の「-」ボタンをクリックして削除します。

    f:id:ksby:20170128195720p:plain

  11. 「OK」ボタンをクリックして「Default Project Structure」ダイアログを閉じます。

  12. 「Welcome to IntelliJ IDEA」ダイアログに戻ったら、ksbysample-library-depend-nospring プロジェクトを開きます。

  13. IntelliJ IDEA のメイン画面が開いたら、メニューから「File」-「Project Structure…」を選択します。

  14. 「Project Structure」ダイアログが表示されます。以下の画像の状態になっているので、

    f:id:ksby:20170128200107p:plain

    「Project SDK」と「Project language level」を選択し直します。

    f:id:ksby:20170128200300p:plain

  15. 「OK」ボタンをクリックして「Project Structure」ダイアログを閉じます。

  16. メイン画面に戻ると画面右下に「Indexing…」の表示が出るので、終了するまで待ちます。

    f:id:ksby:20170128200514p:plain

  17. Build 及びテストで問題がないか確認します。Gradle projects View から clean タスクの実行→「Rebuild Project」メニューの実行→build タスクの実行を行い、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。

    f:id:ksby:20170128201033p:plain

  18. 特に問題は発生しませんでした。8u121 で開発を進めます。

IntelliJ IDEA を 2016.3.2 → 2016.3.3 へバージョンアップする

IntelliJ IDEA の 2016.3.3 がリリースされたのでバージョンアップします。

※上の Java SE のバージョンアップからの続きで ksbysample-nexus-repomng/ksbysample-library-depend-nospring プロジェクトを開いた状態でバージョンアップしています。

  1. IntelliJ IDEA のメインメニューから「Help」-「Check for Updates…」を選択します。

  2. 「Platform and Plugin Updates」ダイアログが表示されます。左下に「Update and Restart」ボタンが表示されていますので、「Update and Restart」ボタンをクリックします。

    f:id:ksby:20170128202000p:plain

  3. JRebel for IntelliJ と Lombok Plugin の update も表示されたので、チェックしたまま「Update and Restart」ボタンをクリックします。

    f:id:ksby:20170128202133p:plain

  4. Patch がダウンロードされて IntelliJ IDEA が再起動します。今回は再起動時に以下の画像の「Update」ダイアログが表示されたので、「Proceed」ボタンをクリックします。

    f:id:ksby:20170128203827p:plain

  5. メイン画面が表示されると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

    f:id:ksby:20170128204111p:plain

  6. 処理が終了すると Gradle Tool Window のツリーの表示が other グループしかない初期の状態に戻っていますので、左上の「Refresh all Gradle projects」ボタンをクリックして更新します。更新が完了すると build グループ等が表示されます。

  7. clean タスク実行 → Rebuild Project 実行をした後、build タスクを実行して “BUILD SUCCESSFUL” のメッセージが表示されることを確認します。

    f:id:ksby:20170128204529p:plain

  8. Project Tool Window で src/test を選択した後、コンテキストメニューを表示して「Run ‘All Tests’ with Coverage」を選択し、テストが全て成功することを確認します。

    f:id:ksby:20170128204732p:plain

Git for Windows を 2.11.0 → 2.11.0(3) へバージョンアップする

Git for Windows の 2.11.0(3) がリリースされていたのでバージョンアップします。実は前回バージョンアップした時に気づかなかったのですが、git-cmd.exe を起動した画面で日本語が表示・入力できなくなっていました。

f:id:ksby:20170128205402p:plain

  1. https://git-for-windows.github.io/ の「Download」ボタンをクリックして Git-2.11.0.3-64-bit.exe をダウンロードします。

  2. Git-2.11.0.3-64-bit.exe を実行します。

  3. 「Git 2.11.0.3 Setup」ダイアログが表示されます。[Next >]ボタンをクリックします。

  4. 「Select Components」画面が表示されます。全てのチェックが外れたままであることを確認した後、[Next >]ボタンをクリックします。

  5. 「Adjusting your PATH environment」画面が表示されます。中央の「Use Git from the Windows Command Prompt」が選択されていることを確認後、[Next >]ボタンをクリックします。

  6. 「Configuring the line ending conversions」画面が表示されます。「Checkout Windows-style, commit Unix-style line endings」が選択されていることを確認した後、[Next >]ボタンをクリックします。

  7. 「Configuring the terminal emulator to use with Git Bash」画面が表示されます。「Use Windows'default console window」が選択されていることを確認した後、[Next >]ボタンをクリックします。

  8. 「Configuring extra options」画面が表示されます。「Enable file system caching」だけがチェックされていることを確認した後、[Next >]ボタンをクリックします。

  9. 「Configuring experimental options」画面が表示されます。全てのチェックが外れたままであることを確認した後、[Install]ボタンをクリックします。

  10. インストールが完了すると「Completing the Git Setup Wizard」のメッセージが表示された画面が表示されます。中央の「View ReleaseNotes.html」のチェックを外した後、「Finish」ボタンをクリックしてインストーラーを終了します。

  11. コマンドプロンプトを起動して git のバージョンが git version 2.11.0.windows.3 になっていることを確認します。

    f:id:ksby:20170128221500p:plain

  12. git-cmd.exe を起動して日本語の表示・入力が問題ないかを確認します。

    f:id:ksby:20170128221826p:plain

  13. 特に問題はないようですので、2.11.0(3) で作業を進めたいと思います。