かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その19 )( Flow の途中で一時的に別の Flow を実行したいなら wireTap! )

概要

記事一覧はこちらです。

参照したサイト・書籍

  1. Spring Integration Reference Manual - Wire Tap
    http://docs.spring.io/spring-integration/docs/4.3.8.RELEASE/reference/html/messaging-channels-section.html#channel-wiretap

目次

  1. wireTap になぜ気づいたのか?
  2. in, out ディレクトリを作成する
  3. ksbysample-eipapp-wiretap プロジェクトを作成する
  4. サンプルの Flow を作成する
  5. 動作確認
  6. 最後に

手順

wireTap になぜ気づいたのか?

  • まず Spring Integration Reference Manual を初めに読んだ時には全く頭に残っていませんでした。
  • Spring Integration DSL に興味を持つようになって、DSL が書いてあるところはざっと見直したのですが、Wire TapDSL のサンプルは以下のようなコードで、MessageChannel 絡みの何かかな?、必要に感じたらまた見直そう、程度にしか思っていなかったはず。
@Bean
public PollableChannel myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input")
            .get();
}
  • Spring Integration Java DSL Reference に書かれているものは以下のコードで、これでは .wireTap(...) というメソッドがあることは全然分かりません。
@Bean
public MessageChannel priorityChannel() {
   return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
                        .interceptor(wireTap())
                        .get();
}
  • Spring Integration Java DSL: Line by line tutorial のサンプルでも wireTap は全然出てきません。そうすると Spring Integration を習得する上で覚えるべきものと認識されません。サンプルに出てくるメソッドからまず覚えようとします。

こんな感じで、wireTap は全く気にもしていませんでした。それがなぜ気づいたのかというと、

  • Java で Enterprise Integration Pattern 用のフレームワークとして Spring Integration 以外に Apache Camel があります。
  • Apache Camel には大量の Component が用意されていて、AWS 関連のコンポーネントとかが使えるとやれることが増えそうだな、と思いました。
  • Component の中に Spring Integration Component という Component を見かけて、Spring Integration –> Apache Camel 連携は普通にできそうな感じが。
  • でも Spring Support 等を見て少し使ってみようとしましたが、さっぱり分からず。。。 Spring Integration DSL が少しは使えるようになったので Apache Camel も分かるだろう、と思っていましたが、なんというか概要・全体像的なところは似ているのですが、詳細のところでどうしてよいのかがさっぱり分かりません。
  • Web を見ていても分からなそうだったので、英語の書籍がないか探して以下の2冊を kindle 版で購入。
    Apache Camel Developer's Cookbook (Solve Common Integration Tasks With Over 100 Easily Accessible Apache Camel Recipes)

    Apache Camel Developer's Cookbook (Solve Common Integration Tasks With Over 100 Easily Accessible Apache Camel Recipes)

    Mastering Apache Camel

    Mastering Apache Camel

  • Mastering Apache Camel を読んでいたところ、「Chaper 2.Message Routing」に “Wire Tap - sending a copy of the message elsewhere” の文章が! メッセージのコピーを他に送れるのかな?と思って本の Wire Tap の章を読んでみましたが、確かにメイン Flow のメッセージはそのままで、途中で別のフローにメッセージのコピーを送信できる仕組みでした。
  • Spring Integration でも WireTap ないのかな?と思って Spring Integration Reference Manual を見ると Wire Tap の記述が!
  • IntegrationFlow でも書けるのか?と思って試してみたところ、IntegrationFlowDefinition#wireTap がちゃんとありました。メイン Flow の Message は変更せずに、途中に別の Flow を実行できる仕組みでした。

すっごい遠回りでした。Spring Integration のマニュアルや記事だけ見ていると wireTap の使い方って分からないのでは。。。

in, out ディレクトリを作成する

サンプルアプリケーションを作成します。常駐型アプリケーション用として以下の構成のディレクトリを作成します。

C:\eipapp\ksbysample-eipapp-wiretap
├ in
└ out

ksbysample-eipapp-wiretap プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. ksbysample-eipapp-wiretap プロジェクトのルート直下に config/checkstyle, config/findbugs ディレクトリを作成します。

  3. config/checkstyle の下に ksbysample-eipapp-messaginggateway プロジェクトの config/checkstyle の下にある google_checks.xml をコピーします。

  4. config/findbugs の下に findbugs-exclude.xml を新規作成し、リンク先の内容 の内容に変更します。

  5. src/main/java の下に ksbysample.eipapp.wiretap パッケージを作成します。

  6. src/main/java/ksbysample/eipapp/wiretap の下に Application.java を作成し、リンク先の内容 を記述します。

  7. src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。

  8. src/main/resources の下に logback-spring.xml を作成し、リンク先の内容 を記述します。

サンプルの Flow を作成する

  1. サンプルの Flow は以下の仕様で作成します。

    • /in ディレクトリにファイルがあるかチェックします。
    • ファイルの内容をメールします。
    • /out ディレクトリに To, Subject, メール本文を出力したファイルを作成します。
    • /out ディレクトリに作成したファイルを SFTPサーバにアップロードします。
  2. src/main/java/ksbysample/eipapp/wiretap の下に FlowConfig.java を新規作成し、リンク先の内容 を記述します。

動作確認

動作確認します。テストには以下のファイルを使用します。

■2014.txt

http://ksby.hatenablog.com/entry/2014/12/27/233427
http://ksby.hatenablog.com/entry/2014/12/29/175849
  1. smtp4dev, freeFTPd を起動します。

    f:id:ksby:20170311231446p:plain f:id:ksby:20170311231600p:plain

  2. ZipkinServer を起動します。今回は起動するだけです。

  3. bootRun を実行して ksbysample-eipapp-wiretap を起動します。

  4. C:\eipapp\ksbysample-eipapp-wiretap\in の下に 2014.txt を置きます。

    f:id:ksby:20170311233054p:plain

  5. SFTP サーバに 2014.txt がアップロードされて /in ディレクトリからはファイルが削除されます。

    f:id:ksby:20170311233527p:plain

    /out ディレクトリにもファイルは残っていません。

    f:id:ksby:20170311233743p:plain

  6. メールも届いており、ファイルの中身がメール本文になっています。

    f:id:ksby:20170311234105p:plain f:id:ksby:20170311234216p:plain f:id:ksby:20170311234311p:plain

  7. SFTP サーバにアップロードされた 2014.txt は以下の内容です。メールで送信した To, Subject が出力されています。

    f:id:ksby:20170311234551p:plain

  8. 起動したサーバと ksbysample-eipapp-wiretap を停止します。

最後に

  • Spring Integration Java DSL Reference の Using Protocol Adapters に記載されている Adapter を使う前なら気にしなかったのですが、使うようになったら wireTap を覚えると便利です。個人的には OutboundAdapter って何か使い勝手悪いなあ、と思っていたのが解消されました。

  • Spring Integration というか Enterprise Integration Patterns ですが、最初何のためにそんな機能があるのかよく分からなくて、慣れてきてやっと使い方が分かるものがあります。もう少し具体的なサンプルで、そういうふうに使えばいいんだと分かるものがあるといいんですけどね。

    Spring Integratin にも Mastering Apache Camel のように簡単な例がいくつも載っている本があるといいな、とは思いました。できれば Spring Integration DSL ベースで。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.5.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
        maven { url "https://plugins.gradle.org/m2/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
        // for Error Prone ( http://errorprone.info/ )
        classpath("net.ltgt.gradle:gradle-errorprone-plugin:0.0.9")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'
apply plugin: 'net.ltgt.errorprone'
apply plugin: 'checkstyle'
apply plugin: 'findbugs'

sourceCompatibility = 1.8
targetCompatibility = 1.8

[compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing,-path']
compileJava.options.compilerArgs += ['-Xep:RemoveUnusedImports:WARN']

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

checkstyle {
    configFile = file("${rootProject.projectDir}/config/checkstyle/google_checks.xml")
    toolVersion = '7.6'
    sourceSets = [project.sourceSets.main]
}

findbugs {
    toolVersion = '3.0.1'
    sourceSets = [project.sourceSets.main]
    ignoreFailures = true
    effort = "max"
    excludeFilter = file("${rootProject.projectDir}/config/findbugs/findbugs-exclude.xml")
}

tasks.withType(FindBugs) {
    reports {
        xml.enabled = false
        html.enabled = true
    }
}

repositories {
    mavenCentral()
    maven { url "http://repo.spring.io/repo/" }
}

dependencyManagement {
    imports {
        mavenBom("io.spring.platform:platform-bom:Athens-SR4") {
            bomProperty 'guava.version', '21.0'
        }
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE")
    }
}

dependencies {
    def spockVersion = "1.1-groovy-2.4-rc-3"
    def lombokVersion = "1.16.12"
    def errorproneVersion = '2.0.15'

    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.boot:spring-boot-starter-mail")
    compile("org.springframework.integration:spring-integration-mail")
    compile("org.springframework.integration:spring-integration-sftp")
    compile("org.codehaus.janino:janino")
    compile("com.google.guava:guava")
    testCompile("org.springframework.boot:spring-boot-starter-test")

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin") {
        exclude module: 'spring-boot-starter-web'
    }

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    testCompile("org.assertj:assertj-core:3.6.2")
    testCompile("org.spockframework:spock-core:${spockVersion}")
    testCompile("org.spockframework:spock-spring:${spockVersion}")

    // for lombok
    compileOnly("org.projectlombok:lombok:${lombokVersion}")
    testCompileOnly("org.projectlombok:lombok:${lombokVersion}")

    // for Error Prone ( http://errorprone.info/ )
    errorprone("com.google.errorprone:error_prone_core:${errorproneVersion}")
    compileOnly("com.google.errorprone:error_prone_annotations:${errorproneVersion}")
}

findbugs-exclude.xml

<?xml version="1.0" encoding="UTF-8"?>
<FindBugsFilter>
</FindBugsFilter>

Application.java

package ksbysample.eipapp.wiretap;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.integration.annotation.IntegrationComponentScan;

@SpringBootApplication
@IntegrationComponentScan
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

application.properties

spring.application.name=eipapp
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.percentage=1.0

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <springProperty scope="context" name="springAppName" source="spring.application.name"/>
    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
    <logger name="org.springframework.integration.expression.ExpressionUtils" level="ERROR"/>
    <logger name="com.jcraft.jsch" level="ERROR"/>
</configuration>

FlowConfig.java

package ksbysample.eipapp.wiretap;

import com.jcraft.jsch.ChannelSftp;
import org.aopalliance.aop.Advice;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.Transformers;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.filters.AcceptAllFileListFilter;
import org.springframework.integration.file.filters.IgnoreHiddenFileListFilter;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.integration.mail.MailHeaders;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

import static java.util.Collections.singletonMap;

@Configuration
public class FlowConfig {

    private static final String ROOT_DIR = "C:/eipapp/ksbysample-eipapp-wiretap";
    private static final String IN_DIR = ROOT_DIR + "/in";
    private static final String OUT_DIR = ROOT_DIR + "/out";

    private static final String MAIL_FROM = "system@sample.com";
    private static final String MAIL_TO = "download@test.co.jp";

    private static final String SFTP_UPLOAD_DIR = "/in";

    private static final String CRLF = "\r\n";

    @Value("${spring.mail.host:localhost}")
    private String mailHost;

    @Value("${spring.mail.port:25}")
    private int mailPort;

    @Value("${spring.mail.protocol:smtp}")
    private String mailProtocol;

    @Value("${spring.mail.default-encoding:UTF-8}")
    private String mailDefaultEncoding;

    /**
     * SFTP サーバに接続するための SessionFactory オブジェクトを生成する
     * 今回は CachingSessionFactory を使用せず処理毎に接続・切断されるようにする
     *
     * @return SFTP サーバ接続用の SessionFactory オブジェクト
     */
    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22);
        factory.setUser("send01");
        factory.setPassword("send01");
        factory.setAllowUnknownKeys(true);
        return factory;
    }

    /**
     * EIP の1つ wireTap のサンプル Flow
     *
     * @return IntegrationFlow オブジェクト
     */
    @Bean
    public IntegrationFlow wiretapSampleFlow() {
        return IntegrationFlows
                // C:/eipapp/ksbysample-eipapp-wiretap/in にファイルが作成されたか 1秒間隔でチェックする
                .from(s -> s.file(new File(IN_DIR))
                                // 同じファイルが置かれても処理する
                                .filter(new AcceptAllFileListFilter<>())
                                .filter(new IgnoreHiddenFileListFilter())
                                // ファイルが新規作成された時だけ Message を送信する
                                // これを入れないとファイルが存在する限り何度も Message が送信され続ける
                                .useWatchService(true)
                                .watchEvents(FileReadingMessageSource.WatchEventType.CREATE)
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                // ファイル名とファイルの絶対パス、メール送信用の From, To, Subject を Message の header にセットする
                .enrichHeaders(h -> h
                        .headerExpression(FileHeaders.FILENAME, "payload.name")
                        .headerExpression(FileHeaders.ORIGINAL_FILE, "payload.absolutePath")
                        .header(MailHeaders.FROM, MAIL_FROM)
                        .header(MailHeaders.TO, MAIL_TO)
                        .headerExpression(MailHeaders.SUBJECT, "payload.name"))
                .wireTap(f -> f
                        // File の内容を読み込んで payload へセットする
                        .transform(Transformers.fileToString())
                        .wireTap(sf -> sf
                                // メールを送信する
                                .handleWithAdapter(a -> a.mail(this.mailHost)
                                        .port(this.mailPort)
                                        .protocol(this.mailProtocol)
                                        .defaultEncoding(this.mailDefaultEncoding)))
                        .wireTap(sf -> sf
                                // payload の内容をファイルに出力する内容に変更する
                                .handle((p, h) -> {
                                    StringBuilder sb = new StringBuilder();
                                    sb.append("To: " + h.get(MailHeaders.TO) + CRLF);
                                    sb.append("Subject: " + h.get(MailHeaders.SUBJECT) + CRLF);
                                    sb.append(CRLF);
                                    sb.append(p);

                                    return MessageBuilder.withPayload(sb.toString())
                                            .build();
                                })
                                // /out ディレクトリにファイルを生成する
                                // ファイル名は header 内の FileHeaders.FILENAME のキー名の文字列が使用される
                                .handleWithAdapter(a -> a.file(new File(OUT_DIR))))
                        .channel("nullChannel"))
                .wireTap(f -> f
                        // payload の File クラスを /out ディレクトリのファイルに変更する
                        .handle((p, h) -> Paths.get(OUT_DIR, (String) h.get(FileHeaders.FILENAME)).toFile())
                        // SFTP サーバにファイルをアップロードする
                        .handleWithAdapter(a -> a.sftp(sftpSessionFactory())
                                        .remoteDirectory(SFTP_UPLOAD_DIR)
                                , e -> e.advice(sftpUploadRetryAdvice())))
                // /in, /out ディレクトリのファイルを削除する
                .<File>handle((p, h) -> {
                    try {
                        Files.delete(Paths.get(p.getAbsolutePath()));
                        Files.delete(Paths.get(OUT_DIR, p.getName()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                    return null;
                })
                .get();
    }

    /**
     * リトライは最大5回 ( SimpleRetryPolicy で指定 )、
     * リトライ間隔は初期値2秒、最大10秒、倍数2.0 ( ExponentialBackOffPolicy で指定 )
     * の RequestHandlerRetryAdvice オブジェクトを生成する
     *
     * @return RequestHandlerRetryAdvice オブジェクト
     */
    @Bean
    public Advice sftpUploadRetryAdvice() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(
                new SimpleRetryPolicy(5, singletonMap(Exception.class, true)));
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(2000);
        backOffPolicy.setMaxInterval(10000);
        backOffPolicy.setMultiplier(2.0);
        retryTemplate.setBackOffPolicy(backOffPolicy);

        RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
        advice.setRetryTemplate(retryTemplate);

        return advice;
    }

}

履歴

2017/03/12
初版発行。