読者です 読者をやめる 読者になる 読者になる

かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その16 )( ExpressionEvaluatingRequestHandlerAdvice のサンプルを作ってみる )

概要

記事一覧はこちらです。

Spring Boot + Spring Integration でいろいろ試してみる ( その15 )( RequestHandlerRetryAdvice のサンプルを作ってみる ) の続きです。

ExpressionEvaluatingRequestHandlerAdvice のサンプルを作成します。ExpressionEvaluatingRequestHandlerAdvice は以下の2つを指定するための RequestHandlerAdvice です。

  • 成功、失敗(例外が throw された)時の処理を SpEL で記述できる。
  • 成功・失敗時に送信する MessageChannel を指定できる。

参照したサイト・書籍

  1. Spring Integration Reference Manual - 8.9.2 Provided Advice Classes - Expression Evaluating Advice
    http://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints-chapter.html#expression-advice

  2. Move file with file-adapter with SI
    http://stackoverflow.com/questions/33835657/move-file-with-file-adapter-with-si

  3. instanceof in SpEL
    http://stackoverflow.com/questions/7628437/instanceof-in-spel

目次

  1. Spring Boot を 1.4.3 → 1.4.4 へ、Spring IO Platform を Athens-SR2 → Athens-SR3 へバージョンアップする
  2. C:\eipapp\ksbysample-eipapp-advice ディレクトリを変更する
  3. application.properties, logback-spring.xml を追加する
  4. ExpressionEvaluatingRequestHandlerAdvice のサンプルを作成する
    1. setOnSuccessExpressionString, setOnFailureExpressionString だけ指定する
    2. setSuccessChannelName, setFailureChannelName だけ指定することはできない
    3. setOnSuccessExpressionString, setOnFailureExpressionString+setSuccessChannelName, setFailureChannelName の組み合わせで指定する
    4. setOnSuccessExpressionString, setOnFailureExpressionString で Success, Failure 用の MessageChannel に渡す payload の型を変更する
  5. ExpressionEvaluatingRequestHandlerAdvice を使用した Advice は Bean として定義しないと機能しない?
  6. ExpressionEvaluatingRequestHandlerAdvice を e -> e.advice(…) で指定した後に処理を書いたらどうなる?
  7. RequestHandlerRetryAdvice と一緒に指定してみる
  8. 続くのか。。。?

手順

Spring Boot を 1.4.3 → 1.4.4 へ、Spring IO Platform を Athens-SR2 → Athens-SR3 へバージョンアップする

Spring Boot、Spring IO Platform がバージョンアップされていますので、build.gradle を リンク先の内容 に変更してバージョンアップします。これにより Spring Integration も 4.3.6 → 4.3.7 へバージョンアップされます。

build.gradle 変更後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。

ちょうど試そうとしていた ExpressionEvaluatingRequestHandlerAdvice が、Spring Integration 4.3.7 から以下の変更が入っていました。

  • setOnSuccessExpression(String)、setOnFailureExpression(String) が Deprecated となり、setOnSuccessExpressionString(String)、setOnFailureExpressionString(String) に変わりました。

C:\eipapp\ksbysample-eipapp-advice ディレクトリを変更する

in06, in07, in08, in09, success, error ディレクトリを追加します。

C:\eipapp\ksbysample-eipapp-advice
├ error
├ in01
├ in02
├ in03
├ in04
├ in05
├ in06
├ in07
├ in08
├ in09
└ success

application.properties, logback-spring.xml を追加する

SpEL を使用するので org.springframework.integration.expression.ExpressionUtils の WARN ログが出力されないようにします。

  1. src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。

  2. src/main/resources の下に logback-spring.xml を作成し、リンク先の内容 を記述します。

ExpressionEvaluatingRequestHandlerAdvice のサンプルを作成する

動作確認のためにサンプルを作成していきます。サンプルは src/main/java/ksbysample/eipapp/advice の下に SuccessOrFailureFlowConfig.java を作成して、その中に記述します。完成形は リンク先の内容 です。

setOnSuccessExpressionString, setOnFailureExpressionString だけ指定する

成功時にはファイルを削除し、失敗時にはファイルを error ディレクトリへ移動するサンプルを作成します。

まずは成功する場合を試します。advice 対象の処理内で例外を throw しません。

    private final String EIPAPP_ADVICE_ROOT_DIR = "C:/eipapp/ksbysample-eipapp-advice";


    // setOnSuccessExpressionString, setOnFailureExpressionString だけ指定するサンプル
    //  ・成功時にはファイルを削除する。
    //  ・失敗時にはファイルを error ディレクトリへ移動する。
    //  ・削除、移動の処理は SpEL で記述する。

    @Bean
    public Advice deleteOrMoveAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload.delete()");
        advice.setOnFailureExpressionString(
                "payload.renameTo(new java.io.File('" + EIPAPP_ADVICE_ROOT_DIR + "/error/' + payload.name))");
        // setTrapException(true) を指定すると throw された例外が再 throw されず、
        // ログに出力されない
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow in06Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in06"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(deleteOrMoveAdvice()))
                .get();
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in06 の下に empty.txt を置くとファイルは削除されました。

f:id:ksby:20170205112733p:plain
↓↓↓
f:id:ksby:20170205113033p:plain

今度は失敗する場合を試します。advice 対象の処理内で例外を throw します。

                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
                    if (true) {
                        throw new RuntimeException("エラーです");
                    }
                    return null;
                }, e -> e.advice(deleteOrMoveAdvice()))

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in06 の下に empty.txt を置くとファイルは error ディレクトリへ移動しました。

f:id:ksby:20170205113837p:plain
↓↓↓
f:id:ksby:20170205114107p:plain

また今回、以下のように記述していますが、

                .<File>handle((p, h) -> {

これは以下と同じです。

                .handle((GenericHandler<File>) (p, h) -> {

第2引数の e -> ... を記述する場合にはどちらかのパターンで記述しないとコンパイルエラーになります。IntelliJ IDEA では記述がない場合 .handle に赤波下線が表示され、Alt+Enter を押すと後者のパターンで補完されます(ただし、File ではなく Object になります)。

setSuccessChannelName, setFailureChannelName だけ指定することはできない

以下の実装では動作しません。

    @Bean
    public Advice successOrFailureChannelAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("successFlow.input");
        advice.setFailureChannelName("failureFlow.input");
        advice.setTrapException(true);
        return advice;
    }

payload をそのまま Success, Failure 用の MessageChannel に渡す場合には advice.setOnSuccessExpressionString("payload"); のように "payload" とだけ記述した setOn…ExpressionString(…) を書く必要があります。

    @Bean
    public Advice successOrFailureChannelAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload");
        advice.setSuccessChannelName("successFlow.input");
        advice.setOnFailureExpressionString("payload");
        advice.setFailureChannelName("failureFlow.input");
        advice.setTrapException(true);
        return advice;
    }

setOnSuccessExpressionString, setOnFailureExpressionString+setSuccessChannelName, setFailureChannelName の組み合わせで指定する

サンプルの動作は上で書いた成功時にはファイルを削除し、失敗時にはファイルを error ディレクトリへ移動するというものですが、今度は SpEL ではなく転送した MessageChannel の先の処理で削除、移動します。

まずは成功する場合を試します。advice 対象の処理内で例外を throw しません。

    // setOnSuccessExpressionString, setOnFailureExpressionString+setSuccessChannelName, setFailureChannelName
    // の組み合わせで指定するサンプル
    //  ・成功時には successFlow.input へ Message を送信する。
    //    successFlow ではファイルを削除する。
    //  ・失敗時には failureFlow.input へ Message を送信する。
    //    failureFlow ではファイルを error ディレクトリへ移動する。

    @Bean
    public Advice successOrFailureChannelAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload");
        advice.setSuccessChannelName("successFlow.input");
        // setOnFailureExpressionString に "payload" と記述しても Failure 用の MessageChannel には
        // File クラスではなく MessageHandlingExpressionEvaluatingAdviceException クラスの payload が渡される
        advice.setOnFailureExpressionString("payload");
        advice.setFailureChannelName("failureFlow.input");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow in07Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in07"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(successOrFailureChannelAdvice()))
                .get();
    }

    @Bean
    public IntegrationFlow successFlow() {
        return f -> f
                .<File>handle((p, h) -> {
                    // ファイルを削除する
                    try {
                        Files.delete(Paths.get(p.getAbsolutePath()));
                        log.info("ファイルを削除しました ( {} )", p.getAbsolutePath());
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                });
    }

    @Bean
    public IntegrationFlow failureFlow() {
        return f -> f
                .<ExpressionEvaluatingRequestHandlerAdvice.MessageHandlingExpressionEvaluatingAdviceException>
                        handle((p, h) -> {
                    // MessageHandlingExpressionEvaluatingAdviceException クラスの payload から
                    // Exception 発生前の File クラスの payload を取得する
                    File file = (File) p.getEvaluationResult();

                    // ファイルを error ディレクトリへ移動する
                    Path src = Paths.get(file.getAbsolutePath());
                    Path dst = Paths.get(EIPAPP_ADVICE_ROOT_DIR + "/error/" + file.getName());
                    try {
                        Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING);
                        log.info("ファイルを移動しました ( {} --> {} )"
                                , src.toAbsolutePath(), dst.toAbsolutePath());
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                });
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in07 の下に empty.txt を置くとファイルは削除されました。

f:id:ksby:20170205142858p:plain
↓↓↓
f:id:ksby:20170205143117p:plain

今度は失敗する場合を試します。advice 対象の処理内で例外を throw します。

                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
                    if (true) {
                        throw new RuntimeException("エラーです");
                    }
                    return null;
                }, e -> e.advice(successOrFailureChannelAdvice()))

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in07 の下に empty.txt を置くとファイルは error ディレクトリへ移動しました。

f:id:ksby:20170205143420p:plain
↓↓↓
f:id:ksby:20170205143902p:plain

SpEL だけで処理が書けるならそれで済ませた方がコード量は少なくて楽ですね。

setOnSuccessExpressionString, setOnFailureExpressionString で Success, Failure 用の MessageChannel に渡す payload の型を変更する

setOnSuccessExpressionString, setOnFailureExpressionString に記述する SpEL の結果により Success, Failure 用の MessageChannel に送信される Message の payload にセットされるデータの型を変えることができます。

以下のように実装すると元の Message の payload は File クラスですが、Success, Failure 用の MessageChannel に送信される Message の payload は String クラスになります(まあ Failure 用に送信される Message の payload は更に MessageHandlingExpressionEvaluatingAdviceException になるので実際は少し面倒ですが)。

    // setOnSuccessExpressionString, setOnFailureExpressionString で Success, Failure 用の
    // MessageChannel に渡す payload の型を変更するサンプル
    //  ・元の File クラスの payload から String クラスの payload の Message に変換して
    //    Success, Failure 用の MessageChannel に送信する

    @Bean
    public Advice convertFileToStringAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload + ' の処理に成功しました。'");
        advice.setSuccessChannelName("printFlow.input");
        advice.setOnFailureExpressionString(
                "payload + ' の処理に失敗しました ( ' + #exception.class.name + ', ' + #exception.cause.message + ' )'");
        advice.setFailureChannelName("printFlow.input");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow in08Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in08"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(convertFileToStringAdvice()))
                .get();
    }

    @Bean
    public IntegrationFlow printFlow() {
        return f -> f
                // setFailureChannelName(...) の指定で転送された Message は
                // MessageHandlingExpressionEvaluatingAdviceException クラスなので、
                // .transform(...) で SpEL を利用して元の String を取得する
                .transform("payload instanceof T(org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException)"
                        + " ? payload.evaluationResult : payload")
                .handle((p, h) -> {
                    System.out.println("●●● " + p);
                    return null;
                });

    }

まずは成功する場合を試します。advice 対象の処理内で例外を throw しません。

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in08 の下に empty.txt を置くと以下のログが出力されます。

f:id:ksby:20170205161843p:plain

今度は失敗する場合を試します。advice 対象の処理内で例外を throw します。

                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
                    if (true) {
                        throw new RuntimeException("エラーです");
                    }
                    return null;
                }, e -> e.advice(convertFileToStringAdvice()))

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in08 の下に empty.txt を置くと以下のログが出力されます。

f:id:ksby:20170205162204p:plain

ExpressionEvaluatingRequestHandlerAdvice を使用した Advice は Bean として定義しないと機能しない?

結論から言うと setSuccessChannelName, setFailureChannelName 等のメソッドを使用して成功、失敗時に別の MessageChannel に Message を送信しないのであれば Bean を作成する必要はありませんが、送信する場合には必ず Bean にする必要があります。

例えば in06Flow を以下のようにメソッド内で ExpressionEvaluatingRequestHandlerAdvice のインスタンスを生成するよう変更します。

    @Bean
    public IntegrationFlow in06Flow() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload.delete()");
        advice.setOnFailureExpressionString(
                "payload.renameTo(new java.io.File('" + EIPAPP_ADVICE_ROOT_DIR + "/error/' + payload.name))");
        advice.setTrapException(true);

        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in06"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(advice))
                .get();
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in06 の下に empty.txt を置くとファイルは削除されます。

f:id:ksby:20170205170845p:plain
↓↓↓
f:id:ksby:20170205171202p:plain

今度は in07Flow を以下のようにメソッド内で ExpressionEvaluatingRequestHandlerAdvice のインスタンスを生成するよう変更します。

    @Bean
    public IntegrationFlow in07Flow() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload");
        advice.setSuccessChannelName("successFlow.input");
        advice.setOnFailureExpressionString("payload");
        advice.setFailureChannelName("failureFlow.input");
        advice.setTrapException(true);

        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in07"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(advice))
                .get();
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in07 の下に empty.txt を置いてもファイルは削除されませんでした。

f:id:ksby:20170205171851p:plain
↓↓↓
f:id:ksby:20170205172155p:plain

ログには Caused by: java.lang.IllegalArgumentException: BeanFactory must not be null の例外が throw されていました。

f:id:ksby:20170205172343p:plain

ExpressionEvaluatingRequestHandlerAdvice を e -> e.advice(…) で指定した後に処理を書いたらどうなる?

成功、失敗時に別の MessageChannel に Message を送信する場合としない場合、それぞれで動作を確認してみます。

最初は送信しない場合です。in06Flow を以下のように変更します。

    @Bean
    public IntegrationFlow in06Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in06"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ PASS1 " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return p;
                }, e -> e.advice(deleteOrMoveAdvice()))
                .handle((p, h) -> {
                    log.info("★★★ PASS2 " + p.getClass().getName());
                    return null;
                })
                .get();
    }
  • advice を指定している処理で return null;return p; に変更して Message を次の処理に渡すようにします。またログに “PASS1” の文字列を追加します。
  • 2つ目の .handle(...) を追加します。

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in06 の下に empty.txt を置くと以下のログが出力されました。追加した .handle(...) の処理が実行されています。

f:id:ksby:20170205182324p:plain

in06 ディレクトリの下に置いた empty.txt も削除されていました。ExpressionEvaluatingRequestHandlerAdvice で指定した処理も実行されていました。

f:id:ksby:20170205181614p:plain
↓↓↓
f:id:ksby:20170205182056p:plain

次は送信する場合です。in07Flow を以下のように変更します。

    @Bean
    public IntegrationFlow in07Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in07"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ PASS1 " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return p;
                }, e -> e.advice(successOrFailureChannelAdvice()))
                .handle((p, h) -> {
                    log.info("★★★ PASS2 " + p.getClass().getName());
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow successFlow() {
        return f -> f
                .<File>handle((p, h) -> {
                    // ファイルを削除する
                    try {
                        Files.delete(Paths.get(p.getAbsolutePath()));
                        log.info("★★★ PASS3 ファイルを削除しました ( {} )", p.getAbsolutePath());
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                });
    }
  • advice を指定している処理で return null;return p; に変更して Message を次の処理に渡すようにします。またログに “PASS1” の文字列を追加します。
  • 2つ目の .handle(...) を追加します。
  • successFlow のログに “★★★ PASS3” の文字列を追加します。

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in07 の下に empty.txt を置くと以下のログが出力されました。追加した .handle(...) の処理が実行されています。

f:id:ksby:20170205183630p:plain

in07 ディレクトリの下に置いた empty.txt も削除されていました。ExpressionEvaluatingRequestHandlerAdvice で指定した処理も実行されていました。

f:id:ksby:20170205183303p:plain
↓↓↓
f:id:ksby:20170205183527p:plain

以上の結果から、以下のことが分かりました。

  • advice を書いても次の処理は実行されます。
  • 1つ目の .handle(...) の処理 → ExpressionEvaluatingRequestHandlerAdvice で指定した成功時の処理 → 2つ目の .handle(...) の処理、の順で実行されます。

RequestHandlerRetryAdvice と一緒に指定してみる

以下のコードを追加します。

    // RequestHandlerRetryAdvice と ExpressionEvaluatingRequestHandlerAdvice を一緒に指定するサンプル
    //  ・RequestHandlerRetryAdvice に指定する RetryTemplate には FlowConfig.java に書いた
    //    simpleAndFixedRetryTemplate Bean を使用する

    @Autowired
    @Qualifier("simpleAndFixedRetryTemplate")
    private RetryTemplate simpleAndFixedRetryTemplate;

    @Bean
    public IntegrationFlow in09Flow() {
        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        retryAdvice.setRetryTemplate(this.simpleAndFixedRetryTemplate);
        retryAdvice.setRecoveryCallback(context -> {
            // リトライが全て失敗するとこの処理が実行される
            MessageHandlingException e = (MessageHandlingException) context.getLastThrowable();
            Message<?> message = ((MessageHandlingException) context.getLastThrowable()).getFailedMessage();
            File payload = (File) message.getPayload();
            log.error("●●● " + e.getRootCause().getClass().getName());
            log.error("●●● " + payload.getName());
            // 例外を再 throw して ExpressionEvaluatingRequestHandlerAdvice の失敗時の処理
            // が実行されるようにする
            throw e;
        });

        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in09"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    RetryContext retryContext = RetrySynchronizationManager.getContext();
                    log.info("★★★ リトライ回数 = " + retryContext.getRetryCount());

                    // 例外を throw して必ずリトライさせる
                    if (true) {
                        throw new RuntimeException("エラーです");
                    }
                    return null;
                }, e -> e
                        // RequestHandlerRetryAdvice と ExpressionEvaluatingRequestHandlerAdvice
                        // を一緒に指定する場合には RequestHandlerRetryAdvice を後に書くこと。
                        // 最初に書くとリトライしてくれない。
                        .advice(successOrFailureChannelAdvice())
                        .advice(retryAdvice))
                .get();
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in09 の下に empty.txt を置くと5回リトライした後、error ディレクトリにファイルを移動しました。

f:id:ksby:20170205192401p:plain

ちなみにコメントに書いてありますが、RequestHandlerRetryAdvice を先に書くとこうなります。

                }, e -> e
                        // RequestHandlerRetryAdvice と ExpressionEvaluatingRequestHandlerAdvice
                        // を一緒に指定する場合には RequestHandlerRetryAdvice を後に書くこと。
                        // 最初に書くとリトライしてくれない。
                        .advice(retryAdvice)
                        .advice(successOrFailureChannelAdvice()))
                .get();

f:id:ksby:20170205192650p:plain

リトライされずにすぐに ExpressionEvaluatingRequestHandlerAdvice の失敗時の処理が実行されます。

上の結果は AOP の処理がどう挿入されるかに依存するので、一緒に指定した場合の処理順序はもしかすると今後ライブラリの実装内容によって変わるかもしれません。リトライ処理は RequestHandlerRetryAdvice は使用せずに RetryTemplate で直接 .handle(...) 内に記述して、advice で指定するのは ExpressionEvaluatingRequestHandlerAdvice だけにする方が無難かもしれません。

続くのか。。。?

あと1つ残った RequestHandlerCircuitBreakerAdvice や、TransactionSynchronizationFactory を使用して Flow 全体の正常、失敗を見て処理を行う方法をまとめておきたいですが、RequestHandlerRetryAdvice, ExpressionEvaluatingRequestHandlerAdvice が調べると意外にボリュームがあって重かったので、一旦サンプル作成に戻ります。気が向いたらまた書きます。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.4.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

..........

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR3'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

..........
  • buildscript の中の springBootVersion = '1.4.3.RELEASE'springBootVersion = '1.4.4.RELEASE' へ変更します。
  • dependencyManagement の中の mavenBom 'io.spring.platform:platform-bom:Athens-SR2'mavenBom 'io.spring.platform:platform-bom:Athens-SR3' へ変更します。

application.properties

spring.application.name=advice
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.percentage=1.0

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <springProperty scope="context" name="springAppName" source="spring.application.name"/>
    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId},%X{X-B3-SpanId}]){yellow} %clr(${PID:-}){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>
    <include resource="org/springframework/boot/logging/logback/console-appender.xml"/>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
    <logger name="org.springframework.integration.expression.ExpressionUtils" level="ERROR"/>
</configuration>

<logger name="org.springframework.integration.expression.ExpressionUtils" level="ERROR"/> 以外に、以前と比較して以下の点を変更しました。

  • %X{X-B3-TraceId:-}%X{X-B3-TraceId} へ変更しました。
  • %X{X-B3-SpanId:-}%X{X-B3-SpanId} へ変更しました。
  • ,%X{X-Span-Export:-} を削除しました。
  • <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> ... </appender> と独自で指定せずに <include resource="org/springframework/boot/logging/logback/console-appender.xml"/> を使用するようにしました。

SuccessOrFailureFlowConfig.java

package ksbysample.eipapp.advice;

import lombok.extern.slf4j.Slf4j;
import org.aopalliance.aop.Advice;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.retry.RetryContext;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.retry.support.RetryTemplate;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;

@Slf4j
@Configuration
public class SuccessOrFailureFlowConfig {

    private final String EIPAPP_ADVICE_ROOT_DIR = "C:/eipapp/ksbysample-eipapp-advice";


    // setOnSuccessExpressionString, setOnFailureExpressionString だけ指定するサンプル
    //  ・成功時にはファイルを削除する。
    //  ・失敗時にはファイルを error ディレクトリへ移動する。
    //  ・削除、移動の処理は SpEL で記述する。

    @Bean
    public Advice deleteOrMoveAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload.delete()");
        advice.setOnFailureExpressionString(
                "payload.renameTo(new java.io.File('" + EIPAPP_ADVICE_ROOT_DIR + "/error/' + payload.name))");
        // setTrapException(true) を指定すると throw された例外が再 throw されず、
        // ログに出力されない
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow in06Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in06"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(deleteOrMoveAdvice()))
                .get();
    }


    // setOnSuccessExpressionString, setOnFailureExpressionString+setSuccessChannelName, setFailureChannelName
    // の組み合わせで指定するサンプル
    //  ・成功時には successFlow.input へ Message を送信する。
    //    successFlow ではファイルを削除する。
    //  ・失敗時には failureFlow.input へ Message を送信する。
    //    failureFlow ではファイルを error ディレクトリへ移動する。

    @Bean
    public Advice successOrFailureChannelAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload");
        advice.setSuccessChannelName("successFlow.input");
        // setOnFailureExpressionString に "payload" と記述しても Failure 用の MessageChannel には
        // File クラスではなく MessageHandlingExpressionEvaluatingAdviceException クラスの payload が渡される
        advice.setOnFailureExpressionString("payload");
        advice.setFailureChannelName("failureFlow.input");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow in07Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in07"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(successOrFailureChannelAdvice()))
                .get();
    }

    @Bean
    public IntegrationFlow successFlow() {
        return f -> f
                .<File>handle((p, h) -> {
                    // ファイルを削除する
                    try {
                        Files.delete(Paths.get(p.getAbsolutePath()));
                        log.info("ファイルを削除しました ( {} )", p.getAbsolutePath());
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                });
    }

    @Bean
    public IntegrationFlow failureFlow() {
        return f -> f
                .<ExpressionEvaluatingRequestHandlerAdvice.MessageHandlingExpressionEvaluatingAdviceException>
                        handle((p, h) -> {
                    // MessageHandlingExpressionEvaluatingAdviceException クラスの payload から
                    // Exception 発生前の File クラスの payload を取得する
                    File file = (File) p.getEvaluationResult();

                    // ファイルを error ディレクトリへ移動する
                    Path src = Paths.get(file.getAbsolutePath());
                    Path dst = Paths.get(EIPAPP_ADVICE_ROOT_DIR + "/error/" + file.getName());
                    try {
                        Files.move(src, dst, StandardCopyOption.REPLACE_EXISTING);
                        log.info("ファイルを移動しました ( {} --> {} )"
                                , src.toAbsolutePath(), dst.toAbsolutePath());
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                });
    }


    // setOnSuccessExpressionString, setOnFailureExpressionString で Success, Failure 用の
    // MessageChannel に渡す payload の型を変更するサンプル
    //  ・元の File クラスの payload から String クラスの payload の Message に変換して
    //    Success, Failure 用の MessageChannel に送信する

    @Bean
    public Advice convertFileToStringAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload + ' の処理に成功しました。'");
        advice.setSuccessChannelName("printFlow.input");
        advice.setOnFailureExpressionString(
                "payload + ' の処理に失敗しました ( ' + #exception.class.name + ', ' + #exception.cause.message + ' )'");
        advice.setFailureChannelName("printFlow.input");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow in08Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in08"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    log.info("★★★ " + p.getAbsolutePath());
//                    if (true) {
//                        throw new RuntimeException("エラーです");
//                    }
                    return null;
                }, e -> e.advice(convertFileToStringAdvice()))
                .get();
    }

    @Bean
    public IntegrationFlow printFlow() {
        return f -> f
                // setFailureChannelName(...) の指定で転送された Message は
                // MessageHandlingExpressionEvaluatingAdviceException クラスなので、
                // .transform(...) で SpEL を利用して元の String を取得する
                .transform("payload instanceof T(org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice$MessageHandlingExpressionEvaluatingAdviceException)"
                        + " ? payload.evaluationResult : payload")
                .handle((p, h) -> {
                    System.out.println("●●● " + p);
                    return null;
                });

    }


    // RequestHandlerRetryAdvice と ExpressionEvaluatingRequestHandlerAdvice を一緒に指定するサンプル
    //  ・RequestHandlerRetryAdvice に指定する RetryTemplate には FlowConfig.java に書いた
    //    simpleAndFixedRetryTemplate Bean を使用する

    @Autowired
    @Qualifier("simpleAndFixedRetryTemplate")
    private RetryTemplate simpleAndFixedRetryTemplate;

    @Bean
    public IntegrationFlow in09Flow() {
        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        retryAdvice.setRetryTemplate(this.simpleAndFixedRetryTemplate);
        retryAdvice.setRecoveryCallback(context -> {
            // リトライが全て失敗するとこの処理が実行される
            MessageHandlingException e = (MessageHandlingException) context.getLastThrowable();
            Message<?> message = ((MessageHandlingException) context.getLastThrowable()).getFailedMessage();
            File payload = (File) message.getPayload();
            log.error("●●● " + e.getRootCause().getClass().getName());
            log.error("●●● " + payload.getName());
            // 例外を再 throw して ExpressionEvaluatingRequestHandlerAdvice の失敗時の処理
            // が実行されるようにする
            throw e;
        });

        return IntegrationFlows
                .from(s -> s.file(new File(EIPAPP_ADVICE_ROOT_DIR + "/in09"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<File>handle((p, h) -> {
                    RetryContext retryContext = RetrySynchronizationManager.getContext();
                    log.info("★★★ リトライ回数 = " + retryContext.getRetryCount());

                    // 例外を throw して必ずリトライさせる
                    if (true) {
                        throw new RuntimeException("エラーです");
                    }
                    return null;
                }, e -> e
                        // RequestHandlerRetryAdvice と ExpressionEvaluatingRequestHandlerAdvice
                        // を一緒に指定する場合には RequestHandlerRetryAdvice を後に書くこと。
                        // 最初に書くとリトライしてくれない。
                        .advice(successOrFailureChannelAdvice())
                        .advice(retryAdvice))
                .get();
    }

}

履歴

2017/02/05
初版発行。