かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その3 )( ディレクトリを監視してファイルが置かれたら処理→削除/移動する常駐型アプリケーションを作成する )

概要

記事一覧はこちらです。

  • Spring Integration の 14. File Support の機能を利用して、以下の処理を行う常駐型アプリケーションを作成してみます。
    • in ディレクトリを監視する。
    • in ディレクトリにファイルが置かれたら処理をする。
    • 処理の結果が正常終了ならば ( 例外が throw されなければ ) ファイルを削除し、異常終了ならば ( 例外が throw されたら ) error ディレクトリにファイルを移動する。
  • ファイルが置かれた時の処理は段階的に機能追加する予定ですが、今回はファイルの絶対パス名を出力するだけにします。
  • 今回は Channel や Endpoint を作成し、本来の Spring Integration のアプリケーションとして作成します。Spring Boot の CommandLineRunner/ApplicationRunner インターフェースは使用しません。
  • Spring Integration 関連の実装は XML ファイルは使用せず、全て Java Config で行います。
  • 実行可能 jar を作成しての動作確認は行いません。動作確認は IntelliJ IDEA 上からのみ行います。

参照したサイト・書籍

  1. Spring Integration Reference Manual - 14. File Support
    http://docs.spring.io/spring-integration/docs/4.3.1.RELEASE/reference/html/files.html

  2. Spring Integration Reference Manual - 4. Messaging Channels
    http://docs.spring.io/spring-integration/reference/html/messaging-channels-section.html

  3. Spring Integration: Jdbc-inbound-adapter is it transactional when using adice-chain
    http://stackoverflow.com/questions/22996733/spring-integration-jdbc-inbound-adapter-is-it-transactional-when-using-adice-ch

    • Transaction を使用する場合には QueueChannel は使用できないとのこと ( メッセージが Queue に格納されたらすぐに commit されるため )。
  4. Spring Boot と Spring Integration を使用して RSS を取得するサンプル
    http://qiita.com/sunny4381/items/10525adc08b13178c057

  5. Spring Boot と Spring Integration を使用したクローラ
    http://qiita.com/sunny4381/items/94d3ddec57ec88cd7af3

    • 上の2つは Spring Boot + Spring Integration で Java Config で実装する時の参考にさせていただきました。
  6. Spring Integration Reference Manual - Appendix C. Transaction Support
    http://docs.spring.io/spring-integration/reference/html/transactions.html

  7. Deleting file after FileReadingMessageSource consumed it
    http://stackoverflow.com/questions/30363210/deleting-file-after-filereadingmessagesource-consumed-it

    • 処理終了後にファイルを削除する方法を参考にしました。
  8. SourcePollingChannelAdapter with Transaction
    http://stackoverflow.com/questions/19423160/sourcepollingchanneladapter-with-transaction

    • int:transaction-synchronization-factory の after-commit の expression を Java Config で実装する方法を参考にしました。
  9. JDBC Spring Integration with Annotations
    http://stackoverflow.com/questions/27247013/jdbc-spring-integration-with-annotations

    • poller に transactional の設定を Java Config で登録する方法を参考にしました。

目次

  1. 作成する常駐型アプリケーションの仕様
  2. ksbysample-eipapp-dirchecker プロジェクトを作成する
  3. in、error ディレクトリを作成する
  4. Channel を作成する
  5. ディレクトリを監視する Inbound Channel Adapter を作成する
  6. ファイルの絶対パス名を出力する Service Activator を作成する
  7. ここまでの実装で動作確認する
  8. 正常/異常終了時の削除/移動処理を実装する
  9. 動作確認する
  10. 感想

手順

作成する常駐型アプリケーションの仕様

  • Spring Integration の Channel や Endpoint を利用した常駐型アプリケーションとして実装します。
  • アプリケーション名は ksbysample-eipapp-dirchecker にします。
  • 実行するディレクトリの構成は以下の想定です。
C:\eipapp\ksbysample-eipapp-dirchecker
└ data
   ├ in     ← このディレクトリにファイルが配置されたかを監視します
   └ error  ← 異常終了時はこのディレクトリにファイルを移動します
  • 以下の処理で実装します。
    1. Inbound Channel Adapter により 1秒間隔で in ディレクトリを監視します。
    2. ファイルが置かれたら inChannel にファイルを送ります。
    3. inChannel にファイルが置かれたら Service Activator でファイルの絶対パス名を標準出力に出力します。
    4. 正常終了したらファイルを削除し、異常終了したらファイルを error ディレクトリに移動します。
  • ファイルの絶対パス名を標準出力に出力するだけではさすがに異常終了することはないので、異常終了はソース内で例外を throw して試します。

ksbysample-eipapp-dirchecker プロジェクトを作成する

  1. feature/4-issue ブランチを作成します。

  2. IntelliJ IDEA の「Welcome to IntelliJ IDEA」ダイアログを表示した後、「Create New Project」をクリックします。

    f:id:ksby:20160821173404p:plain

  3. 「New Project」ダイアログが表示されます。画面左側で「Gradle」を選択した後、画面右側は何も変更せずに「Next」ボタンをクリックします。

    f:id:ksby:20160821174432p:plain

  4. GroupId、ArtifactId を入力する画面が表示されます。以下の画像の文字列を入力した後、「Next」ボタンをクリックします。

    f:id:ksby:20160821174523p:plain

  5. 次の画面が表示されます。「Create directories for empty content roots automatically」をチェックした後、「Next」ボタンをクリックします。

    f:id:ksby:20160821174611p:plain

  6. Project name、Project location を入力する画面が表示されます。以下の画像の文字列を入力した後、「Finish」ボタンをクリックします。

    f:id:ksby:20160821174727p:plain

  7. build.gradle を リンク先の内容 に変更します。

  8. Gradle projects View の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。

  9. src/main/java の下に ksbysample.eipapp.dirchecker パッケージを作成します。

  10. src/main/java/ksbysample/eipapp/dirchecker の下に Application.java を新規作成します。作成後、リンク先の内容 に変更します。

  11. 以下のディレクトリの下に .gitkeep ファイルを作成します。

    • src/main/groovy
    • src/main/resources
    • src/test/groovy
    • src/test/java
    • src/test/resources
  12. この時点で Project View は以下の状態になります。

    f:id:ksby:20160821181147p:plain

  13. commit&push します。

in、error ディレクトリを作成する

以下の階層のディレクトリを作成します。

C:\eipapp\ksbysample-eipapp-dirchecker
└ data
   ├ in
   └ error

Channel を作成する

  1. src/main/java/ksbysample/eipapp/dirchecker の下に eip.channel パッケージを作成します。

  2. ksbysample/eipapp/dirchecker/config の下に ChannelConfig.java を作成します。作成後、リンク先の内容 の内容に変更します。

ディレクトリを監視する Inbound Channel Adapter を作成する

  1. src/main/java/ksbysample/eipapp/dirchecker/eip の下に endpoint パッケージを作成します。

  2. src/main/java/ksbysample/eipapp/dirchecker/eip/endpoint の下に InDirChecker.java を作成します。作成後、リンク先のその1の内容 の内容に変更します。

ファイルの絶対パス名を出力する Service Activator を作成する

  1. src/main/java/ksbysample/eipapp/dirchecker/eip/endpoint の下に FileProcessor.java を作成します。作成後、リンク先の内容 の内容に変更します。

ここまでの実装で動作確認する

ここまでの実装で in ディレクトリにファイルを置いたらファイルの絶対パス名が標準出力に出力されるようになっていますので、確認します。

  1. Gradle projects View から bootRun を実行します。

    f:id:ksby:20160822035458p:plain

  2. C:\eipapp\ksbysample-eipapp-dirchecker\data\in に「テストファイル1.txt」というファイルを配置します。

    f:id:ksby:20160822035707p:plain ↓↓↓ f:id:ksby:20160822035826p:plain

  3. 「テストファイル1.txt」の絶対パス名が出力されることが確認できます。

    f:id:ksby:20160822040026p:plain

  4. C:\eipapp\ksbysample-eipapp-dirchecker\data\in に「テストファイル2.txt」というファイルを配置します。

    f:id:ksby:20160822040319p:plain

  5. 「テストファイル2.txt」の絶対パス名が出力されることが確認できます。また「テストファイル1.txt」は in ディレクトリに置いたままですが、AcceptOnceFileListFilter の効果により1度しか出力されていません。

    f:id:ksby:20160822040447p:plain

  6. Ctrl+F2 を押して Tomcat を停止し、C:\eipapp\ksbysample-eipapp-dirchecker\data\in に置いたファイルは全て削除します。

  7. ここまでで一旦 commit&push します。

正常/異常終了時の削除/移動処理を実装する

  1. src/main/java/ksbysample/eipapp/dirchecker/eip の下に config パッケージを作成します。

  2. ksbysample/eipapp/dirchecker/eip/config の下に TransactionConfig.java を作成します。作成後、リンク先の内容 に変更します。

  3. src/main/java/ksbysample/eipapp/dirchecker/eip/endpoint の下の InDirChecker.javaリンク先のその2の内容 の内容に変更します。

動作確認する

正常終了時にはファイルが削除され、異常終了時にはファイルが error ディレクトリに移動することを確認します。まずは正常終了時の場合です。

  1. Gradle projects View から bootRun を実行します。

    f:id:ksby:20160823005650p:plain

  2. C:\eipapp\ksbysample-eipapp-dirchecker\data\in に「テストファイル1.txt」というファイルを配置すると、すぐにファイルがなくなりました。

    f:id:ksby:20160823004519p:plain ↓↓↓ f:id:ksby:20160823004622p:plain ↓↓↓ f:id:ksby:20160823004809p:plain

  3. コンソールに「テストファイル1.txt」の絶対パス名が出力されることが確認できます。

    f:id:ksby:20160823010030p:plain

  4. C:\eipapp\ksbysample-eipapp-dirchecker\data\in に「テストファイル2.txt」というファイルを配置すると、こちらもすぐにファイルがなくなりました。

    f:id:ksby:20160823005156p:plain ↓↓↓ f:id:ksby:20160823010754p:plain

  5. コンソールには「テストファイル2.txt」の絶対パス名が出力されることが確認できます。

    f:id:ksby:20160823010922p:plain

  6. Ctrl+F2 を押して Tomcat を停止します。

次に異常終了時の場合です。FileProcessor クラスを以下のように変更します。

@MessageEndpoint
public class FileProcessor {

    @ServiceActivator(inputChannel = "inChannel")
    public void process(Message<File> message) throws Exception {
        File file = message.getPayload();
if (true) {
    throw new RuntimeException("Error !!");
}
        System.out.println(file.getAbsolutePath());
    }

}
  1. Gradle projects View から bootRun を実行します。

  2. C:\eipapp\ksbysample-eipapp-dirchecker\data\in に「テストファイル1.txt」というファイルを配置すると、error ディレクトリに移動しました。

    f:id:ksby:20160823012842p:plain ↓↓↓ f:id:ksby:20160823012947p:plain

  3. コンソールを見ると Error !! の文字が出力されていることが確認できます。

    f:id:ksby:20160823013111p:plain

  4. 次に C:\eipapp\ksbysample-eipapp-dirchecker\data\in に「テストファイル2.txt」というファイルを配置すると、こちらも error ディレクトリに移動しました。

    f:id:ksby:20160823013258p:plain ↓↓↓ f:id:ksby:20160823013355p:plain

  5. Ctrl+F2 を押して Tomcat を停止します。

  6. FileProcessor クラスの修正内容を元に戻します。

  7. ここまでで commit&push します。

感想

  • Spring Integration は Channel, Endpoint や Spring Integration が提供する File Support 等のライブラリが理解できてくると、いろいろ面白いことが出来て何気に楽しいです。
  • 日本語のドキュメントはほとんどありません。今回もほぼ英語のドキュメントか英語版の stackoverflow を見ていました。
  • 英語のドキュメントも XML を利用する例が圧倒的に多くて、まだ Java Config で実装する例は少なそうなんですよね。Java Config でやろうとしたので、かなり苦労しました。今回実装したような内容だと XML では簡単に書けるので、何度 XML でいいんじゃないかなと思ったことか。。。 
  • Spring Integration をやると SpEL を利用する箇所がよく出てくるのですが、自分はまだ SpEL をよく理解していないなということを実感させられます。payload.delete() とか stackoverflow の例を見て書いていますが、実はよく分かっていないです ( たぶん応用ができません )。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.3.7.RELEASE'
    }
    repositories {
        jcenter()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.0.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'spring-boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

compileJava.options.compilerArgs = ['-Xlint:all']
compileTestGroovy.options.compilerArgs = ['-Xlint:all']
compileTestJava.options.compilerArgs = ['-Xlint:all']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    jcenter()
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:2.0.7.RELEASE'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile('org.springframework.boot:spring-boot-starter-integration')
    compile('org.springframework.integration:spring-integration-file')
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core") { exclude module: "groovy-all" }
    testCompile("org.spockframework:spock-spring") { exclude module: "groovy-all" }

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.projectlombok:lombok:1.16.10")
    compile("org.apache.commons:commons-lang3:3.4")
}
  • Spring Boot で Spring Integration を利用するために compile('org.springframework.boot:spring-boot-starter-integration') を追加します。
  • Spring Integration の File Support 機能を利用するために compile('org.springframework.integration:spring-integration-file') を追加します。

Application.java

package ksbysample.eipapp.dirchecker;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

ChannelConfig.java

package ksbysample.eipapp.dirchecker.eip.channel;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;

@Configuration
public class ChannelConfig {

    @Bean
    public MessageChannel inChannel() {
        return new DirectChannel();
    }

}
  • “inChannel” という名前の Channel を作成します。Channel 名をメソッド名とする Bean を作成します。
  • Inbound Channel Adapter –> Service Activator へデータを渡すだけなので、タイプは DirectChannel にします。inChannel メソッド内で DirectChannel クラスのインスタンスを生成します。
  • 今回は Inbound Channel Adapter に設定する poller で Transaction を有効にするため、QueueChannel のようなキューイングするタイプの Channel は使用できません ( Spring Integration: Jdbc-inbound-adapter is it transactional when using adice-chain 参照 )。
  • Channel の説明については、4. Messaging Channels を参照してください。

InDirChecker.java

■その1

package ksbysample.eipapp.dirchecker.eip.endpoint;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.CompositeFileListFilter;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.Message;
import org.springframework.scheduling.support.PeriodicTrigger;

import java.io.File;
import java.nio.file.Paths;

@MessageEndpoint
public class InDirChecker {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private static final String IN_DIR_PATH = "C:\\eipapp\\ksbysample-eipapp-dirchecker\\data\\in";

    @Autowired
    private FileReadingMessageSource inDirFileReadingMessageSource;

    @Bean
    public FileReadingMessageSource inDirFileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(Paths.get(IN_DIR_PATH).toFile());

        // 動作確認用に一時的に AcceptOnceFileListFilter をセットする
        // ※AcceptOnceFileListFilter は同一のファイル名、タイムスタンプのファイルを
        //   1度のみ処理するための Filter である
        CompositeFileListFilter<File> filter = new CompositeFileListFilter<>();
        filter.addFilter(new AcceptOnceFileListFilter<File>());
        source.setFilter(filter);

        return source;
    }

    @Bean
    public PollerMetadata checkFilePoller() {
        PeriodicTrigger trigger = new PeriodicTrigger(1000);
        trigger.setFixedRate(true);
        PollerMetadata poller = new PollerMetadata();
        poller.setTrigger(trigger);
        return poller;
    }

    @InboundChannelAdapter(value = "inChannel", poller = @Poller("checkFilePoller"))
    public Message<File> checkFile() {
        return inDirFileReadingMessageSource.receive();
    }

}
  • クラスに @MessageEndpoint アノテーションを付加します。
  • in ディレクトリにファイルが配置された時にファイルを読み込むための inDirFileReadingMessageSource Bean を作成します。
    • FileReadingMessageSource クラスのインスタンスを生成し、処理対象のディレクトリをセットします。
    • 「正常処理時にファイルを削除する処理」を実装する前に1度動作確認を行いたいので、AcceptOnceFileListFilter をセットしておきます。これをセットしておかないと、配置されたファイルをずっと inChannel に送信し続けます。
  • Inbound Channel Adapter でポーリング処理を行うための checkFilePoller Bean を作成します。
    • 基本は PollerMetadata クラスのインスタンスを生成して返す処理です。
    • 1秒間隔で実行するために、生成した PollerMetadata クラスのインスタンスに PeriodicTrigger クラスのインスタンスを生成してセットします。
  • 最後に @InboundChannelAdapter アノテーションを付加した checkFile メソッドを作成します。
    • @InboundChannelAdapter アノテーションの value 属性に読み込んだデータを送信する Channel を ( 今回は inChannel )、poller 属性にポーリング処理を行うための設定を指定します。poller 属性については checkFilePoller Bean を生成しているので、@Poller("Bean名") の形式で指定します。
    • メソッド内では inDirFileReadingMessageSource.receive() を呼び出します。
  • 以上の実装で 1秒間隔で in ディレクトリをチェックし、ファイルがあれば inChannel に Message 型のデータを生成して送信するようになります。

■その2

package ksbysample.eipapp.dirchecker.eip.endpoint;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.transaction.DefaultTransactionSynchronizationFactory;
import org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor;
import org.springframework.integration.transaction.PseudoTransactionManager;
import org.springframework.integration.transaction.TransactionSynchronizationFactory;
import org.springframework.messaging.Message;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.MatchAlwaysTransactionAttributeSource;
import org.springframework.transaction.interceptor.TransactionInterceptor;

import java.io.File;
import java.nio.file.Paths;
import java.util.Collections;

@MessageEndpoint
public class InDirChecker {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private static final String DATA_DIR_PATH = "C:\\eipapp\\ksbysample-eipapp-dirchecker\\data";
    private static final String IN_DIR_PATH = DATA_DIR_PATH + "\\in";
    private static final String ERROR_DIR_PATH = DATA_DIR_PATH + "\\error";

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private NullChannel nullChannel;

    @Autowired
    private PseudoTransactionManager pseudoTransactionManager;

    @Autowired
    private FileReadingMessageSource inDirFileReadingMessageSource;

    @Bean
    public TransactionSynchronizationFactory checkFilePollerSyncFactory() {
        ExpressionParser parser = new SpelExpressionParser();
        ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor
                = new ExpressionEvaluatingTransactionSynchronizationProcessor();
        syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
        syncProcessor.setAfterCommitExpression(parser.parseExpression("payload.delete()"));
        syncProcessor.setAfterCommitChannel(nullChannel);
        syncProcessor.setAfterRollbackExpression(
                parser.parseExpression("payload.renameTo('" + ERROR_DIR_PATH + "\\' + payload.name)"));
        syncProcessor.setBeforeCommitChannel(nullChannel);
        return new DefaultTransactionSynchronizationFactory(syncProcessor);
    }

    @Bean
    public FileReadingMessageSource inDirFileReadingMessageSource() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(Paths.get(IN_DIR_PATH).toFile());
        return source;
    }

    @Bean
    public PollerMetadata checkFilePoller() {
        PeriodicTrigger trigger = new PeriodicTrigger(1000);
        trigger.setFixedRate(true);
        PollerMetadata poller = new PollerMetadata();
        poller.setTrigger(trigger);
        poller.setTransactionSynchronizationFactory(checkFilePollerSyncFactory());

        MatchAlwaysTransactionAttributeSource matchAlwaysTransactionAttributeSource = new MatchAlwaysTransactionAttributeSource();
        matchAlwaysTransactionAttributeSource.setTransactionAttribute(new DefaultTransactionAttribute());
        TransactionInterceptor txAdvice =
                new TransactionInterceptor(pseudoTransactionManager, matchAlwaysTransactionAttributeSource);
        poller.setAdviceChain(Collections.singletonList(txAdvice));

        return poller;
    }

    @InboundChannelAdapter(value = "inChannel", poller = @Poller("checkFilePoller"))
    public Message<File> checkFile() {
        return inDirFileReadingMessageSource.receive();
    }

}
  • 以下の @Autowired アノテーション付きのフィールドを追加します。
    • private ApplicationContext applicationContext;
    • private NullChannel nullChannel;
    • private PseudoTransactionManager pseudoTransactionManager;
  • checkFilePollerSyncFactory Bean を作成します。正常終了時の処理 ( AfterCommit )、異状終了時の処理 ( AfterRollback ) を SpEL で定義します。
  • inDirFileReadingMessageSource Bean 内で AcceptOnceFileListFilter を設定していた部分を削除します。今回の実装をすると処理されたファイルは削除か移動されるので、AcceptOnceFileListFilter は必要なくなります。
  • checkFilePoller Bean に以下の処理を追加します。
    • poller.setTransactionSynchronizationFactory(checkFilePollerSyncFactory()); を追加します。
    • pseudoTransactionManager を poller.setAdviceChain(...) で設定する処理を追加します。

FileProcessor.java

package ksbysample.eipapp.dirchecker.eip.endpoint;

import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;

import java.io.File;

@MessageEndpoint
public class FileProcessor {

    @ServiceActivator(inputChannel = "inChannel")
    public void process(Message<File> message) throws Exception {
        File file = message.getPayload();
        System.out.println(file.getAbsolutePath());
    }

}
  • クラスに @MessageEndpoint アノテーションを付加します。
  • @ServiceActivator アノテーションを付加した process メソッドを作成します。@ServiceActivator アノテーションの inputChannel 属性には読み込む元の Channel を指定します。指定した Channel にデータが来ると process メソッドが実行されます。

TransactionConfig.java

package ksbysample.eipapp.dirchecker.eip.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.transaction.PseudoTransactionManager;

@Configuration
public class TransactionConfig {

    @Bean
    public PseudoTransactionManager pseudoTransactionManager() {
        return new PseudoTransactionManager();
    }

}
  • FileReadingMessageSource を利用した Inbound Channel Adapter で正常/異常終了時の処理を実装するためには PseudoTransactionManager を使用します。上記のように pseudoTransactionManager Bean を作成します。Appendix C. Transaction Support 参照。

履歴

2016/08/23
初版発行。