Spring Boot + Spring Integration でいろいろ試してみる ( その15 )( RequestHandlerRetryAdvice のサンプルを作ってみる )
概要
記事一覧はこちらです。
- Spring Integration DSL で 8.9.2 Provided Advice Classes に記載されている RequestHandlerRetryAdvice を使用したサンプルを作成します。
参照したサイト・書籍
- 8.9 Adding Behavior to Endpoints
http://docs.spring.io/spring-integration/docs/4.3.7.RELEASE/reference/html/messaging-endpoints-chapter.html#message-handler-advice-chain
目次
- ksbysample-eipapp-advice プロジェクトを作成する
- C:\eipapp\ksbysample-eipapp-advice ディレクトリを作成する
- RequestHandlerRetryAdvice のサンプルを作成する
- RequestHandlerRetryAdvice ではなく直接 RetryTemplate を利用して ExceptionClassifierRetryPolicy のサンプルを作成する
- 続きます!
手順
ksbysample-eipapp-advice プロジェクトを作成する
IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。
Gradle プロジェクトの作成方法については Spring Boot + Spring Integration でいろいろ試してみる ( その8 )( MySQL のテーブルのデータを取得して PostgreSQL のテーブルへ登録する常駐型アプリケーションを作成する ) 参照。
spring-boot-starter-integration を入れると spring-retry も入りました。わざわざ build.gradle に書く必要はありませんでしたね。。。
src/main/java の下に ksbysample.eipapp.advice パッケージを作成します。
src/main/java/ksbysample/eipapp/advice の下に Application.java を作成し、リンク先の内容 を記述します。
C:\eipapp\ksbysample-eipapp-advice ディレクトリを作成する
以下の構成のディレクトリを作成します。
C:\eipapp\ksbysample-eipapp-advice ├ in01 ├ in02 ├ in03 ├ in04 └ in05
RequestHandlerRetryAdvice のサンプルを作成する
RetryPolicy, BackOffPolicy には何があるか?
RequestHandlerRetryAdvice クラスを利用してリトライする場合、リトライの終了条件を RetryPolicy インターフェースの実装クラスで指定し、リトライ時の待機時間を BackOffPolicy インターフェースの実装クラスで指定します。
RetryPolicy インターフェースの実装クラスには以下のものがあります。
クラス | 説明 |
---|---|
AlwaysRetryPolicy | 必ずリトライします(ただしリトライ対象の処理の中で例外は throw すること)。テストのためのスタブ等で使用します。 |
CompositeRetryPolicy | 複数の RetryPolicy を組み合わせることができる RetryPolicy です。例えば Exception が3回発生するか(SimpleRetryPolicy で指定)、10 秒経過する(TimeoutRetryPolicy で指定)まではリトライする、という条件が指定できます。 |
ExceptionClassifierRetryPolicy | Exception 毎に RetryPolicy を指定できる RetryPolicy です。ただし例えば ExceptionClassifierRetryPolicy で NullPointerException に TimeoutRetryPolicy が適用されるよう設定して .handle(...) 内で NullPointerException を throw しても LambdaMessageProcessor::processMessage で発生した例外が MessageHandlingException に変換されてしまうため、Spring Integration DSL+RequestHandlerRetryAdvice の組み合わせではこの RetryPolicy は使えないのでは? |
NeverRetryPolicy | 初回実行のみでリトライしません。これも AlwaysRetryPolicy と同様テスト用です。 |
SimpleRetryPolicy | リトライ回数、リトライの対象とする例外を指定できる RetryPolicy です。デフォルトではリトライ回数は3、例外は Exception.class です。通常使用するのはおそらくこれでしょう。 |
TimeoutRetryPolicy | リトライの条件を回数ではなくミリ秒数で指定します。例外が throw されても指定時間が経過するまでリトライし続けます。ただし例外が throw されずに指定された時間が経過してもリトライ対象の処理から応答が返ってこない場合には何も起きません。 |
BackOffPolicy インターフェースの実装クラスには以下のものがあります。
クラス | 説明 |
---|---|
ExponentialRandomBackOffPolicy | ExponentialBackOffPolicy と同様、初期の待機時間(ミリ秒)、最大待機時間(ミリ秒)、倍数を指定しますが、次の待機時間が固定の倍数ではなく1~倍数の間のランダムの数値の倍数になります。このクラスのソースのコメントに分かりやすい説明があります。 |
ExponentialBackOffPolicy | 初期の待機時間(ミリ秒)、最大待機時間(ミリ秒)、倍数(例えば2を指定すれば2倍)を指定し、次の待機時間が前回待機時間 * 倍数のミリ秒数になります。次回の待機時間を前回の待機時間の2倍にする、等の用途で使用します。デフォルトでは初期の待機時間は 100 ミリ秒、最大待機時間は 30000 ミリ秒、倍数は 2、です。 |
FixedBackOffPolicy | リトライ前に指定されたミリ秒間、固定で待機します。時間を指定しない場合、デフォルトでは1秒待機します。 |
UniformRandomBackOffPolicy | 最小と最大のミリ秒を指定し、その間のランダムな時間待機します。待機時間はリトライ毎に変わります。時間を指定しない場合、デフォルトでは最小は 500 ミリ秒、最大は 1500 ミリ秒です。 |
NoBackOffPolicy | リトライ前に何も実行しません。 |
こちらでメインに使うのは FixedBackOffPolicy、ExponentialBackOffPolicy の2つでしょうか。
SimpleRetryPolicy+FixedBackOffPolicy のサンプルを作成する
動作確認のためにサンプルを作成していきます。src/main/java/ksbysample/eipapp/advice/FlowConfig.java の完成形は リンク先の内容 です。
SimpleRetryPolicy でリトライ最大回数5回を指定し、FixedBackOffPolicy でリトライ時に2秒待機することを指定します。
条件を設定した SimpleRetryPolicy、FixedBackOffPolicy を RetryTemplate にセットし、RetryTemplate を RequestHandlerRetryAdvice にセットします。
RequestHandlerRetryAdvice を .handle(...)
の第2引数に e -> e.advice(retryAdvice)
と渡します。これで .handle(...)
の第1引数に渡したラムダ式を最大5回、2秒待機でリトライします。
尚、以降の RequestHandlerRetryAdvice のサンプルでは IntegrationFlow はほぼ同じものを使うので、共通で呼び出す private メソッドを作成して引数で in ディレクトリ、RetryTemplate を変更できるようにしています。
/** * 共通 retry 用 Flow * retryTemplate で設定されたリトライを実行する * * @param inDir 監視対象の in ディレクトリ * @param retryTemplate リトライ条件を設定した RetryTemplate クラスのインスタンス * @return */ private IntegrationFlow retryFlow(File inDir, RetryTemplate retryTemplate) { RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice(); retryAdvice.setRetryTemplate(retryTemplate); retryAdvice.setRecoveryCallback(context -> { // リトライが全て失敗するとこの処理が実行される MessageHandlingException e = (MessageHandlingException) context.getLastThrowable(); Message<?> message = ((MessageHandlingException) context.getLastThrowable()).getFailedMessage(); File payload = (File) message.getPayload(); log.error("●●● " + e.getRootCause().getClass().getName()); log.error("●●● " + payload.getName()); return null; }); return IntegrationFlows .from(s -> s.file(inDir) , e -> e.poller(Pollers.fixedDelay(1000))) .log() .handle((GenericHandler<Object>) (p, h) -> { RetryContext retryContext = RetrySynchronizationManager.getContext(); log.info("★★★ リトライ回数 = " + retryContext.getRetryCount()); // 例外を throw して強制的にリトライさせる if (true) { throw new RuntimeException(); } return p; }, e -> e.advice(retryAdvice)) .log() .channel(nullChannel) .get(); } /** * リトライ最大回数は5回、リトライ時は2秒待機する * * @return */ @Bean public RetryTemplate simpleAndFixedRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // SimpleRetryPolicy retryTemplate.setRetryPolicy( new SimpleRetryPolicy(5, singletonMap(Exception.class, true))); // FixedBackOffPolicy FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(2000); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); return retryTemplate; } @Bean public IntegrationFlow retry01Flow() { return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in01") , simpleAndFixedRetryTemplate()); }
bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in01 の下に empty.txt を置くと以下のログが出力されます。2秒待機で5回リトライしていることが確認できます。回数指定の場合、最後のリトライの処理で例外が throw されるとすぐに RequestHandlerRetryAdvice::setRecoveryCallback(...)
の処理が呼び出されています。
TimeoutRetryPolicy+ExponentialBackOffPolicy のサンプルを作成する
TimeoutRetryPolicy でリトライ最大45秒を指定し、ExponentialBackOffPolicy でリトライ時に初期値2秒、最大10秒、倍数2を指定します。
/** * リトライは最大45秒、リトライ時は初期値2秒、最大10秒、倍数2(2,4,8,10,10,...)待機する * * @return */ @Bean public RetryTemplate timeoutAndExponentialRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // TimeoutRetryPolicy TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy(); timeoutRetryPolicy.setTimeout(45000); retryTemplate.setRetryPolicy(timeoutRetryPolicy); // ExponentialBackOffPolicy ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy(); exponentialBackOffPolicy.setInitialInterval(2000); exponentialBackOffPolicy.setMaxInterval(10000); exponentialBackOffPolicy.setMultiplier(2.0); retryTemplate.setBackOffPolicy(exponentialBackOffPolicy); return retryTemplate; } @Bean public IntegrationFlow retry02Flow() { return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in02") , timeoutAndExponentialRetryTemplate()); }
bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in02 の下に empty.txt を置くと以下のログが出力されます。2, 4, 8, 10, 10, 10, 10 とリトライ毎に待機時間が2倍になっており(ただし最大は10秒)、最後のリトライで 45 秒を超えたので処理が終了していることが確認できます。時間指定の場合、最後のリトライの処理で例外が throw されてもすぐには RequestHandlerRetryAdvice::setRecoveryCallback(...)
の処理が呼び出されず、待機してから呼び出されるようです。
CompositeRetryPolicy のサンプルを作成する
SimpleRetryPolicy で最大3回、TimeoutRetryPolicy で最大10秒の2つのルールを CompositeRetryPolicy に設定します。
まずは FixedBackOffPolicy でリトライ時の待機時間を1秒にして、SimpleRetryPolicy の最大3回が適用されることを確認します。
/** * 3回リトライするか、10秒を越えるまでリトライする * FixedBackOffPolicy はリトライ時の待機時間を1秒にしているので、 * 3回リトライで終了する * * @return */ @Bean public RetryTemplate compositeRetryTemplate01() { RetryTemplate retryTemplate = new RetryTemplate(); // CompositeRetryPolicy CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy(); RetryPolicy[] retryPolicies = new RetryPolicy[2]; retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true)); TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy(); timeoutRetryPolicy.setTimeout(10000); retryPolicies[1] = timeoutRetryPolicy; compositeRetryPolicy.setPolicies(retryPolicies); retryTemplate.setRetryPolicy(compositeRetryPolicy); // FixedBackOffPolicy FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(1000); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); return retryTemplate; } @Bean public IntegrationFlow retry03Flow() { return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in03") , compositeRetryTemplate01()); }
bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in03 の下に empty.txt を置くと以下のログが出力されます。1秒待機で3回リトライして処理が終了していることが確認できます。
今度は FixedBackOffPolicy でリトライ時の待機時間を5秒にして、TimeoutRetryPolicy の最大10秒が適用されることを確認します。
/** * 3回リトライするか、10秒を越えるまでリトライする * FixedBackOffPolicy はリトライ時の待機時間を5秒にしているので、 * 10秒リトライで終了する * * @return */ @Bean public RetryTemplate compositeRetryTemplate02() { RetryTemplate retryTemplate = new RetryTemplate(); // CompositeRetryPolicy CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy(); RetryPolicy[] retryPolicies = new RetryPolicy[2]; retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true)); TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy(); timeoutRetryPolicy.setTimeout(10000); retryPolicies[1] = timeoutRetryPolicy; compositeRetryPolicy.setPolicies(retryPolicies); retryTemplate.setRetryPolicy(compositeRetryPolicy); // FixedBackOffPolicy FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(5000); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); return retryTemplate; } @Bean public IntegrationFlow retry04Flow() { return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in04") , compositeRetryTemplate02()); }
bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in04 の下に empty.txt を置くと以下のログが出力されます。5秒待機で2回リトライして10秒を超えたので処理が終了していることが確認できます。
RequestHandlerRetryAdvice ではなく直接 RetryTemplate を利用して ExceptionClassifierRetryPolicy のサンプルを作成する
以下の内容で実装します。
- FileSystemAlreadyExistsException なら最大1分間リトライします。
- FileSystemNotFoundException なら最大5回リトライします。
- ExponentialBackOffPolicy で初期の待機時間2秒、最大待機時間10秒、倍数2を指定します。
最初は FileSystemAlreadyExistsException を throw します。
@Autowired private RetryTemplateMessageHandler retryTemplateMessageHandler; @Bean public IntegrationFlow retry05Flow() { return IntegrationFlows .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-advice/in05")) , e -> e.poller(Pollers.fixedDelay(1000))) .log() .handle(this.retryTemplateMessageHandler) .log() .channel(nullChannel) .get(); } @Configuration public static class RetryTemplateConfig { @Bean public RetryTemplate exceptionClassifierRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // ExceptionClassifierRetryPolicy ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy(); Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>(); // FileSystemAlreadyExistsException なら TimeoutRetryPolicy で最大1分間リトライ TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy(); timeoutRetryPolicy.setTimeout(60000); policyMap.put(FileSystemAlreadyExistsException.class, timeoutRetryPolicy); // FileSystemNotFoundException なら SimpleRetryPolicy で最大5回リトライ policyMap.put(FileSystemNotFoundException.class , new SimpleRetryPolicy(5 , singletonMap(FileSystemNotFoundException.class, true))); retryPolicy.setPolicyMap(policyMap); retryTemplate.setRetryPolicy(retryPolicy); // ExponentialBackOffPolicy で初期の待機時間2秒、最大待機時間10秒、倍数2を指定 ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy(); exponentialBackOffPolicy.setInitialInterval(2000); exponentialBackOffPolicy.setMaxInterval(10000); exponentialBackOffPolicy.setMultiplier(2.0); retryTemplate.setBackOffPolicy(exponentialBackOffPolicy); return retryTemplate; } } @MessageEndpoint public static class RetryTemplateMessageHandler implements GenericHandler<File> { private final RetryTemplate exceptionClassifierRetryTemplate; public RetryTemplateMessageHandler( @Qualifier("exceptionClassifierRetryTemplate") RetryTemplate exceptionClassifierRetryTemplate) { this.exceptionClassifierRetryTemplate = exceptionClassifierRetryTemplate; } @Override public Object handle(File payload, Map<String, Object> headers) { Object result = this.exceptionClassifierRetryTemplate.execute( context -> { log.info("★★★ リトライ回数 = " + context.getRetryCount()); if (true) { throw new FileSystemAlreadyExistsException(); } return payload; }, context -> { Exception e = (Exception) context.getLastThrowable(); log.error("●●● " + e.getClass().getName()); log.error("●●● " + payload.getName()); return payload; }); return result; } }
bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in05 の下に empty.txt を置くと以下のログが出力されます。リトライをして1分を超えたら処理が終了していることが確認できます。
次は FileSystemNotFoundException を throw します。
Object result = this.exceptionClassifierRetryTemplate.execute( context -> { log.info("★★★ リトライ回数 = " + context.getRetryCount()); if (true) { // throw new FileSystemAlreadyExistsException(); throw new FileSystemNotFoundException(); } return payload; }, context -> {
bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in05 の下に empty.txt を置くと以下のログが出力されます。5回リトライして1分を超えずに処理が終了していることが確認できます。
続きます!
spring-retry って結構機能豊富なんですね。。。
次は ExpressionEvaluatingRequestHandlerAdvice を使用したサンプルを作成してみます。
ソースコード
build.gradle
group 'ksbysample' version '1.0.0-RELEASE' buildscript { ext { springBootVersion = '1.4.3.RELEASE' } repositories { mavenCentral() maven { url "http://repo.spring.io/repo/" } } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' apply plugin: 'groovy' sourceCompatibility = 1.8 targetCompatibility = 1.8 [compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing'] eclipse { classpath { containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER') containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8' } } idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } repositories { mavenCentral() maven { url "http://repo.spring.io/repo/" } } dependencyManagement { imports { mavenBom 'io.spring.platform:platform-bom:Athens-SR2' mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE' } } dependencies { // dependency-management-plugin によりバージョン番号が自動で設定されるもの // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照 compile("org.springframework.boot:spring-boot-starter-integration") compile("org.springframework.integration:spring-integration-file") compile("org.codehaus.janino:janino") testCompile("org.springframework.boot:spring-boot-starter-test") testCompile("org.spockframework:spock-core") testCompile("org.spockframework:spock-spring") // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの // http://projects.spring.io/spring-cloud/ の「Release Trains」参照 compile("org.springframework.cloud:spring-cloud-starter-zipkin") { exclude module: 'spring-boot-starter-web' } // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE") compile("org.projectlombok:lombok:1.16.12") testCompile("org.assertj:assertj-core:3.6.2") }
Application.java
package ksbysample.eipapp.advice; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
FlowConfig.java
package ksbysample.eipapp.advice; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.channel.NullChannel; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.core.Pollers; import org.springframework.integration.dsl.support.GenericHandler; import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandlingException; import org.springframework.retry.RetryContext; import org.springframework.retry.RetryPolicy; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.backoff.FixedBackOffPolicy; import org.springframework.retry.policy.CompositeRetryPolicy; import org.springframework.retry.policy.ExceptionClassifierRetryPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.policy.TimeoutRetryPolicy; import org.springframework.retry.support.RetrySynchronizationManager; import org.springframework.retry.support.RetryTemplate; import java.io.File; import java.nio.file.FileSystemAlreadyExistsException; import java.nio.file.FileSystemNotFoundException; import java.util.HashMap; import java.util.Map; import static java.util.Collections.singletonMap; @Slf4j @Configuration public class FlowConfig { private final NullChannel nullChannel; public FlowConfig(NullChannel nullChannel) { this.nullChannel = nullChannel; } /** * 共通 retry 用 Flow * retryTemplate で設定されたリトライを実行する * * @param inDir 監視対象の in ディレクトリ * @param retryTemplate リトライ条件を設定した RetryTemplate クラスのインスタンス * @return */ private IntegrationFlow retryFlow(File inDir, RetryTemplate retryTemplate) { RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice(); retryAdvice.setRetryTemplate(retryTemplate); retryAdvice.setRecoveryCallback(context -> { // リトライが全て失敗するとこの処理が実行される MessageHandlingException e = (MessageHandlingException) context.getLastThrowable(); Message<?> message = ((MessageHandlingException) context.getLastThrowable()).getFailedMessage(); File payload = (File) message.getPayload(); log.error("●●● " + e.getRootCause().getClass().getName()); log.error("●●● " + payload.getName()); return null; }); return IntegrationFlows .from(s -> s.file(inDir) , e -> e.poller(Pollers.fixedDelay(1000))) .log() .handle((GenericHandler<Object>) (p, h) -> { RetryContext retryContext = RetrySynchronizationManager.getContext(); log.info("★★★ リトライ回数 = " + retryContext.getRetryCount()); // 例外を throw して強制的にリトライさせる if (true) { throw new RuntimeException(); } return p; }, e -> e.advice(retryAdvice)) .log() .channel(nullChannel) .get(); } /** * リトライ最大回数は5回、リトライ時は2秒待機する * * @return */ @Bean public RetryTemplate simpleAndFixedRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // SimpleRetryPolicy retryTemplate.setRetryPolicy( new SimpleRetryPolicy(5, singletonMap(Exception.class, true))); // FixedBackOffPolicy FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(2000); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); return retryTemplate; } @Bean public IntegrationFlow retry01Flow() { return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in01") , simpleAndFixedRetryTemplate()); } /** * リトライは最大45秒、リトライ時は初期値2秒、最大10秒、倍数2(2,4,8,10,10,...)待機する * * @return */ @Bean public RetryTemplate timeoutAndExponentialRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // TimeoutRetryPolicy TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy(); timeoutRetryPolicy.setTimeout(45000); retryTemplate.setRetryPolicy(timeoutRetryPolicy); // ExponentialBackOffPolicy ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy(); exponentialBackOffPolicy.setInitialInterval(2000); exponentialBackOffPolicy.setMaxInterval(10000); exponentialBackOffPolicy.setMultiplier(2.0); retryTemplate.setBackOffPolicy(exponentialBackOffPolicy); return retryTemplate; } @Bean public IntegrationFlow retry02Flow() { return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in02") , timeoutAndExponentialRetryTemplate()); } /** * 3回リトライするか、10秒を越えるまでリトライする * FixedBackOffPolicy はリトライ時の待機時間を1秒にしているので、 * 3回リトライで終了する * * @return */ @Bean public RetryTemplate compositeRetryTemplate01() { RetryTemplate retryTemplate = new RetryTemplate(); // CompositeRetryPolicy CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy(); RetryPolicy[] retryPolicies = new RetryPolicy[2]; retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true)); TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy(); timeoutRetryPolicy.setTimeout(10000); retryPolicies[1] = timeoutRetryPolicy; compositeRetryPolicy.setPolicies(retryPolicies); retryTemplate.setRetryPolicy(compositeRetryPolicy); // FixedBackOffPolicy FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(1000); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); return retryTemplate; } @Bean public IntegrationFlow retry03Flow() { return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in03") , compositeRetryTemplate01()); } /** * 3回リトライするか、10秒を越えるまでリトライする * FixedBackOffPolicy はリトライ時の待機時間を5秒にしているので、 * 10秒リトライで終了する * * @return */ @Bean public RetryTemplate compositeRetryTemplate02() { RetryTemplate retryTemplate = new RetryTemplate(); // CompositeRetryPolicy CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy(); RetryPolicy[] retryPolicies = new RetryPolicy[2]; retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true)); TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy(); timeoutRetryPolicy.setTimeout(10000); retryPolicies[1] = timeoutRetryPolicy; compositeRetryPolicy.setPolicies(retryPolicies); retryTemplate.setRetryPolicy(compositeRetryPolicy); // FixedBackOffPolicy FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(5000); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); return retryTemplate; } @Bean public IntegrationFlow retry04Flow() { return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in04") , compositeRetryTemplate02()); } /** * ここから下は RequestHandlerRetryAdvice は使用せず RetryTemplate を直接使用して * ExceptionClassifierRetryPolicy でリトライした例である */ @Autowired private RetryTemplateMessageHandler retryTemplateMessageHandler; @Bean public IntegrationFlow retry05Flow() { return IntegrationFlows .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-advice/in05")) , e -> e.poller(Pollers.fixedDelay(1000))) .log() .handle(this.retryTemplateMessageHandler) .log() .channel(nullChannel) .get(); } @Configuration public static class RetryTemplateConfig { @Bean public RetryTemplate exceptionClassifierRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); // ExceptionClassifierRetryPolicy ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy(); Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>(); // FileSystemAlreadyExistsException なら TimeoutRetryPolicy で最大1分間リトライ TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy(); timeoutRetryPolicy.setTimeout(60000); policyMap.put(FileSystemAlreadyExistsException.class, timeoutRetryPolicy); // FileSystemNotFoundException なら SimpleRetryPolicy で最大5回リトライ policyMap.put(FileSystemNotFoundException.class , new SimpleRetryPolicy(5 , singletonMap(FileSystemNotFoundException.class, true))); retryPolicy.setPolicyMap(policyMap); retryTemplate.setRetryPolicy(retryPolicy); // ExponentialBackOffPolicy で初期の待機時間2秒、最大待機時間10秒、倍数2を指定 ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy(); exponentialBackOffPolicy.setInitialInterval(2000); exponentialBackOffPolicy.setMaxInterval(10000); exponentialBackOffPolicy.setMultiplier(2.0); retryTemplate.setBackOffPolicy(exponentialBackOffPolicy); return retryTemplate; } } @MessageEndpoint public static class RetryTemplateMessageHandler implements GenericHandler<File> { private final RetryTemplate exceptionClassifierRetryTemplate; public RetryTemplateMessageHandler( @Qualifier("exceptionClassifierRetryTemplate") RetryTemplate exceptionClassifierRetryTemplate) { this.exceptionClassifierRetryTemplate = exceptionClassifierRetryTemplate; } @Override public Object handle(File payload, Map<String, Object> headers) { Object result = this.exceptionClassifierRetryTemplate.execute( context -> { log.info("★★★ リトライ回数 = " + context.getRetryCount()); if (true) { // throw new FileSystemAlreadyExistsException(); throw new FileSystemNotFoundException(); } return payload; }, context -> { Exception e = (Exception) context.getLastThrowable(); log.error("●●● " + e.getClass().getName()); log.error("●●● " + payload.getName()); return payload; }); return result; } } }
履歴
2017/02/03
初版発行。