Spring Boot + Spring Integration でいろいろ試してみる ( その3 )( ディレクトリを監視してファイルが置かれたら処理→削除/移動する常駐型アプリケーションを作成する )
概要
記事一覧はこちらです。
- Spring Integration の 14. File Support の機能を利用して、以下の処理を行う常駐型アプリケーションを作成してみます。
- ファイルが置かれた時の処理は段階的に機能追加する予定ですが、今回はファイルの絶対パス名を出力するだけにします。
- 今回は Channel や Endpoint を作成し、本来の Spring Integration のアプリケーションとして作成します。Spring Boot の CommandLineRunner/ApplicationRunner インターフェースは使用しません。
- Spring Integration 関連の実装は XML ファイルは使用せず、全て Java Config で行います。
- 実行可能 jar を作成しての動作確認は行いません。動作確認は IntelliJ IDEA 上からのみ行います。
参照したサイト・書籍
Spring Integration Reference Manual - 14. File Support
http://docs.spring.io/spring-integration/docs/4.3.1.RELEASE/reference/html/files.htmlSpring Integration Reference Manual - 4. Messaging Channels
http://docs.spring.io/spring-integration/reference/html/messaging-channels-section.htmlSpring Integration: Jdbc-inbound-adapter is it transactional when using adice-chain
http://stackoverflow.com/questions/22996733/spring-integration-jdbc-inbound-adapter-is-it-transactional-when-using-adice-ch- Transaction を使用する場合には QueueChannel は使用できないとのこと ( メッセージが Queue に格納されたらすぐに commit されるため )。
Spring Boot と Spring Integration を使用して RSS を取得するサンプル
http://qiita.com/sunny4381/items/10525adc08b13178c057Spring Boot と Spring Integration を使用したクローラ
http://qiita.com/sunny4381/items/94d3ddec57ec88cd7af3- 上の2つは Spring Boot + Spring Integration で Java Config で実装する時の参考にさせていただきました。
Spring Integration Reference Manual - Appendix C. Transaction Support
http://docs.spring.io/spring-integration/reference/html/transactions.htmlDeleting file after FileReadingMessageSource consumed it
http://stackoverflow.com/questions/30363210/deleting-file-after-filereadingmessagesource-consumed-it- 処理終了後にファイルを削除する方法を参考にしました。
SourcePollingChannelAdapter with Transaction
http://stackoverflow.com/questions/19423160/sourcepollingchanneladapter-with-transaction- int:transaction-synchronization-factory の after-commit の expression を Java Config で実装する方法を参考にしました。
JDBC Spring Integration with Annotations
http://stackoverflow.com/questions/27247013/jdbc-spring-integration-with-annotations- poller に transactional の設定を Java Config で登録する方法を参考にしました。
目次
- 作成する常駐型アプリケーションの仕様
- ksbysample-eipapp-dirchecker プロジェクトを作成する
- in、error ディレクトリを作成する
- Channel を作成する
- ディレクトリを監視する Inbound Channel Adapter を作成する
- ファイルの絶対パス名を出力する Service Activator を作成する
- ここまでの実装で動作確認する
- 正常/異常終了時の削除/移動処理を実装する
- 動作確認する
- 感想
手順
作成する常駐型アプリケーションの仕様
- Spring Integration の Channel や Endpoint を利用した常駐型アプリケーションとして実装します。
- アプリケーション名は ksbysample-eipapp-dirchecker にします。
- 実行するディレクトリの構成は以下の想定です。
C:\eipapp\ksbysample-eipapp-dirchecker └ data ├ in ← このディレクトリにファイルが配置されたかを監視します └ error ← 異常終了時はこのディレクトリにファイルを移動します
- 以下の処理で実装します。
- ファイルの絶対パス名を標準出力に出力するだけではさすがに異常終了することはないので、異常終了はソース内で例外を throw して試します。
ksbysample-eipapp-dirchecker プロジェクトを作成する
feature/4-issue ブランチを作成します。
IntelliJ IDEA の「Welcome to IntelliJ IDEA」ダイアログを表示した後、「Create New Project」をクリックします。
「New Project」ダイアログが表示されます。画面左側で「Gradle」を選択した後、画面右側は何も変更せずに「Next」ボタンをクリックします。
GroupId、ArtifactId を入力する画面が表示されます。以下の画像の文字列を入力した後、「Next」ボタンをクリックします。
次の画面が表示されます。「Create directories for empty content roots automatically」をチェックした後、「Next」ボタンをクリックします。
Project name、Project location を入力する画面が表示されます。以下の画像の文字列を入力した後、「Finish」ボタンをクリックします。
build.gradle を リンク先の内容 に変更します。
Gradle projects View の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。
src/main/java の下に ksbysample.eipapp.dirchecker パッケージを作成します。
src/main/java/ksbysample/eipapp/dirchecker の下に Application.java を新規作成します。作成後、リンク先の内容 に変更します。
以下のディレクトリの下に .gitkeep ファイルを作成します。
- src/main/groovy
- src/main/resources
- src/test/groovy
- src/test/java
- src/test/resources
この時点で Project View は以下の状態になります。
commit&push します。
in、error ディレクトリを作成する
以下の階層のディレクトリを作成します。
C:\eipapp\ksbysample-eipapp-dirchecker └ data ├ in └ error
Channel を作成する
src/main/java/ksbysample/eipapp/dirchecker の下に eip.channel パッケージを作成します。
ksbysample/eipapp/dirchecker/config の下に ChannelConfig.java を作成します。作成後、リンク先の内容 の内容に変更します。
ディレクトリを監視する Inbound Channel Adapter を作成する
src/main/java/ksbysample/eipapp/dirchecker/eip の下に endpoint パッケージを作成します。
src/main/java/ksbysample/eipapp/dirchecker/eip/endpoint の下に InDirChecker.java を作成します。作成後、リンク先のその1の内容 の内容に変更します。
ファイルの絶対パス名を出力する Service Activator を作成する
- src/main/java/ksbysample/eipapp/dirchecker/eip/endpoint の下に FileProcessor.java を作成します。作成後、リンク先の内容 の内容に変更します。
ここまでの実装で動作確認する
ここまでの実装で in ディレクトリにファイルを置いたらファイルの絶対パス名が標準出力に出力されるようになっていますので、確認します。
Gradle projects View から bootRun を実行します。
C:\eipapp\ksbysample-eipapp-dirchecker\data\in に「テストファイル1.txt」というファイルを配置します。
↓↓↓
「テストファイル1.txt」の絶対パス名が出力されることが確認できます。
C:\eipapp\ksbysample-eipapp-dirchecker\data\in に「テストファイル2.txt」というファイルを配置します。
「テストファイル2.txt」の絶対パス名が出力されることが確認できます。また「テストファイル1.txt」は in ディレクトリに置いたままですが、AcceptOnceFileListFilter の効果により1度しか出力されていません。
Ctrl+F2 を押して Tomcat を停止し、C:\eipapp\ksbysample-eipapp-dirchecker\data\in に置いたファイルは全て削除します。
ここまでで一旦 commit&push します。
正常/異常終了時の削除/移動処理を実装する
src/main/java/ksbysample/eipapp/dirchecker/eip の下に config パッケージを作成します。
ksbysample/eipapp/dirchecker/eip/config の下に TransactionConfig.java を作成します。作成後、リンク先の内容 に変更します。
src/main/java/ksbysample/eipapp/dirchecker/eip/endpoint の下の InDirChecker.java を リンク先のその2の内容 の内容に変更します。
動作確認する
正常終了時にはファイルが削除され、異常終了時にはファイルが error ディレクトリに移動することを確認します。まずは正常終了時の場合です。
Gradle projects View から bootRun を実行します。
C:\eipapp\ksbysample-eipapp-dirchecker\data\in に「テストファイル1.txt」というファイルを配置すると、すぐにファイルがなくなりました。
↓↓↓ ↓↓↓
コンソールに「テストファイル1.txt」の絶対パス名が出力されることが確認できます。
C:\eipapp\ksbysample-eipapp-dirchecker\data\in に「テストファイル2.txt」というファイルを配置すると、こちらもすぐにファイルがなくなりました。
↓↓↓
コンソールには「テストファイル2.txt」の絶対パス名が出力されることが確認できます。
Ctrl+F2 を押して Tomcat を停止します。
次に異常終了時の場合です。FileProcessor クラスを以下のように変更します。
@MessageEndpoint public class FileProcessor { @ServiceActivator(inputChannel = "inChannel") public void process(Message<File> message) throws Exception { File file = message.getPayload(); if (true) { throw new RuntimeException("Error !!"); } System.out.println(file.getAbsolutePath()); } }
Gradle projects View から bootRun を実行します。
C:\eipapp\ksbysample-eipapp-dirchecker\data\in に「テストファイル1.txt」というファイルを配置すると、error ディレクトリに移動しました。
↓↓↓
コンソールを見ると Error !! の文字が出力されていることが確認できます。
次に C:\eipapp\ksbysample-eipapp-dirchecker\data\in に「テストファイル2.txt」というファイルを配置すると、こちらも error ディレクトリに移動しました。
↓↓↓
Ctrl+F2 を押して Tomcat を停止します。
FileProcessor クラスの修正内容を元に戻します。
ここまでで commit&push します。
感想
- Spring Integration は Channel, Endpoint や Spring Integration が提供する File Support 等のライブラリが理解できてくると、いろいろ面白いことが出来て何気に楽しいです。
- 日本語のドキュメントはほとんどありません。今回もほぼ英語のドキュメントか英語版の stackoverflow を見ていました。
- 英語のドキュメントも XML を利用する例が圧倒的に多くて、まだ Java Config で実装する例は少なそうなんですよね。Java Config でやろうとしたので、かなり苦労しました。今回実装したような内容だと XML では簡単に書けるので、何度 XML でいいんじゃないかなと思ったことか。。。
- Spring Integration をやると SpEL を利用する箇所がよく出てくるのですが、自分はまだ SpEL をよく理解していないなということを実感させられます。
payload.delete()
とか stackoverflow の例を見て書いていますが、実はよく分かっていないです ( たぶん応用ができません )。
ソースコード
build.gradle
group 'ksbysample' version '1.0.0-RELEASE' buildscript { ext { springBootVersion = '1.3.7.RELEASE' } repositories { jcenter() maven { url "http://repo.spring.io/repo/" } } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") classpath("io.spring.gradle:dependency-management-plugin:0.6.0.RELEASE") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'spring-boot' apply plugin: 'io.spring.dependency-management' apply plugin: 'groovy' sourceCompatibility = 1.8 targetCompatibility = 1.8 compileJava.options.compilerArgs = ['-Xlint:all'] compileTestGroovy.options.compilerArgs = ['-Xlint:all'] compileTestJava.options.compilerArgs = ['-Xlint:all'] eclipse { classpath { containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER') containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8' } } idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } repositories { jcenter() } dependencyManagement { imports { mavenBom 'io.spring.platform:platform-bom:2.0.7.RELEASE' } } dependencies { // dependency-management-plugin によりバージョン番号が自動で設定されるもの // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照 compile('org.springframework.boot:spring-boot-starter-integration') compile('org.springframework.integration:spring-integration-file') testCompile("org.springframework.boot:spring-boot-starter-test") testCompile("org.spockframework:spock-core") { exclude module: "groovy-all" } testCompile("org.spockframework:spock-spring") { exclude module: "groovy-all" } // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの compile("org.projectlombok:lombok:1.16.10") compile("org.apache.commons:commons-lang3:3.4") }
- Spring Boot で Spring Integration を利用するために
compile('org.springframework.boot:spring-boot-starter-integration')
を追加します。 - Spring Integration の File Support 機能を利用するために
compile('org.springframework.integration:spring-integration-file')
を追加します。
Application.java
package ksbysample.eipapp.dirchecker; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
ChannelConfig.java
package ksbysample.eipapp.dirchecker.eip.channel; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.channel.DirectChannel; import org.springframework.messaging.MessageChannel; @Configuration public class ChannelConfig { @Bean public MessageChannel inChannel() { return new DirectChannel(); } }
- “inChannel” という名前の Channel を作成します。Channel 名をメソッド名とする Bean を作成します。
- Inbound Channel Adapter –> Service Activator へデータを渡すだけなので、タイプは DirectChannel にします。inChannel メソッド内で DirectChannel クラスのインスタンスを生成します。
- 今回は Inbound Channel Adapter に設定する poller で Transaction を有効にするため、QueueChannel のようなキューイングするタイプの Channel は使用できません ( Spring Integration: Jdbc-inbound-adapter is it transactional when using adice-chain 参照 )。
- Channel の説明については、4. Messaging Channels を参照してください。
InDirChecker.java
■その1
package ksbysample.eipapp.dirchecker.eip.endpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.Poller; import org.springframework.integration.file.FileReadingMessageSource; import org.springframework.integration.file.filters.AcceptOnceFileListFilter; import org.springframework.integration.file.filters.CompositeFileListFilter; import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.messaging.Message; import org.springframework.scheduling.support.PeriodicTrigger; import java.io.File; import java.nio.file.Paths; @MessageEndpoint public class InDirChecker { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private static final String IN_DIR_PATH = "C:\\eipapp\\ksbysample-eipapp-dirchecker\\data\\in"; @Autowired private FileReadingMessageSource inDirFileReadingMessageSource; @Bean public FileReadingMessageSource inDirFileReadingMessageSource() { FileReadingMessageSource source = new FileReadingMessageSource(); source.setDirectory(Paths.get(IN_DIR_PATH).toFile()); // 動作確認用に一時的に AcceptOnceFileListFilter をセットする // ※AcceptOnceFileListFilter は同一のファイル名、タイムスタンプのファイルを // 1度のみ処理するための Filter である CompositeFileListFilter<File> filter = new CompositeFileListFilter<>(); filter.addFilter(new AcceptOnceFileListFilter<File>()); source.setFilter(filter); return source; } @Bean public PollerMetadata checkFilePoller() { PeriodicTrigger trigger = new PeriodicTrigger(1000); trigger.setFixedRate(true); PollerMetadata poller = new PollerMetadata(); poller.setTrigger(trigger); return poller; } @InboundChannelAdapter(value = "inChannel", poller = @Poller("checkFilePoller")) public Message<File> checkFile() { return inDirFileReadingMessageSource.receive(); } }
- クラスに @MessageEndpoint アノテーションを付加します。
- in ディレクトリにファイルが配置された時にファイルを読み込むための inDirFileReadingMessageSource Bean を作成します。
- Inbound Channel Adapter でポーリング処理を行うための checkFilePoller Bean を作成します。
- 最後に @InboundChannelAdapter アノテーションを付加した checkFile メソッドを作成します。
- 以上の実装で 1秒間隔で in ディレクトリをチェックし、ファイルがあれば inChannel に Message
型のデータを生成して送信するようになります。
■その2
package ksbysample.eipapp.dirchecker.eip.endpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.Poller; import org.springframework.integration.channel.NullChannel; import org.springframework.integration.file.FileReadingMessageSource; import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.transaction.DefaultTransactionSynchronizationFactory; import org.springframework.integration.transaction.ExpressionEvaluatingTransactionSynchronizationProcessor; import org.springframework.integration.transaction.PseudoTransactionManager; import org.springframework.integration.transaction.TransactionSynchronizationFactory; import org.springframework.messaging.Message; import org.springframework.scheduling.support.PeriodicTrigger; import org.springframework.transaction.interceptor.DefaultTransactionAttribute; import org.springframework.transaction.interceptor.MatchAlwaysTransactionAttributeSource; import org.springframework.transaction.interceptor.TransactionInterceptor; import java.io.File; import java.nio.file.Paths; import java.util.Collections; @MessageEndpoint public class InDirChecker { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private static final String DATA_DIR_PATH = "C:\\eipapp\\ksbysample-eipapp-dirchecker\\data"; private static final String IN_DIR_PATH = DATA_DIR_PATH + "\\in"; private static final String ERROR_DIR_PATH = DATA_DIR_PATH + "\\error"; @Autowired private ApplicationContext applicationContext; @Autowired private NullChannel nullChannel; @Autowired private PseudoTransactionManager pseudoTransactionManager; @Autowired private FileReadingMessageSource inDirFileReadingMessageSource; @Bean public TransactionSynchronizationFactory checkFilePollerSyncFactory() { ExpressionParser parser = new SpelExpressionParser(); ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor(); syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory()); syncProcessor.setAfterCommitExpression(parser.parseExpression("payload.delete()")); syncProcessor.setAfterCommitChannel(nullChannel); syncProcessor.setAfterRollbackExpression( parser.parseExpression("payload.renameTo('" + ERROR_DIR_PATH + "\\' + payload.name)")); syncProcessor.setBeforeCommitChannel(nullChannel); return new DefaultTransactionSynchronizationFactory(syncProcessor); } @Bean public FileReadingMessageSource inDirFileReadingMessageSource() { FileReadingMessageSource source = new FileReadingMessageSource(); source.setDirectory(Paths.get(IN_DIR_PATH).toFile()); return source; } @Bean public PollerMetadata checkFilePoller() { PeriodicTrigger trigger = new PeriodicTrigger(1000); trigger.setFixedRate(true); PollerMetadata poller = new PollerMetadata(); poller.setTrigger(trigger); poller.setTransactionSynchronizationFactory(checkFilePollerSyncFactory()); MatchAlwaysTransactionAttributeSource matchAlwaysTransactionAttributeSource = new MatchAlwaysTransactionAttributeSource(); matchAlwaysTransactionAttributeSource.setTransactionAttribute(new DefaultTransactionAttribute()); TransactionInterceptor txAdvice = new TransactionInterceptor(pseudoTransactionManager, matchAlwaysTransactionAttributeSource); poller.setAdviceChain(Collections.singletonList(txAdvice)); return poller; } @InboundChannelAdapter(value = "inChannel", poller = @Poller("checkFilePoller")) public Message<File> checkFile() { return inDirFileReadingMessageSource.receive(); } }
- 以下の @Autowired アノテーション付きのフィールドを追加します。
private ApplicationContext applicationContext;
private NullChannel nullChannel;
private PseudoTransactionManager pseudoTransactionManager;
- checkFilePollerSyncFactory Bean を作成します。正常終了時の処理 ( AfterCommit )、異状終了時の処理 ( AfterRollback ) を SpEL で定義します。
- inDirFileReadingMessageSource Bean 内で AcceptOnceFileListFilter を設定していた部分を削除します。今回の実装をすると処理されたファイルは削除か移動されるので、AcceptOnceFileListFilter は必要なくなります。
- checkFilePoller Bean に以下の処理を追加します。
poller.setTransactionSynchronizationFactory(checkFilePollerSyncFactory());
を追加します。- pseudoTransactionManager を
poller.setAdviceChain(...)
で設定する処理を追加します。
FileProcessor.java
package ksbysample.eipapp.dirchecker.eip.endpoint; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import java.io.File; @MessageEndpoint public class FileProcessor { @ServiceActivator(inputChannel = "inChannel") public void process(Message<File> message) throws Exception { File file = message.getPayload(); System.out.println(file.getAbsolutePath()); } }
- クラスに @MessageEndpoint アノテーションを付加します。
- @ServiceActivator アノテーションを付加した process メソッドを作成します。@ServiceActivator アノテーションの inputChannel 属性には読み込む元の Channel を指定します。指定した Channel にデータが来ると process メソッドが実行されます。
TransactionConfig.java
package ksbysample.eipapp.dirchecker.eip.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.transaction.PseudoTransactionManager; @Configuration public class TransactionConfig { @Bean public PseudoTransactionManager pseudoTransactionManager() { return new PseudoTransactionManager(); } }
- FileReadingMessageSource を利用した Inbound Channel Adapter で正常/異常終了時の処理を実装するためには PseudoTransactionManager を使用します。上記のように pseudoTransactionManager Bean を作成します。Appendix C. Transaction Support 参照。
履歴
2016/08/23
初版発行。