かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その14 )( delayer のサンプルを作ってみる )

概要

記事一覧はこちらです。

  • Spring Integration DSL8.6 Delayer を使用したサンプルを作成します。
  • Delayer は指定した時間 Message を待機させる機能です。
  • RequestHandlerAdvice、.bridge(…)、MessageStore も初めて使っていますが今回説明は入れていません。RequestHandlerAdvice、MessageStore は慣れておきたいので、その内調べてまとめておきたいですね。。。

参照したサイト・書籍

  1. Spring Integration Reference Manual - 8.6 Delayer
    http://docs.spring.io/spring-integration/reference/htmlsingle/#delayer

  2. handling file backup/archive on success and retry on failure
    http://stackoverflow.com/questions/24016152/handling-file-backup-archive-on-success-and-retry-on-failure

    • ExpressionEvaluatingRequestHandlerAdvice の使い方の参考にしました。

目次

  1. ksbysample-eipapp-delayer プロジェクトを作成する
  2. C:\eipapp\ksbysample-eipapp-delayer ディレクトリを作成する
  3. まずはシンプルに時間を指定して待機させてみる
  4. Message の header, payload に指定した時間で待機させてみる
  5. 待機する時間を動的に変えてみる
  6. .delay(...) の第1引数には GROUP_ID を指定するが、何かを MessageGroupStore に保存するのか?

手順

ksbysample-eipapp-delayer プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. src/main/java の下に ksbysample.eipapp.delayer パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/delayer の下に Application.java を作成し、リンク先の内容 を記述します。

C:\eipapp\ksbysample-eipapp-delayer ディレクトリを作成する

以下の構成のディレクトリを作成します。

C:\eipapp\ksbysample-eipapp-delayer
├ in01
├ in02
├ in03
└ in04

処理開始用のファイルには Spring Boot + Spring Integration でいろいろ試してみる ( その10 )( URL一覧のファイルが置かれたらアクセス可能かチェックして結果ファイルに出力する ) で作成した 2014.txt と、必要に応じて作成したもの使用します。

まずはシンプルに時間を指定して待機させてみる

  1. src/main/java/ksbysample/eipapp/delayer の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

  2. bootRun タスクを実行した後、C:\eipapp\ksbysample-eipapp-delayer\in01 に 2014.txt を置きます。

    以下のログが出力されて、実装通り3秒待機→6秒待機していることが確認できます。

    f:id:ksby:20170129023442p:plain

  3. bootRun タスクを停止します。

Message の header, payload に指定した時間で待機させてみる

  1. src/main/java/ksbysample/eipapp/delayer の下の FlowConfig.javaリンク先のその2の内容 に変更します。

  2. 動作確認用に、中が空の empty.txt と、中に “10000” と記述した 10000.txt を作成します。

  3. bootRun タスクを実行した後、最初に C:\eipapp\ksbysample-eipapp-delayer\in02 に empty.txt を置きます。

    以下のログが出力されて、(DelayerEndpointSpec e) -> e.defaultDelay(...) で指定されているデフォルトの待機秒数が使用されて、8秒待機→2秒待機していることが確認できます。

    f:id:ksby:20170129024605p:plain

  4. 次に C:\eipapp\ksbysample-eipapp-delayer\in02 に 10000.txt を置きます。

    以下のログが出力されて、ファイルに記述した待機秒数が使用されて、10秒待機→10秒待機していることが確認できます。

    f:id:ksby:20170129025132p:plain

  5. bootRun タスクを停止します。

待機する時間を動的に変えてみる

Message の headers にセットされた delayInit, delayCount の2つを参照して delayInit + delayCount * 1000 ミリ秒待機する、というサンプルを作成します。

  1. src/main/java/ksbysample/eipapp/delayer の下の FlowConfig.javaリンク先のその3の内容 に変更します。

  2. bootRun タスクを実行した後、C:\eipapp\ksbysample-eipapp-delayer\in03 に 2014.txt を置きます。

    以下のログが出力されて、待機秒数が3秒→4秒→5秒と増えていることが確認できます。

    f:id:ksby:20170129094149p:plain

  3. bootRun タスクを停止します。

.delay(...) の第1引数には GROUP_ID を指定するが、何かを MessageGroupStore に保存するのか?

org.springframework.integration.handler の DelayHandler.java の doInit メソッドを見ると messageStore が指定されていない場合には this.messageStore = new SimpleMessageStore(); と実装されているので、デフォルトでは delay 中は SimpleMessageStore に何かのデータを入れており、その時に GROUP_ID を使用するようです。

SimpleMessageStore のクラス図を IntelliJ IDEA で作成してみると MessageGroupStore インターフェースを実装していることが分かります。

f:id:ksby:20170129165355p:plain

何を MessageGroupStore に入れるのか確認してみます。

  1. src/main/java/ksbysample/eipapp/delayer の下の FlowConfig.javaリンク先のその4の内容 に変更します。

  2. bootRun タスクを実行した後、C:\eipapp\ksbysample-eipapp-delayer\in04 に 2014.txt を置きます。

    以下のログが出力されます。

    f:id:ksby:20170129165607p:plain

    • delay が開始されると指定した simpleMessageStore にデータがセットされて、delay が終了すると削除されることが確認できます。
    • header は id, timestamp のみでした。カスタムの header が追加されてはいないようです。
    • payload は org.springframework.integration.handler の DelayHandler クラス内で定義されている DelayedMessageWrapper クラスのデータがセットされていました。DelayedMessageWrapper クラスは処理開始時点の日時がセットされる long requestDate と、オリジナルの Message がセットされる Message<?> original をフィールドに持つクラスでした。
  3. bootRun タスクを停止します。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

[compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    mavenCentral()
    maven { url "http://repo.spring.io/repo/" }
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.integration:spring-integration-file")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.2")
}

Application.java

package ksbysample.eipapp.delayer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

FlowConfig.java

■その1

package ksbysample.eipapp.delayer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.DelayerEndpointSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;

import java.io.File;

@Slf4j
@Configuration
public class FlowConfig {

    private static final String GROUP_ID_DELAY = "DELAYFLOW_DELAY";

    private final NullChannel nullChannel;

    public FlowConfig(NullChannel nullChannel) {
        this.nullChannel = nullChannel;
    }

    /**
     * 正常処理時に payload にセットされた File クラスのファイルを削除する RequestHandlerAdvice
     *
     * @return
     */
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice fileDeleteAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice requestHandlerAdvice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        requestHandlerAdvice.setOnSuccessExpression("payload.delete()");
        return requestHandlerAdvice;
    }


    @Bean
    public IntegrationFlow delayFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in01"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                // 3秒待機する
                .delay(GROUP_ID_DELAY, (DelayerEndpointSpec e) -> e.defaultDelay(3000))
                .log()
                // 6秒待機する
                .delay(GROUP_ID_DELAY, "6000")
                .log()
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

}

■その2

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public IntegrationFlow delayByMessageFlow() {
        return IntegrationFlows
                // C:/eipapp/ksbysample-eipapp-delayer/in02 には待機秒数を記述したファイルを置く
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in02"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // payload にセットされた File クラスを headers.originalPayload へセットする
                .enrichHeaders(h -> h.headerExpression("originalPayload", "payload"))
                // ファイルの内容を読み込んで payload にセットする
                // ※payload のクラスが File クラス --> String クラスに変わる
                .transform(Transformers.fileToString())
                .log()
                // payload にセットされた秒数待機する
                .delay(GROUP_ID_DELAY, "payload", (DelayerEndpointSpec e) -> e.defaultDelay(8000))
                .log()
                // headers.delay にセットされた秒数待機する
                .enrichHeaders(h -> h.headerExpression("delay", "payload"))
                .delay(GROUP_ID_DELAY, "headers.delay", (DelayerEndpointSpec e) -> e.defaultDelay(2000))
                .log()
                // headers.originalPayload にセットされている File クラスを payload へ戻す
                .transform("headers.originalPayload")
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

}

■その3

@Slf4j
@Configuration
public class FlowConfig {

    ..........


    @Bean
    public IntegrationFlow delayDynamicFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in03"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // 待機秒数を計算するための delayInit, delayCount の初期値を header にセットする
                .enrichHeaders(h -> h
                        .header("delayInit", 3000)
                        .header("delayCount", 0))
                .log()
                // delayCount が 3 になるまで処理をループさせたいので、別の Flow にする
                // return f -> f.~ で書いた場合、Bean名 + ".input" という MessageChannel
                // が自動生成されて、ここに Message を送信すると処理が開始される
                .channel("loopCountFlow.input")
                .get();
    }

    @Bean
    public IntegrationFlow loopCountFlow() {
        return f -> f
                // delayInit + delayCount * 1000 のミリ秒数待機する
                .delay(GROUP_ID_DELAY, m -> {
                    return ((int) m.getHeaders().get("delayInit"))
                            + (((int) m.getHeaders().get("delayCount")) * 1000);
                })
                .log()
                // headers.delayCount を +1 する
                .enrichHeaders(h -> h.headerFunction("delayCount"
                        , m -> ((int) m.getHeaders().get("delayCount")) + 1
                        , true))
                // delayCount が3未満なら loopCountFlow の最初に戻る
                // そうでなければ次の処理へ
                .routeToRecipients(r -> r
                        .recipientFlow("headers.delayCount < 3"
                                , sf -> sf.channel("loopCountFlow.input"))
                        .defaultOutputToParentFlow())
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .handle((p, h) -> {
                    System.out.println("ファイルを削除しました");
                    return null;
                });
    }

}

■その4

@Slf4j
@Configuration
public class FlowConfig {

    ..........

    @Bean
    public MessageGroupStore simpleMessageStore() {
        return new SimpleMessageStore();
    }

    /**
     * "" の String の Message を返す MessageSource
     *
     * @return
     */
    @Bean
    public MessageSource<String> stringMessageSource() {
        return () -> MessageBuilder.withPayload("").build();
    }

    @Bean
    public IntegrationFlow printMessageStoreFlow() {
        return IntegrationFlows
                // 1秒毎に処理を実行したいので、1秒毎に Message を送信させる
                .from(stringMessageSource()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // MessageStore に格納された Message の headers, payload を出力する
                .handle((p, h) -> {
                    Collection<Message<?>> delayMessagesList
                            = simpleMessageStore().getMessagesForGroup(GROUP_ID_DELAY);
                    delayMessagesList.forEach(m -> {
                        m.getHeaders().entrySet().forEach(entry -> {
                            System.out.println("[header ] " + entry.getKey() + " = " + entry.getValue());
                        });
                        System.out.println("[payload] " + m.getPayload());
                    });
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow checkMessageStoreFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in04"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                // 3秒待機する
                .delay(GROUP_ID_DELAY
                        , (DelayerEndpointSpec e) -> e.defaultDelay(3000)
                                .messageStore(simpleMessageStore()))
                .log()
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

}
  • checkMessageStoreFlow は in04 ディレクトリにファイルが置かれる→3秒待機→ファイル削除する、Flow です。MessageGroupStore のデータを確認したいので .messageStore(simpleMessageStore()) で指定しています。
  • printMessageStoreFlow は1秒毎に MessageGroupStore をチェックしてデータがあれば出力する Flow です。checkMessageStoreFlow とは別に並行して処理が実行されます。

■最終形

package ksbysample.eipapp.delayer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.DelayerEndpointSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.Transformers;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.store.SimpleMessageStore;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

import java.io.File;
import java.util.Collection;
import java.util.Map;

@Slf4j
@Configuration
public class FlowConfig {

    private static final String GROUP_ID_DELAY = "DELAYFLOW_DELAY";

    private final NullChannel nullChannel;

    public FlowConfig(NullChannel nullChannel) {
        this.nullChannel = nullChannel;
    }

    /**
     * 正常処理時に payload にセットされた File クラスのファイルを削除する RequestHandlerAdvice
     *
     * @return
     */
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice fileDeleteAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice requestHandlerAdvice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        requestHandlerAdvice.setOnSuccessExpression("payload.delete()");
        return requestHandlerAdvice;
    }

    @Bean
    public IntegrationFlow delayFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in01"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                // 3秒待機する
                .delay(GROUP_ID_DELAY
                        , (DelayerEndpointSpec e) -> e.defaultDelay(3000))
                .log()
                // 6秒待機する
                .delay(GROUP_ID_DELAY, "6000")
                .log()
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

    @Bean
    public IntegrationFlow delayByMessageFlow() {
        return IntegrationFlows
                // C:/eipapp/ksbysample-eipapp-delayer/in02 には待機秒数を記述したファイルを置く
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in02"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // payload にセットされた File クラスを headers.originalPayload へセットする
                .enrichHeaders(h -> h.headerExpression("originalPayload", "payload"))
                // ファイルの内容を読み込んで payload にセットする
                // ※payload のクラスが File クラス --> String クラスに変わる
                .transform(Transformers.fileToString())
                .log()
                // payload にセットされた秒数待機する
                .delay(GROUP_ID_DELAY, "payload", (DelayerEndpointSpec e) -> e.defaultDelay(8000))
                .log()
                // headers.delay にセットされた秒数待機する
                .enrichHeaders(h -> h.headerExpression("delay", "payload"))
                .delay(GROUP_ID_DELAY, "headers.delay", (DelayerEndpointSpec e) -> e.defaultDelay(2000))
                .log()
                // headers.originalPayload にセットされている File クラスを payload へ戻す
                .transform("headers.originalPayload")
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

    @Bean
    public IntegrationFlow delayDynamicFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in03"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // 待機秒数を計算するための delayInit, delayCount の初期値を header にセットする
                .enrichHeaders(h -> h
                        .header("delayInit", 3000)
                        .header("delayCount", 0))
                .log()
                // delayCount が 3 になるまで処理をループさせたいので、別の Flow にする
                // return f -> f.~ で書いた場合、Bean名 + ".input" という MessageChannel
                // が自動生成されて、ここに Message を送信すると処理が開始される
                .channel("loopCountFlow.input")
                .get();
    }

    @Bean
    public IntegrationFlow loopCountFlow() {
        return f -> f
                // delayInit + delayCount * 1000 のミリ秒数待機する
                .delay(GROUP_ID_DELAY, m -> {
                    return ((int) m.getHeaders().get("delayInit"))
                            + (((int) m.getHeaders().get("delayCount")) * 1000);
                })
                .log()
                // headers.delayCount を +1 する
                .enrichHeaders(h -> h.headerFunction("delayCount"
                        , m -> ((int) m.getHeaders().get("delayCount")) + 1
                        , true))
                // delayCount が3未満なら loopCountFlow の最初に戻る
                // そうでなければ次の処理へ
                .routeToRecipients(r -> r
                        .recipientFlow("headers.delayCount < 3"
                                , sf -> sf.channel("loopCountFlow.input"))
                        .defaultOutputToParentFlow())
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .handle((p, h) -> {
                    System.out.println("ファイルを削除しました");
                    return null;
                });
    }

    @Bean
    public MessageGroupStore simpleMessageStore() {
        return new SimpleMessageStore();
    }

    /**
     * "" の String の Message を返す MessageSource
     *
     * @return
     */
    @Bean
    public MessageSource<String> stringMessageSource() {
        return () -> MessageBuilder.withPayload("").build();
    }

    @Bean
    public IntegrationFlow printMessageStoreFlow() {
        return IntegrationFlows
                // 1秒毎に処理を実行したいので、1秒毎に Message を送信させる
                .from(stringMessageSource()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // MessageStore に格納された Message の headers, payload を出力する
                .handle((p, h) -> {
                    Collection<Message<?>> delayMessagesList
                            = simpleMessageStore().getMessagesForGroup(GROUP_ID_DELAY);
                    delayMessagesList.forEach(m -> {
                        m.getHeaders().entrySet().forEach(entry -> {
                            System.out.println("[header ] " + entry.getKey() + " = " + entry.getValue());
                        });
                        System.out.println("[payload] " + m.getPayload());
                    });
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow checkMessageStoreFlow() {
        return IntegrationFlows
                .from(s -> s.file(new File("C:/eipapp/ksbysample-eipapp-delayer/in04"))
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log()
                // 3秒待機する
                .delay(GROUP_ID_DELAY
                        , (DelayerEndpointSpec e) -> e.defaultDelay(3000)
                                .messageStore(simpleMessageStore()))
                .log()
                // ファイルを削除する
                .bridge(e -> e.advice(fileDeleteAdvice()))
                .channel(nullChannel)
                .get();
    }

}

履歴

2017/01/29
初版発行。