かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その26 )( MessageChannel の capacity を超えたメッセージを送信しようとするとどうなるのか? )

概要

記事一覧はこちらです。

MessageChannel には capacity (MessageChannel に格納できる Message の最大数)を設定できますが、Inbound Channel Adapter から取得した Message 数より capacity の数値が低い場合にどのような動作になるのか気になったので確認します。

参照したサイト・書籍

目次

  1. ksbysample-eipapp-channelcapacity プロジェクトを作成する
  2. 仕様を決める
  3. Application クラスを変更する
  4. Flyway 用の SQL ファイルを作成する
  5. application.properties に DB 接続用の設定を記述する
  6. ApplicationConfig クラスを作成する
  7. QueueSourceDto クラスを作成する
  8. FlowConfig クラスを作成する
  9. 動作確認
  10. 【検証】.transactional(this.transactionManager) が有効か確認する

手順

ksbysample-eipapp-channelcapacity プロジェクトを作成する

Spring Initializr でプロジェクトの雛形を作成した後、build.gradle を以下のように変更します。

buildscript {
    ext {
        springBootVersion = '2.0.4.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'ksbysample'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    def lombokVersion = "1.18.2"

    compile('org.springframework.boot:spring-boot-starter-integration')
    implementation('org.springframework.boot:spring-boot-starter-jdbc')
    implementation('org.springframework.integration:spring-integration-jdbc')
    testCompile('org.springframework.boot:spring-boot-starter-test')

    implementation("com.h2database:h2:1.4.192")
    implementation("org.flywaydb:flyway-core:5.1.4")

    // for lombok
    annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    compileOnly("org.projectlombok:lombok:${lombokVersion}")
}
  • Spring Integration の 19. JDBC Support の Inbound Channel Adapter の機能を使うので、以下の行を追加します。
    • implementation('org.springframework.integration:spring-integration-jdbc')
  • DataSource や PlatformTransactionManager も利用するので以下の行を追加します。
    • implementation('org.springframework.boot:spring-boot-starter-jdbc')
  • JDBC Inbound Channel Adapter のデータ取得元のテーブルを自動生成するために Flyway を入れます。Flyway は Appendix F. Dependency versions に記載があるのでバージョン番号を指定しなくても良いのですが、最新バージョンを使用したいので指定します。DB には H2 を使用します。
    • implementation("com.h2database:h2:1.4.192")
    • implementation("org.flywaydb:flyway-core:5.1.4")
  • DTO クラスを作成する時に @Data アノテーションを使いたいんで lombok も入れます。

仕様を決める

今回作成するサンプルは以下の仕様にします。

  • JDBC Inbound Channel Adapter から取得する Message の数を最大 5 にし、取得した後に送信する MessageChannel の capacity の数を 3 にする。
  • JDBC Inbound Channel Adapter からは 1秒毎に Message を取得する。
  • 送信先の MessageChannel からは 3秒毎に Message を取得する。取得する時の Message の最大数は 1 とする。
  • JDBC Inbound Channel Adapter の取得元のテーブル名を QUEUE_SOURCE とし、テーブルには 10レコード入れておく。

これで取得した5つの Message 全てを次の MessageChannel に送信できないという状況になります。

Application クラスを変更する

src/main/java/ksbysample/eipapp/channelcapacity/Application.java の以下の点を変更します。

package ksbysample.eipapp.channelcapacity;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.jpa.JpaRepositoriesAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication(exclude = {JpaRepositoriesAutoConfiguration.class, HibernateJpaAutoConfiguration.class})
@EnableScheduling
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}
  • MessageChannel に格納された Message の数を定期的にログに出力するのにスケジューリングの機能を利用したいので、@EnableScheduling アノテーションを付与します。
  • @SpringBootApplication@SpringBootApplication(exclude = {JpaRepositoriesAutoConfiguration.class, HibernateJpaAutoConfiguration.class}) に変更します。

Flyway 用の SQL ファイルを作成する

src/main/resources の下に db/migration ディレクトリを作成します。

src/main/resources/db/migration の下に V1__init.sql を新規作成し、以下の内容を記述します。

CREATE TABLE QUEUE_SOURCE
(
  seq INT NOT NULL,
  status INT NOT NULL,
  value VARCHAR(64) NOT NULL,
  CONSTRAINT PK_QUEUE_SOURCE PRIMARY KEY (seq)
);

INSERT INTO QUEUE_SOURCE VALUES (1, 0, 'その1');
INSERT INTO QUEUE_SOURCE VALUES (2, 0, 'その2');
INSERT INTO QUEUE_SOURCE VALUES (3, 0, 'その3');
INSERT INTO QUEUE_SOURCE VALUES (4, 0, 'その4');
INSERT INTO QUEUE_SOURCE VALUES (5, 0, 'その5');
INSERT INTO QUEUE_SOURCE VALUES (6, 0, 'その6');
INSERT INTO QUEUE_SOURCE VALUES (7, 0, 'その7');
INSERT INTO QUEUE_SOURCE VALUES (8, 0, 'その8');
INSERT INTO QUEUE_SOURCE VALUES (9, 0, 'その9');
INSERT INTO QUEUE_SOURCE VALUES (10, 0, 'その10');

application.properties に DB 接続用の設定を記述する

src/main/resources/application.properties に以下の内容を記述します。

spring.datasource.hikari.jdbc-url=jdbc:h2:mem:channelcapacitydb
spring.datasource.hikari.username=sa
spring.datasource.hikari.password=
spring.datasource.hikari.driver-class-name=org.h2.Driver

ここでアプリケーションを1度起動してエラーが出ないことを確認しておきます。

f:id:ksby:20180901000732p:plain

Flyway.setCallbacks(FlywayCallback) has been deprecated and will be removed in Flyway 6.0. Use Flyway.setCallbacks(Callback) instead. というメッセージの WARN ログが出ていますが、stackoverflow に Flyway deprecation message logged when using Spring Boot 2 という記事がありました。FlywayAutoConfiguration のソースで Flyway.setCallbacks(FlywayCallback) が使用されていることが原因のようです。

ApplicationConfig クラスを作成する

src/main/java/ksbysampleveipapp/channelcapacity の下に ApplicationConfig.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.channelcapacity;

import com.zaxxer.hikari.HikariDataSource;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableMBeanExport;
import org.springframework.jmx.support.RegistrationPolicy;

import javax.sql.DataSource;

@Configuration
@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)
public class ApplicationConfig {

    @Bean
    @ConfigurationProperties("spring.datasource.hikari")
    public DataSource dataSource() {
        return DataSourceBuilder.create()
                .type(HikariDataSource.class)
                .build();
    }

}

QueueSourceDto クラスを作成する

src/main/java/ksbysampleveipapp/channelcapacity の下に QueueSourceDto.java を新規作成し、以下の内容を記述します。

package ksbysample.eipapp.channelcapacity;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class QueueSourceDto {

    private int seq;

    private int status;

    private String value;

}

FlowConfig クラスを作成する

src/main/java/ksbysampleveipapp/channelcapacity の下に FlowConfig.java を新規作成し、以下の内容を記述します。

jdbcMessageSource → selectDbFlow → dstChannel → getDstChannelFlow の順で Message が流れます。

package ksbysample.eipapp.channelcapacity;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Slf4j
@Configuration
public class FlowConfig {

    private final DataSource dataSource;

    private final PlatformTransactionManager transactionManager;

    private final NullChannel nullChannel;

    public FlowConfig(DataSource dataSource
            , PlatformTransactionManager transactionManager
            , NullChannel nullChannel) {
        this.dataSource = dataSource;
        this.transactionManager = transactionManager;
        this.nullChannel = nullChannel;
    }

    @Bean
    public MessageSource<Object> jdbcMessageSource() {
        JdbcPollingChannelAdapter adapter
                = new JdbcPollingChannelAdapter(this.dataSource
                , "select * from QUEUE_SOURCE where status = 0");
        adapter.setRowMapper(new BeanPropertyRowMapper<>(QueueSourceDto.class));
        adapter.setUpdateSql("update QUEUE_SOURCE set status = 1 where seq in (:seq)");
        return adapter;
    }

    @Bean
    public IntegrationFlow selectDbFlow() {
        return IntegrationFlows.from(jdbcMessageSource()
                , e -> e.poller(Pollers
                        .fixedDelay(1000)
                        .maxMessagesPerPoll(5)
                        .transactional(this.transactionManager)))
                // 取得したデータは List 形式で全件 payload にセットされているので、split で1payload1データに分割する
                .split()
                .<QueueSourceDto>log(LoggingHandler.Level.WARN, m -> "☆☆☆ " + m.getPayload().getSeq())
                .channel(dstChannel())
                .get();
    }

    @Bean
    public QueueChannel dstChannel() {
        return MessageChannels.queue(3).get();
    }

    @Bean
    public IntegrationFlow getDstChannelFlow() {
        return IntegrationFlows.from(dstChannel())
                .bridge(e -> e.poller(Pollers
                        .fixedDelay(3000)
                        .maxMessagesPerPoll(1)))
                .<QueueSourceDto>log(LoggingHandler.Level.ERROR, m -> "★★★ " + m.getPayload().getSeq())
                .channel(nullChannel)
                .get();
    }

    @Scheduled(initialDelay = 1000, fixedDelay = 1000)
    public void checkDstChannel() {
        log.info("dstChannel().getQueueSize() = " + dstChannel().getQueueSize());
    }

}

動作確認

アプリケーションを実行してみます。

f:id:ksby:20180901055429p:plain

取得したログを見ると以下のように処理されるようです。

  • dstChannel に capacity 分の Message が溜まっていると送信する部分の処理(.channel(dstChannel()))で止まり、JDBC Inbound Channel Adapter からデータを取りません。
  • JDBC Inbound Channel Adapter から最初は .maxMessagesPerPoll(5) で指定した5件取得しますが、その後は5件から減った分の1件しか取得しません。常に5件ずつ取得する訳ではありませんでした。

【検証】.transactional(this.transactionManager) が有効か確認する

※ここから先のコードはコミットしません。

JDBC Inbound Channel Adapter からデータを取得する時に e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(5).transactional(this.transactionManager)).transactional(this.transactionManager) を付けてトランザクションを有効にしていますが、本当に効いているのか確認してみます。

dstChannel に Message を送信する時にタイムアウト時間を設定して、タイムアウトした時には RuntimeException を throw するようにします。src/main/java/ksbysampleveipapp/channelcapacity/FlowConfig.java を以下のように変更します。

    @Bean
    public IntegrationFlow selectDbFlow() {
        return IntegrationFlows.from(jdbcMessageSource()
                , e -> e.poller(Pollers
                        .fixedDelay(1000)
                        .maxMessagesPerPoll(5)
                        .transactional(this.transactionManager)))
                // 取得したデータは List 形式で全件 payload にセットされているので、split で1payload1データに分割する
                .split()
                .<QueueSourceDto>log(LoggingHandler.Level.WARN, m -> "☆☆☆ " + m.getPayload().getSeq())
                // .channel(dstChannel())
                .handle((p, h) -> {
                    dstChannel().send(new GenericMessage<>(p, h), 2000);
                    return null;
                })
                .get();
    }

    static class DstChannelInterceptor implements ChannelInterceptor {
        @Override
        public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
            // postSend メソッドは MessageChannel に Message を送信した後に呼び出される
            // 引数の sent は Message の送信に成功すると true が、失敗すると false がセットされるので、
            // false の時に RuntimeException を throw させる
            if (!sent) {
                throw new RuntimeException("Can't send message.");
            }
        }
    }

    @Bean
    public QueueChannel dstChannel() {
        QueueChannel queueChannel = MessageChannels.queue(3).get();
        queueChannel.addInterceptor(new DstChannelInterceptor());
        return queueChannel;
    }
  • ChannelInterceptor インターフェースの実装クラス DstChannelInterceptor を追加します。
  • dstChannel Bean に queueChannel.addInterceptor(new DstChannelInterceptor()); を追加します。
  • selectDbFlow Bean の dstChannel へのメッセージを送信処理を .channel(dstChannel()).handle((p, h) -> { dstChannel().send(new GenericMessage<>(p, h), 2000); return null; }) に変更して、取得される時間(3秒)より短い時間(2秒)でタイムアウトさせます。

アプリケーションを実行してみます。

f:id:ksby:20180901092115p:plain f:id:ksby:20180901092236p:plain

最低でも dstChannel で1件目が処理されていますが、RuntimeException が発生すれると1件目から取り直されていました。adapter.setUpdateSql("update QUEUE_SOURCE set status = 1 where seq in (:seq)"); で update されたのが commit されていないようです。どのような SQL が実行されているか確認してみます。

log4jdbc を入れるので、build.gradle に implementation("com.integralblue:log4jdbc-spring-boot-starter:1.0.2") を追加します。

dependencies {
    def lombokVersion = "1.18.2"

    compile('org.springframework.boot:spring-boot-starter-integration')
    implementation('org.springframework.boot:spring-boot-starter-jdbc')
    implementation('org.springframework.integration:spring-integration-jdbc')
    testCompile('org.springframework.boot:spring-boot-starter-test')

    implementation("com.h2database:h2:1.4.192")
    implementation("org.flywaydb:flyway-core:5.1.4")
    implementation("com.integralblue:log4jdbc-spring-boot-starter:1.0.2")

    // for lombok
    annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    compileOnly("org.projectlombok:lombok:${lombokVersion}")
}

src/main/properties/application.properties に log4jdbc の設定を追加します。

spring.datasource.hikari.jdbc-url=jdbc:h2:mem:channelcapacitydb
spring.datasource.hikari.username=sa
spring.datasource.hikari.password=
spring.datasource.hikari.driver-class-name=org.h2.Driver
spring.datasource.hikari.register-mbeans=true

# log4jdbc-log4j2
logging.level.jdbc.sqlonly=DEBUG
logging.level.jdbc.sqltiming=INFO
logging.level.jdbc.audit=INFO
logging.level.jdbc.resultset=ERROR
logging.level.jdbc.resultsettable=ERROR
logging.level.jdbc.connection=DEBUG

アプリケーションを実行してみます。

f:id:ksby:20180901094620p:plain

.maxMessagesPerPoll(5) を指定しているので5件しか update していないと思っていましたが、テーブル内にある10件全て取得して update されていました。なるほどそんな動作をするのか。。。

また RuntimeException が発生しているので rollback もされていました。.transactional(this.transactionManager) は有効でした。

dstChannel で処理した分は commit して処理し直されないようにしてみます。src/main/java/ksbysampleveipapp/channelcapacity/FlowConfig.java を以下のように変更します。

    @Bean
    public MessageSource<Object> jdbcMessageSource() {
        JdbcPollingChannelAdapter adapter
                = new JdbcPollingChannelAdapter(this.dataSource
                , "select * from QUEUE_SOURCE where status = 0");
        adapter.setRowMapper(new BeanPropertyRowMapper<>(QueueSourceDto.class));
        adapter.setUpdateSql("update QUEUE_SOURCE set status = 1 where seq in (:seq)");
        adapter.setMaxRowsPerPoll(1);
        return adapter;
    }

    @Bean
    public IntegrationFlow selectDbFlow() {
        return IntegrationFlows.from(jdbcMessageSource()
                , e -> e.poller(Pollers
                        .fixedDelay(1000)
                        .maxMessagesPerPoll(1)
                        .transactional(this.transactionManager)))
                // 取得したデータは List 形式で全件 payload にセットされているので、split で1payload1データに分割する
                .split()
                .<QueueSourceDto>log(LoggingHandler.Level.WARN, m -> "☆☆☆ " + m.getPayload().getSeq())
                // .channel(dstChannel())
                .handle((p, h) -> {
                    dstChannel().send(new GenericMessage<>(p, h), 2000);
                    return null;
                })
                .get();
    }
  • jdbcMessageSource Bean の処理に adapter.setMaxRowsPerPoll(1); を追加します。
  • selectDbFlow Bean の処理で、.maxMessagesPerPoll(5).maxMessagesPerPoll(1) に変更します。

これで1件毎に commit されるはずです。アプリケーションを実行してみます。

f:id:ksby:20180901100612p:plain

1件目は dstChannel に Message を送信したら commit されていました。

f:id:ksby:20180901100840p:plain f:id:ksby:20180901101157p:plain

7件目でタイムアウトが発生しましたが、次に取り直されたのは7件目でした。

結論としては、以下のような考慮が必要になるものと思われます。

  • JdbcPollingChannelAdapter のデータ取得元のテーブルに件数がある場合には、JdbcPollingChannelAdapter#setMaxRowsPerPoll による取得件数の設定は必須。
  • 取得した後の処理ですぐに split しない方が多分良さそうです。
  • rollback が発生した時のことを考慮して(どこから処理をやり直してよいのか?)、MessageSource から取得するデータ件数、送信先の MessageChannel の capacity や MessageChannel から取得する件数等を考える必要があります。

履歴

2018/09/01
初版発行。