かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その18 )( @MessagingGateway でメソッド呼び出しのインターフェースで MessageChannel へ Message を送信する2 )

概要

記事一覧はこちらです。

参照したサイト・書籍

目次

  1. FTP アップロード処理にリトライ処理を追加する
  2. ErrorChannel の処理を作成する
  3. 動作確認
    1. 事前準備
    2. 1ファイルを SFTP サーバの /in ディレクトリに置いてみる
    3. 2ファイルを SFTP サーバの /in ディレクトリに置いてみる
    4. FTP サーバを停止して errorChannel に Message が送信されるようにしてみる
  4. メモ書き
    1. .handle(Mail.outboundAdapter(...).~).handleWithAdapter(a -> a.mail(...).~) は同じです&使い方の説明や感想など
    2. .handle(Mail.outboundAdapter(...).~) のメール送信処理は spring.mail.~ の設定が反映されない。。。と思って調べたら MailSendingMessageHandler って何?

手順

FTP アップロード処理にリトライ処理を追加する

前回作成した FTP サーバにアップロードする処理にリトライ処理を入れるのを忘れていたので、追加します。

  1. src/main/java/ksbysample/eipapp/messaginggateway の下の FlowConfig.javaリンク先のその1の内容 に変更します。

ErrorChannel の処理を作成する

Flow の処理中に例外が throw されると “errorChannel” という名前の MessageChannel に throw された例外がセットされた Message が送信されます。

“errorChannel” の MessageChannel は Spring Integration が自動的に生成します。生成しているのは org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor #postProcessBeanFactory です。

  1. まず throw された例外の stacktrace をログに出力されるのと同じ適宜インデントされたフォーマットの文字列で取得したいので、Guava の Throwables#getStackTraceAsString を使用できるようにします。build.gradle を リンク先の文字列 に変更します。

    変更後、Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。

  2. src/main/java/ksbysample/eipapp/messaginggateway の下の FlowConfig.javaリンク先のその2の内容 に変更します。

動作確認

事前準備

まずは clean タスク → Rebuild Project → build タスクを実行して正常終了することを確認します。

f:id:ksby:20170226105416p:plain

次に Xlight FTP Server, FreeFTPd, smtp4dev, zipkin を起動します。zipkin は最新の zipkin-server-1.20.1-exec.jar をダウンロードしています。

f:id:ksby:20170226101713p:plain f:id:ksby:20170226101803p:plain f:id:ksby:20170226101843p:plain f:id:ksby:20170226105630p:plain

起動していて vagrant か docker で必要なサーバが起動する仮想サーバを作った方がいいな、と思いました。そのうち気が向いたらやってみます。

ログが見にくくなるので JavaMail の debug 機能を一旦 OFF にします。src/main/java/ksbysample/eipapp/messaginggateway の下の MailHelperConfig.java の sendMailFlow メソッドを以下のように変更します。

    @Bean
    public IntegrationFlow sendMailFlow() {
        return f -> f
                .handle(Mail.outboundAdapter(this.mailHost)
                        .port(this.mailPort)
                        .protocol(this.mailProtocol)
                        .defaultEncoding(this.mailDefaultEncoding)
                        .javaMailProperties(p -> p.put("mail.debug", "false")));
    }
  • .javaMailProperties(p -> p.put("mail.debug", "true")));.javaMailProperties(p -> p.put("mail.debug", "false"))); に変更します。

bootRun を実行して ksbysample-eipapp-messaginggateway を起動します。

1ファイルを SFTP サーバの /in ディレクトリに置いてみる

まずは testdata01.txt を SFTP サーバの send01 ユーザの /in ディレクトリに置いてみます。ファイルの文字コードUTF-8、改行コードは CRLF で以下の内容です。

これはテストです。
今日は晴れています。

f:id:ksby:20170226104932p:plain

SFTP サーバからファイルがダウンロードされて処理が行われ、FTP サーバの recv01 ユーザの /out ディレクトリにファイルがアップロードされました。

f:id:ksby:20170226110035p:plain

アップロードされた testdata01.txt の内容は以下のようになっていました。文字コードUTF-8、改行コードは CRLF です。

To: download@test.co.jp
Subject: testdata01.txt

これはテストです。
今日は晴れています。

IntelliJ IDEA のコンソールに出力されたログは以下のようになっており、特にエラーは出ていません。

f:id:ksby:20170226110420p:plain

smtp4dev に送信されたメールは以下の内容でした。こちらも特に問題ありませんでした。

f:id:ksby:20170226110520p:plain f:id:ksby:20170226111029p:plain f:id:ksby:20170226111120p:plain

Zipkin で処理状況を見ると以下のようになっていました。

f:id:ksby:20170226111659p:plain f:id:ksby:20170226111805p:plain

2ファイルを SFTP サーバの /in ディレクトリに置いてみる

次は testdata01.txt, testdata02.txt の2ファイルを SFTP サーバの send01 ユーザの /in ディレクトリに置いてみます。testdata02.txt の内容は以下のものです。

緊急警報です。

明日は、
雨ですね。

f:id:ksby:20170226112605p:plain

2ファイルとも SFTP サーバからファイルがダウンロードされて処理が行われ、FTP サーバの recv01 ユーザの /out ディレクトリにファイルがアップロードされました。

f:id:ksby:20170226112817p:plain

アップロードされた testdata01.txt, testdata02.txt の内容は以下のようになっていました。

■testdata01.txt

To: download@test.co.jp
Subject: testdata01.txt

これはテストです。
今日は晴れています。

■testdata02.txt

To: download@test.co.jp
Subject: testdata02.txt

緊急警報です。

明日は、
雨ですね。

IntelliJ IDEA のコンソールに出力されたログは以下のようになっており、特にエラーは出ていません。traceId は1ファイル毎に 1 ID 発行されています。

f:id:ksby:20170226113148p:plain

smtp4dev に送信されたメールは以下の内容でした。こちらも特に問題ありませんでした。

f:id:ksby:20170226113402p:plain f:id:ksby:20170226113454p:plain f:id:ksby:20170226113551p:plain

FTP サーバを停止して errorChannel に Message が送信されるようにしてみる

Xlight FTP Server を停止します。

f:id:ksby:20170226114000p:plain

testdata01.txt を SFTP サーバの send01 ユーザの /in ディレクトリに置きます。

今度は FTP アップロードできなかったため、C:\eipapp\ksbysample-eipapp-messaginggateway\send ディレクトリにファイルが残ったままになりました。

f:id:ksby:20170226114237p:plain

IntelliJ IDEA のコンソールに出力されたログは以下のようになっていました。例外が throw されています。

f:id:ksby:20170226114502p:plain

smtp4dev に送信されたメールは以下の内容でした。今回は「エラーが発生しました」メールが送信されており、送信されたメールの本文を確認すると例外の内容が出力されています。

f:id:ksby:20170226115052p:plain f:id:ksby:20170226115306p:plain

想定通りの動作になっていたので、動作確認を終わります。

メモ書き

.handle(Mail.outboundAdapter(...).~).handleWithAdapter(a -> a.mail(...).~) は同じです&使い方の説明や感想など

src/main/java/ksbysample/eipapp/messaginggateway の MailHelperConfig.java の中で以下のように記述していますが、

    @Bean
    public IntegrationFlow sendMailFlow() {
        return f -> f
                .handle(Mail.outboundAdapter(this.mailHost)
                        .port(this.mailPort)
                        .protocol(this.mailProtocol)
                        .defaultEncoding(this.mailDefaultEncoding)
                        .javaMailProperties(p -> p.put("mail.debug", "false")));
    }

これは以下と同じです。

    @Bean
    public IntegrationFlow sendMailFlow() {
        return f -> f
                .handleWithAdapter(a -> a.mail(this.mailHost)
                        .port(this.mailPort)
                        .protocol(this.mailProtocol)
                        .defaultEncoding(this.mailDefaultEncoding)
                        .javaMailProperties(p -> p.put("mail.debug", "false")));
    }

Mail 以外には Files, Ftp 等があり、以下の Web ページに説明があります。

Spring Integration Java DSL Reference - Using Protocol Adapters https://github.com/spring-projects/spring-integration-java-dsl/wiki/spring-integration-java-dsl-reference#using-protocol-adapters

.handleWithAdapter(a -> a.~) の “a” は “adapters” の略です。.handleWithAdapter(a -> a.~) の書き方の方が IDE の補完により使える Adapter が一覧表示されるので使いやすいと思います。

f:id:ksby:20170226142909p:plain

使い方ですが、Message に特定の header をセットして送信すると、その header と payload のデータを利用してメール送信したりファイル出力したりしてくれます。

例えば .handle(Mail.outboundAdapter(...).~) あるいは .handleWithAdapter(a -> a.mail(...).~) の場合ですが、

  • メール本文は payload にセットする。
  • From は header に “mail_from” をキーにしてセットする。
  • To は header に “mail_to” をキーにしてセットする。
  • Subject は header に “mail_subject” をキーにしてセットする。
  • “mail_from”, “mail_to”, “mail_subject” 等は org.springframework.integration.mail の MailHeaders クラスに定義されている。通常は MailHeaders.FROM, MailHeaders.TO, MailHeaders.SUBJECT を使用する。

という Message を送信すると header にセットされた From, To, Subject で payload の内容でメール送信します。

どのような header が使用できるのかは、

Spring Integration Reference Manual
http://docs.spring.io/spring-integration/reference/html/

の中の「V. Integration Endpoints」の下にある 21. Mail Support のような「~ Adapters」「~ Support」というタイトルのページの中に記述があります(たぶん)。

また .handleWithAdapter(a -> a.file(...).~).handleWithAdapter(a -> a.fileGateway(...).~) のように “Gateway” という文字列が付くものと付かないものが表示される Adapter がありますが、これはその次の処理に Message を送信するか否かで分けます。"Gateway" は次に Message を送信しますが、付かないものは送信しません ( outboundAdapter なので次に Message は送信されません )。

付かないものは .handle(...) の中で return null; を返しているのと同じです。

    .handle((p, h) -> {
        .....(ここに処理を記述する).....
        return null;
    })

Adapter 関連は使いこなせれば便利なのかもしれませんが、まだ以下のような感想を抱いていて使いこなせていない感があります。もう少しいろいろ強制的に使ってみて慣れないとダメかな。

  • outboundAdapter だとそこで処理が止まってしまうが、その後に処理は続けたい場合にどうしたらよいのか分かりづらい。
  • outboundGateway にすればよいのかもしれないが、outboundAdapter の処理だけやって次に処理を流してくれればよいだけなのに outboundGateway だと追加で指定をしないといけなかったりして何か使いずらい印象がある。gateway の考え方にまだ慣れていないだけなのか?

.handle(Mail.outboundAdapter(...).~) のメール送信処理は spring.mail.~ の設定が反映されない。。。と思って調べたら MailSendingMessageHandler って何?

今回メール送信用の共通処理では以下のように実装していますが、これは Spring Integration Java DSL Reference のサンプルを見て書いています。

                .handle(Mail.outboundAdapter(this.mailHost)
                        .port(this.mailPort)
                        .protocol(this.mailProtocol)
                        .defaultEncoding(this.mailDefaultEncoding)
                        .javaMailProperties(p -> p.put("mail.debug", "false")));

Spring Boot を使っていて、かつ compile("org.springframework.boot:spring-boot-starter-mail") を入れているのだから、自動生成されている javaMailSender Bean が利用されていて application.properties の spring.mail.~ の設定が反映されないのかな? と思い、

  • 上の .defaultEncoding(this.mailDefaultEncoding) の部分を削除する。
  • application.properties に spring.mail.default-encoding=UTF-8 を追加する。

としてみたのですが全く設定が反映されません。ソースを追ってみると、org.springframework.integration.dsl.mail.MailSendingMessageHandlerSpec の中で private final JavaMailSenderImpl sender = new JavaMailSenderImpl(); と実装されていて javaMailSender Bean は利用されていませんでした。

MailSendingMessageHandlerSpec クラスは MessageHandlerSpec<MailSendingMessageHandlerSpec, MailSendingMessageHandler> を継承していて、MailSendingMessageHandler クラスを見るとこちらはフィールドに private final MailSender mailSender; と書かれていて、かつコンストラクタに渡された MailSender インターフェースの実装クラスのインスタンスをセットしています。MailSendingMessageHandler クラスは javaMailSender Bean を DI しているんじゃ。。。と思いましたが、よく見たら MailSendingMessageHandler クラスには @Component アノテーションは付いていませんでした。こちらも javaMailSender Bean は使っていませんね。

なんか不便だな。。。 と思いましたが、javaMailSender Bean をコンストラクタに渡して MailSendingMessageHandler Bean を作成すればよいのでは?と思い、以下のテストコードを作成して動かしてみるとメールを送信することができました。

package ksbysample.eipapp.messaginggateway;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.mail.MailSendingMessageHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;

import javax.mail.internet.MimeMessage;

@ContextConfiguration
@RunWith(SpringRunner.class)
@DirtiesContext
@TestPropertySource(properties = {
        "spring.mail.host=localhost"
        , "spring.mail.default-encoding=UTF-8"
})
public class MailHelperConfigTest {

    @Autowired
    private JavaMailSender mailSender;

    @Autowired
    @Qualifier("testFlow.input")
    private MessageChannel testFlowInput;

    @Test
    public void sendMailFlow() throws Exception {
        MimeMessage mimeMessage = this.mailSender.createMimeMessage();
        MimeMessageHelper message = new MimeMessageHelper(mimeMessage, false, "UTF-8");
        message.setFrom("from@test.co.jp");
        message.setTo("to@sample.com");
        message.setSubject("これはテストです");
        message.setText("本文です。\r\n改行してみます。");

        this.testFlowInput.send(MessageBuilder.withPayload(message.getMimeMessage()).build());
    }

    @Configuration
    @EnableIntegration
    @ComponentScan
    public static class ContextConfiguration {

        @Autowired
        private JavaMailSender mailSender;

        @Bean
        public MailSendingMessageHandler mailSendingMessageHandler() {
            return new MailSendingMessageHandler(this.mailSender);
        }

        @Bean
        public IntegrationFlow testFlow() {
            return f -> f
                    .handle(mailSendingMessageHandler());
        }

    }

}

f:id:ksby:20170226200224p:plain f:id:ksby:20170226200325p:plain

メール送信には Mail.outboundAdapter(...) だけでなく MailSendingMessageHandler クラスが使えますね。発見です。このクラスだと SimpleMailMessage や MimeMailMessage クラスのインスタンスを payload にセットして Message を送信すればメール送信してくれるので添付ファイルがあるメールでも送れそうです。でもこのクラスって Spring Integration Reference Manual には記載されていないので、Spring Integration のソースを見ていないとさすがに分かりません。

またおそらくこれと同じように “~MessageHandler” というクラス名の AbstractMessageHandler インターフェースの実装クラスが他にも存在して、たぶん知っているとかなり便利なのでは?という気がしてきました。いつか調べてみたいと思います。

ソースコード

FlowConfig.java

■その1

@Configuration
public class FlowConfig {

    ..........

    /**
     * Message の payload にセットされた File オブジェクトが指し示すファイルを FTP サーバにアップロードする
     *
     * @return IntegrationFlow オブジェクト
     */
    @Bean
    public IntegrationFlow ftpUploadFlow() {
        return f -> f
                .handleWithAdapter(a -> a.ftp(ftpSessionFactory()).remoteDirectory(PATH_FTP_UPLOAD_DIR)
                        , e -> e.advice(ftpUploadRetryAdvice()));
    }

    ..........

    /**
     * リトライは最大5回 ( SimpleRetryPolicy で指定 )、
     * リトライ間隔は初期値2秒、最大10秒、倍数2.0 ( ExponentialBackOffPolicy で指定 )
     * の RequestHandlerRetryAdvice オブジェクトを生成する
     *
     * @return RequestHandlerRetryAdvice オブジェクト
     */
    @Bean
    public Advice ftpUploadRetryAdvice() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(
                new SimpleRetryPolicy(5, singletonMap(Exception.class, true)));
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(2000);
        backOffPolicy.setMaxInterval(10000);
        backOffPolicy.setMultiplier(2.0);
        retryTemplate.setBackOffPolicy(backOffPolicy);

        RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
        advice.setRetryTemplate(retryTemplate);

        return advice;
    }

}
  • ftpUploadRetryAdvice メソッドを追加します。
  • ftpUploadFlow メソッドで , e -> e.advice(ftpUploadRetryAdvice()) を追加します。

■その2

@Configuration
public class FlowConfig {

    ..........

    private static final String ERRORMAIL_FROM = "system@sample.com";
    private static final String ERRORMAIL_TO = "alert@test.co.jp";
    private static final String ERRORMAIL_SUBJECT = "エラーが発生しました";

    ..........

    /**
     * errorChannel に送信された Message からエラーメッセージを取得してメールする
     *
     * @return IntegrationFlow オブジェクト
     */
    @Bean
    public IntegrationFlow errorChannelFlow() {
        return IntegrationFlows.from("errorChannel")
                .<Exception>handle((p, h) -> {
                    String stacktrace = Throwables.getStackTraceAsString(p);
                    this.mailHelper.send(stacktrace
                            , new MapBuilder<>()
                                    .put(MailHeaders.FROM, ERRORMAIL_FROM)
                                    .put(MailHeaders.TO, ERRORMAIL_TO)
                                    .put(MailHeaders.SUBJECT, ERRORMAIL_SUBJECT)
                                    .get());

                    return null;
                })
                .get();
    }

}
  • errorChannelFlow メソッドを追加します。

■完成形

package ksbysample.eipapp.messaginggateway;

import com.google.common.base.Throwables;
import com.jcraft.jsch.ChannelSftp;
import lombok.extern.slf4j.Slf4j;
import org.aopalliance.aop.Advice;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.MapBuilder;
import org.springframework.integration.dsl.support.Transformers;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.filters.AcceptAllFileListFilter;
import org.springframework.integration.file.filters.IgnoreHiddenFileListFilter;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.integration.mail.MailHeaders;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

import static java.util.Collections.singletonMap;

@Slf4j
@Configuration
public class FlowConfig {

    private static final String PATH_SFTP_DOWNLOAD_DIR = "/in";
    private static final String PATH_LOCAL_DOWNLOAD_DIR = "C:/eipapp/ksbysample-eipapp-messaginggateway/recv";
    private static final String PATH_LOCAL_UPLOAD_DIR = "C:/eipapp/ksbysample-eipapp-messaginggateway/send";
    private static final String PATH_FTP_UPLOAD_DIR = "/out";

    private static final String DOWNLOADFILEMAIL_FROM = "system@sample.com";
    private static final String DOWNLOADFILEMAIL_TO = "download@test.co.jp";

    private static final String ERRORMAIL_FROM = "system@sample.com";
    private static final String ERRORMAIL_TO = "alert@test.co.jp";
    private static final String ERRORMAIL_SUBJECT = "エラーが発生しました";

    private static final String CRLF = "\r\n";

    private final MailHelperConfig.MailHelper mailHelper;

    public FlowConfig(MailHelperConfig.MailHelper mailHelper) {
        this.mailHelper = mailHelper;
    }

    /**
     * SFTP サーバに接続するための SessionFactory オブジェクトを生成する
     *
     * @return SFTP サーバ接続用の SessionFactory オブジェクト
     */
    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22);
        factory.setUser("send01");
        factory.setPassword("send01");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

    /**
     * FTP サーバに接続するための SessionFactory オブジェクトを生成する
     *
     * @return FTP サーバ接続用の SessionFactory オブジェクト
     */
    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
        factory.setHost("localhost");
        factory.setPort(21);
        factory.setUsername("recv01");
        factory.setPassword("recv01");
        return new CachingSessionFactory<>(factory);
    }

    /**
     * SFTP サーバにあるファイルをダウンロードした後、ファイルの内容をメールで送信して、
     * 送信したメールの内容をファイルに出力して FTP サーバにアップロードする
     *
     * @return IntegrationFlow オブジェクト
     */
    @Bean
    public IntegrationFlow sftpToMailToFtpFlow() {
        return IntegrationFlows
                // SFTP サーバの /in ディレクトリにファイルがあるか 5秒間隔でチェックする
                .from(s -> s.sftp(sftpSessionFactory())
                                .preserveTimestamp(true)
                                .deleteRemoteFiles(true)
                                .remoteDirectory(PATH_SFTP_DOWNLOAD_DIR)
                                .localDirectory(new File(PATH_LOCAL_DOWNLOAD_DIR))
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                .maxMessagesPerPoll(100)))
                .log()
                // ファイル名とファイルの絶対パスを Message の header にセットする
                .enrichHeaders(h -> h
                        .headerExpression(FileHeaders.FILENAME, "payload.name")
                        .headerExpression(FileHeaders.ORIGINAL_FILE, "payload.absolutePath"))
                // File の内容を読み込んで payload へセットする
                .transform(Transformers.fileToString())
                // ファイルの内容をメール本文とするメールを送信する
                .<String>handle((p, h) -> {
                    this.mailHelper.send(p
                            , new MapBuilder<>()
                                    .put(MailHeaders.FROM, DOWNLOADFILEMAIL_FROM)
                                    .put(MailHeaders.TO, DOWNLOADFILEMAIL_TO)
                                    .put(MailHeaders.SUBJECT, h.get(FileHeaders.FILENAME))
                                    .get());

                    return MessageBuilder.withPayload(p)
                            .setHeader(MailHeaders.TO, DOWNLOADFILEMAIL_TO)
                            .setHeader(MailHeaders.SUBJECT, h.get(FileHeaders.FILENAME))
                            .build();
                })
                .log()
                // メール送信した内容を payload にセットする
                .<String>handle((p, h) -> {
                    StringBuilder sb = new StringBuilder();
                    sb.append("To: " + h.get(MailHeaders.TO) + CRLF);
                    sb.append("Subject: " + h.get(MailHeaders.SUBJECT) + CRLF);
                    sb.append(CRLF);
                    sb.append(p);

                    return MessageBuilder.withPayload(sb.toString())
                            .build();
                })
                // /send ディレクトリの下に payload の内容を出力したファイルを生成する
                .handleWithAdapter(a -> a.fileGateway(new File(PATH_LOCAL_UPLOAD_DIR)))
                .log()
                // /recv ディレクトリの下のファイルを削除し、payload には /send ディレクトリの下の
                // ファイルを指す File オブジェクトをセットする
                .handle((p, h) -> {
                    try {
                        Files.delete(Paths.get((String) h.get(FileHeaders.ORIGINAL_FILE)));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }

                    return MessageBuilder
                            .withPayload(Paths.get(PATH_LOCAL_UPLOAD_DIR
                                    , (String) h.get(FileHeaders.FILENAME)).toFile())
                            .build();
                })
                // /send ディレクトリの下に作成したファイルを FTP サーバにアップロードする
                .bridge(e -> e.advice(ftpUploadAdvice()))
                .log()
                // /send ディレクトリの下に作成したファイルを削除する
                .<File>handle((p, h) -> {
                    try {
                        Files.delete(Paths.get(p.getAbsolutePath()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }

                    return null;
                })
                .log()
                .get();
    }

    /**
     * Message の payload にセットされた File オブジェクトが指し示すファイルを FTP サーバにアップロードする
     *
     * @return IntegrationFlow オブジェクト
     */
    @Bean
    public IntegrationFlow ftpUploadFlow() {
        return f -> f
                .handleWithAdapter(a -> a.ftp(ftpSessionFactory()).remoteDirectory(PATH_FTP_UPLOAD_DIR)
                        , e -> e.advice(ftpUploadRetryAdvice()));
    }

    /**
     * ftpUploadFlow へ Message を送信する ExpressionEvaluatingRequestHandlerAdvice Bean
     *
     * @return ExpressionEvaluatingRequestHandlerAdvice オブジェクト
     */
    @Bean
    public Advice ftpUploadAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload");
        advice.setSuccessChannelName("ftpUploadFlow.input");
        return advice;
    }

    /**
     * リトライは最大5回 ( SimpleRetryPolicy で指定 )、
     * リトライ間隔は初期値2秒、最大10秒、倍数2.0 ( ExponentialBackOffPolicy で指定 )
     * の RequestHandlerRetryAdvice オブジェクトを生成する
     *
     * @return RequestHandlerRetryAdvice オブジェクト
     */
    @Bean
    public Advice ftpUploadRetryAdvice() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(
                new SimpleRetryPolicy(5, singletonMap(Exception.class, true)));
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(2000);
        backOffPolicy.setMaxInterval(10000);
        backOffPolicy.setMultiplier(2.0);
        retryTemplate.setBackOffPolicy(backOffPolicy);

        RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
        advice.setRetryTemplate(retryTemplate);

        return advice;
    }

    /**
     * errorChannel に送信された Message からエラーメッセージを取得してメールする
     *
     * @return IntegrationFlow オブジェクト
     */
    @Bean
    public IntegrationFlow errorChannelFlow() {
        return IntegrationFlows.from("errorChannel")
                .<Exception>handle((p, h) -> {
                    String stacktrace = Throwables.getStackTraceAsString(p);
                    this.mailHelper.send(stacktrace
                            , new MapBuilder<>()
                                    .put(MailHeaders.FROM, ERRORMAIL_FROM)
                                    .put(MailHeaders.TO, ERRORMAIL_TO)
                                    .put(MailHeaders.SUBJECT, ERRORMAIL_SUBJECT)
                                    .get());

                    return null;
                })
                .get();
    }

}

build.gradle

dependencies {
    ..........

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.14")
    compile("com.google.guava:guava:21.0")
    testCompile("org.assertj:assertj-core:3.6.2")
}
  • dependencies に compile("com.google.guava:guava:21.0") を追加します。

履歴

2017/02/26
初版発行。