かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その15 )( RequestHandlerRetryAdvice のサンプルを作ってみる )

概要

記事一覧はこちらです。

参照したサイト・書籍

  1. 8.9 Adding Behavior to Endpoints
    http://docs.spring.io/spring-integration/docs/4.3.7.RELEASE/reference/html/messaging-endpoints-chapter.html#message-handler-advice-chain

目次

  1. ksbysample-eipapp-advice プロジェクトを作成する
  2. C:\eipapp\ksbysample-eipapp-advice ディレクトリを作成する
  3. RequestHandlerRetryAdvice のサンプルを作成する
    1. RetryPolicy, BackOffPolicy には何があるか?
    2. SimpleRetryPolicy+FixedBackOffPolicy のサンプルを作成する
    3. TimeoutRetryPolicy+ExponentialBackOffPolicy のサンプルを作成する
    4. CompositeRetryPolicy のサンプルを作成する
  4. RequestHandlerRetryAdvice ではなく直接 RetryTemplate を利用して ExceptionClassifierRetryPolicy のサンプルを作成する
  5. 続きます!

手順

ksbysample-eipapp-advice プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. src/main/java の下に ksbysample.eipapp.advice パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/advice の下に Application.java を作成し、リンク先の内容 を記述します。

C:\eipapp\ksbysample-eipapp-advice ディレクトリを作成する

以下の構成のディレクトリを作成します。

C:\eipapp\ksbysample-eipapp-advice
├ in01
├ in02
├ in03
├ in04
└ in05

RequestHandlerRetryAdvice のサンプルを作成する

RetryPolicy, BackOffPolicy には何があるか?

RequestHandlerRetryAdvice クラスを利用してリトライする場合、リトライの終了条件を RetryPolicy インターフェースの実装クラスで指定し、リトライ時の待機時間を BackOffPolicy インターフェースの実装クラスで指定します。

RetryPolicy インターフェースの実装クラスには以下のものがあります。

f:id:ksby:20170129231917p:plain

クラス 説明
AlwaysRetryPolicy 必ずリトライします(ただしリトライ対象の処理の中で例外は throw すること)。テストのためのスタブ等で使用します。
CompositeRetryPolicy 複数の RetryPolicy を組み合わせることができる RetryPolicy です。例えば Exception が3回発生するか(SimpleRetryPolicy で指定)、10 秒経過する(TimeoutRetryPolicy で指定)まではリトライする、という条件が指定できます。
ExceptionClassifierRetryPolicy Exception 毎に RetryPolicy を指定できる RetryPolicy です。ただし例えば ExceptionClassifierRetryPolicy で NullPointerException に TimeoutRetryPolicy が適用されるよう設定して .handle(...) 内で NullPointerException を throw しても LambdaMessageProcessor::processMessage で発生した例外が MessageHandlingException に変換されてしまうため、Spring Integration DSL+RequestHandlerRetryAdvice の組み合わせではこの RetryPolicy は使えないのでは?
NeverRetryPolicy 初回実行のみでリトライしません。これも AlwaysRetryPolicy と同様テスト用です。
SimpleRetryPolicy リトライ回数、リトライの対象とする例外を指定できる RetryPolicy です。デフォルトではリトライ回数は3、例外は Exception.class です。通常使用するのはおそらくこれでしょう。
TimeoutRetryPolicy リトライの条件を回数ではなくミリ秒数で指定します。例外が throw されても指定時間が経過するまでリトライし続けます。ただし例外が throw されずに指定された時間が経過してもリトライ対象の処理から応答が返ってこない場合には何も起きません。

BackOffPolicy インターフェースの実装クラスには以下のものがあります。

f:id:ksby:20170129225634p:plain

クラス 説明
ExponentialRandomBackOffPolicy ExponentialBackOffPolicy と同様、初期の待機時間(ミリ秒)、最大待機時間(ミリ秒)、倍数を指定しますが、次の待機時間が固定の倍数ではなく1~倍数の間のランダムの数値の倍数になります。このクラスのソースのコメントに分かりやすい説明があります。
ExponentialBackOffPolicy 初期の待機時間(ミリ秒)、最大待機時間(ミリ秒)、倍数(例えば2を指定すれば2倍)を指定し、次の待機時間が前回待機時間 * 倍数のミリ秒数になります。次回の待機時間を前回の待機時間の2倍にする、等の用途で使用します。デフォルトでは初期の待機時間は 100 ミリ秒、最大待機時間は 30000 ミリ秒、倍数は 2、です。
FixedBackOffPolicy リトライ前に指定されたミリ秒間、固定で待機します。時間を指定しない場合、デフォルトでは1秒待機します。
UniformRandomBackOffPolicy 最小と最大のミリ秒を指定し、その間のランダムな時間待機します。待機時間はリトライ毎に変わります。時間を指定しない場合、デフォルトでは最小は 500 ミリ秒、最大は 1500 ミリ秒です。
NoBackOffPolicy リトライ前に何も実行しません。

こちらでメインに使うのは FixedBackOffPolicy、ExponentialBackOffPolicy の2つでしょうか。

SimpleRetryPolicy+FixedBackOffPolicy のサンプルを作成する

動作確認のためにサンプルを作成していきます。src/main/java/ksbysample/eipapp/advice/FlowConfig.java の完成形は リンク先の内容 です。

SimpleRetryPolicy でリトライ最大回数5回を指定し、FixedBackOffPolicy でリトライ時に2秒待機することを指定します。

条件を設定した SimpleRetryPolicy、FixedBackOffPolicy を RetryTemplate にセットし、RetryTemplate を RequestHandlerRetryAdvice にセットします。

RequestHandlerRetryAdvice を .handle(...) の第2引数に e -> e.advice(retryAdvice) と渡します。これで .handle(...) の第1引数に渡したラムダ式を最大5回、2秒待機でリトライします。

尚、以降の RequestHandlerRetryAdvice のサンプルでは IntegrationFlow はほぼ同じものを使うので、共通で呼び出す private メソッドを作成して引数で in ディレクトリ、RetryTemplate を変更できるようにしています。

    /**
     * 共通 retry 用 Flow
     * retryTemplate で設定されたリトライを実行する
     *
     * @param inDir         監視対象の in ディレクトリ
     * @param retryTemplate リトライ条件を設定した RetryTemplate クラスのインスタンス
     * @return
     */
    private IntegrationFlow retryFlow(File inDir, RetryTemplate retryTemplate) {
        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        retryAdvice.setRetryTemplate(retryTemplate);
        retryAdvice.setRecoveryCallback(context -> {
            // リトライが全て失敗するとこの処理が実行される
            MessageHandlingException e = (MessageHandlingException) context.getLastThrowable();
            Message<?> message = ((MessageHandlingException) context.getLastThrowable()).getFailedMessage();
            File payload = (File) message.getPayload();
            log.error("●●● " + e.getRootCause().getClass().getName());
            log.error("●●● " + payload.getName());
            return null;
        });

        return IntegrationFlows
                .from(s -> s.file(inDir)
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                .handle((GenericHandler<Object>) (p, h) -> {
                    RetryContext retryContext = RetrySynchronizationManager.getContext();
                    log.info("★★★ リトライ回数 = " + retryContext.getRetryCount());

                    // 例外を throw して強制的にリトライさせる
                    if (true) {
                        throw new RuntimeException();
                    }
                    return p;
                }, e -> e.advice(retryAdvice))
                .log()
                .channel(nullChannel)
                .get();
    }

    /**
     * リトライ最大回数は5回、リトライ時は2秒待機する
     *
     * @return
     */
    @Bean
    public RetryTemplate simpleAndFixedRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // SimpleRetryPolicy
        retryTemplate.setRetryPolicy(
                new SimpleRetryPolicy(5, singletonMap(Exception.class, true)));

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(2000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry01Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in01")
                , simpleAndFixedRetryTemplate());
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in01 の下に empty.txt を置くと以下のログが出力されます。2秒待機で5回リトライしていることが確認できます。回数指定の場合、最後のリトライの処理で例外が throw されるとすぐに RequestHandlerRetryAdvice::setRecoveryCallback(...) の処理が呼び出されています。

f:id:ksby:20170201224951p:plain

TimeoutRetryPolicy+ExponentialBackOffPolicy のサンプルを作成する

TimeoutRetryPolicy でリトライ最大45秒を指定し、ExponentialBackOffPolicy でリトライ時に初期値2秒、最大10秒、倍数2を指定します。

    /**
     * リトライは最大45秒、リトライ時は初期値2秒、最大10秒、倍数2(2,4,8,10,10,...)待機する
     *
     * @return
     */
    @Bean
    public RetryTemplate timeoutAndExponentialRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // TimeoutRetryPolicy
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(45000);
        retryTemplate.setRetryPolicy(timeoutRetryPolicy);

        // ExponentialBackOffPolicy
        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
        exponentialBackOffPolicy.setInitialInterval(2000);
        exponentialBackOffPolicy.setMaxInterval(10000);
        exponentialBackOffPolicy.setMultiplier(2.0);
        retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry02Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in02")
                , timeoutAndExponentialRetryTemplate());
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in02 の下に empty.txt を置くと以下のログが出力されます。2, 4, 8, 10, 10, 10, 10 とリトライ毎に待機時間が2倍になっており(ただし最大は10秒)、最後のリトライで 45 秒を超えたので処理が終了していることが確認できます。時間指定の場合、最後のリトライの処理で例外が throw されてもすぐには RequestHandlerRetryAdvice::setRecoveryCallback(...) の処理が呼び出されず、待機してから呼び出されるようです。

f:id:ksby:20170201225601p:plain

CompositeRetryPolicy のサンプルを作成する

SimpleRetryPolicy で最大3回、TimeoutRetryPolicy で最大10秒の2つのルールを CompositeRetryPolicy に設定します。

まずは FixedBackOffPolicy でリトライ時の待機時間を1秒にして、SimpleRetryPolicy の最大3回が適用されることを確認します。

    /**
     * 3回リトライするか、10秒を越えるまでリトライする
     * FixedBackOffPolicy はリトライ時の待機時間を1秒にしているので、
     * 3回リトライで終了する
     *
     * @return
     */
    @Bean
    public RetryTemplate compositeRetryTemplate01() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // CompositeRetryPolicy
        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
        RetryPolicy[] retryPolicies = new RetryPolicy[2];
        retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true));
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(10000);
        retryPolicies[1] = timeoutRetryPolicy;
        compositeRetryPolicy.setPolicies(retryPolicies);
        retryTemplate.setRetryPolicy(compositeRetryPolicy);

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry03Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in03")
                , compositeRetryTemplate01());
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in03 の下に empty.txt を置くと以下のログが出力されます。1秒待機で3回リトライして処理が終了していることが確認できます。

f:id:ksby:20170201225949p:plain

今度は FixedBackOffPolicy でリトライ時の待機時間を5秒にして、TimeoutRetryPolicy の最大10秒が適用されることを確認します。

    /**
     * 3回リトライするか、10秒を越えるまでリトライする
     * FixedBackOffPolicy はリトライ時の待機時間を5秒にしているので、
     * 10秒リトライで終了する
     *
     * @return
     */
    @Bean
    public RetryTemplate compositeRetryTemplate02() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // CompositeRetryPolicy
        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
        RetryPolicy[] retryPolicies = new RetryPolicy[2];
        retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true));
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(10000);
        retryPolicies[1] = timeoutRetryPolicy;
        compositeRetryPolicy.setPolicies(retryPolicies);
        retryTemplate.setRetryPolicy(compositeRetryPolicy);

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(5000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry04Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in04")
                , compositeRetryTemplate02());
    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in04 の下に empty.txt を置くと以下のログが出力されます。5秒待機で2回リトライして10秒を超えたので処理が終了していることが確認できます。

f:id:ksby:20170201230345p:plain

RequestHandlerRetryAdvice ではなく直接 RetryTemplate を利用して ExceptionClassifierRetryPolicy のサンプルを作成する

以下の内容で実装します。

  • FileSystemAlreadyExistsException なら最大1分間リトライします。
  • FileSystemNotFoundException なら最大5回リトライします。
  • ExponentialBackOffPolicy で初期の待機時間2秒、最大待機時間10秒、倍数2を指定します。

最初は FileSystemAlreadyExistsException を throw します。

    @Autowired
    private RetryTemplateMessageHandler retryTemplateMessageHandler;

    @Bean
    public IntegrationFlow retry05Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-advice/in05"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                .handle(this.retryTemplateMessageHandler)
                .log()
                .channel(nullChannel)
                .get();
    }

    @Configuration
    public static class RetryTemplateConfig {

        @Bean
        public RetryTemplate exceptionClassifierRetryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();

            // ExceptionClassifierRetryPolicy
            ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
            Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
            //  FileSystemAlreadyExistsException なら TimeoutRetryPolicy で最大1分間リトライ
            TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
            timeoutRetryPolicy.setTimeout(60000);
            policyMap.put(FileSystemAlreadyExistsException.class, timeoutRetryPolicy);
            //  FileSystemNotFoundException なら SimpleRetryPolicy で最大5回リトライ
            policyMap.put(FileSystemNotFoundException.class
                    , new SimpleRetryPolicy(5
                            , singletonMap(FileSystemNotFoundException.class, true)));
            retryPolicy.setPolicyMap(policyMap);
            retryTemplate.setRetryPolicy(retryPolicy);

            // ExponentialBackOffPolicy で初期の待機時間2秒、最大待機時間10秒、倍数2を指定
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(2000);
            exponentialBackOffPolicy.setMaxInterval(10000);
            exponentialBackOffPolicy.setMultiplier(2.0);
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

            return retryTemplate;
        }

    }

    @MessageEndpoint
    public static class RetryTemplateMessageHandler implements GenericHandler<File> {

        private final RetryTemplate exceptionClassifierRetryTemplate;

        public RetryTemplateMessageHandler(
                @Qualifier("exceptionClassifierRetryTemplate") RetryTemplate exceptionClassifierRetryTemplate) {
            this.exceptionClassifierRetryTemplate = exceptionClassifierRetryTemplate;
        }

        @Override
        public Object handle(File payload, Map<String, Object> headers) {
            Object result = this.exceptionClassifierRetryTemplate.execute(
                    context -> {
                        log.info("★★★ リトライ回数 = " + context.getRetryCount());

                        if (true) {
                            throw new FileSystemAlreadyExistsException();
                        }
                        return payload;
                    }, context -> {
                        Exception e = (Exception) context.getLastThrowable();
                        log.error("●●● " + e.getClass().getName());
                        log.error("●●● " + payload.getName());
                        return payload;
                    });

            return result;
        }

    }

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in05 の下に empty.txt を置くと以下のログが出力されます。リトライをして1分を超えたら処理が終了していることが確認できます。

f:id:ksby:20170203003657p:plain

次は FileSystemNotFoundException を throw します。

            Object result = this.exceptionClassifierRetryTemplate.execute(
                    context -> {
                        log.info("★★★ リトライ回数 = " + context.getRetryCount());

                        if (true) {
//                            throw new FileSystemAlreadyExistsException();
                            throw new FileSystemNotFoundException();
                        }
                        return payload;
                    }, context -> {

bootRun タスクを実行した後 C:\eipapp\ksbysample-eipapp-advice\in05 の下に empty.txt を置くと以下のログが出力されます。5回リトライして1分を超えずに処理が終了していることが確認できます。

f:id:ksby:20170203005049p:plain

続きます!

spring-retry って結構機能豊富なんですね。。。

次は ExpressionEvaluatingRequestHandlerAdvice を使用したサンプルを作成してみます。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

[compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
    maven { url "http://repo.spring.io/repo/" }
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-file")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin") {
        exclude module: 'spring-boot-starter-web'
    }

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.2")
}

Application.java

package ksbysample.eipapp.advice;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

FlowConfig.java

package ksbysample.eipapp.advice;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.CompositeRetryPolicy;
import org.springframework.retry.policy.ExceptionClassifierRetryPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.policy.TimeoutRetryPolicy;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.retry.support.RetryTemplate;

import java.io.File;
import java.nio.file.FileSystemAlreadyExistsException;
import java.nio.file.FileSystemNotFoundException;
import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.singletonMap;

@Slf4j
@Configuration
public class FlowConfig {

    private final NullChannel nullChannel;

    public FlowConfig(NullChannel nullChannel) {
        this.nullChannel = nullChannel;
    }

    /**
     * 共通 retry 用 Flow
     * retryTemplate で設定されたリトライを実行する
     *
     * @param inDir         監視対象の in ディレクトリ
     * @param retryTemplate リトライ条件を設定した RetryTemplate クラスのインスタンス
     * @return
     */
    private IntegrationFlow retryFlow(File inDir, RetryTemplate retryTemplate) {
        RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
        retryAdvice.setRetryTemplate(retryTemplate);
        retryAdvice.setRecoveryCallback(context -> {
            // リトライが全て失敗するとこの処理が実行される
            MessageHandlingException e = (MessageHandlingException) context.getLastThrowable();
            Message<?> message = ((MessageHandlingException) context.getLastThrowable()).getFailedMessage();
            File payload = (File) message.getPayload();
            log.error("●●● " + e.getRootCause().getClass().getName());
            log.error("●●● " + payload.getName());
            return null;
        });

        return IntegrationFlows
                .from(s -> s.file(inDir)
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                .handle((GenericHandler<Object>) (p, h) -> {
                    RetryContext retryContext = RetrySynchronizationManager.getContext();
                    log.info("★★★ リトライ回数 = " + retryContext.getRetryCount());

                    // 例外を throw して強制的にリトライさせる
                    if (true) {
                        throw new RuntimeException();
                    }
                    return p;
                }, e -> e.advice(retryAdvice))
                .log()
                .channel(nullChannel)
                .get();
    }

    /**
     * リトライ最大回数は5回、リトライ時は2秒待機する
     *
     * @return
     */
    @Bean
    public RetryTemplate simpleAndFixedRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // SimpleRetryPolicy
        retryTemplate.setRetryPolicy(
                new SimpleRetryPolicy(5, singletonMap(Exception.class, true)));

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(2000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry01Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in01")
                , simpleAndFixedRetryTemplate());
    }

    /**
     * リトライは最大45秒、リトライ時は初期値2秒、最大10秒、倍数2(2,4,8,10,10,...)待機する
     *
     * @return
     */
    @Bean
    public RetryTemplate timeoutAndExponentialRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // TimeoutRetryPolicy
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(45000);
        retryTemplate.setRetryPolicy(timeoutRetryPolicy);

        // ExponentialBackOffPolicy
        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
        exponentialBackOffPolicy.setInitialInterval(2000);
        exponentialBackOffPolicy.setMaxInterval(10000);
        exponentialBackOffPolicy.setMultiplier(2.0);
        retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry02Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in02")
                , timeoutAndExponentialRetryTemplate());
    }

    /**
     * 3回リトライするか、10秒を越えるまでリトライする
     * FixedBackOffPolicy はリトライ時の待機時間を1秒にしているので、
     * 3回リトライで終了する
     *
     * @return
     */
    @Bean
    public RetryTemplate compositeRetryTemplate01() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // CompositeRetryPolicy
        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
        RetryPolicy[] retryPolicies = new RetryPolicy[2];
        retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true));
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(10000);
        retryPolicies[1] = timeoutRetryPolicy;
        compositeRetryPolicy.setPolicies(retryPolicies);
        retryTemplate.setRetryPolicy(compositeRetryPolicy);

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(1000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry03Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in03")
                , compositeRetryTemplate01());
    }

    /**
     * 3回リトライするか、10秒を越えるまでリトライする
     * FixedBackOffPolicy はリトライ時の待機時間を5秒にしているので、
     * 10秒リトライで終了する
     *
     * @return
     */
    @Bean
    public RetryTemplate compositeRetryTemplate02() {
        RetryTemplate retryTemplate = new RetryTemplate();

        // CompositeRetryPolicy
        CompositeRetryPolicy compositeRetryPolicy = new CompositeRetryPolicy();
        RetryPolicy[] retryPolicies = new RetryPolicy[2];
        retryPolicies[0] = new SimpleRetryPolicy(3, singletonMap(Exception.class, true));
        TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
        timeoutRetryPolicy.setTimeout(10000);
        retryPolicies[1] = timeoutRetryPolicy;
        compositeRetryPolicy.setPolicies(retryPolicies);
        retryTemplate.setRetryPolicy(compositeRetryPolicy);

        // FixedBackOffPolicy
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        fixedBackOffPolicy.setBackOffPeriod(5000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

        return retryTemplate;
    }

    @Bean
    public IntegrationFlow retry04Flow() {
        return retryFlow(new File("C:/eipapp/ksbysample-eipapp-advice/in04")
                , compositeRetryTemplate02());
    }

    /**
     * ここから下は RequestHandlerRetryAdvice は使用せず RetryTemplate を直接使用して
     * ExceptionClassifierRetryPolicy でリトライした例である
     */

    @Autowired
    private RetryTemplateMessageHandler retryTemplateMessageHandler;

    @Bean
    public IntegrationFlow retry05Flow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-advice/in05"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                .handle(this.retryTemplateMessageHandler)
                .log()
                .channel(nullChannel)
                .get();
    }

    @Configuration
    public static class RetryTemplateConfig {

        @Bean
        public RetryTemplate exceptionClassifierRetryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();

            // ExceptionClassifierRetryPolicy
            ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
            Map<Class<? extends Throwable>, RetryPolicy> policyMap = new HashMap<>();
            //  FileSystemAlreadyExistsException なら TimeoutRetryPolicy で最大1分間リトライ
            TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
            timeoutRetryPolicy.setTimeout(60000);
            policyMap.put(FileSystemAlreadyExistsException.class, timeoutRetryPolicy);
            //  FileSystemNotFoundException なら SimpleRetryPolicy で最大5回リトライ
            policyMap.put(FileSystemNotFoundException.class
                    , new SimpleRetryPolicy(5
                            , singletonMap(FileSystemNotFoundException.class, true)));
            retryPolicy.setPolicyMap(policyMap);
            retryTemplate.setRetryPolicy(retryPolicy);

            // ExponentialBackOffPolicy で初期の待機時間2秒、最大待機時間10秒、倍数2を指定
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(2000);
            exponentialBackOffPolicy.setMaxInterval(10000);
            exponentialBackOffPolicy.setMultiplier(2.0);
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);

            return retryTemplate;
        }

    }

    @MessageEndpoint
    public static class RetryTemplateMessageHandler implements GenericHandler<File> {

        private final RetryTemplate exceptionClassifierRetryTemplate;

        public RetryTemplateMessageHandler(
                @Qualifier("exceptionClassifierRetryTemplate") RetryTemplate exceptionClassifierRetryTemplate) {
            this.exceptionClassifierRetryTemplate = exceptionClassifierRetryTemplate;
        }

        @Override
        public Object handle(File payload, Map<String, Object> headers) {
            Object result = this.exceptionClassifierRetryTemplate.execute(
                    context -> {
                        log.info("★★★ リトライ回数 = " + context.getRetryCount());

                        if (true) {
//                            throw new FileSystemAlreadyExistsException();
                            throw new FileSystemNotFoundException();
                        }
                        return payload;
                    }, context -> {
                        Exception e = (Exception) context.getLastThrowable();
                        log.error("●●● " + e.getClass().getName());
                        log.error("●●● " + payload.getName());
                        return payload;
                    });

            return result;
        }

    }

}

履歴

2017/02/03
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その14 )( delayer のサンプルを作ってみる )

概要

記事一覧はこちらです。

  • Spring Integration DSL8.6 Delayer を使用したサンプルを作成します。
  • Delayer は指定した時間 Message を待機させる機能です。
  • RequestHandlerAdvice、.bridge(…)、MessageStore も初めて使っていますが今回説明は入れていません。RequestHandlerAdvice、MessageStore は慣れておきたいので、その内調べてまとめておきたいですね。。。

参照したサイト・書籍

  1. Spring Integration Reference Manual - 8.6 Delayer
    http://docs.spring.io/spring-integration/reference/htmlsingle/#delayer

  2. handling file backup/archive on success and retry on failure
    http://stackoverflow.com/questions/24016152/handling-file-backup-archive-on-success-and-retry-on-failure

    • ExpressionEvaluatingRequestHandlerAdvice の使い方の参考にしました。

目次

  1. ksbysample-eipapp-delayer プロジェクトを作成する
  2. C:\eipapp\ksbysample-eipapp-delayer ディレクトリを作成する
  3. まずはシンプルに時間を指定して待機させてみる
  4. Message の header, payload に指定した時間で待機させてみる
  5. 待機する時間を動的に変えてみる
  6. .delay(...) の第1引数には GROUP_ID を指定するが、何かを MessageGroupStore に保存するのか?

手順

ksbysample-eipapp-delayer プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. src/main/java の下に ksbysample.eipapp.delayer パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/delayer の下に Application.java を作成し、リンク先の内容 を記述します。

C:\eipapp\ksbysample-eipapp-delayer ディレクトリを作成する

以下の構成のディレクトリを作成します。

C:\eipapp\ksbysample-eipapp-delayer
├ in01
├ in02
├ in03
└ in04

処理開始用のファイルには Spring Boot + Spring Integration でいろいろ試してみる ( その10 )( URL一覧のファイルが置かれたらアクセス可能かチェックして結果ファイルに出力する ) で作成した 2014.txt と、必要に応じて作成したもの使用します。

まずはシンプルに時間を指定して待機させてみる

  1. src/main/java/ksbysample/eipapp/delayer の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

  2. bootRun タスクを実行した後、C:\eipapp\ksbysample-eipapp-delayer\in01 に 2014.txt を置きます。

    以下のログが出力されて、実装通り3秒待機→6秒待機していることが確認できます。

    f:id:ksby:20170129023442p:plain

  3. bootRun タスクを停止します。

Message の header, payload に指定した時間で待機させてみる

  1. src/main/java/ksbysample/eipapp/delayer の下の FlowConfig.javaリンク先のその2の内容 に変更します。

  2. 動作確認用に、中が空の empty.txt と、中に “10000” と記述した 10000.txt を作成します。

  3. bootRun タスクを実行した後、最初に C:\eipapp\ksbysample-eipapp-delayer\in02 に empty.txt を置きます。

    以下のログが出力されて、(DelayerEndpointSpec e) -> e.defaultDelay(...) で指定されているデフォルトの待機秒数が使用されて、8秒待機→2秒待機していることが確認できます。

    f:id:ksby:20170129024605p:plain

  4. 次に C:\eipapp\ksbysample-eipapp-delayer\in02 に 10000.txt を置きます。

    以下のログが出力されて、ファイルに記述した待機秒数が使用されて、10秒待機→10秒待機していることが確認できます。

    f:id:ksby:20170129025132p:plain

  5. bootRun タスクを停止します。

待機する時間を動的に変えてみる

Message の headers にセットされた delayInit, delayCount の2つを参照して delayInit + delayCount * 1000 ミリ秒待機する、というサンプルを作成します。

  1. src/main/java/ksbysample/eipapp/delayer の下の FlowConfig.javaリンク先のその3の内容 に変更します。

  2. bootRun タスクを実行した後、C:\eipapp\ksbysample-eipapp-delayer\in03 に 2014.txt を置きます。

    以下のログが出力されて、待機秒数が3秒→4秒→5秒と増えていることが確認できます。

    f:id:ksby:20170129094149p:plain

  3. bootRun タスクを停止します。

.delay(...) の第1引数には GROUP_ID を指定するが、何かを MessageGroupStore に保存するのか?

org.springframework.integration.handler の DelayHandler.java の doInit メソッドを見ると messageStore が指定されていない場合には this.messageStore = new SimpleMessageStore(); と実装されているので、デフォルトでは delay 中は SimpleMessageStore に何かのデータを入れており、その時に GROUP_ID を使用するようです。

SimpleMessageStore のクラス図を IntelliJ IDEA で作成してみると MessageGroupStore インターフェースを実装していることが分かります。

f:id:ksby:20170129165355p:plain

何を MessageGroupStore に入れるのか確認してみます。

  1. src/main/java/ksbysample/eipapp/delayer の下の FlowConfig.javaリンク先のその4の内容 に変更します。

  2. bootRun タスクを実行した後、C:\eipapp\ksbysample-eipapp-delayer\in04 に 2014.txt を置きます。

    以下のログが出力されます。

    f:id:ksby:20170129165607p:plain

    • delay が開始されると指定した simpleMessageStore にデータがセットされて、delay が終了すると削除されることが確認できます。
    • header は id, timestamp のみでした。カスタムの header が追加されてはいないようです。
    • payload は org.springframework.integration.handler の DelayHandler クラス内で定義されている DelayedMessageWrapper クラスのデータがセットされていました。DelayedMessageWrapper クラスは処理開始時点の日時がセットされる long requestDate と、オリジナルの Message がセットされる Message<?> original をフィールドに持つクラスでした。
  3. bootRun タスクを停止します。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

[compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
    maven { url "http://repo.spring.io/repo/" }
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-file")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.2")
}

Application.java

package ksbysample.eipapp.delayer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

FlowConfig.java

■その1

package ksbysample.eipapp.delayer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.DelayerEndpointSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;

import java.io.File;

@Slf4j
@Configuration
public class FlowConfig {

    private static final String GROUP_ID_DELAY = "DELAYFLOW_DELAY";

    private final NullChannel nullChannel;

    public FlowConfig(NullChannel nullChannel) {
        this.nullChannel = nullChannel;
    }

    /**
     * 正常処理時に payload にセットされた File クラスのファイルを削除する RequestHandlerAdvice
     *
     * @return
     */
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice fileDeleteAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice requestHandlerAdvice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        requestHandlerAdvice.setOnSuccessExpression("payload.delete()");
        return requestHandlerAdvice;
    }


    @Bean
    public IntegrationFlow delayFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in01"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                // 3秒待機する
                .delay(GROUP_ID_DELAY, (DelayerEndpointSpec e) -> e.defaultDelay(3000))
                .log()
                // 6秒待機する
                .delay(GROUP_ID_DELAY, "6000")
                .log()
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

}

■その2

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public IntegrationFlow delayByMessageFlow() {
        return IntegrationFlows
                // C:/eipapp/ksbysample-eipapp-delayer/in02 には待機秒数を記述したファイルを置く
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in02"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // payload にセットされた File クラスを headers.originalPayload へセットする
                .enrichHeaders(h -> h.headerExpression("originalPayload", "payload"))
                // ファイルの内容を読み込んで payload にセットする
                // ※payload のクラスが File クラス --> String クラスに変わる
                .transform(Transformers.fileToString())
                .log()
                // payload にセットされた秒数待機する
                .delay(GROUP_ID_DELAY, "payload", (DelayerEndpointSpec e) -> e.defaultDelay(8000))
                .log()
                // headers.delay にセットされた秒数待機する
                .enrichHeaders(h -> h.headerExpression("delay", "payload"))
                .delay(GROUP_ID_DELAY, "headers.delay", (DelayerEndpointSpec e) -> e.defaultDelay(2000))
                .log()
                // headers.originalPayload にセットされている File クラスを payload へ戻す
                .transform("headers.originalPayload")
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

}

■その3

@Slf4j
@Configuration
public class FlowConfig {

    ..........


    @Bean
    public IntegrationFlow delayDynamicFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in03"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // 待機秒数を計算するための delayInit, delayCount の初期値を header にセットする
                .enrichHeaders(h -> h
                        .header("delayInit", 3000)
                        .header("delayCount", 0))
                .log()
                // delayCount が 3 になるまで処理をループさせたいので、別の Flow にする
                // return f -> f.~ で書いた場合、Bean名 + ".input" という MessageChannel
                // が自動生成されて、ここに Message を送信すると処理が開始される
                .channel("loopCountFlow.input")
                .get();
    }

    @Bean
    public IntegrationFlow loopCountFlow() {
        return f -> f
                // delayInit + delayCount * 1000 のミリ秒数待機する
                .delay(GROUP_ID_DELAY, m -> {
                    return ((int) m.getHeaders().get("delayInit"))
                            + (((int) m.getHeaders().get("delayCount")) * 1000);
                })
                .log()
                // headers.delayCount を +1 する
                .enrichHeaders(h -> h.headerFunction("delayCount"
                        , m -> ((int) m.getHeaders().get("delayCount")) + 1
                        , true))
                // delayCount が3未満なら loopCountFlow の最初に戻る
                // そうでなければ次の処理へ
                .routeToRecipients(r -> r
                        .recipientFlow("headers.delayCount < 3"
                                , sf -> sf.channel("loopCountFlow.input"))
                        .defaultOutputToParentFlow())
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .handle((p, h) -> {
                    System.out.println("ファイルを削除しました");
                    return null;
                });
    }

}

■その4

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public MessageGroupStore simpleMessageStore() {
        return new SimpleMessageStore();
    }

    /**
     * "" の String の Message を返す MessageSource
     *
     * @return
     */
    @Bean
    public MessageSource<String> stringMessageSource() {
        return () -> MessageBuilder.withPayload("").build();
    }

    @Bean
    public IntegrationFlow printMessageStoreFlow() {
        return IntegrationFlows
                // 1秒毎に処理を実行したいので、1秒毎に Message を送信させる
                .from(stringMessageSource()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // MessageStore に格納された Message の headers, payload を出力する
                .handle((p, h) -> {
                    Collection<Message<?>> delayMessagesList
                            = simpleMessageStore().getMessagesForGroup(GROUP_ID_DELAY);
                    delayMessagesList.forEach(m -> {
                        m.getHeaders().entrySet().forEach(entry -> {
                            System.out.println("[header ] " + entry.getKey() + " = " + entry.getValue());
                        });
                        System.out.println("[payload] " + m.getPayload());
                    });
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow checkMessageStoreFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in04"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                // 3秒待機する
                .delay(GROUP_ID_DELAY
                        , (DelayerEndpointSpec e) -> e.defaultDelay(3000)
                                .messageStore(simpleMessageStore()))
                .log()
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

}
  • checkMessageStoreFlow は in04 ディレクトリにファイルが置かれる→3秒待機→ファイル削除する、Flow です。MessageGroupStore のデータを確認したいので .messageStore(simpleMessageStore()) で指定しています。
  • printMessageStoreFlow は1秒毎に MessageGroupStore をチェックしてデータがあれば出力する Flow です。checkMessageStoreFlow とは別に並行して処理が実行されます。

■最終形

package ksbysample.eipapp.delayer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.DelayerEndpointSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.Transformers;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

import java.io.File;
import java.util.Collection;
import java.util.Map;

@Slf4j
@Configuration
public class FlowConfig {

    private static final String GROUP_ID_DELAY = "DELAYFLOW_DELAY";

    private final NullChannel nullChannel;

    public FlowConfig(NullChannel nullChannel) {
        this.nullChannel = nullChannel;
    }

    /**
     * 正常処理時に payload にセットされた File クラスのファイルを削除する RequestHandlerAdvice
     *
     * @return
     */
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice fileDeleteAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice requestHandlerAdvice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        requestHandlerAdvice.setOnSuccessExpression("payload.delete()");
        return requestHandlerAdvice;
    }

    @Bean
    public IntegrationFlow delayFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in01"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                // 3秒待機する
                .delay(GROUP_ID_DELAY
                        , (DelayerEndpointSpec e) -> e.defaultDelay(3000))
                .log()
                // 6秒待機する
                .delay(GROUP_ID_DELAY, "6000")
                .log()
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

    @Bean
    public IntegrationFlow delayByMessageFlow() {
        return IntegrationFlows
                // C:/eipapp/ksbysample-eipapp-delayer/in02 には待機秒数を記述したファイルを置く
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in02"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // payload にセットされた File クラスを headers.originalPayload へセットする
                .enrichHeaders(h -> h.headerExpression("originalPayload", "payload"))
                // ファイルの内容を読み込んで payload にセットする
                // ※payload のクラスが File クラス --> String クラスに変わる
                .transform(Transformers.fileToString())
                .log()
                // payload にセットされた秒数待機する
                .delay(GROUP_ID_DELAY, "payload", (DelayerEndpointSpec e) -> e.defaultDelay(8000))
                .log()
                // headers.delay にセットされた秒数待機する
                .enrichHeaders(h -> h.headerExpression("delay", "payload"))
                .delay(GROUP_ID_DELAY, "headers.delay", (DelayerEndpointSpec e) -> e.defaultDelay(2000))
                .log()
                // headers.originalPayload にセットされている File クラスを payload へ戻す
                .transform("headers.originalPayload")
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

    @Bean
    public IntegrationFlow delayDynamicFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in03"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // 待機秒数を計算するための delayInit, delayCount の初期値を header にセットする
                .enrichHeaders(h -> h
                        .header("delayInit", 3000)
                        .header("delayCount", 0))
                .log()
                // delayCount が 3 になるまで処理をループさせたいので、別の Flow にする
                // return f -> f.~ で書いた場合、Bean名 + ".input" という MessageChannel
                // が自動生成されて、ここに Message を送信すると処理が開始される
                .channel("loopCountFlow.input")
                .get();
    }

    @Bean
    public IntegrationFlow loopCountFlow() {
        return f -> f
                // delayInit + delayCount * 1000 のミリ秒数待機する
                .delay(GROUP_ID_DELAY, m -> {
                    return ((int) m.getHeaders().get("delayInit"))
                            + (((int) m.getHeaders().get("delayCount")) * 1000);
                })
                .log()
                // headers.delayCount を +1 する
                .enrichHeaders(h -> h.headerFunction("delayCount"
                        , m -> ((int) m.getHeaders().get("delayCount")) + 1
                        , true))
                // delayCount が3未満なら loopCountFlow の最初に戻る
                // そうでなければ次の処理へ
                .routeToRecipients(r -> r
                        .recipientFlow("headers.delayCount < 3"
                                , sf -> sf.channel("loopCountFlow.input"))
                        .defaultOutputToParentFlow())
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .handle((p, h) -> {
                    System.out.println("ファイルを削除しました");
                    return null;
                });
    }

    @Bean
    public MessageGroupStore simpleMessageStore() {
        return new SimpleMessageStore();
    }

    /**
     * "" の String の Message を返す MessageSource
     *
     * @return
     */
    @Bean
    public MessageSource<String> stringMessageSource() {
        return () -> MessageBuilder.withPayload("").build();
    }

    @Bean
    public IntegrationFlow printMessageStoreFlow() {
        return IntegrationFlows
                // 1秒毎に処理を実行したいので、1秒毎に Message を送信させる
                .from(stringMessageSource()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // MessageStore に格納された Message の headers, payload を出力する
                .handle((p, h) -> {
                    Collection<Message<?>> delayMessagesList
                            = simpleMessageStore().getMessagesForGroup(GROUP_ID_DELAY);
                    delayMessagesList.forEach(m -> {
                        m.getHeaders().entrySet().forEach(entry -> {
                            System.out.println("[header ] " + entry.getKey() + " = " + entry.getValue());
                        });
                        System.out.println("[payload] " + m.getPayload());
                    });
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow checkMessageStoreFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in04"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                // 3秒待機する
                .delay(GROUP_ID_DELAY
                        , (DelayerEndpointSpec e) -> e.defaultDelay(3000)
                                .messageStore(simpleMessageStore()))
                .log()
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

}

履歴

2017/01/29
初版発行。

IntelliJ IDEA 2016.3 の新機能 Semantic highlighting がとても気に入りました!

共有ライブラリを管理するために Sonatype の Nexus Repository Manager OSS を使用する ( 番外編 )( IntelliJ IDEA 2016.3 の新機能を試してみる ) でいくつか 2016.3 の新機能を使う設定にして IntelliJ IDEA を使い続けていましたが、設定当初はあまり期待していなかった Semantic highlighting が使っていくうちに一番気に入りました。ソースが非常に読みやすくなります!

標準では有効になっていないので、設定していない人は是非設定することをオススメします。白い画面だと Semantic highlighting は見にくいので、Scheme は Darcula にしましょう。

以下が画面サンプルです。

Scheme: Default、Semantic highlighting 無効 f:id:ksby:20170128224943p:plain

Scheme: Default、Semantic highlighting 有効 f:id:ksby:20170128225206p:plain

Scheme: Darcula、Semantic highlighting 無効 f:id:ksby:20170128225417p:plain

Scheme: Darcula、Semantic highlighting 有効 f:id:ksby:20170128225625p:plain

また他の機能ですが、

  • 「Enable font ligatures」はソースによってあまりに表示・編集が重くなりすぎるので、無効にしました。
  • フォントも「Enable font ligatures」を有効にしないなら Fira Code font はあまり好きなフォントではなかったので、Source Code Pro に戻して Size も 12pt に戻しています。
  • Parameter hints は Semantic highlighting 程ではありませんが、気に入っています。変数名が出ると便利です。

Java SE を 8u112 → 8u121 へ、IntelliJ IDEA を 2016.3.2 → 2016.3.3 へ、Git for Windows を 2.11.0 → 2.11.0(3) へバージョンアップ

Java SE を 8u112 → 8u121 へバージョンアップする

  1. OracleJava SE Downloads を見ると 8u121 がダウンロードできるようになっていました。以下のページに説明があります。

    8u121 へバージョンアップします。

  2. jdk-8u121-windows-x64.exe をダウンロードして C:\Java\jdk1.8.0_121 へインストールした後、環境変数 JAVA_HOME のパスを C:\Java\jdk1.8.0_121 へ変更します。

    コマンドプロンプトから java -version を実行し、1.8.0_121 に変更されていることを確認します。

    f:id:ksby:20170128192340p:plain

  3. IntelliJ IDEA を再起動した後、プロジェクトで使用する Java SE を 8u121 へ変更します。

  4. 開いているプロジェクトを閉じて「Welcome to IntelliJ IDEA」ダイアログを表示します。

  5. ダイアログ下部の「Configure」-「Project Defaults」-「Project Structure」を選択します。

    f:id:ksby:20170128192957p:plain

  6. 「Default Project Structure」ダイアログが表示されます。画面左側で「Project Settings」-「Project」を選択後、画面右側の「Project SDK」の「New…」ボタンをクリックし、表示されるメニューから「JDK」を選択します。

    f:id:ksby:20170128193323p:plain

  7. 「Select Home Directory for JDK」ダイアログが表示されます。環境変数 JAVA_HOME のディレクトリが選択された状態で表示されますので、そのまま「OK」ボタンをクリックします。

    f:id:ksby:20170128193928p:plain

  8. 「Default Project Structure」ダイアログに戻るので、今度は「Project SDK」の「Edit」ボタンをクリックします。

    f:id:ksby:20170128194842p:plain

  9. 画面左側で「Platform Settings」-「SDKs」が選択された状態になるので、画面右上の入力フィールドで “1.8” → “1.8.0_121” へ変更します。

    f:id:ksby:20170128195507p:plain

  10. 次に中央のリストから「1.8.0_112」を選択した後、リストの上の「-」ボタンをクリックして削除します。

    f:id:ksby:20170128195720p:plain

  11. 「OK」ボタンをクリックして「Default Project Structure」ダイアログを閉じます。

  12. 「Welcome to IntelliJ IDEA」ダイアログに戻ったら、ksbysample-library-depend-nospring プロジェクトを開きます。

  13. IntelliJ IDEA のメイン画面が開いたら、メニューから「File」-「Project Structure…」を選択します。

  14. 「Project Structure」ダイアログが表示されます。以下の画像の状態になっているので、

    f:id:ksby:20170128200107p:plain

    「Project SDK」と「Project language level」を選択し直します。

    f:id:ksby:20170128200300p:plain

  15. 「OK」ボタンをクリックして「Project Structure」ダイアログを閉じます。

  16. メイン画面に戻ると画面右下に「Indexing…」の表示が出るので、終了するまで待ちます。

    f:id:ksby:20170128200514p:plain

  17. Build 及びテストで問題がないか確認します。Gradle projects View から clean タスクの実行→「Rebuild Project」メニューの実行→build タスクの実行を行い、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。

    f:id:ksby:20170128201033p:plain

  18. 特に問題は発生しませんでした。8u121 で開発を進めます。

IntelliJ IDEA を 2016.3.2 → 2016.3.3 へバージョンアップする

IntelliJ IDEA の 2016.3.3 がリリースされたのでバージョンアップします。

※上の Java SE のバージョンアップからの続きで ksbysample-nexus-repomng/ksbysample-library-depend-nospring プロジェクトを開いた状態でバージョンアップしています。

  1. IntelliJ IDEA のメインメニューから「Help」-「Check for Updates…」を選択します。

  2. 「Platform and Plugin Updates」ダイアログが表示されます。左下に「Update and Restart」ボタンが表示されていますので、「Update and Restart」ボタンをクリックします。

    f:id:ksby:20170128202000p:plain

  3. JRebel for IntelliJ と Lombok Plugin の update も表示されたので、チェックしたまま「Update and Restart」ボタンをクリックします。

    f:id:ksby:20170128202133p:plain

  4. Patch がダウンロードされて IntelliJ IDEA が再起動します。今回は再起動時に以下の画像の「Update」ダイアログが表示されたので、「Proceed」ボタンをクリックします。

    f:id:ksby:20170128203827p:plain

  5. メイン画面が表示されると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

    f:id:ksby:20170128204111p:plain

  6. 処理が終了すると Gradle Tool Window のツリーの表示が other グループしかない初期の状態に戻っていますので、左上の「Refresh all Gradle projects」ボタンをクリックして更新します。更新が完了すると build グループ等が表示されます。

  7. clean タスク実行 → Rebuild Project 実行をした後、build タスクを実行して “BUILD SUCCESSFUL” のメッセージが表示されることを確認します。

    f:id:ksby:20170128204529p:plain

  8. Project Tool Window で src/test を選択した後、コンテキストメニューを表示して「Run ‘All Tests’ with Coverage」を選択し、テストが全て成功することを確認します。

    f:id:ksby:20170128204732p:plain

Git for Windows を 2.11.0 → 2.11.0(3) へバージョンアップする

Git for Windows の 2.11.0(3) がリリースされていたのでバージョンアップします。実は前回バージョンアップした時に気づかなかったのですが、git-cmd.exe を起動した画面で日本語が表示・入力できなくなっていました。

f:id:ksby:20170128205402p:plain

  1. https://git-for-windows.github.io/ の「Download」ボタンをクリックして Git-2.11.0.3-64-bit.exe をダウンロードします。

  2. Git-2.11.0.3-64-bit.exe を実行します。

  3. 「Git 2.11.0.3 Setup」ダイアログが表示されます。[Next >]ボタンをクリックします。

  4. 「Select Components」画面が表示されます。全てのチェックが外れたままであることを確認した後、[Next >]ボタンをクリックします。

  5. 「Adjusting your PATH environment」画面が表示されます。中央の「Use Git from the Windows Command Prompt」が選択されていることを確認後、[Next >]ボタンをクリックします。

  6. 「Configuring the line ending conversions」画面が表示されます。「Checkout Windows-style, commit Unix-style line endings」が選択されていることを確認した後、[Next >]ボタンをクリックします。

  7. 「Configuring the terminal emulator to use with Git Bash」画面が表示されます。「Use Windows'default console window」が選択されていることを確認した後、[Next >]ボタンをクリックします。

  8. 「Configuring extra options」画面が表示されます。「Enable file system caching」だけがチェックされていることを確認した後、[Next >]ボタンをクリックします。

  9. 「Configuring experimental options」画面が表示されます。全てのチェックが外れたままであることを確認した後、[Install]ボタンをクリックします。

  10. インストールが完了すると「Completing the Git Setup Wizard」のメッセージが表示された画面が表示されます。中央の「View ReleaseNotes.html」のチェックを外した後、「Finish」ボタンをクリックしてインストーラーを終了します。

  11. コマンドプロンプトを起動して git のバージョンが git version 2.11.0.windows.3 になっていることを確認します。

    f:id:ksby:20170128221500p:plain

  12. git-cmd.exe を起動して日本語の表示・入力が問題ないかを確認します。

    f:id:ksby:20170128221826p:plain

  13. 特に問題はないようですので、2.11.0(3) で作業を進めたいと思います。

Spring Boot + Spring Integration でいろいろ試してみる ( その13 )( FTP サーバからファイルをダウンロードして SFTP サーバへアップロードする2 )

概要

記事一覧はこちらです。

参照したサイト・書籍

目次

  1. メモ書き
    1. FtpInboundChannelAdapter ( s -> s.ftp(this.ftpSessionFactory).~ ) はファイルを FTP ダウンロードしていなくても in ディレクトリにファイルがあれば Message を次に送信する?
    2. .localFilter(new AcceptAllFileListFilter<>()) を指定しないとどうなるのか?
    3. Pollers.maxMessagesPerPoll(...) を指定しないとどうなるのか?
    4. マルチスレッドで並列処理しても spring-retry のリトライ回数はスレッドの処理毎に維持されるのか?

手順

メモ書き

FtpInboundChannelAdapter ( s -> s.ftp(this.ftpSessionFactory).~ ) はファイルを FTP ダウンロードしていなくても in ディレクトリにファイルがあれば Message を次に送信する?

FtpInboundChannelAdapter を MessageSource に指定している場合 FTP サーバからファイルをダウンロードした時だけ次に Message を送信するものと思っていたのですが、実は .localDirectory(...) に指定したディレクトリにファイルを置いても次に Message を送信します。

分かりやすくするために FlowConfig.java の ftp2SftpFlow Bean の .from(...) の後に .log() を追加します。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                .log()
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))

まずは bootRun で起動した後 FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt を置くと以下のログが出力されます。Message が送信されて .log() により “GenericMessage [payload=…” のログが出力されています。

f:id:ksby:20170127092504p:plain

今度は FTP サーバ側ではなく .localDirectory(...) に指定している C:\eipapp\ksbysample-eipapp-ftp2sftp\recv に直接 2015.txt を置いてみます。

以下のログが出力されます。FTP ダウンロードはしていないのに Message が送信されます。

f:id:ksby:20170127093145p:plain

動作状況を見ている感じだと FtpInboundChannelAdapter とは FTP ダウンロード機能が付いた FileInboundAdaper で、FTP ダウンロードできなくてもローカルディレクトリにファイルがあれば次に Message が送信されます。

.localFilter(new AcceptAllFileListFilter<>()) を指定しないとどうなるのか?

指定しないと AcceptOnceFileListFilter<File> だけが適用されます。

org.springframework.integration.file.remote.synchronizer の AbstractInboundFileSynchronizingMessageSource.java を見ると private volatile FileListFilter<File> localFileListFilter = new AcceptOnceFileListFilter<File>(); の記述があります。

上で変更した FlowConfig.java の ftp2SftpFlow Bean の .from(...) の後に .log() を追加したものに、.localFilter(...) の記述を全てコメントアウトして動作確認してみます。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
//                                .localFilter(new AcceptAllFileListFilter<>())
//                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                .log()
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))

bootRun で起動していたら一度停止した後、bootRun で起動します。

まずは FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt を置くと以下のログが出力されます。

f:id:ksby:20170127095404p:plain

次に再度 FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt を置いてみます。

f:id:ksby:20170127095638p:plain

今度は FTP ダウンロードは行われますが、既に1度処理しているので AcceptOnceFileListFilter により次に Message は送信されません。ダウンロードされた 2014.txt は C:\eipapp\ksbysample-eipapp-ftp2sftp\recv に残ったままでした。

また .localFilter(new AcceptAllFileListFilter<>()) を記述して localDirectory(...) に指定したディレクトリからファイルを移動しないとどうなるのかも見てみます。以下のようにソースを変更します。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                .log()
                .channel((nullChannel))
                .get();

bootRun で起動した後 FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt を置くと、以下のように FTP ダウンロードされたファイルが次々と Message で送信され続けます。

f:id:ksby:20170127100748p:plain

.localFilter(new AcceptAllFileListFilter<>()) を記述するなら、同じスレッドの処理で localDirectory(...) に指定したディレクトリからファイルを移動・削除する必要があります。

Pollers.maxMessagesPerPoll(...) を指定しないとどうなるのか?

Message を送信する処理で、1度に送信するのが1ファイルだけになります。例えば2ファイルある場合、最初に1つ目のファイルの Message を送信したら、次は poller で指定した時間が経過した後に2つ目のファイルの Message が送信されます。

.maxMessagesPerPoll(100)コメントアウトして動作確認してみます。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
//                                .maxMessagesPerPoll(100)))
                        ))
                .log()
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))

bootRun で起動した後 FTP サーバの C:\Xlight\ftproot\recv01\out に 2014.txt, 2015.txt, 2016.txt の3ファイルを置くと以下のようにログが出力されました。

f:id:ksby:20170127103404p:plain

  • ファイルは最初に3ファイル全て FTP ダウンロードされます。
  • その後 Message を送信する処理のところで、5秒間隔で1ファイルずつ送信されます。

マルチスレッドで並列処理しても spring-retry のリトライ回数はスレッドの処理毎に維持されるのか?

org.springframework.retry.support の RetrySynchronizationManager.java をみると private static final ThreadLocal<RetryContext> context = new ThreadLocal<RetryContext>(); と RetryContext に ThreadLocal を利用しているので維持されそうです。確認してみます。

リトライ処理を行う SFTP アップロードの処理をマルチスレッドで並列に処理されるようにしたいので、.channel(c -> c.executor(Executors.newCachedThreadPool())) を追加します。また recv ディレクトリに残っているファイルが何度も Message で送信されてこないよう .filter(...) の2行をコメントアウトします。

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
//                                .localFilter(new AcceptAllFileListFilter<>())
//                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                // 以降の処理をマルチスレッドで並列処理する
                .channel(c -> c.executor(Executors.newCachedThreadPool()))
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))
                // SFTPサーバにファイルをアップロードする
                .handle(this.sftpUploadMessageHandler)
                // SFTP アップロードのエラーチェック用 header ( sftpUploadError ) をチェックし、
                // false ならば send ディレクトリへ、true ならば error ディレクトリへファイルを移動する
                .routeToRecipients(r -> r

SFTP サーバを停止した後、bootRun で起動します。起動後、C:\Xlight\ftproot\recv01\out に 2014.txt を配置 → 2回リトライのログが出る → 2015.txt を配置 → 1回リトライのログが出る → 2016.txt を配置、の順にファイルを置きます。

以下のようにログが出力されました。

f:id:ksby:20170128005309p:plain

  • 2014.txt は pool-1-thread-1、2015.txt は pool-1-thread-2、2016.txt は pool-1-thread-3 とファイル毎に異なるスレッドで処理されています。
  • リトライ回数はスレッド毎に 1, 2, 3, 4 と増えており、特に回数がおかしくなることはありませんでした。

履歴

2017/01/28
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その12 )( FTP サーバからファイルをダウンロードして SFTP サーバへアップロードする )

概要

記事一覧はこちらです。

  • Spring Integration DSL のサンプルを作成します。
  • 以下の処理を行う常駐型アプリケーションを作成します。
    • FTPサーバに recv01 ユーザでログインし /out ディレクトリにファイルがあるかチェックします。ファイルがあれば D:\eipapp\ksbysample-eipapp-ftp2sftp\recv ディレクトリにダウンロードします。
    • ダウンロードしたファイルを SFTP サーバにアップロードします。SFTP サーバには send01 ユーザでログインし /in ディレクトリにアップロードします。ファイルはアップロードが成功したら send ディレクトリに、失敗したら error ディレクトリに移動します。
  • Spring Cloud Sleuth による zipkin 連携は今回も入れます。
  • リトライ処理に spring-retry を使用してみます。

参照したサイト・書籍

  1. Xlight ftp server
    https://www.xlightftpd.com/

    • Windows で利用可能な FTP/SFTP サーバです。「Personal edition is free for personal use.」とのことなので今回はこれを利用します。
    • Spring Integration の FTP/FTPS Adapters でダウンロード、アップロードしても特に問題は発生しませんでした。
  2. spring-integration-java-dsl/build.gradle
    https://github.com/spring-projects/spring-integration-java-dsl/blob/master/build.gradle

  3. javac - Java プログラミング言語コンパイラ
    http://docs.oracle.com/javase/jp/7/technotes/tools/solaris/javac.html

  4. Spring Integration Reference Manual - 15. FTP/FTPS Adapters http://docs.spring.io/spring-integration/reference/html/ftp.html

  5. Spring Integration Reference Manual - 27. SFTP Adapters http://docs.spring.io/spring-integration/reference/html/sftp.html

  6. Spring-Retry - シンプルで本質的なコードを汚さないリトライ処理フレームワーク http://tech.atware.co.jp/spring-retry/

  7. spring-projects/spring-retry
    https://github.com/spring-projects/spring-retry

目次

  1. Windows の FTP/SFTP サーバとして Xlight ftp server をインストール・設定する
    1. Xlight ftp server をインストールする
    2. FTP サーバと recv01 ユーザを作成して FTP サーバを起動する
    3. SFTP サーバと send01 ユーザを作成して SFTP サーバを起動する
  2. recv, send, error ディレクトリを作成する
  3. ksbysample-eipapp-ftp2sftp プロジェクトを作成する
  4. FTP サーバからファイルをダウンロードする処理を作成する
  5. SFTP サーバにファイルをアップロードする処理を作成する
  6. アップロード成功時には send ディレクリへ、失敗時には error ディレクトリへ移動する処理を作成する
  7. 動作確認

手順

WindowsFTP/SFTP サーバとして Xlight ftp server をインストール・設定する

Spring Boot + Spring Integration でいろいろ試してみる ( その1 )( SFTP でファイルアップロードするバッチを作成する )freeFTPd をインストールしましたが、Spring Integration の 15. FTP/FTPS Adapters でダウンロードすると ダウンロード中であることを示す ~.writing がファイルの末尾に付いたままダウンロードが完了しない問題がありました。

他に Windows で利用可能な FTP サーバを探したところ Xlight ftp server が personal use ならば free とのことなので、今回はこれをインストールして FTP/SFTP サーバとして使用します。

Xlight ftp server をインストールする

  1. ダウンロードのページ から「Setup with installer - 64-bit」の Download Link をクリックして setup-x64.exe をダウンロードします。

    f:id:ksby:20170125135603p:plain

  2. setup-x64.exe を実行します。

  3. 「Setup - Xlight FTP Server」ダイアログが表示されます。「Next >」ボタンをクリックします。

  4. 「License Agreement」画面が表示されます。「I accept the agreement」を選択した後、「Next >」ボタンをクリックします。

  5. 「Select Destination Location」画面が表示されます。インストール先を “C:\Xlight” へ変更した後、「Next >」ボタンをクリックします。

  6. 「Select Start Menu Folder」画面が表示されます。「Next >」ボタンをクリックします。

  7. 「Select Additional Tasks」画面が表示されます。「Create desktop icon」のチェックを外した後、「Next >」ボタンをクリックします。

  8. 「Ready to Install」画面が表示されます。「Install」ボタンをクリックします。

  9. インストールが実行されます。

  10. インストールが完了すると「Completing the Xlight FTP Server Setup Wizard」画面が表示されます。「Launch application」のチェックを外した後、「Finish」ボタンをクリックします。

FTP サーバと recv01 ユーザを作成して FTP サーバを起動する

  1. C:\Xlight\xlight.exe を実行します。

  2. 「Xlight FTP Server」画面が表示されます。「New Virtual Server」ボタンをクリックします。

    f:id:ksby:20170125141928p:plain

  3. 「New Virtual Server」ダイアログが表示されます。何も変更せずに「OK」ボタンをクリックします。

    f:id:ksby:20170125142438p:plain

  4. 元の画面に戻り FTP サーバが追加されていることが確認できます。次に「User List」ボタンをクリックします。

    f:id:ksby:20170125142647p:plain

  5. 「User List」ダイアログが表示されます。「Add」ボタンをクリックします。

    f:id:ksby:20170125143421p:plain

  6. ユーザ登録用のダイアログが表示されます。以下の値を入力した後、「OK」ボタンをクリックします。

    • 「Username」「Password」に “recv01” を入力します。
    • 「Home Directory」に “C:\Xlight\ftproot\recv01” を入力します。

    f:id:ksby:20170125144137p:plain

  7. 「User List」ダイアログに戻り recv01 ユーザが登録されていることが確認できます。recv01 を選択した後、「Edit」ボタンをクリックします。

    f:id:ksby:20170125144435p:plain

  8. 「Username: recv01」ダイアログが表示されます。画面左側の「User Path」を選択した後、画面右側のリストから Virtual Path = “/” のデータを選択して「Edit」ボタンをクリックします。

    f:id:ksby:20170125145016p:plain

  9. 「Virtual Path」ダイアログが表示されます。全ての操作を行えるようにしたいので、以下の画像の赤枠内のチェックボックスをチェックした後、「OK」ボタンをクリックします。

    f:id:ksby:20170125145521p:plain

    「Username: recv01」ダイアログに戻ると Permission に “-” の部分がなくなっています。「OK」ボタンをクリックしてダイアログを閉じます。

    f:id:ksby:20170125145846p:plain

  10. C:\Xlight\ftproot\recv01\out ディレクトリを作成します。

  11. 「Xlight FTP Server」画面に戻ったら Port = 21 のサーバを選択した後、「Start Server」ボタンをクリックしてサーバを起動します。起動すると「Status」に “Running” と表示されます。

    f:id:ksby:20170125153551p:plain

SFTP サーバと send01 ユーザを作成して SFTP サーバを起動する

基本的には FTP サーバの時と同じです。異なる部分のみ記載します。

  1. Virtual Server 作成時に「Protocol」を “SSH2” にします。Port も 22 に変わります。

    f:id:ksby:20170125165244p:plain

  2. 以下のユーザを作成します。ディレクトリの操作権限は recv01 の時と同様に全てチェックします。

    • 「Username」「Password」に “send01” を入力します。
    • 「Home Directory」に “C:\Xlight\sftproot\send01” を入力します。

    f:id:ksby:20170125170413p:plain

  3. C:\Xlight\sftproot\send01\in ディレクトリを作成します。

  4. 最後に「Xlight FTP Server」画面で Port = 22 のサーバを選択した後、「Start Server」ボタンをクリックしてサーバを起動します。起動すると「Status」に “Running” と表示されます。

    f:id:ksby:20170125171552p:plain

C:\Xlight のディレクトリ構成は以下のようになります。

C:\Xlight
├ ftproot
│ └ recv01
│    └ out
└ sftproot
   └ send01
      └ in

recv, send, error ディレクトリを作成する

常駐型アプリケーション用として以下の構成のディレクトリを作成します。

C:\eipapp\ksbysample-eipapp-ftp2sftp
├ error
├ recv
└ send

ksbysample-eipapp-ftp2sftp プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. src/main/java の下に ksbysample.eipapp.ftp2sftp パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/ftp2sftp の下に Application.java を作成し、リンク先の内容 を記述します。

  4. src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。

  5. src/main/resources の下に logback-spring.xml を作成し、リンク先の内容 を記述します。

FTP サーバからファイルをダウンロードする処理を作成する

  1. src/main/java/ksbysample/eipapp/ftp2sftp の下に ApplicationConfig.java を作成し、リンク先のその1の内容 を記述します。

  2. src/main/java/ksbysample/eipapp/ftp2sftp の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

SFTP サーバにファイルをアップロードする処理を作成する

  1. src/main/java/ksbysample/eipapp/ftp2sftp の下の ApplicationConfig.javaリンク先のその2の内容 に変更します。

  2. src/main/java/ksbysample/eipapp/ftp2sftp の下に SftpUploadMessageHandler.java を作成し、リンク先の内容 を記述します。

  3. src/main/java/ksbysample/eipapp/ftp2sftp の下の FlowConfig.javaリンク先のその2の内容 に変更します。

アップロード成功時には send ディレクリへ、失敗時には error ディレクトリへ移動する処理を作成する

  1. src/main/java/ksbysample/eipapp/ftp2sftp の下の FlowConfig.javaリンク先のその3の内容 に変更します。

動作確認

  1. clean タスク実行 → Rebuild Project 実行をした後、build タスクを実行して “BUILD SUCCESSFUL” のメッセージが表示されることを確認します。

    f:id:ksby:20170127004240p:plain

  2. テストでは Spring Boot + Spring Integration でいろいろ試してみる ( その10 )( URL一覧のファイルが置かれたらアクセス可能かチェックして結果ファイルに出力する ) で作成した 2014.txt, 2015.txt, 2016.txt を使用します。

  3. まずは FTP サーバ、SFTP サーバを起動して正常に処理される場合を確認します。ディレクトリにファイルがないことを確認してから bootRun タスクを実行します。

  4. C:\Xlight\ftproot\recv01\out に 2014.txt を入れます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなり、 C:\eipapp\ksbysample-eipapp-ftp2sftp\send と C:\Xlight\sftproot\send01\in に 2014.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127005635p:plain

  5. send, in のディレクトリのファイルを全て削除してから C:\Xlight\ftproot\recv01\out に 2014.txt, 2015.txt, 2016.txt の3ファイルを入れてみます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなり、C:\eipapp\ksbysample-eipapp-ftp2sftp\send と C:\Xlight\sftproot\send01\in に 2014.txt, 2015.txt, 2016.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127010005p:plain

    • 最初に 2014.txt, 2015.txt, 2016.txt の全てのファイルをダウンロードしています。
    • その後で1ファイルずつ Message 送信 → SFTP アップロード が行われています。
  6. 今度は SFTP サーバを停止してアップロードに失敗する場合を確認します。

  7. send, in のディレクトリのファイルを全て削除してから C:\Xlight\ftproot\recv01\out に 2014.txt を入れてみます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなり、C:\eipapp\ksbysample-eipapp-ftp2sftp\error に 2014.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127010946p:plain

    • リトライは4回 ( 初回と合わせて実行されたのは計5回 ) 行われています。

    Zipkin のグラフを見てもリトライしている状況は分かりませんでした。途中で途切れたりはせず、単に時間がかかっているので横に長いグラフが出るだけのようです。

    f:id:ksby:20170127011600p:plain

  8. error のディレクトリのファイルを全て削除してから C:\Xlight\ftproot\recv01\out に 2014.txt, 2015.txt, 2016.txt を入れてみます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなり、C:\eipapp\ksbysample-eipapp-ftp2sftp\error に 2014.txt, 2015.txt, 2016.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127011953p:plain

    • 並列処理にはしていないので、3ファイルダウンロードした後、1ファイルずつ Message 送信 → SFTP アップロードを5回リトライ → エラーとなり error ディレクトリへ移動 が行われています。
  9. 最後に SFTP サーバを停止した状態で out ディレクトリにファイルを置いた後、リトライの途中で SFTP サーバを起動して正常に処理される場合を確認します。

  10. error のディレクトリのファイルを全て削除してから C:\Xlight\ftproot\recv01\out に 2014.txt を入れてみます。少しすると C:\Xlight\ftproot\recv01\out からファイルがなくなりリトライが実行されますので、途中で SFTP サーバを起動します。更に少しすると C:\eipapp\ksbysample-eipapp-ftp2sftp\send と C:\Xlight\sftproot\send01\in に 2014.txt が作成されました。

    ログは以下のように出力されていました。

    f:id:ksby:20170127012807p:plain

    • リトライ中に SFTP サーバが起動すれば正常に処理が進みました。

長くなったので、メモ書きは次に書きます。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

[compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
    maven { url "http://repo.spring.io/repo/" }
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-ftp")
    compile("org.springframework.integration:spring-integration-sftp")
    compile("org.springframework.retry:spring-retry")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin") {
        exclude module: 'spring-boot-starter-web'
    }

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.2")
}
  • compileJava.options.compilerArgs 等の compileArgs の記述を spring-integration-java-dsl/build.gradle をまねて1行にして、-options,-processing を追加してみました。
  • Spring Integration の FTP/FTPS Adapters, SFTP Adapters を使用するので以下の2行を記述します。
    • compile("org.springframework.integration:spring-integration-ftp")
    • compile("org.springframework.integration:spring-integration-sftp")
  • 今回 spring-retry を使用するので以下の1行を記述します。
    • compile("org.springframework.retry:spring-retry")

Application.java

package ksbysample.eipapp.ftp2sftp;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.retry.annotation.EnableRetry;

@SpringBootApplication
@EnableRetry(proxyTargetClass = true)
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}
  • 今回は spring-retry を使用するので @EnableRetry(proxyTargetClass = true) を記述します。

application.properties

spring.application.name=ftp2sftp
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.percentage=1.0

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <springProperty scope="context" name="springAppName" source="spring.application.name"/>
    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
    <logger name="org.springframework.integration.expression.ExpressionUtils" level="ERROR"/>
    <logger name="com.jcraft.jsch" level="ERROR"/>
</configuration>

ApplicationConfig.java

■その1

package ksbysample.eipapp.ftp2sftp;

import org.apache.commons.net.ftp.FTPFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;

@Configuration
public class ApplicationConfig {

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
        factory.setHost("localhost");
        factory.setPort(21);
        factory.setUsername("recv01");
        factory.setPassword("recv01");
        return new CachingSessionFactory<>(factory);
    }

}

■その2

@Configuration
public class ApplicationConfig {

    ..........

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22);
        factory.setUser("send01");
        factory.setPassword("send01");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

}
  • @Bean public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() { ... } を追加します。

■完成形

package ksbysample.eipapp.ftp2sftp;

import com.jcraft.jsch.ChannelSftp;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;

@Configuration
public class ApplicationConfig {

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
        factory.setHost("localhost");
        factory.setPort(21);
        factory.setUsername("recv01");
        factory.setPassword("recv01");
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22);
        factory.setUser("send01");
        factory.setPassword("send01");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

}

FlowConfig.java

■その1

package ksbysample.eipapp.ftp2sftp;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.file.filters.AcceptAllFileListFilter;
import org.springframework.integration.file.filters.IgnoreHiddenFileListFilter;
import org.springframework.integration.file.remote.session.SessionFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

@Slf4j
@Configuration
public class FlowConfig {

    private final SessionFactory<FTPFile> ftpSessionFactory;

    public FlowConfig(SessionFactory<FTPFile> ftpSessionFactory) {
        this.ftpSessionFactory = ftpSessionFactory;
    }

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                .handle((p, h) -> {
                    // C:/eipapp/ksbysample-eipapp-ftp2sftp/recv にダウンロードされたファイルを削除しないと
                    // .localFilter(new AcceptAllFileListFilter<>()) を記述しているために毎回同じファイルが
                    // Message で流れてくるので、動作確認するためにここで削除する
                    try {
                        Files.delete(Paths.get(p.toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                })
                .get();
    }

}

■その2

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    private final SftpUploadMessageHandler sftpUploadMessageHandler;

    public FlowConfig(SessionFactory<FTPFile> ftpSessionFactory
            , SftpUploadMessageHandler sftpUploadMessageHandler) {
        this.ftpSessionFactory = ftpSessionFactory;
        this.sftpUploadMessageHandler = sftpUploadMessageHandler;
    }

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(...)
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))
                // SFTPサーバにファイルをアップロードする
                .handle(this.sftpUploadMessageHandler)
                .handle((p, h) -> {
                    // C:/eipapp/ksbysample-eipapp-ftp2sftp/recv にダウンロードされたファイルを削除しないと
                    // .localFilter(new AcceptAllFileListFilter<>()) を記述しているために毎回同じファイルが
                    // Message で流れてくるので、動作確認するためにここで削除する
                    try {
                        Files.delete(Paths.get(p.toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                })
                .get();
    }

}
  • .from(...) の次に以下の処理を追加します。
    • .enrichHeaders(h -> h.header("sftpUploadError", false))
    • .handle(this.sftpUploadMessageHandler)

■その3

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(...)
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))
                // SFTPサーバにファイルをアップロードする
                .handle(this.sftpUploadMessageHandler)
                // SFTP アップロードのエラーチェック用 header ( sftpUploadError ) をチェックし、
                // false ならば send ディレクトリへ、true ならば error ディレクトリへファイルを移動する
                .routeToRecipients(r -> r
                        .recipientFlow("headers['sftpUploadError'] == false"
                                , f -> f.handle((p, h) -> {
                                    try {
                                        Path src = Paths.get(((File) p).getAbsolutePath());
                                        Files.move(src
                                                , Paths.get("C:/eipapp/ksbysample-eipapp-ftp2sftp/send/" + src.getFileName())
                                                , StandardCopyOption.REPLACE_EXISTING);
                                        log.info("FTP --> SFTP 正常終了 ( {} )", src.toAbsolutePath());
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    return null;
                                }))
                        .recipientFlow("headers['sftpUploadError'] == true"
                                , f -> f.handle((p, h) -> {
                                    try {
                                        Path src = Paths.get(((File) p).getAbsolutePath());
                                        Files.move(src
                                                , Paths.get("C:/eipapp/ksbysample-eipapp-ftp2sftp/error/" + src.getFileName())
                                                , StandardCopyOption.REPLACE_EXISTING);
                                        log.info("FTP --> SFTP エラー ( {} )", src.toAbsolutePath());
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    return null;
                                })))
                .get();
    }

}
  • .routeToRecipients(...) の処理を追加します。

■完成形

package ksbysample.eipapp.ftp2sftp;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.file.filters.AcceptAllFileListFilter;
import org.springframework.integration.file.filters.IgnoreHiddenFileListFilter;
import org.springframework.integration.file.remote.session.SessionFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;

@Slf4j
@Configuration
public class FlowConfig {

    private final SessionFactory<FTPFile> ftpSessionFactory;

    private final SftpUploadMessageHandler sftpUploadMessageHandler;

    public FlowConfig(SessionFactory<FTPFile> ftpSessionFactory
            , SftpUploadMessageHandler sftpUploadMessageHandler) {
        this.ftpSessionFactory = ftpSessionFactory;
        this.sftpUploadMessageHandler = sftpUploadMessageHandler;
    }

    @Bean
    public IntegrationFlow ftp2SftpFlow() {
        return IntegrationFlows
                // FTP サーバを5秒間隔でチェックしてファイルがあればダウンロードする
                .from(s -> s.ftp(this.ftpSessionFactory)
                                // ファイルの更新日時を保持する(はず)。ただし Windows 上の FTPサーバ(Xlight ftp server)
                                // --> Windows のローカルディスクにダウンロードした時は保持されたのは更新日だけで
                                // 更新時刻は 0:00 だった。
                                .preserveTimestamp(true)
                                // ダウンロードしたら FTPサーバからファイルを削除する
                                .deleteRemoteFiles(true)
                                .remoteDirectory("/out")
                                .localDirectory(new File("C:/eipapp/ksbysample-eipapp-ftp2sftp/recv"))
                                // .localFilter(new AcceptAllFileListFilter<>()) を指定しないと1度ダウンロードされた
                                // ファイルは次の処理に渡されない
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                // FTP サーバに存在するファイルは1度で全てダウンロードされるが、次の処理に Message
                                // として渡すのは .maxMessagesPerPoll(100) で指定された数だけである
                                .maxMessagesPerPoll(100)))
                // SFTP アップロードのエラーチェック用 header を追加する
                .enrichHeaders(h -> h.header("sftpUploadError", false))
                // SFTPサーバにファイルをアップロードする
                .handle(this.sftpUploadMessageHandler)
                // SFTP アップロードのエラーチェック用 header ( sftpUploadError ) をチェックし、
                // false ならば send ディレクトリへ、true ならば error ディレクトリへファイルを移動する
                .routeToRecipients(r -> r
                        .recipientFlow("headers['sftpUploadError'] == false"
                                , f -> f.handle((p, h) -> {
                                    try {
                                        Path src = Paths.get(((File) p).getAbsolutePath());
                                        Files.move(src
                                                , Paths.get("C:/eipapp/ksbysample-eipapp-ftp2sftp/send/" + src.getFileName())
                                                , StandardCopyOption.REPLACE_EXISTING);
                                        log.info("FTP --> SFTP 正常終了 ( {} )", src.toAbsolutePath());
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    return null;
                                }))
                        .recipientFlow("headers['sftpUploadError'] == true"
                                , f -> f.handle((p, h) -> {
                                    try {
                                        Path src = Paths.get(((File) p).getAbsolutePath());
                                        Files.move(src
                                                , Paths.get("C:/eipapp/ksbysample-eipapp-ftp2sftp/error/" + src.getFileName())
                                                , StandardCopyOption.REPLACE_EXISTING);
                                        log.info("FTP --> SFTP エラー ( {} )", src.toAbsolutePath());
                                    } catch (IOException e) {
                                        throw new RuntimeException(e);
                                    }
                                    return null;
                                })))
                .get();
    }

}

SftpUploadMessageHandler.java

package ksbysample.eipapp.ftp2sftp;

import com.jcraft.jsch.ChannelSftp;
import lombok.extern.slf4j.Slf4j;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.retry.RetryContext;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.retry.support.RetrySynchronizationManager;

import java.io.File;
import java.util.Map;

@Slf4j
@MessageEndpoint
public class SftpUploadMessageHandler implements GenericHandler<File> {

    private final SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;

    public SftpUploadMessageHandler(SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory) {
        this.sftpSessionFactory = sftpSessionFactory;
    }

    @Override
    @Retryable(value = {Exception.class}, maxAttempts = 5, backoff = @Backoff(delay = 10000))
    public Object handle(File payload, Map<String, Object> headers) {
        // リトライした場合にはリトライ回数をログに出力する
        RetryContext retryContext = RetrySynchronizationManager.getContext();
        if (retryContext.getRetryCount() > 0) {
            log.info("リトライ回数 = {}", retryContext.getRetryCount());
        }

        SftpRemoteFileTemplate sftpClient = new SftpRemoteFileTemplate(sftpSessionFactory);
        sftpClient.setRemoteDirectoryExpression(new LiteralExpression("/in"));
        sftpClient.send(MessageBuilder.withPayload(payload).build(), FileExistsMode.REPLACE);
        return payload;
    }

    @Recover
    public Object recoverException(Exception e, File payload, Map<String, Object> headers) {
        log.error("SFTPサーバにアップロードできませんでした ( {} )", payload.getAbsolutePath());
        return MessageBuilder.withPayload(payload)
                .setHeader("sftpUploadError", true)
                .build();
    }

}

履歴

2017/01/27
初版発行。

Spring Boot + Spring Integration でいろいろ試してみる ( その11 )( Spring Cloud Sleuth を使用して処理状況を Zipkin で表示する )

概要

記事一覧はこちらです。

参照したサイト・書籍

  1. Zipkin
    http://zipkin.io/

  2. Tracing Spring Integration Flow With Spring Cloud Sleuth
    https://dzone.com/articles/tracing-spring-integration-flow-with-spring-cloud

  3. LINE Engineers' Blog - LINEのマイクロサービス環境における分散トレーシング
    http://developers.linecorp.com/blog/ja/?p=3392

  4. Spring CloudとZipkinを利用した分散トレーシング
    http://www.slideshare.net/rakutentech/spring-cloudzipkin

  5. Spring Cloud Sleuth
    http://cloud.spring.io/spring-cloud-static/spring-cloud-sleuth/1.1.1.RELEASE/

  6. Spring Cloud
    http://projects.spring.io/spring-cloud/

  7. The logback manual - Chapter 6: Layouts
    http://logback.qos.ch/manual/layouts.html

目次

  1. org.springframework.cloud:spring-cloud-dependencies の BOM で適用される spring-cloud-sleuth と spring-boot のバージョンは?
  2. Zipkin サーバを起動して Spring Integration の処理状況を表示する
    1. Zipkin の jar ファイルをダウンロードして起動する
    2. build.gradle を変更する
    3. application.properties を作成して必要な設定を記述する
    4. logback-spring.xml でログのレイアウトを変更する
    5. 動作確認
  3. Zipkin 上に表示される channel は Spring Integration DSL で書いたどの処理に該当するのか?
  4. urlCheckChannel, writeFileChannel, deleteFileChannel を全て QueueChannel に変更しても traceId は同じになるのか?
  5. 最後に

手順

org.springframework.cloud:spring-cloud-dependencies の BOM で適用される spring-cloud-sleuth と spring-boot のバージョンは?

Spring Cloud のページを見ると Spring Cloud を利用する時は Spring IO Platform とは別の BOM を利用するようです。Spring IO Platform では io.spring.platform:platform-bom:Athens-SR2 ですが、Spring Cloud の場合には org.springframework.cloud:spring-cloud-dependencies:Camden.SR4 と書かれています。

ただし Spring Cloud のページと Spring Cloud Sleuth のページで書かれている BOM が異なっており、Spring Cloud のページでは org.springframework.cloud:spring-cloud-dependencies:Camden.SR4Spring Cloud Sleuth のページでは org.springframework.cloud:spring-cloud-dependencies:Brixton.RELEASE でした。

Spring Cloud のページに BOM と適用されるバージョンが記述されています。常駐型アプリケーションでは Spring Boot の 1.4.3.RELEASE を使用していて Camden.SR4 では1つ古い 1.4.2.RELEASE が記述されていますが、同じ 1.4 系なので Camden.SR4 を使用して試してみることにします。

BOM spring-cloud-sleuth spring-boot
Camden.SR4 1.1.1.RELEASE 1.4.2.RELEASE
Brixton.SR7 1.0.11.RELEASE 1.3.8.RELEASE

。。。と書きましたが、BOM の末尾は Camden.SR4 ではなく Camden.RELEASE のように .RELEASE と書かないと正常に動作しません。Camden.SR4 と書くと Zipkin のグラフがなぜか左揃えで表示されますし、数百 ms で終了するはずの処理がなぜか 4 秒もかかるようになりました。

f:id:ksby:20170120002738p:plain

ちなみに Camden.RELEASE と書くと spring-cloud-sleuth は 1.0.9.RELEASE が使用されます。今回は以下のバージョンになります。

BOM spring-cloud-sleuth spring-boot
Camden.RELEASE 1.0.9.RELEASE 1.4.3.RELEASE

Zipkin サーバを起動して Spring Integration の処理状況を表示する

Zipkin の jar ファイルをダウンロードして起動する

  1. Zipkin のページにアクセスします。ページ左側の「Quickstart」リンクをクリックした後、「Java」のところの「latest release」リンクをクリックして zipkin-server-1.19.2-exec.jar をダウンロードします。

    f:id:ksby:20170118180355p:plain

  2. C:\zipkin ディレクトリを作成し、その下に zipkin-server-1.19.2-exec.jar を保存します。

  3. コマンドプロンプトを起動し、java -jar zipkin-server-1.19.2-exec.jar コマンドを実行して Zipkin サーバを起動します。

    f:id:ksby:20170118182420p:plain

    Zipkin って Sprint Boot で作られているんですね。

    f:id:ksby:20170118182615p:plain

    Started ZipkinServer in ... のメッセージが出たら起動完了です。

  4. ブラウザで http://localhost:9411/ にアクセスすると Zipkin の画面が表示されました。

    f:id:ksby:20170118183151p:plain

build.gradle を変更する

IntelliJ IDEA で ksbysample-eipapp-urlchecker プロジェクトを開いた後、build.gradle を リンク先のその1の内容 に変更します。変更後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。

gradlew dependencies で依存関係を確認してみます。

compile - Dependencies for source set 'main'.
+--- org.springframework.boot:spring-boot-starter-integration: -> 1.4.3.RELEASE
|    +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE
|    |    +--- org.springframework.boot:spring-boot:1.4.3.RELEASE
|    |    |    +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |    \--- org.springframework:spring-context:4.3.5.RELEASE
|    |    |         +--- org.springframework:spring-aop:4.3.5.RELEASE
|    |    |         |    +--- org.springframework:spring-beans:4.3.5.RELEASE
|    |    |         |    |    \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |         |    \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |         +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|    |    |         +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |         \--- org.springframework:spring-expression:4.3.5.RELEASE
|    |    |              \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    +--- org.springframework.boot:spring-boot-autoconfigure:1.4.3.RELEASE
|    |    |    \--- org.springframework.boot:spring-boot:1.4.3.RELEASE (*)
|    |    +--- org.springframework.boot:spring-boot-starter-logging:1.4.3.RELEASE
|    |    |    +--- ch.qos.logback:logback-classic:1.1.8
|    |    |    |    +--- ch.qos.logback:logback-core:1.1.8
|    |    |    |    \--- org.slf4j:slf4j-api:1.7.21 -> 1.7.22
|    |    |    +--- org.slf4j:jcl-over-slf4j:1.7.22
|    |    |    |    \--- org.slf4j:slf4j-api:1.7.22
|    |    |    +--- org.slf4j:jul-to-slf4j:1.7.22
|    |    |    |    \--- org.slf4j:slf4j-api:1.7.22
|    |    |    \--- org.slf4j:log4j-over-slf4j:1.7.22
|    |    |         \--- org.slf4j:slf4j-api:1.7.22
|    |    +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    \--- org.yaml:snakeyaml:1.17
|    +--- org.springframework.boot:spring-boot-starter-aop:1.4.3.RELEASE
|    |    +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE (*)
|    |    +--- org.springframework:spring-aop:4.3.5.RELEASE (*)
|    |    \--- org.aspectj:aspectjweaver:1.8.9
|    +--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE
|    |    +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    +--- org.springframework:spring-aop:4.3.5.RELEASE (*)
|    |    +--- org.springframework:spring-context:4.3.5.RELEASE (*)
|    |    +--- org.springframework:spring-messaging:4.3.5.RELEASE
|    |    |    +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|    |    |    +--- org.springframework:spring-context:4.3.5.RELEASE (*)
|    |    |    \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    +--- org.springframework:spring-tx:4.3.5.RELEASE
|    |    |    +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|    |    |    \--- org.springframework:spring-core:4.3.5.RELEASE
|    |    \--- org.springframework.retry:spring-retry:1.1.3.RELEASE -> 1.1.5.RELEASE
|    +--- org.springframework.integration:spring-integration-java-dsl:1.1.4.RELEASE -> 1.2.1.RELEASE
|    |    +--- org.springframework.integration:spring-integration-core:4.3.5.RELEASE -> 4.3.6.RELEASE (*)
|    |    \--- org.reactivestreams:reactive-streams:1.0.0
|    \--- org.springframework.integration:spring-integration-jmx:4.3.6.RELEASE
|         \--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE (*)
+--- org.springframework.integration:spring-integration-file: -> 4.3.6.RELEASE
|    +--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE (*)
|    \--- commons-io:commons-io:2.4
+--- org.springframework.integration:spring-integration-http: -> 4.3.6.RELEASE
|    +--- org.springframework.integration:spring-integration-core:4.3.6.RELEASE (*)
|    \--- org.springframework:spring-webmvc:4.3.5.RELEASE
|         +--- org.springframework:spring-aop:4.3.5.RELEASE (*)
|         +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|         +--- org.springframework:spring-context:4.3.5.RELEASE (*)
|         +--- org.springframework:spring-core:4.3.5.RELEASE
|         +--- org.springframework:spring-expression:4.3.5.RELEASE (*)
|         \--- org.springframework:spring-web:4.3.5.RELEASE
|              +--- org.springframework:spring-aop:4.3.5.RELEASE (*)
|              +--- org.springframework:spring-beans:4.3.5.RELEASE (*)
|              +--- org.springframework:spring-context:4.3.5.RELEASE (*)
|              \--- org.springframework:spring-core:4.3.5.RELEASE
+--- org.codehaus.janino:janino: -> 2.7.8
|    \--- org.codehaus.janino:commons-compiler:2.7.8
+--- org.springframework.cloud:spring-cloud-starter-zipkin: -> 1.0.9.RELEASE
|    +--- org.springframework.cloud:spring-cloud-starter-sleuth:1.0.9.RELEASE
|    |    +--- org.springframework.cloud:spring-cloud-starter:1.1.3.RELEASE
|    |    |    +--- org.springframework.boot:spring-boot-starter:1.3.7.RELEASE -> 1.4.3.RELEASE (*)
|    |    |    +--- org.springframework.cloud:spring-cloud-context:1.1.3.RELEASE
|    |    |    |    \--- org.springframework.security:spring-security-crypto:4.0.4.RELEASE -> 4.1.4.RELEASE
|    |    |    +--- org.springframework.cloud:spring-cloud-commons:1.1.3.RELEASE
|    |    |    |    \--- org.springframework.security:spring-security-crypto:4.0.4.RELEASE -> 4.1.4.RELEASE
|    |    |    \--- org.springframework.security:spring-security-rsa:1.0.3.RELEASE
|    |    |         \--- org.bouncycastle:bcpkix-jdk15on:1.55 -> 1.54
|    |    |              \--- org.bouncycastle:bcprov-jdk15on:1.54
|    |    +--- org.springframework.boot:spring-boot-starter-web:1.3.7.RELEASE -> 1.4.3.RELEASE
|    |    |    +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE (*)
|    |    |    +--- org.springframework.boot:spring-boot-starter-tomcat:1.4.3.RELEASE
|    |    |    |    +--- org.apache.tomcat.embed:tomcat-embed-core:8.5.6
|    |    |    |    +--- org.apache.tomcat.embed:tomcat-embed-el:8.5.6
|    |    |    |    \--- org.apache.tomcat.embed:tomcat-embed-websocket:8.5.6
|    |    |    |         \--- org.apache.tomcat.embed:tomcat-embed-core:8.5.6
|    |    |    +--- org.hibernate:hibernate-validator:5.2.4.Final
|    |    |    |    +--- javax.validation:validation-api:1.1.0.Final
|    |    |    |    +--- org.jboss.logging:jboss-logging:3.2.1.Final -> 3.3.0.Final
|    |    |    |    \--- com.fasterxml:classmate:1.1.0 -> 1.3.3
|    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.8.5
|    |    |    |    +--- com.fasterxml.jackson.core:jackson-annotations:2.8.0 -> 2.8.5
|    |    |    |    \--- com.fasterxml.jackson.core:jackson-core:2.8.5
|    |    |    +--- org.springframework:spring-web:4.3.5.RELEASE (*)
|    |    |    \--- org.springframework:spring-webmvc:4.3.5.RELEASE (*)
|    |    +--- org.springframework.boot:spring-boot-starter-actuator:1.3.7.RELEASE -> 1.4.3.RELEASE
|    |    |    +--- org.springframework.boot:spring-boot-starter:1.4.3.RELEASE (*)
|    |    |    \--- org.springframework.boot:spring-boot-actuator:1.4.3.RELEASE
|    |    |         +--- org.springframework.boot:spring-boot:1.4.3.RELEASE (*)
|    |    |         +--- org.springframework.boot:spring-boot-autoconfigure:1.4.3.RELEASE (*)
|    |    |         +--- com.fasterxml.jackson.core:jackson-databind:2.8.5 (*)
|    |    |         +--- org.springframework:spring-core:4.3.5.RELEASE
|    |    |         \--- org.springframework:spring-context:4.3.5.RELEASE (*)
|    |    +--- org.springframework.boot:spring-boot-starter-aop:1.3.7.RELEASE -> 1.4.3.RELEASE (*)
|    |    \--- org.springframework.cloud:spring-cloud-sleuth-core:1.0.9.RELEASE
|    |         +--- org.springframework:spring-context:4.2.7.RELEASE -> 4.3.5.RELEASE (*)
|    |         \--- org.aspectj:aspectjrt:1.8.9
|    \--- org.springframework.cloud:spring-cloud-sleuth-zipkin:1.0.9.RELEASE
|         +--- org.springframework.cloud:spring-cloud-sleuth-core:1.0.9.RELEASE (*)
|         +--- io.zipkin.java:zipkin:1.11.1
|         +--- io.zipkin.reporter:zipkin-reporter:0.5.0
|         |    \--- io.zipkin.java:zipkin:1.11.1
|         \--- io.zipkin.reporter:zipkin-sender-urlconnection:0.5.0
|              +--- io.zipkin.reporter:zipkin-reporter:0.5.0 (*)
|              \--- io.zipkin.java:zipkin:1.11.1
+--- org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE (*)
\--- org.projectlombok:lombok:1.16.12

Camden の Spring Boot のバージョンは 1.4 系なのかと思ったら org.springframework.boot:spring-boot-starter:1.3.7.RELEASE -> 1.4.3.RELEASE (*) と 1.3 系が表示されますね ( 1.4 系に変更されていますが )。

また org.springframework.boot:spring-boot-starter-web が入っています。これがあると Tomcat が起動してしまうので、依存関係から除外します。build.gradle を リンク先のその2の内容 に変更した後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。

application.properties を作成して必要な設定を記述する

  1. src/main/resources の下に application.properties を新規作成した後、リンク先の内容 を記述します。

logback-spring.xml でログのレイアウトを変更する

  1. src/main/resources の下の logback-spring.xmlリンク先の内容 に変更します。この変更を行うことで、ログに “boostrap” ではなく application.properties の spring.application.name に設定した文字列が出力されるようになります。

    f:id:ksby:20170118203648p:plain

    f:id:ksby:20170118204456p:plain

動作確認

  1. bootRun タスクを実行し、アプリケーションを起動します。

  2. in フォルダに 2014.txt を置きます。処理が実行されてコンソールにログが出力されます。"urlchecker" の次が traceId で1回の処理で全て同じ値が出力されています。

    f:id:ksby:20170120010113p:plain

    Zipkin の画面には今回のデータが表示され、

    f:id:ksby:20170120010315p:plain

    クリックすると詳細な状況が表示されます。並列で処理されていることが分かります。

    f:id:ksby:20170120010524p:plain

    その中のバーの1つをクリックすると payload の情報や traceId, spanId が掲載されたダイアログが表示されます。

    f:id:ksby:20170120010655p:plain

  3. Zipkin のグラフを見た感想ですが、

    • 内部的に urllistfilepollerflow.channel#1urllistfilepollerflow.channel#2 といった channel が作成されていることに初めて気づきました。でもどれが何の処理なのか全然分かりませんね。。。
    • IntegrationFlows の .from(...).handle(...) 等のメソッド間のデータはどうも chennel 経由でやり取りされているようです。

Zipkin 上に表示される channel は Spring Integration DSL で書いたどの処理に該当するのか?

起動時のログに channel と割り当てられている処理が出力されていますので、それを見れば分かります。

f:id:ksby:20170120012800p:plain

例えば urllistfilepollerflow.channel#1urllistfilepollerflow.channel#2 は以下の処理が該当します。

channel 処理
urllistfilepollerflow.channel#1 .split(new FileSplitter())
urlListFilePollerFlow.channel#2 .headerFilter(MESSAGE_HEADER_SEQUENCE_SIZE, false)

urlCheckChannel, writeFileChannel, deleteFileChannel を全て QueueChannel に変更しても traceId は同じになるのか?

  1. DirectChannel ではなく QueueChannel でデータをやり取りするよう src/main/java/ksbysample/eipapp/urlchecker の下の FlowConfig.javaリンク先の内容 に変更します。

  2. bootRun タスクを実行し、アプリケーションを起動します。

  3. in フォルダに 2014.txt を置きます。処理が実行されてコンソールにログが出力されます。traceId は QueueChannel の時でも同じ値でした。

    f:id:ksby:20170120021319p:plain

    グラフは少し遅延が途中に入る程度のように見えます。

    f:id:ksby:20170120021543p:plain

Message の header を見ると traceId 等の情報が埋め込まれています。

GenericMessage [
payload=http://ksby.hatenablog.com/entry/2014/12/27/233427
, headers={
    sequenceNumber=1
    , file_name=2014.txt
    , sequenceSize=2
    , X-B3-ParentSpanId=553e9dfb0d0b90b3
    , X-Message-Sent=true
    , messageSent=true
    , file_originalFile=C:\eipapp\ksbysample-eipapp-urlchecker\in\2014.txt
    , spanName=message:urlListFilePollerFlow.channel#3
    , lines.size=2
    , spanTraceId=13b92684b6371460
    , spanId=4f65cf944c245ba
    , spanParentSpanId=553e9dfb0d0b90b3
    , X-Span-Name=message:urlListFilePollerFlow.channel#3
    , X-B3-SpanId=4f65cf944c245ba
    , currentSpan=[
        Trace: 13b92684b6371460
        , Span: 4f65cf944c245ba
        , Parent: 553e9dfb0d0b90b3
        , exportable:true
    ]
    , X-B3-Sampled=1
    , X-B3-TraceId=13b92684b6371460
    , correlationId=43f57730-e42f-74a8-dcbe-e28d549cd0f3
    , id=5db6324f-313f-df80-4d5b-61b93ef48096
    , X-Current-Span=[
        Trace: 13b92684b6371460
        , Span: 4f65cf944c245ba
        , Parent: 553e9dfb0d0b90b3
        , exportable:true
    ]
    , spanSampled=1
    , timestamp=1484848137184
}]

最後に

並列処理の状況がこんな簡単に可視化できるとは、Spring Cloud Sleuth+Zipkin いいですね! Spring Cloud で分散処理させている時に可視化するのに便利なソフトウェアとは聞いていましたが、Spring Integration の処理状況も表示させることができるとは意外でした。

ソースコード

build.gradle

■その1

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-file")
    compile("org.springframework.integration:spring-integration-http")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.1")
}
  • dependencyManagement に mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE' を追加します。
  • dependencies に compile("org.springframework.cloud:spring-cloud-starter-zipkin") を追加します。

■その2

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin") {
        exclude module: 'spring-boot-starter-web'
    }
  • dependencies の compile("org.springframework.cloud:spring-cloud-starter-zipkin") の後に { exclude module: "spring-boot-starter-web" } を追加します。

application.properties

spring.application.name=urlchecker
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.percentage=1.0
  • 上の3行を追加します。spring.application.name に設定した文字列が Zipkin の画面に表示されます。

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <springProperty scope="context" name="springAppName" source="spring.application.name"/>
    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
</configuration>
  • <springProperty scope="context" name="springAppName" source="spring.application.name"/> を追加します。
  • <property name="CONSOLE_LOG_PATTERN" value="..."/> を追加します。JSON Logback with Logstash の「Logback setup」には %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr([${springAppName:-},... と書かれているのですが、これだと traceId が2ヶ所出力されてしまうので %clr(${LOG_LEVEL_PATTERN:-%5p})%clr(${level:-%5p}) へ変更しています。また左側に出力される情報が多く、ログの右側の文字列が見えなくなるため ,%X{X-B3-ParentSpanId:-} は削除しました。

FlowConfig.java

package ksbysample.eipapp.urlchecker;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Slf4j
@Configuration
public class FlowConfig {

    private final static String URLLISTFILE_IN_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/in";
    private final static String URLLISTFILE_EXT_PATTERN = "*.txt";
    private final static String RESULTFILE_OUT_DIR = "C:/eipapp/ksbysample-eipapp-urlchecker/out";

    private final static String MESSAGE_HEADER_LINES_SIZE = "lines.size";
    private final static String MESSAGE_HEADER_SEQUENCE_SIZE = "sequenceSize";
    private final static String MESSAGE_HEADER_HTTP_STATUS = "httpStatus";
    private final static String MESSAGE_HEADER_FILE_NAME = "file_name";
    private final static String MESSAGE_HEADER_FILE_ORIGINALFILE = "file_originalFile";

    @Bean
    public MessageChannel urlCheckChannel() {
        return new QueueChannel();
    }

    @Bean
    public MessageChannel writeFileChannel() {
        return new QueueChannel();
    }

    @Bean
    public MessageChannel deleteFileChannel() {
        return new QueueChannel();
    }

    @Bean
    public Executor taskExecutor() {
        return Executors.newCachedThreadPool();
    }

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    @Bean
    public IntegrationFlow urlListFilePollerFlow() {
        return IntegrationFlows
                // in ディレクトリに拡張子が .txt のファイルが存在するか 1秒間隔でチェックする
                .from(s -> s.file(new File(URLLISTFILE_IN_DIR)).patternFilter(URLLISTFILE_EXT_PATTERN)
                        , c -> c.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して .enrichHeaders 以降の処理を .from のファイルチェック処理とは別のスレッドで実行する
                .channel(c -> c.executor(taskExecutor()))
                // 見つかったファイルの行数をカウントし、Message の header に "lines.size" というキーでセットする
                // この .enrichHeaders から .enrichHeaders までの処理は writeFileFlow の .resequence 及び .aggregate
                // の処理のために "sequenceSize" header に正しい行数をセットするためのものである
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_LINES_SIZE
                        , m -> {
                            List<String> lines;
                            try {
                                lines = Files.readAllLines(Paths.get(m.getPayload().toString()), StandardCharsets.UTF_8);
                            } catch (IOException e) {
                                throw new RuntimeException(e);
                            }
                            return lines.size();
                        }))
                // FileSplitter クラスを利用して、ファイルを行毎に分解する
                .split(new FileSplitter())
                // スレッドを生成して .headerFilter 以降の処理を更に別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の header から "sequenceSize" というキーの header を削除する
                .headerFilter(MESSAGE_HEADER_SEQUENCE_SIZE, false)
                // Message の header に "sequenceSize" というキーの header を追加し、"originalSequenceSize"
                // というキーの header の値をセットする
                .enrichHeaders(h -> h.headerFunction(MESSAGE_HEADER_SEQUENCE_SIZE
                        , m -> m.getHeaders().get(MESSAGE_HEADER_LINES_SIZE)))
                // Message の内容をログに出力する
                .log()
                // Message の payload に格納された URL 文字列の値をチェックし、"http://" から始まる場合には urlCheckChannel へ、
                // そうでない場合には writeFileChannel へ Message を送信する
                .<String, Boolean>route(p -> p.startsWith("http://")
                        , r -> r.subFlowMapping(true, sf -> sf.channel(urlCheckChannel()))
                                .subFlowMapping(false, sf -> sf.channel(writeFileChannel())))
                .get();
    }

    @Bean
    public IntegrationFlow urlCheckFlow() {
        return IntegrationFlows.from(urlCheckChannel())
                .handle((GenericHandler<Object>) (p, h) -> {
                    return p;
                }, e -> e.poller(Pollers.fixedDelay(1000)))
                // スレッドを生成して、以降の処理を別のスレッドで並行処理する
                .channel(c -> c.executor(taskExecutor()))
                // Message の payload に格納された URL にアクセスし、HTTP ステータスコードを取得する
                // 取得した HTTP ステータスコードは Message の header に "httpStatus" というキーでセットする
                .handle((p, h) -> {
                    String statusCode;
                    try {
                        ResponseEntity<String> response = restTemplate().getForEntity(p.toString(), String.class);
                        statusCode = response.getStatusCode().toString();
                    } catch (HttpClientErrorException e) {
                        statusCode = e.getStatusCode().toString();
                    } catch (Exception e) {
                        statusCode = e.getMessage();
                    }
                    log.info(statusCode + " : " + p.toString());
                    return MessageBuilder.withPayload(p)
                            .setHeader(MESSAGE_HEADER_HTTP_STATUS, statusCode)
                            .build();
                })
                // writeFileChannel へ Message を送信する
                .channel(writeFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow writeFileFlow() {
        return IntegrationFlows.from(writeFileChannel())
                // Message の payload のデータを URL だけから URL,HTTPステータスコード に変更する
                .handle((GenericHandler<Object>) (p, h) -> MessageBuilder.withPayload(p + "," + h.get(MESSAGE_HEADER_HTTP_STATUS)).build()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // Message が流れる順番をファイルに書かれている順番にする
                // スレッドを生成して並行処理させていたため、.resequence() を呼ぶ前は順不同になっている
                .resequence()
                .log()
                // 1つの URL につき 1つの Message 、かつ複数 Message になっているのを、
                // 1つの List に集約して 1 Message に変更する
                .aggregate()
                .log()
                // out ディレクトリに結果ファイルを出力する
                // 結果ファイルには URL と HTTP ステータスコード を出力する
                .handle((p, h) -> {
                    Path outPath = Paths.get(RESULTFILE_OUT_DIR, h.get(MESSAGE_HEADER_FILE_NAME).toString());
                    @SuppressWarnings("unchecked")
                    List<String> lines = (List<String>) p;
                    try {
                        Files.write(outPath, lines, StandardCharsets.UTF_8
                                , StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return p;
                })
                // deleteFileChannel へ Message を送信する
                .channel(deleteFileChannel())
                .get();
    }

    @Bean
    public IntegrationFlow deleteFileFlow() {
        return IntegrationFlows.from(deleteFileChannel())
                // in ディレクトリのファイルを削除する
                .handle((GenericHandler<Object>) (p, h) -> {
                    try {
                        Files.delete(Paths.get(h.get(MESSAGE_HEADER_FILE_ORIGINALFILE).toString()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    // ここで処理が終了するので null を返す
                    return null;
                }, e -> e.poller(Pollers.fixedDelay(1000)))
                .get();
    }

}
  • urlCheckChannel Bean, writeFileChannel Bean, deleteFileChannel Bean を全て return new DirectChannel();return new QueueChannel(); へ変更します。
  • urlCheckFlow メソッドの .from(urlCheckChannel()) の後に .handle((GenericHandler<Object>) (p, h) -> { return p; }, e -> e.poller(Pollers.fixedDelay(1000))) を追加します。
  • writeFileFlow メソッドの最初の .handle(...) の第2引数に , e -> e.poller(Pollers.fixedDelay(1000)) を追加します。
  • deleteFileFlow メソッドの最初の .handle(...) の第2引数に , e -> e.poller(Pollers.fixedDelay(1000)) を追加します。

履歴

2017/01/20
初版発行。