Spring Boot + Spring Integration でいろいろ試してみる ( その9 )( Pollers.fixedRate で待機時間を指定しても意味がない場合がある? )
概要
記事一覧はこちらです。
- Spring Integration DSL の Pollers.fixedRate で次の処理までの待機時間をミリ秒で指定できますが、単純に「処理をする」→「指定されたミリ秒待機する」→「処理をする」→。。。、という処理だと漠然と思っていたら、QueueChannel からデータを取得する場合には少し違っていたというお話です。
参照したサイト・書籍
目次
手順
ksbysample-eipapp-pollertest プロジェクトを作成する
まずは動作確認をするためのプロジェクトを作成します。
poller の動作確認をするための実装をする
以下の仕様のプログラムを実装します。
- MessageSource として呼び出されたら必ず Person オブジェクトの ArrayList を返すものを作成します。
- getFlow では 1秒毎に MessageSource からデータを取得し、ログを出力した後 nextChannel にデータを渡します。
- nextChannel は次の nextFlow でもポーリングをしたいので、QueueChannel として作成します。
- nextFlow では nextChannel から 5秒毎にデータを取得してログを出力します。
src/main/java の下に ksbysample.eipapp.pollertest パッケージを作成します。
src/main/java/ksbysample/eipapp/pollertest の下に Application.java を作成し、リンク先の内容 を記述します。
src/main/java/ksbysample/eipapp/pollertest の下に Person.java を作成し、リンク先の内容 を記述します。
src/main/java/ksbysample/eipapp/pollertest の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。
動作確認
bootRun タスクを実行します。
- nextFlow の poller の initialDelay (第2引数) に設定した 10秒後に nextFlow の処理が実行されて、getFlow が nextChannel に渡したデータ 11件が出力されます。
- ただしその後は getFlow, nextFlow がほぼ同時に出力され続けており、nextFlow の poller の period (第1引数) に設定した 10秒がどうも機能していません???
起動したアプリケーションを停止します。
src/main/java/ksbysample/eipapp/pollertest の下の FlowConfig.java を リンク先のその2の内容 に変更します。
再度 bootRun タスクを実行します。
- 今度は nextFlow はひたすら実行されたりはせずに、だいたい指定された 8秒毎に実行されているようです。
結局どのように処理されているのか?
- MessageSource からデータを取得する場合には Pollers.fixedRate で指定された待機時間がそのまま機能する。
- QueueChannel から取得する場合、QueueChannel をチェックしてデータがなくなって初めて待機するようになり、Pollers.fixedRate で指定された待機時間後にまた処理が実行される。QueueChannel にデータが次々と流れてくる状況では Pollers.fixedRate で待機時間を指定しても処理は待機することなく次々と実行されて実質意味がない。
ソースコード
build.gradle
group 'ksbysample' version '1.0.0-RELEASE' buildscript { ext { springBootVersion = '1.4.3.RELEASE' } repositories { mavenCentral() maven { url "http://repo.spring.io/repo/" } } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' apply plugin: 'groovy' sourceCompatibility = 1.8 targetCompatibility = 1.8 compileJava.options.compilerArgs = ['-Xlint:all'] compileTestGroovy.options.compilerArgs = ['-Xlint:all'] compileTestJava.options.compilerArgs = ['-Xlint:all'] eclipse { classpath { containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER') containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8' } } idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } repositories { jcenter() } dependencyManagement { imports { mavenBom 'io.spring.platform:platform-bom:Athens-SR2' } } dependencies { // dependency-management-plugin によりバージョン番号が自動で設定されるもの // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照 compile('org.springframework.boot:spring-boot-starter-integration') compile('org.springframework.integration:spring-integration-java-dsl') testCompile("org.springframework.boot:spring-boot-starter-test") testCompile("org.spockframework:spock-core") testCompile("org.spockframework:spock-spring") // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの compile("org.projectlombok:lombok:1.16.12") testCompile("org.assertj:assertj-core:3.6.1") }
Application.java
package ksbysample.eipapp.pollertest; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
Person.java
package ksbysample.eipapp.pollertest; import lombok.AllArgsConstructor; import lombok.Data; @Data @AllArgsConstructor public class Person { private String name; private int age; }
FlowConfig.java
■その1
package ksbysample.eipapp.pollertest; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.core.Pollers; import org.springframework.integration.dsl.support.GenericHandler; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import java.util.ArrayList; import java.util.List; @Slf4j @Configuration public class FlowConfig { @Bean public MessageSource<List<Person>> personListMessageSource() { return () -> { List<Person> personList = new ArrayList<>(); personList.add(new Person("tanaka", 35)); personList.add(new Person("suzuki", 28)); personList.add(new Person("kimura", 41)); return MessageBuilder.withPayload(personList).build(); }; } @Bean public MessageChannel nextChannel() { return new QueueChannel(100); } @Bean public IntegrationFlow getFlow() { // 指定された時間毎に personListMessageSource からデータを取得して // ログを出力した後、nextChannel にデータを渡す return IntegrationFlows.from(personListMessageSource() , c -> c.poller(Pollers.fixedRate(1000))) .handle((p, h) -> { log.info("★★★ getFlow"); return p; }) .channel(nextChannel()) .get(); } @Bean public IntegrationFlow nextFlow() { // 指定された時間毎に nextChannel からデータを取得してログを出力する return IntegrationFlows.from(nextChannel()) .handle((GenericHandler<Object>) (p, h) -> { log.info("■■■ nextFlow"); return null; }, c -> c.poller(Pollers.fixedRate(5000, 5000))) .get(); } }
■その2
@Bean public IntegrationFlow getFlow() { // 指定された時間毎に personListMessageSource からデータを取得して // ログを出力した後、nextChannel にデータを渡す return IntegrationFlows.from(personListMessageSource() , c -> c.poller(Pollers.fixedRate(2000))) .handle((p, h) -> { log.info("★★★ getFlow"); return p; }) .channel(nextChannel()) .get(); } @Bean public IntegrationFlow nextFlow() { // 指定された時間毎に nextChannel からデータを取得してログを出力する return IntegrationFlows.from(nextChannel()) .handle((GenericHandler<Object>) (p, h) -> { log.info("■■■ nextFlow"); return null; }, c -> c.poller(Pollers.fixedRate(8000, 5000))) .get(); }
- getFlow 内の Pollers.fixedRate に指定するミリ秒数を 1000 → 2000 に変更します。
- nextFlow 内の Pollers.fixedRate に指定するミリ秒数を 5000 → 8000 に変更します。
履歴
2017/01/13
初版発行。