かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その9 )( Pollers.fixedRate で待機時間を指定しても意味がない場合がある? )

概要

記事一覧はこちらです。

  • Spring Integration DSL の Pollers.fixedRate で次の処理までの待機時間をミリ秒で指定できますが、単純に「処理をする」→「指定されたミリ秒待機する」→「処理をする」→。。。、という処理だと漠然と思っていたら、QueueChannel からデータを取得する場合には少し違っていたというお話です。

参照したサイト・書籍

目次

  1. ksbysample-eipapp-pollertest プロジェクトを作成する
  2. poller の動作確認をするための実装をする
  3. 動作確認
  4. 結局どのように処理されているのか?

手順

ksbysample-eipapp-pollertest プロジェクトを作成する

まずは動作確認をするためのプロジェクトを作成します。

  1. IntelliJ IDEA で Gradle プロジェクトを作成し、build.gradle を リンク先の内容 に変更します。

poller の動作確認をするための実装をする

  1. 以下の仕様のプログラムを実装します。

    f:id:ksby:20170113001933p:plain

    • MessageSource として呼び出されたら必ず Person オブジェクトの ArrayList を返すものを作成します。
    • getFlow では 1秒毎に MessageSource からデータを取得し、ログを出力した後 nextChannel にデータを渡します。
    • nextChannel は次の nextFlow でもポーリングをしたいので、QueueChannel として作成します。
    • nextFlow では nextChannel から 5秒毎にデータを取得してログを出力します。
  2. src/main/java の下に ksbysample.eipapp.pollertest パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/pollertest の下に Application.java を作成し、リンク先の内容 を記述します。

  4. src/main/java/ksbysample/eipapp/pollertest の下に Person.java を作成し、リンク先の内容 を記述します。

  5. src/main/java/ksbysample/eipapp/pollertest の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

動作確認

  1. bootRun タスクを実行します。

    f:id:ksby:20170113011005p:plain

    • nextFlow の poller の initialDelay (第2引数) に設定した 10秒後に nextFlow の処理が実行されて、getFlow が nextChannel に渡したデータ 11件が出力されます。
    • ただしその後は getFlow, nextFlow がほぼ同時に出力され続けており、nextFlow の poller の period (第1引数) に設定した 10秒がどうも機能していません???
  2. 起動したアプリケーションを停止します。

  3. src/main/java/ksbysample/eipapp/pollertest の下の FlowConfig.javaリンク先のその2の内容 に変更します。

  4. 再度 bootRun タスクを実行します。

    f:id:ksby:20170113012830p:plain

    • 今度は nextFlow はひたすら実行されたりはせずに、だいたい指定された 8秒毎に実行されているようです。

結局どのように処理されているのか?

  • MessageSource からデータを取得する場合には Pollers.fixedRate で指定された待機時間がそのまま機能する。
  • QueueChannel から取得する場合、QueueChannel をチェックしてデータがなくなって初めて待機するようになり、Pollers.fixedRate で指定された待機時間後にまた処理が実行される。QueueChannel にデータが次々と流れてくる状況では Pollers.fixedRate で待機時間を指定しても処理は待機することなく次々と実行されて実質意味がない。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

compileJava.options.compilerArgs = ['-Xlint:all']
compileTestGroovy.options.compilerArgs = ['-Xlint:all']
compileTestJava.options.compilerArgs = ['-Xlint:all']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    jcenter()
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
    }
}

dependencies {
    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile('org.springframework.boot:spring-boot-starter-integration')
    compile('org.springframework.integration:spring-integration-java-dsl')
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.1")
}

Application.java

package ksbysample.eipapp.pollertest;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

Person.java

package ksbysample.eipapp.pollertest;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class Person {

    private String name;

    private int age;

}

FlowConfig.java

■その1

package ksbysample.eipapp.pollertest;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import java.util.ArrayList;
import java.util.List;

@Slf4j
@Configuration
public class FlowConfig {

    @Bean
    public MessageSource<List<Person>> personListMessageSource() {
        return () -> {
            List<Person> personList = new ArrayList<>();
            personList.add(new Person("tanaka", 35));
            personList.add(new Person("suzuki", 28));
            personList.add(new Person("kimura", 41));
            return MessageBuilder.withPayload(personList).build();
        };
    }

    @Bean
    public MessageChannel nextChannel() {
        return new QueueChannel(100);
    }

    @Bean
    public IntegrationFlow getFlow() {
        // 指定された時間毎に personListMessageSource からデータを取得して
        // ログを出力した後、nextChannel にデータを渡す
        return IntegrationFlows.from(personListMessageSource()
                , c -> c.poller(Pollers.fixedRate(1000)))
                .handle((p, h) -> {
                    log.info("★★★ getFlow");
                    return p;
                })
                .channel(nextChannel())
                .get();
    }

    @Bean
    public IntegrationFlow nextFlow() {
        // 指定された時間毎に nextChannel からデータを取得してログを出力する
        return IntegrationFlows.from(nextChannel())
                .handle((GenericHandler<Object>) (p, h) -> {
                    log.info("■■■ nextFlow");
                    return null;
                }, c -> c.poller(Pollers.fixedRate(5000, 5000)))
                .get();
    }

}

■その2

    @Bean
    public IntegrationFlow getFlow() {
        // 指定された時間毎に personListMessageSource からデータを取得して
        // ログを出力した後、nextChannel にデータを渡す
        return IntegrationFlows.from(personListMessageSource()
                , c -> c.poller(Pollers.fixedRate(2000)))
                .handle((p, h) -> {
                    log.info("★★★ getFlow");
                    return p;
                })
                .channel(nextChannel())
                .get();
    }

    @Bean
    public IntegrationFlow nextFlow() {
        // 指定された時間毎に nextChannel からデータを取得してログを出力する
        return IntegrationFlows.from(nextChannel())
                .handle((GenericHandler<Object>) (p, h) -> {
                    log.info("■■■ nextFlow");
                    return null;
                }, c -> c.poller(Pollers.fixedRate(8000, 5000)))
                .get();
    }
  • getFlow 内の Pollers.fixedRate に指定するミリ秒数を 1000 → 2000 に変更します。
  • nextFlow 内の Pollers.fixedRate に指定するミリ秒数を 5000 → 8000 に変更します。

履歴

2017/01/13
初版発行。