かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その15 )( RequestHandlerRetryAdvice のサンプルを作ってみる )

概要

記事一覧はこちらです。

参照したサイト・書籍

  1. 8.9 Adding Behavior to Endpoints
    http://docs.spring.io/spring-integration/docs/4.3.7.RELEASE/reference/html/messaging-endpoints-chapter.html#message-handler-advice-chain

目次

  1. ksbysample-eipapp-advice プロジェクトを作成する
  2. C:\eipapp\ksbysample-eipapp-advice ディレクトリを作成する
  3. RequestHandlerRetryAdvice のサンプルを作成する
    1. RetryPolicy, BackOffPolicy には何があるか?
    2. SimpleRetryPolicy+FixedBackOffPolicy のサンプルを作成する
    3. TimeoutRetryPolicy+ExponentialBackOffPolicy のサンプルを作成する
    4. CompositeRetryPolicy のサンプルを作成する
  4. RequestHandlerRetryAdvice ではなく直接 RetryTemplate を利用して ExceptionClassifierRetryPolicy のサンプルを作成する
  5. 続きます!

手順

ksbysample-eipapp-advice プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. src/main/java の下に ksbysample.eipapp.advice パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/advice の下に Application.java を作成し、リンク先の内容 を記述します。

C:\eipapp\ksbysample-eipapp-advice ディレクトリを作成する

以下の構成のディレクトリを作成します。

C:\eipapp\ksbysample-eipapp-advice
├ in01
├ in02
├ in03
├ in04
└ in05

RequestHandlerRetryAdvice のサンプルを作成する

RetryPolicy, BackOffPolicy には何があるか?

RequestHandlerRetryAdvice クラスを利用してリトライする場合、リトライの終了条件を RetryPolicy インターフェースの実装クラスで指定し、リトライ時の待機時間を BackOffPolicy インターフェースの実装クラスで指定します。

RetryPolicy インターフェースの実装クラスには以下のものがあります。

f:id:ksby:20170129231917p:plain

クラス 説明
AlwaysRetryPolicy 必ずリトライします(ただしリトライ対象の処理の中で例外は throw すること)。テストのためのスタブ等で使用します。
CompositeRetryPolicy 複数の RetryPolicy を組み合わせることができる RetryPolicy です。例えば Exception が3回発生するか(SimpleRetryPolicy で指定)、10 秒経過する(TimeoutRetryPolicy で指定)まではリトライする、という条件が指定できます。
ExceptionClassifierRetryPolicy Exception 毎に RetryPolicy を指定できる RetryPolicy です。ただし例えば ExceptionClassifierRetryPolicy で NullPointerException に TimeoutRetryPolicy が適用されるよう設定して .handle(...) 内で NullPointerException を throw しても LambdaMessageProcessor::processMessage で発生した例外が MessageHandlingException に変換されてしまうため、Spring Integration DSL+RequestHandlerRetryAdvice の組み合わせではこの RetryPolicy は使えないのでは?
NeverRetryPolicy 初回実行のみでリトライしません。これも AlwaysRetryPolicy と同様テスト用です。
SimpleRetryPolicy リトライ回数、リトライの対象とする例外を指定できる RetryPolicy です。デフォルトではリトライ回数は3、例外は Exception.class です。通常使用するのはおそらくこれでしょう。
TimeoutRetryPolicy リトライの条件を回数ではなくミリ秒数で指定します。例外が throw されても指定時間が経過するまでリトライし続けます。ただし例外が throw されずに指定された時間が経過してもリトライ対象の処理から応答が返ってこない場合には何も起きません。

BackOffPolicy インターフェースの実装クラスには以下のものがあります。

f:id:ksby:20170129225634p:plain

クラス 説明
ExponentialRandomBackOffPolicy ExponentialBackOffPolicy と同様、初期の待機時間(ミリ秒)、最大待機時間(ミリ秒)、倍数を指定しますが、次の待機時間が固定の倍数ではなく1~倍数の間のランダムの数値の倍数になります。このクラスのソースのコメントに分かりやすい説明があります。
ExponentialBackOffPolicy 初期の待機時間(ミリ秒)、最大待機時間(ミリ秒)、倍数(例えば2を指定すれば2倍)を指定し、次の待機時間が前回待機時間 * 倍数のミリ秒数になります。次回の待機時間を前回の待機時間の2倍にする、等の用途で使用します。デフォルトでは初期の待機時間は 100 ミリ秒、最大待機時間は 30000 ミリ秒、倍数は 2、です。
FixedBackOffPolicy リトライ前に指定されたミリ秒間、固定で待機します。時間を指定しない場合、デフォルトでは1秒待機します。
UniformRandomBackOffPolicy 最小と最大のミリ秒を指定し、その間のランダムな時間待機します。待機時間はリトライ毎に変わります。時間を指定しない場合、デフォルトでは最小は 500 ミリ秒、最大は 1500 ミリ秒です。
NoBackOffPolicy リトライ前に何も実行しません。

こちらでメインに使うのは FixedBackOffPolicy、ExponentialBackOffPolicy の2つでしょうか。

SimpleRetryPolicy+FixedBackOffPolicy のサンプルを作成する

動作確認のためにサンプルを作成していきます。src/main/java/ksbysample/eipapp/advice/FlowConfig.java の完成形は リンク先の内容 です。

SimpleRetryPolicy でリトライ最大回数5回を指定し、FixedBackOffPolicy でリトライ時に2秒待機することを指定します。

条件を設定した SimpleRetryPolicy、FixedBackOffPolicy を RetryTemplate にセットし、RetryTemplate を RequestHandlerRetryAdvice にセットします。

RequestHandlerRetryAdvice を .handle(...) の第2引数に e -> e.advice(retryAdvice) と渡します。これで .handle(...) の第1引数に渡したラムダ式を最大5回、2秒待機でリトライします。

尚、以降の RequestHandlerRetryAdvice のサンプルでは IntegrationFlow はほぼ同じものを使うので、共通で呼び出す private メソッドを作成して引数で in ディレクトリ、RetryTemplate を変更できるようにしています。

    /**
     * 共通 retry 用 Flow
     * retryTemplate で設定されたリトライを実行する
     *
     * @param inDir         監視対象の in ディレクトリ
     * @param retryTemplate リトライ条件を設定した RetryTemplate クラスのインスタンス
     * @return
     */
    private IntegrationFlow retryFlow(File inDir, RetryTemplate retryTemplate) {
        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        retryAdvice.setRetryTemplate(retryTemplate);
        retryAdvice.setRecoveryCallback(context -> {
            // リトライが全て失敗するとこの処理が実行される
            MessageHandlingException e = (MessageHandlingException) context.getLastThrowable();
            Message<?> message = ((MessageHandlingException) context.getLastThrowable()).getFailedMessage();
            File payload = (File) message.getPayload();
            log.error("●●● " + e.getRootCause().getClass().getName());
            log.error("●●● " + payload.getName());
            return null;
        });

        return IntegrationFlows
                .from(s -> s.file(inDir)
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                .handle((GenericHandler<Object>) (p, h) -> {
                    RetryContext retryContext = RetrySynchronizationManager.getContext();
                    log.info("★★★ リトライ回数 = " + retryContext.getRetryCount());

                    // 例外を throw して強制的にリトライさせる
                    if (true) {
                        throw new RuntimeException();
                    }
                    return p;
                }, e -> e.advice(retryAdvice))
                .log()
                .channel(nullChannel)
                .get();
    }

    /**
     * リトライ最大回数は5回、リトライ時は2秒待機する
     *
     * @return
     */
    @Bean
    public RetryTemplate simpleAndFixedRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // SimpleRetryPolicy
        retryTemplate.setRetryPolicy(
                new SimpleRetryPolicy(5, singletonMap(Exception.class, true)));

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(2000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry01Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in01")
                , simpleAndFixedRetryTemplate());
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in01 の下に empty.txt を置くと以下のログが出力されます。2秒待機で5回リトライしていることが確認できます。回数指定の場合、最後のリトライの処理で例外が throw されるとすぐに RequestHandlerRetryAdvice::setRecoveryCallback(...) の処理が呼び出されています。

f:id:ksby:20170201224951p:plain

TimeoutRetryPolicy+ExponentialBackOffPolicy のサンプルを作成する

TimeoutRetryPolicy でリトライ最大45秒を指定し、ExponentialBackOffPolicy でリトライ時に初期値2秒、最大10秒、倍数2を指定します。

    /**
     * リトライは最大45秒、リトライ時は初期値2秒、最大10秒、倍数2(2,4,8,10,10,...)待機する
     *
     * @return
     */
    @Bean
    public RetryTemplate timeoutAndExponentialRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // TimeoutRetryPolicy
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(45000);
        retryTemplate.setRetryPolicy(timeoutRetryPolicy);

        // ExponentialBackOffPolicy
        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
        exponentialBackOffPolicy.setInitialInterval(2000);
        exponentialBackOffPolicy.setMaxInterval(10000);
        exponentialBackOffPolicy.setMultiplier(2.0);
        retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry02Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in02")
                , timeoutAndExponentialRetryTemplate());
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in02 の下に empty.txt を置くと以下のログが出力されます。2, 4, 8, 10, 10, 10, 10 とリトライ毎に待機時間が2倍になっており(ただし最大は10秒)、最後のリトライで 45 秒を超えたので処理が終了していることが確認できます。時間指定の場合、最後のリトライの処理で例外が throw されてもすぐには RequestHandlerRetryAdvice::setRecoveryCallback(...) の処理が呼び出されず、待機してから呼び出されるようです。

f:id:ksby:20170201225601p:plain

CompositeRetryPolicy のサンプルを作成する

SimpleRetryPolicy で最大3回、TimeoutRetryPolicy で最大10秒の2つのルールを CompositeRetryPolicy に設定します。

まずは FixedBackOffPolicy でリトライ時の待機時間を1秒にして、SimpleRetryPolicy の最大3回が適用されることを確認します。

    /**
     * 3回リトライするか、10秒を越えるまでリトライする
     * FixedBackOffPolicy はリトライ時の待機時間を1秒にしているので、
     * 3回リトライで終了する
     *
     * @return
     */
    @Bean
    public RetryTemplate compositeRetryTemplate01() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // CompositeRetryPolicy
        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
        RetryPolicy[] retryPolicies = new RetryPolicy[2];
        retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true));
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(10000);
        retryPolicies[1] = timeoutRetryPolicy;
        compositeRetryPolicy.setPolicies(retryPolicies);
        retryTemplate.setRetryPolicy(compositeRetryPolicy);

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry03Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in03")
                , compositeRetryTemplate01());
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in03 の下に empty.txt を置くと以下のログが出力されます。1秒待機で3回リトライして処理が終了していることが確認できます。

f:id:ksby:20170201225949p:plain

今度は FixedBackOffPolicy でリトライ時の待機時間を5秒にして、TimeoutRetryPolicy の最大10秒が適用されることを確認します。

    /**
     * 3回リトライするか、10秒を越えるまでリトライする
     * FixedBackOffPolicy はリトライ時の待機時間を5秒にしているので、
     * 10秒リトライで終了する
     *
     * @return
     */
    @Bean
    public RetryTemplate compositeRetryTemplate02() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // CompositeRetryPolicy
        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
        RetryPolicy[] retryPolicies = new RetryPolicy[2];
        retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true));
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(10000);
        retryPolicies[1] = timeoutRetryPolicy;
        compositeRetryPolicy.setPolicies(retryPolicies);
        retryTemplate.setRetryPolicy(compositeRetryPolicy);

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(5000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry04Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in04")
                , compositeRetryTemplate02());
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in04 の下に empty.txt を置くと以下のログが出力されます。5秒待機で2回リトライして10秒を超えたので処理が終了していることが確認できます。

f:id:ksby:20170201230345p:plain

RequestHandlerRetryAdvice ではなく直接 RetryTemplate を利用して ExceptionClassifierRetryPolicy のサンプルを作成する

以下の内容で実装します。

  • FileSystemAlreadyExistsException なら最大1分間リトライします。
  • FileSystemNotFoundException なら最大5回リトライします。
  • ExponentialBackOffPolicy で初期の待機時間2秒、最大待機時間10秒、倍数2を指定します。

最初は FileSystemAlreadyExistsException を throw します。

    @Autowired
    private RetryTemplateMessageHandler retryTemplateMessageHandler;

    @Bean
    public IntegrationFlow retry05Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-advice/in05"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                .handle(this.retryTemplateMessageHandler)
                .log()
                .channel(nullChannel)
                .get();
    }

    @Configuration
    public static class RetryTemplateConfig {

        @Bean
        public RetryTemplate exceptionClassifierRetryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();

            // ExceptionClassifierRetryPolicy
            ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
            Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
            //  FileSystemAlreadyExistsException なら TimeoutRetryPolicy で最大1分間リトライ
            TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
            timeoutRetryPolicy.setTimeout(60000);
            policyMap.put(FileSystemAlreadyExistsException.class, timeoutRetryPolicy);
            //  FileSystemNotFoundException なら SimpleRetryPolicy で最大5回リトライ
            policyMap.put(FileSystemNotFoundException.class
                    , new SimpleRetryPolicy(5
                            , singletonMap(FileSystemNotFoundException.class, true)));
            retryPolicy.setPolicyMap(policyMap);
            retryTemplate.setRetryPolicy(retryPolicy);

            // ExponentialBackOffPolicy で初期の待機時間2秒、最大待機時間10秒、倍数2を指定
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(2000);
            exponentialBackOffPolicy.setMaxInterval(10000);
            exponentialBackOffPolicy.setMultiplier(2.0);
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

            return retryTemplate;
        }

    }

    @MessageEndpoint
    public static class RetryTemplateMessageHandler implements GenericHandler<File> {

        private final RetryTemplate exceptionClassifierRetryTemplate;

        public RetryTemplateMessageHandler(
                @Qualifier("exceptionClassifierRetryTemplate") RetryTemplate exceptionClassifierRetryTemplate) {
            this.exceptionClassifierRetryTemplate = exceptionClassifierRetryTemplate;
        }

        @Override
        public Object handle(File payload, Map<String, Object> headers) {
            Object result = this.exceptionClassifierRetryTemplate.execute(
                    context -> {
                        log.info("★★★ リトライ回数 = " + context.getRetryCount());

                        if (true) {
                            throw new FileSystemAlreadyExistsException();
                        }
                        return payload;
                    }, context -> {
                        Exception e = (Exception) context.getLastThrowable();
                        log.error("●●● " + e.getClass().getName());
                        log.error("●●● " + payload.getName());
                        return payload;
                    });

            return result;
        }

    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in05 の下に empty.txt を置くと以下のログが出力されます。リトライをして1分を超えたら処理が終了していることが確認できます。

f:id:ksby:20170203003657p:plain

次は FileSystemNotFoundException を throw します。

            Object result = this.exceptionClassifierRetryTemplate.execute(
                    context -> {
                        log.info("★★★ リトライ回数 = " + context.getRetryCount());

                        if (true) {
//                            throw new FileSystemAlreadyExistsException();
                            throw new FileSystemNotFoundException();
                        }
                        return payload;
                    }, context -> {

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in05 の下に empty.txt を置くと以下のログが出力されます。5回リトライして1分を超えずに処理が終了していることが確認できます。

f:id:ksby:20170203005049p:plain

続きます!

spring-retry って結構機能豊富なんですね。。。

次は ExpressionEvaluatingRequestHandlerAdvice を使用したサンプルを作成してみます。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

[compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
    maven { url "http://repo.spring.io/repo/" }
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-file")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin") {
        exclude module: 'spring-boot-starter-web'
    }

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.2")
}

Application.java

package ksbysample.eipapp.advice;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

FlowConfig.java

package ksbysample.eipapp.advice;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.CompositeRetryPolicy;
import org.springframework.retry.policy.ExceptionClassifierRetryPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.policy.TimeoutRetryPolicy;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.retry.support.RetryTemplate;

import java.io.File;
import java.nio.file.FileSystemAlreadyExistsException;
import java.nio.file.FileSystemNotFoundException;
import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.singletonMap;

@Slf4j
@Configuration
public class FlowConfig {

    private final NullChannel nullChannel;

    public FlowConfig(NullChannel nullChannel) {
        this.nullChannel = nullChannel;
    }

    /**
     * 共通 retry 用 Flow
     * retryTemplate で設定されたリトライを実行する
     *
     * @param inDir         監視対象の in ディレクトリ
     * @param retryTemplate リトライ条件を設定した RetryTemplate クラスのインスタンス
     * @return
     */
    private IntegrationFlow retryFlow(File inDir, RetryTemplate retryTemplate) {
        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        retryAdvice.setRetryTemplate(retryTemplate);
        retryAdvice.setRecoveryCallback(context -> {
            // リトライが全て失敗するとこの処理が実行される
            MessageHandlingException e = (MessageHandlingException) context.getLastThrowable();
            Message<?> message = ((MessageHandlingException) context.getLastThrowable()).getFailedMessage();
            File payload = (File) message.getPayload();
            log.error("●●● " + e.getRootCause().getClass().getName());
            log.error("●●● " + payload.getName());
            return null;
        });

        return IntegrationFlows
                .from(s -> s.file(inDir)
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                .handle((GenericHandler<Object>) (p, h) -> {
                    RetryContext retryContext = RetrySynchronizationManager.getContext();
                    log.info("★★★ リトライ回数 = " + retryContext.getRetryCount());

                    // 例外を throw して強制的にリトライさせる
                    if (true) {
                        throw new RuntimeException();
                    }
                    return p;
                }, e -> e.advice(retryAdvice))
                .log()
                .channel(nullChannel)
                .get();
    }

    /**
     * リトライ最大回数は5回、リトライ時は2秒待機する
     *
     * @return
     */
    @Bean
    public RetryTemplate simpleAndFixedRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // SimpleRetryPolicy
        retryTemplate.setRetryPolicy(
                new SimpleRetryPolicy(5, singletonMap(Exception.class, true)));

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(2000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry01Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in01")
                , simpleAndFixedRetryTemplate());
    }

    /**
     * リトライは最大45秒、リトライ時は初期値2秒、最大10秒、倍数2(2,4,8,10,10,...)待機する
     *
     * @return
     */
    @Bean
    public RetryTemplate timeoutAndExponentialRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // TimeoutRetryPolicy
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(45000);
        retryTemplate.setRetryPolicy(timeoutRetryPolicy);

        // ExponentialBackOffPolicy
        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
        exponentialBackOffPolicy.setInitialInterval(2000);
        exponentialBackOffPolicy.setMaxInterval(10000);
        exponentialBackOffPolicy.setMultiplier(2.0);
        retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry02Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in02")
                , timeoutAndExponentialRetryTemplate());
    }

    /**
     * 3回リトライするか、10秒を越えるまでリトライする
     * FixedBackOffPolicy はリトライ時の待機時間を1秒にしているので、
     * 3回リトライで終了する
     *
     * @return
     */
    @Bean
    public RetryTemplate compositeRetryTemplate01() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // CompositeRetryPolicy
        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
        RetryPolicy[] retryPolicies = new RetryPolicy[2];
        retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true));
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(10000);
        retryPolicies[1] = timeoutRetryPolicy;
        compositeRetryPolicy.setPolicies(retryPolicies);
        retryTemplate.setRetryPolicy(compositeRetryPolicy);

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry03Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in03")
                , compositeRetryTemplate01());
    }

    /**
     * 3回リトライするか、10秒を越えるまでリトライする
     * FixedBackOffPolicy はリトライ時の待機時間を5秒にしているので、
     * 10秒リトライで終了する
     *
     * @return
     */
    @Bean
    public RetryTemplate compositeRetryTemplate02() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // CompositeRetryPolicy
        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
        RetryPolicy[] retryPolicies = new RetryPolicy[2];
        retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true));
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(10000);
        retryPolicies[1] = timeoutRetryPolicy;
        compositeRetryPolicy.setPolicies(retryPolicies);
        retryTemplate.setRetryPolicy(compositeRetryPolicy);

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(5000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry04Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in04")
                , compositeRetryTemplate02());
    }

    /**
     * ここから下は RequestHandlerRetryAdvice は使用せず RetryTemplate を直接使用して
     * ExceptionClassifierRetryPolicy でリトライした例である
     */

    @Autowired
    private RetryTemplateMessageHandler retryTemplateMessageHandler;

    @Bean
    public IntegrationFlow retry05Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-advice/in05"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                .handle(this.retryTemplateMessageHandler)
                .log()
                .channel(nullChannel)
                .get();
    }

    @Configuration
    public static class RetryTemplateConfig {

        @Bean
        public RetryTemplate exceptionClassifierRetryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();

            // ExceptionClassifierRetryPolicy
            ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
            Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
            //  FileSystemAlreadyExistsException なら TimeoutRetryPolicy で最大1分間リトライ
            TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
            timeoutRetryPolicy.setTimeout(60000);
            policyMap.put(FileSystemAlreadyExistsException.class, timeoutRetryPolicy);
            //  FileSystemNotFoundException なら SimpleRetryPolicy で最大5回リトライ
            policyMap.put(FileSystemNotFoundException.class
                    , new SimpleRetryPolicy(5
                            , singletonMap(FileSystemNotFoundException.class, true)));
            retryPolicy.setPolicyMap(policyMap);
            retryTemplate.setRetryPolicy(retryPolicy);

            // ExponentialBackOffPolicy で初期の待機時間2秒、最大待機時間10秒、倍数2を指定
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(2000);
            exponentialBackOffPolicy.setMaxInterval(10000);
            exponentialBackOffPolicy.setMultiplier(2.0);
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

            return retryTemplate;
        }

    }

    @MessageEndpoint
    public static class RetryTemplateMessageHandler implements GenericHandler<File> {

        private final RetryTemplate exceptionClassifierRetryTemplate;

        public RetryTemplateMessageHandler(
                @Qualifier("exceptionClassifierRetryTemplate") RetryTemplate exceptionClassifierRetryTemplate) {
            this.exceptionClassifierRetryTemplate = exceptionClassifierRetryTemplate;
        }

        @Override
        public Object handle(File payload, Map<String, Object> headers) {
            Object result = this.exceptionClassifierRetryTemplate.execute(
                    context -> {
                        log.info("★★★ リトライ回数 = " + context.getRetryCount());

                        if (true) {
//                            throw new FileSystemAlreadyExistsException();
                            throw new FileSystemNotFoundException();
                        }
                        return payload;
                    }, context -> {
                        Exception e = (Exception) context.getLastThrowable();
                        log.error("●●● " + e.getClass().getName());
                        log.error("●●● " + payload.getName());
                        return payload;
                    });

            return result;
        }

    }

}

履歴

2017/02/03
初版発行。