かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その17 )( @MessagingGateway でメソッド呼び出しのインターフェースで MessageChannel へ Message を送信する )

概要

記事一覧はこちらです。

  • Spring Integration DSL のサンプルを作成します。今回は長くなったため2回に分けています。
  • 以下の処理を行う常駐型アプリケーションを作成します。
    • SFTPサーバに send01 ユーザでログインし /in ディレクトリにファイルがあるかチェックします。ファイルがあれば C:\eipapp\ksbysample-eipapp-messaginggateway\recv ディレクトリにダウンロードします。
    • ダウンロードしたファイルの内容をメール本文とするメールを送信します。From は system@sample.com、To は download@test.co.jp、Subject はファイル名にします。
    • メールした時の To, Subject, メール本文を出力したファイルを C:\eipapp\ksbysample-eipapp-messaginggateway\send ディレクトリに生成して FTP サーバへアップロードします。FTP サーバには recv01 ユーザでログインし /out ディレクトリにアップロードします。
    • 今回は ErrorChannel の処理を実装します。ErrorChannel に送信されたエラーメッセージをメール本文とするメールを送信します。From は system@sample.com、To は alert@test.co.jp、Subject は “エラーが発生しました” にします。
    • メール送信処理は共通の IntegrationFlow として作成します。
  • FTP サーバは Xlight ftp server を、SFTP サーバは FreeFTPd を使用します。今回この記事を書いている途中で Xlight ftp server の試用期間が終了したのですが、「Personal edition is free for personal use.」と書かれていたので問題なく使えるものと思っていたら SFTP サーバは使用できなくなりました。。。 ( https://www.xlightftpd.com/purchase.htm に記載されていました )
  • SMTPサーバは smtp4dev を使用します。

参照したサイト・書籍

  1. Spring Integration Reference Manual - 8.4 Messaging Gateways
    http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#gateway

  2. How to access custom headers in Spring Integration after receiving http error
    http://stackoverflow.com/questions/38482418/how-to-access-custom-headers-in-spring-integration-after-receiving-http-error

    • タイトルとは全然関係なくて、.transform(...) の中で header にアクセスする方法を調べた時に参考にしました。

目次

  1. @MessagingGateway とは?
  2. recv, send ディレクトリを作成する
  3. ksbysample-eipapp-messaginggateway プロジェクトを作成する
  4. メール送信用の共通処理を作成する
  5. SFTP サーバからファイルをダウンロードする処理を作成する
  6. ファイルの内容をメール送信する処理を作成する
  7. メールの内容をファイルに出力する処理を作成する
  8. FTP サーバにアップロードする処理を作成する
  9. 続く。。。

手順

@MessagingGateway とは?

Spring Integration の Message 送信・受信を意識させずに、通常のメソッドを呼び出す方式で Message を送信・受信できるようにする仕組みです。

例えば以下の IntegrationFlow に Message を送信するには、

    @Bean
    public IntegrationFlow printFlow() {
        return f -> f
                .handle((p, h) -> {
                    System.out.println(p);
                    return null;
                });
    }

以下のように MessageChannel#send で Message を送信する必要があります。

    @Autowired
    @Qualifier("printFlow.input")
    private MessageChannel printFlowInput;

    @Test
    public void printFlow() throws Exception {
        this.printFlowInput.send(new GenericMessage<>("テストです"));
    }

これを以下のように @MessagingGateway アノテーションを付加した interface を用意することで、

    @MessagingGateway
    public interface PrintHelper {

        @Gateway(requestChannel = "printFlow.input")
        void print(String payload);

    }

    @Bean
    public IntegrationFlow printFlow() {
        return f -> f
                .handle((p, h) -> {
                    System.out.println(p);
                    return null;
                });
    }

以下のように通常のメソッドのように呼び出すことができるようになります。

    @Autowired
    private PrintHelper printHelper;

    @Test
    public void printFlow() throws Exception {
        this.printHelper.print("テストです");
    }

また別ファイルに定義した return f -> f.~ 形式の IntegrationFlow へ Message を送信しようとした時に、実行時に “~.input” の MessageChannel をうまく DI 出来ずエラーになる場合があります。この時も @MessagingGateway で inteface を定義すれば回避できます。

1点注意です。Spring Boot の 1.4 までは @IntegrationComponentScan を付加しないと ( 例えば @SpringBootApplication の下に記述します ) @MessagingGateway を付加した interface が Bean の生成対象として認識されません。1.5 からは省略できるようになりました。

recv, send ディレクトリを作成する

サンプルアプリケーションを作成します。常駐型アプリケーション用として以下の構成のディレクトリを作成します。

C:\eipapp\ksbysample-eipapp-messaginggateway
├ recv
└ send

ksbysample-eipapp-messaginggateway プロジェクトを作成する

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

  2. ksbysample-eipapp-messaginggateway プロジェクトのルート直下に config/checkstyle, config/findbugs ディレクトリを作成します。

  3. config/checkstyle の下に Spring Boot 1.3.x の Web アプリを 1.4.x へバージョンアップする ( その8 )( build.gradle への checkstyle, findbugs の導入+CheckStyle-IDEA, FindBugs-IDEA Plugin の導入 ) で作成した google_checks.xml を置きます。

  4. config/findbugs の下に findbugs-exclude.xml を新規作成し、リンク先の内容 の内容に変更します。

  5. src/main/java の下に ksbysample.eipapp.messaginggateway パッケージを作成します。

  6. src/main/java/ksbysample/eipapp/messaginggateway の下に Application.java を作成し、リンク先の内容 を記述します。

  7. src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。

  8. src/main/resources の下に logback-spring.xml を作成し、リンク先の内容 を記述します。

メール送信用の共通処理を作成する

  1. src/main/java/ksbysample/eipapp/messaginggateway の下に MailHelperConfig.java を新規作成し、リンク先の内容 を記述します。

SFTP サーバからファイルをダウンロードする処理を作成する

  1. src/main/java/ksbysample/eipapp/messaginggateway の下に FlowConfig.java を新規作成し、リンク先のその1の内容 を記述します。

ファイルの内容をメール送信する処理を作成する

  1. src/main/java/ksbysample/eipapp/messaginggateway の下の FlowConfig.javaリンク先のその2の内容 に変更します。

メールの内容をファイルに出力する処理を作成する

  1. src/main/java/ksbysample/eipapp/messaginggateway の下の FlowConfig.javaリンク先のその3の内容 に変更します。

FTP サーバにアップロードする処理を作成する

  1. src/main/java/ksbysample/eipapp/messaginggateway の下の FlowConfig.javaリンク先のその4の内容 に変更します。

続く。。。

長くなったので続きます。次回は以下の予定です。

  • ErrorChannel の処理を実装します。
  • 動作確認します。
  • FlowConfig.java の完成形は次回載せます。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.4.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
        maven { url "https://plugins.gradle.org/m2/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
        // for Error Prone ( http://errorprone.info/ )
        classpath("net.ltgt.gradle:gradle-errorprone-plugin:0.0.9")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'
apply plugin: 'net.ltgt.errorprone'
apply plugin: 'checkstyle'
apply plugin: 'findbugs'

sourceCompatibility = 1.8
targetCompatibility = 1.8

[compileJava, compileTestGroovy, compileTestJava]*.options*.compilerArgs = ['-Xlint:all,-options,-processing,-path']

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

configurations {
    // for Error Prone ( http://errorprone.info/ )
    errorprone {
        resolutionStrategy.force 'com.google.errorprone:error_prone_core:2.0.15'
    }
}

checkstyle {
    configFile = file("${rootProject.projectDir}/config/checkstyle/google_checks.xml")
    toolVersion = '7.5.1'
    sourceSets = [project.sourceSets.main]
}

findbugs {
    toolVersion = '3.0.1'
    sourceSets = [project.sourceSets.main]
    ignoreFailures = true
    effort = "max"
    excludeFilter = file("${rootProject.projectDir}/config/findbugs/findbugs-exclude.xml")
}

tasks.withType(FindBugs) {
    reports {
        xml.enabled = false
        html.enabled = true
    }
}

repositories {
    mavenCentral()
    maven { url "http://repo.spring.io/repo/" }
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR3'
        mavenBom 'org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile("org.springframework.boot:spring-boot-starter-integration")
    compile("org.springframework.boot:spring-boot-starter-mail")
    compile("org.springframework.integration:spring-integration-ftp")
    compile("org.springframework.integration:spring-integration-mail")
    compile("org.springframework.integration:spring-integration-sftp")
    compile("org.codehaus.janino:janino")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // org.springframework.cloud:spring-cloud-dependencies によりバージョン番号が自動で設定されるもの
    // http://projects.spring.io/spring-cloud/ の「Release Trains」参照
    compile("org.springframework.cloud:spring-cloud-starter-zipkin") {
        exclude module: 'spring-boot-starter-web'
    }

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.springframework.integration:spring-integration-java-dsl:1.2.1.RELEASE")
    compile("org.projectlombok:lombok:1.16.14")
    testCompile("org.assertj:assertj-core:3.6.2")
}

findbugs-exclude.xml

<?xml version="1.0" encoding="UTF-8"?>
<FindBugsFilter>
</FindBugsFilter>

Application.java

package ksbysample.eipapp.messaginggateway;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.integration.annotation.IntegrationComponentScan;

@SpringBootApplication
@IntegrationComponentScan
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

application.properties

spring.application.name=messaginggateway
spring.zipkin.base-url=http://localhost:9411/
spring.sleuth.sampler.percentage=1.0

logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <springProperty scope="context" name="springAppName" source="spring.application.name"/>
    <property name="CONSOLE_LOG_PATTERN"
              value="%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${level:-%5p}) %clr([${springAppName:-},%X{X-B3-TraceId:-},%X{X-B3-SpanId:-},%X{X-Span-Export:-}]){yellow} %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}"/>

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE"/>
    </root>
    <logger name="org.springframework.integration.expression.ExpressionUtils" level="ERROR"/>
    <logger name="com.jcraft.jsch" level="ERROR"/>
</configuration>

MailHelperConfig.java

package ksbysample.eipapp.messaginggateway;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.mail.Mail;

import java.util.Map;

@Configuration
public class MailHelperConfig {

    @Value("${spring.mail.default-encoding:UTF-8}")
    private String mailDefaultEncoding;

    @Value("${spring.mail.host:localhost}")
    private String mailHost;

    @Value("${spring.mail.port:25}")
    private int mailPort;

    @Value("${spring.mail.protocol:smtp}")
    private String mailProtocol;

    @MessagingGateway
    public interface MailHelper {

        @Gateway(requestChannel = "sendMailFlow.input")
        void send(String payload, Map<Object, Object> headers);

    }

    /**
     * メールを送信する
     * メールの From, To, Subject 等は Message の header に、メール本文は payload にセットする
     * ヘッダーにセットする時の key 文字列は org.springframework.integration.mail.MailHeaders クラス参照
     *
     * @return IntegrationFlow オブジェクト
     */
    @Bean
    public IntegrationFlow sendMailFlow() {
        return f -> f
                .handle(Mail.outboundAdapter(this.mailHost)
                        .port(this.mailPort)
                        .protocol(this.mailProtocol)
                        .defaultEncoding(this.mailDefaultEncoding)
                        .javaMailProperties(p -> p.put("mail.debug", "true")));
    }

}

FlowConfig.java

■その1

package ksbysample.eipapp.messaginggateway;

import com.jcraft.jsch.ChannelSftp;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.file.filters.AcceptAllFileListFilter;
import org.springframework.integration.file.filters.IgnoreHiddenFileListFilter;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;

import java.io.File;

@Configuration
public class FlowConfig {

    private static final String PATH_SFTP_DOWNLOAD_DIR = "/in";
    private static final String PATH_LOCAL_DOWNLOAD_DIR = "C:/eipapp/ksbysample-eipapp-messaginggateway/recv";

    /**
     * SFTP サーバに接続するための SessionFactory オブジェクトを生成する
     *
     * @return SFTP サーバ接続用の SessionFactory オブジェクト
     */
    @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(22);
        factory.setUser("send01");
        factory.setPassword("send01");
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<>(factory);
    }

    /**
     * SFTP サーバにあるファイルをダウンロードした後、ファイルの内容をメールで送信して、
     * 送信したメールの内容をファイルに出力して FTP サーバにアップロードする
     *
     * @return IntegrationFlow オブジェクト
     */
    @Bean
    public IntegrationFlow sftpToMailToFtpFlow() {
        return IntegrationFlows
                // SFTP サーバの /in ディレクトリにファイルがあるか 5秒間隔でチェックする
                .from(s -> s.sftp(sftpSessionFactory())
                                .preserveTimestamp(true)
                                .deleteRemoteFiles(true)
                                .remoteDirectory(PATH_SFTP_DOWNLOAD_DIR)
                                .localDirectory(new File(PATH_LOCAL_DOWNLOAD_DIR))
                                .localFilter(new AcceptAllFileListFilter<>())
                                .localFilter(new IgnoreHiddenFileListFilter())
                        , e -> e.poller(Pollers.fixedDelay(5000)
                                .maxMessagesPerPoll(100)))
                .log()
                .<File>handle((p, h) -> {
                    p.delete();
                    return null;
                })
                .get();
    }

}

■その2

@Configuration
public class FlowConfig {

    ..........

    private static final String DOWNLOADFILEMAIL_FROM = "system@sample.com";
    private static final String DOWNLOADFILEMAIL_TO = "download@test.co.jp";

    private final MailHelperConfig.MailHelper mailHelper;

    public FlowConfig(MailHelperConfig.MailHelper mailHelper) {
        this.mailHelper = mailHelper;
    }

    ..........

    /**
     * SFTP サーバにあるファイルをダウンロードした後、ファイルの内容をメールで送信して、
     * 送信したメールの内容をファイルに出力して FTP サーバにアップロードする
     *
     * @return IntegrationFlow オブジェクト
     */
    @Bean
    public IntegrationFlow sftpToMailToFtpFlow() {
        return IntegrationFlows
                ..........
                // ファイル名とファイルの絶対パスを Message の header にセットする
                .enrichHeaders(h -> h
                        .headerExpression(FileHeaders.FILENAME, "payload.name")
                        .headerExpression(FileHeaders.ORIGINAL_FILE, "payload.absolutePath"))
                // File の内容を読み込んで payload へセットする
                .transform(Transformers.fileToString())
                // ファイルの内容をメール本文とするメールを送信する
                .<String>handle((p, h) -> {
                    this.mailHelper.send(p
                            , new MapBuilder<>()
                                    .put(MailHeaders.FROM, DOWNLOADFILEMAIL_FROM)
                                    .put(MailHeaders.TO, DOWNLOADFILEMAIL_TO)
                                    .put(MailHeaders.SUBJECT, h.get(FileHeaders.FILENAME))
                                    .get());

                    return MessageBuilder.withPayload(new File((String) h.get(FileHeaders.ORIGINAL_FILE)))
                            .setHeader(MailHeaders.TO, DOWNLOADFILEMAIL_TO)
                            .setHeader(MailHeaders.SUBJECT, h.get(FileHeaders.FILENAME))
                            .build();
                })
                .log()
                ..........
                .get();
    }

}
  • MapBuilder は org.springframework.integration.dsl.support パッケージにあるクラスです。

■その3

@Configuration
public class FlowConfig {

    ..........
    private static final String PATH_LOCAL_UPLOAD_DIR = "C:/eipapp/ksbysample-eipapp-messaginggateway/send";

    ..........

    private static final String CRLF = "\r\n";

    ..........

    /**
     * SFTP サーバにあるファイルをダウンロードした後、ファイルの内容をメールで送信して、
     * 送信したメールの内容をファイルに出力して FTP サーバにアップロードする
     *
     * @return IntegrationFlow オブジェクト
     */
    @Bean
    public IntegrationFlow sftpToMailToFtpFlow() {
        return IntegrationFlows
                ..........
                // メール送信した内容を payload にセットする
                .<String>handle((p, h) -> {
                    StringBuilder sb = new StringBuilder();
                    sb.append("To: " + h.get(MailHeaders.TO) + CRLF);
                    sb.append("Subject: " + h.get(MailHeaders.SUBJECT) + CRLF);
                    sb.append(CRLF);
                    sb.append(p);

                    return MessageBuilder.withPayload(sb.toString())
                            .build();
                })
                // /send ディレクトリの下に payload の内容を出力したファイルを生成する
                .handleWithAdapter(a -> a.fileGateway(new File(PATH_LOCAL_UPLOAD_DIR)))
                .log()
                // /recv ディレクトリの下のファイルを削除し、payload には /send ディレクトリの下の
                // ファイルを指す File オブジェクトをセットする
                .handle((p, h) -> {
                    try {
                        Files.delete(Paths.get((String) h.get(FileHeaders.ORIGINAL_FILE)));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }

                    return MessageBuilder
                            .withPayload(Paths.get(PATH_LOCAL_UPLOAD_DIR
                                    , (String) h.get(FileHeaders.FILENAME)).toFile())
                            .build();
                })
                .channel("nullChannel")
                .get();
    }

}

■その4

@Configuration
public class FlowConfig {

    ..........
    private static final String PATH_FTP_UPLOAD_DIR = "/out";

    ..........

    /**
     * FTP サーバに接続するための SessionFactory オブジェクトを生成する
     *
     * @return FTP サーバ接続用の SessionFactory オブジェクト
     */
    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
        factory.setHost("localhost");
        factory.setPort(21);
        factory.setUsername("recv01");
        factory.setPassword("recv01");
        return new CachingSessionFactory<>(factory);
    }

    /**
     * SFTP サーバにあるファイルをダウンロードした後、ファイルの内容をメールで送信して、
     * 送信したメールの内容をファイルに出力して FTP サーバにアップロードする
     *
     * @return IntegrationFlow オブジェクト
     */
    @Bean
    public IntegrationFlow sftpToMailToFtpFlow() {
        return IntegrationFlows
                ..........
                // /send ディレクトリの下に作成したファイルを FTP サーバにアップロードする
                .bridge(e -> e.advice(ftpUploadAdvice()))
                .log()
                // /send ディレクトリの下に作成したファイルを削除する
                .<File>handle((p, h) -> {
                    try {
                        Files.delete(Paths.get(p.getAbsolutePath()));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }

                    return null;
                })
                .log()
                .get();
    }

    /**
     * Message の payload にセットされた File オブジェクトが指し示すファイルを FTP サーバにアップロードする
     *
     * @return IntegrationFlow オブジェクト
     */
    @Bean
    public IntegrationFlow ftpUploadFlow() {
        return f -> f
                .handleWithAdapter(a -> a.ftp(ftpSessionFactory()).remoteDirectory(PATH_FTP_UPLOAD_DIR));
    }

    /**
     * ftpUploadFlow へ Message を送信する ExpressionEvaluatingRequestHandlerAdvice Bean
     *
     * @return ExpressionEvaluatingRequestHandlerAdvice オブジェクト
     */
    @Bean
    public Advice ftpUploadAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice
                = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpressionString("payload");
        advice.setSuccessChannelName("ftpUploadFlow.input");
        return advice;
    }

}

履歴

2017/02/25
初版発行。