Spring Boot + Spring Integration でいろいろ試してみる ( その19 )( Flow の途中で一時的に別の Flow を実行したいなら wireTap! )
概要
記事一覧はこちらです。
- Spring Boot + Spring Integration でいろいろ試してみる ( その17 )( @MessagingGateway でメソッド呼び出しのインターフェースで MessageChannel へ Message を送信する ) で、Flow の途中で FTP アップロードするために
.handleWithAdapter(a -> a.ftp(...))
を呼び出す方法として ExpressionEvaluatingRequestHandlerAdvice +.bridge(e -> e.advice(...))
の組み合わせで実装していたのですが、Flow の途中で別の Flow を実行するための.wireTap(...)
というメソッドが用意されていることに気付いたので、今回は.wireTap(...)
のサンプルを作成します。
参照したサイト・書籍
- Spring Integration Reference Manual - Wire Tap
http://docs.spring.io/spring-integration/docs/4.3.8.RELEASE/reference/html/messaging-channels-section.html#channel-wiretap
目次
- wireTap になぜ気づいたのか?
- in, out ディレクトリを作成する
- ksbysample-eipapp-wiretap プロジェクトを作成する
- サンプルの Flow を作成する
- 動作確認
- 最後に
手順
wireTap になぜ気づいたのか?
- まず Spring Integration Reference Manual を初めに読んだ時には全く頭に残っていませんでした。
- Spring Integration DSL に興味を持つようになって、DSL が書いてあるところはざっと見直したのですが、Wire Tap の DSL のサンプルは以下のようなコードで、MessageChannel 絡みの何かかな?、必要に感じたらまた見直そう、程度にしか思っていなかったはず。
@Bean public PollableChannel myChannel() { return MessageChannels.queue() .wireTap("loggingFlow.input") .get(); }
- Spring Integration Java DSL Reference に書かれているものは以下のコードで、これでは
.wireTap(...)
というメソッドがあることは全然分かりません。
@Bean public MessageChannel priorityChannel() { return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup") .interceptor(wireTap()) .get(); }
- Spring Integration Java DSL: Line by line tutorial のサンプルでも wireTap は全然出てきません。そうすると Spring Integration を習得する上で覚えるべきものと認識されません。サンプルに出てくるメソッドからまず覚えようとします。
こんな感じで、wireTap は全く気にもしていませんでした。それがなぜ気づいたのかというと、
- Java で Enterprise Integration Pattern 用のフレームワークとして Spring Integration 以外に Apache Camel があります。
- Apache Camel には大量の Component が用意されていて、AWS 関連のコンポーネントとかが使えるとやれることが増えそうだな、と思いました。
- Component の中に Spring Integration Component という Component を見かけて、Spring Integration –> Apache Camel 連携は普通にできそうな感じが。
- でも Spring Support 等を見て少し使ってみようとしましたが、さっぱり分からず。。。 Spring Integration DSL が少しは使えるようになったので Apache Camel も分かるだろう、と思っていましたが、なんというか概要・全体像的なところは似ているのですが、詳細のところでどうしてよいのかがさっぱり分かりません。
- Web を見ていても分からなそうだったので、英語の書籍がないか探して以下の2冊を kindle 版で購入。
- 作者: Scott Cranton,Jakub Korab
- 出版社/メーカー: Packt Publishing
- 発売日: 2013/12/26
- メディア: Kindle版
- この商品を含むブログを見る
- 作者: Jean-Baptiste Onofré
- 出版社/メーカー: Packt Publishing
- 発売日: 2015/06/30
- メディア: Kindle版
- この商品を含むブログを見る
- Mastering Apache Camel を読んでいたところ、「Chaper 2.Message Routing」に “Wire Tap - sending a copy of the message elsewhere” の文章が! メッセージのコピーを他に送れるのかな?と思って本の Wire Tap の章を読んでみましたが、確かにメイン Flow のメッセージはそのままで、途中で別のフローにメッセージのコピーを送信できる仕組みでした。
- Spring Integration でも WireTap ないのかな?と思って Spring Integration Reference Manual を見ると Wire Tap の記述が!
- IntegrationFlow でも書けるのか?と思って試してみたところ、IntegrationFlowDefinition#wireTap がちゃんとありました。メイン Flow の Message は変更せずに、途中に別の Flow を実行できる仕組みでした。
すっごい遠回りでした。Spring Integration のマニュアルや記事だけ見ていると wireTap の使い方って分からないのでは。。。
in, out ディレクトリを作成する
サンプルアプリケーションを作成します。常駐型アプリケーション用として以下の構成のディレクトリを作成します。
C:\eipapp\ksbysample-eipapp-wiretap ├ in └ out
ksbysample-eipapp-wiretap プロジェクトを作成する
IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。
ksbysample-eipapp-wiretap プロジェクトのルート直下に config/checkstyle, config/findbugs ディレクトリを作成します。
config/checkstyle の下に ksbysample-eipapp-messaginggateway プロジェクトの config/checkstyle の下にある google_checks.xml をコピーします。
config/findbugs の下に findbugs-exclude.xml を新規作成し、リンク先の内容 の内容に変更します。
src/main/java の下に ksbysample.eipapp.wiretap パッケージを作成します。
src/main/java/ksbysample/eipapp/wiretap の下に Application.java を作成し、リンク先の内容 を記述します。
src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。
src/main/resources の下に logback-spring.xml を作成し、リンク先の内容 を記述します。
サンプルの Flow を作成する
サンプルの Flow は以下の仕様で作成します。
src/main/java/ksbysample/eipapp/wiretap の下に FlowConfig.java を新規作成し、リンク先の内容 を記述します。
動作確認
動作確認します。テストには以下のファイルを使用します。
■2014.txt
http://ksby.hatenablog.com/entry/2014/12/27/233427 http://ksby.hatenablog.com/entry/2014/12/29/175849
smtp4dev, freeFTPd を起動します。
ZipkinServer を起動します。今回は起動するだけです。
bootRun を実行して ksbysample-eipapp-wiretap を起動します。
C:\eipapp\ksbysample-eipapp-wiretap\in の下に 2014.txt を置きます。
SFTP サーバに 2014.txt がアップロードされて /in ディレクトリからはファイルが削除されます。
/out ディレクトリにもファイルは残っていません。
メールも届いており、ファイルの中身がメール本文になっています。
SFTP サーバにアップロードされた 2014.txt は以下の内容です。メールで送信した To, Subject が出力されています。
起動したサーバと ksbysample-eipapp-wiretap を停止します。
最後に
Spring Integration Java DSL Reference の Using Protocol Adapters に記載されている Adapter を使う前なら気にしなかったのですが、使うようになったら wireTap を覚えると便利です。個人的には OutboundAdapter って何か使い勝手悪いなあ、と思っていたのが解消されました。
Spring Integration というか Enterprise Integration Patterns ですが、最初何のためにそんな機能があるのかよく分からなくて、慣れてきてやっと使い方が分かるものがあります。もう少し具体的なサンプルで、そういうふうに使えばいいんだと分かるものがあるといいんですけどね。
Spring Integratin にも Mastering Apache Camel のように簡単な例がいくつも載っている本があるといいな、とは思いました。できれば Spring Integration DSL ベースで。
ソースコード
build.gradle
group 'ksbysample' version '1.0.0-RELEASE' buildscript { ext { springBootVersion = '1.4.5.RELEASE' } repositories { mavenCentral() maven { url "http://repo.spring.io/repo/" } maven { url "https://plugins.gradle.org/m2/" } } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE") // for Error Prone ( http://errorprone.info/ ) classpath("net.ltgt.gradle:gradle-errorprone-plugin:0.0.9") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' apply plugin: 'groovy' apply plugin: 'net.ltgt.errorprone' apply plugin: 'checkstyle' apply plugin: 'findbugs' sourceCompatibility = 1.8 targetCompatibility = 1.8 [compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing,-path'] compileJava.options.compilerArgs += ['-Xep:RemoveUnusedImports:WARN'] idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } checkstyle { configFile = file("${rootProject.projectDir}/config/checkstyle/google_checks.xml") toolVersion = '7.6' sourceSets = [project.sourceSets.main] } findbugs { toolVersion = '3.0.1' sourceSets = [project.sourceSets.main] ignoreFailures = true effort = "max" excludeFilter = file("${rootProject.projectDir}/config/findbugs/findbugs-exclude.xml") } tasks.withType(FindBugs) { reports { xml.enabled = false html.enabled = true } } repositories { mavenCentral() maven { url "http://repo.spring.io/repo/" } } dependencyManagement { imports { mavenBom("io.spring.platform:platform-bom:Athens-SR4") { bomProperty 'guava.version', '21.0' } mavenBom("org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE") } } dependencies { def spockVersion = "1.1-groovy-2.4-rc-3" def lombokVersion = "1.16.12" def errorproneVersion = '2.0.15' // dependency-management-plugin によりバージョン番号が自動で設定されるもの // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照 compile("org.springframework.boot:spring-boot-starter-integration") compile("org.springframework.boot:spring-boot-starter-mail") compile("org.springframework.integration:spring-integration-mail") compile("org.springframework.integration:spring-integration-sftp") compile("org.codehaus.janino:janino") compile("com.google.guava:guava") testCompile("org.springframework.boot:spring-boot-starter-test") // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの // http://projects.spring.io/spring-cloud/ の「Release Trains」参照 compile("org.springframework.cloud:spring-cloud-starter-zipkin") { exclude module: 'spring-boot-starter-web' } // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE") testCompile("org.assertj:assertj-core:3.6.2") testCompile("org.spockframework:spock-core:${spockVersion}") testCompile("org.spockframework:spock-spring:${spockVersion}") // for lombok compileOnly("org.projectlombok:lombok:${lombokVersion}") testCompileOnly("org.projectlombok:lombok:${lombokVersion}") // for Error Prone ( http://errorprone.info/ ) errorprone("com.google.errorprone:error_prone_core:${errorproneVersion}") compileOnly("com.google.errorprone:error_prone_annotations:${errorproneVersion}") }
findbugs-exclude.xml
<?xml version="1.0" encoding="UTF-8"?> <FindBugsFilter> </FindBugsFilter>
Application.java
package ksbysample.eipapp.wiretap; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.integration.annotation.IntegrationComponentScan; @SpringBootApplication @IntegrationComponentScan public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
application.properties
spring.application.name=eipapp spring.zipkin.base-url=http://localhost:9411/ spring.sleuth.sampler.percentage=1.0
logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <include resource="org/springframework/boot/logging/logback/defaults.xml"/> <springProperty scope="context" name="springAppName" source="spring.application.name"/> <property name="CONSOLE_LOG_PATTERN" value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>${CONSOLE_LOG_PATTERN}</pattern> <charset>UTF-8</charset> </encoder> </appender> <root level="INFO"> <appender-ref ref="CONSOLE"/> </root> <logger name="org.springframework.integration.expression.ExpressionUtils" level="ERROR"/> <logger name="com.jcraft.jsch" level="ERROR"/> </configuration>
FlowConfig.java
package ksbysample.eipapp.wiretap; import com.jcraft.jsch.ChannelSftp; import org.aopalliance.aop.Advice; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.core.Pollers; import org.springframework.integration.dsl.support.Transformers; import org.springframework.integration.file.FileHeaders; import org.springframework.integration.file.FileReadingMessageSource; import org.springframework.integration.file.filters.AcceptAllFileListFilter; import org.springframework.integration.file.filters.IgnoreHiddenFileListFilter; import org.springframework.integration.file.remote.session.SessionFactory; import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice; import org.springframework.integration.mail.MailHeaders; import org.springframework.integration.sftp.session.DefaultSftpSessionFactory; import org.springframework.integration.support.MessageBuilder; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import static java.util.Collections.singletonMap; @Configuration public class FlowConfig { private static final String ROOT_DIR = "C:/eipapp/ksbysample-eipapp-wiretap"; private static final String IN_DIR = ROOT_DIR + "/in"; private static final String OUT_DIR = ROOT_DIR + "/out"; private static final String MAIL_FROM = "system@sample.com"; private static final String MAIL_TO = "download@test.co.jp"; private static final String SFTP_UPLOAD_DIR = "/in"; private static final String CRLF = "\r\n"; @Value("${spring.mail.host:localhost}") private String mailHost; @Value("${spring.mail.port:25}") private int mailPort; @Value("${spring.mail.protocol:smtp}") private String mailProtocol; @Value("${spring.mail.default-encoding:UTF-8}") private String mailDefaultEncoding; /** * SFTP サーバに接続するための SessionFactory オブジェクトを生成する * 今回は CachingSessionFactory を使用せず処理毎に接続・切断されるようにする * * @return SFTP サーバ接続用の SessionFactory オブジェクト */ @Bean public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() { DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true); factory.setHost("localhost"); factory.setPort(22); factory.setUser("send01"); factory.setPassword("send01"); factory.setAllowUnknownKeys(true); return factory; } /** * EIP の1つ wireTap のサンプル Flow * * @return IntegrationFlow オブジェクト */ @Bean public IntegrationFlow wiretapSampleFlow() { return IntegrationFlows // C:/eipapp/ksbysample-eipapp-wiretap/in にファイルが作成されたか 1秒間隔でチェックする .from(s -> s.file(new File(IN_DIR)) // 同じファイルが置かれても処理する .filter(new AcceptAllFileListFilter<>()) .filter(new IgnoreHiddenFileListFilter()) // ファイルが新規作成された時だけ Message を送信する // これを入れないとファイルが存在する限り何度も Message が送信され続ける .useWatchService(true) .watchEvents(FileReadingMessageSource.WatchEventType.CREATE) , e -> e.poller(Pollers.fixedDelay(1000))) // ファイル名とファイルの絶対パス、メール送信用の From, To, Subject を Message の header にセットする .enrichHeaders(h -> h .headerExpression(FileHeaders.FILENAME, "payload.name") .headerExpression(FileHeaders.ORIGINAL_FILE, "payload.absolutePath") .header(MailHeaders.FROM, MAIL_FROM) .header(MailHeaders.TO, MAIL_TO) .headerExpression(MailHeaders.SUBJECT, "payload.name")) .wireTap(f -> f // File の内容を読み込んで payload へセットする .transform(Transformers.fileToString()) .wireTap(sf -> sf // メールを送信する .handleWithAdapter(a -> a.mail(this.mailHost) .port(this.mailPort) .protocol(this.mailProtocol) .defaultEncoding(this.mailDefaultEncoding))) .wireTap(sf -> sf // payload の内容をファイルに出力する内容に変更する .handle((p, h) -> { StringBuilder sb = new StringBuilder(); sb.append("To: " + h.get(MailHeaders.TO) + CRLF); sb.append("Subject: " + h.get(MailHeaders.SUBJECT) + CRLF); sb.append(CRLF); sb.append(p); return MessageBuilder.withPayload(sb.toString()) .build(); }) // /out ディレクトリにファイルを生成する // ファイル名は header 内の FileHeaders.FILENAME のキー名の文字列が使用される .handleWithAdapter(a -> a.file(new File(OUT_DIR)))) .channel("nullChannel")) .wireTap(f -> f // payload の File クラスを /out ディレクトリのファイルに変更する .handle((p, h) -> Paths.get(OUT_DIR, (String) h.get(FileHeaders.FILENAME)).toFile()) // SFTP サーバにファイルをアップロードする .handleWithAdapter(a -> a.sftp(sftpSessionFactory()) .remoteDirectory(SFTP_UPLOAD_DIR) , e -> e.advice(sftpUploadRetryAdvice()))) // /in, /out ディレクトリのファイルを削除する .<File>handle((p, h) -> { try { Files.delete(Paths.get(p.getAbsolutePath())); Files.delete(Paths.get(OUT_DIR, p.getName())); } catch (IOException e) { throw new RuntimeException(e); } return null; }) .get(); } /** * リトライは最大5回 ( SimpleRetryPolicy で指定 )、 * リトライ間隔は初期値2秒、最大10秒、倍数2.0 ( ExponentialBackOffPolicy で指定 ) * の RequestHandlerRetryAdvice オブジェクトを生成する * * @return RequestHandlerRetryAdvice オブジェクト */ @Bean public Advice sftpUploadRetryAdvice() { RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.setRetryPolicy( new SimpleRetryPolicy(5, singletonMap(Exception.class, true))); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(2000); backOffPolicy.setMaxInterval(10000); backOffPolicy.setMultiplier(2.0); retryTemplate.setBackOffPolicy(backOffPolicy); RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice(); advice.setRetryTemplate(retryTemplate); return advice; } }
履歴
2017/03/12
初版発行。