読者です 読者をやめる 読者になる 読者になる

かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その8 )( MySQL のテーブルのデータを取得して PostgreSQL のテーブルへ登録する常駐型アプリケーションを作成する )

概要

記事一覧はこちらです。

  • Spring Integration の 18. JDBC Support の機能を利用して、以下の処理を行う常駐型アプリケーションを作成してみます。
    1. MySQL の orders テーブルにデータを登録します。データには status を持たせ、登録時には status = ‘00’ にします。
    2. 登録されたデータを常駐型アプリケーションで取得します。取得時に status = ‘01’ に更新します。
    3. 取得したデータを PostgreSQL の orders テーブルに登録します。
    4. 登録できたデータを MySQL の orders テーブルから削除します。
  • Spring Integration のフロー図で描くと以下の構成になります。 f:id:ksby:20170109102124p:plain
  • 今回 Spring Integration DSL も使用します。XML ファイルは使用しません。
  • MySQLPostgreSQL は以前インストールした 5.6、9.4 を使用します。
  • Spring Boot は 1.4.3 を、Spring Integration は 4.3.6 を使用します。どちらも Spring IO Platform の Athens-SR2 を利用して指定します。

参照したサイト・書籍

  1. Spring Integration Reference Manual - 18. JDBC Support
    http://docs.spring.io/spring-integration/docs/4.3.6.RELEASE/reference/html/jdbc.html

  2. spring-integration/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java
    https://github.com/spring-projects/spring-integration/blob/master/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java

    • JdbcMessageHandler の使い方を参考にしました。
  3. Spring Integration Java DSL Reference
    https://github.com/spring-projects/spring-integration-java-dsl/wiki/spring-integration-java-dsl-reference

  4. Spring Integration Java DSL: Line by line tutorial
    https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial

    • Spring Integration DSL の tutorial です。
  5. Spring Integration Java DSL sample - further simplification with Jms namespace factories
    http://www.java-allandsundry.com/2014/06/spring-integration-java-dsl-sample.html

  6. Spring Integration Java DSL sample
    https://dzone.com/articles/spring-integration-java-dsl

  7. JDBC Spring Integration with Annotations
    http://stackoverflow.com/questions/27247013/jdbc-spring-integration-with-annotations

目次

  1. ksbysample-eipapp-datacopy プロジェクトを作成する
  2. MySQL と PostgreSQL に orders テーブルを作成する
  3. データベースの設定を記述し、DataSource 等の Bean を定義する
  4. copyChannel, delChannel を作成する
  5. getFlow を作成する
  6. copyFlow を作成する
  7. delFlow を作成する
  8. 動作確認
  9. 感想

手順

ksbysample-eipapp-datacopy プロジェクトを作成する

  1. feature/6-issue ブランチを作成します。

  2. IntelliJ IDEA の「Welcome to IntelliJ IDEA」ダイアログを表示した後、「Create New Project」をクリックします。

    f:id:ksby:20170109165105p:plain

  3. 「New Project」ダイアログが表示されます。画面左側で「Gradle」を選択した後、画面右側は何も変更せずに「Next」ボタンをクリックします。

    f:id:ksby:20170109165235p:plain

  4. GroupId、ArtifactId を入力する画面が表示されます。以下の画像の文字列を入力した後、「Next」ボタンをクリックします。

    f:id:ksby:20170109165833p:plain

  5. 次の画面が表示されます。「Create directories for empty content roots automatically」をチェックした後、「Next」ボタンをクリックします。

    f:id:ksby:20170109170709p:plain

  6. Project name、Project location を入力する画面が表示されます。以下の画像の文字列を入力した後、「Finish」ボタンをクリックします。

    f:id:ksby:20170109171103p:plain

  7. build.gradle を リンク先の内容 に変更します。

  8. Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。

  9. src/main/java の下に ksbysample.eipapp.datacopy パッケージを作成します。

  10. src/main/java/ksbysample/eipapp/datacopy の下に Application.java を新規作成します。作成後、リンク先の内容 に変更します。

  11. 以下のディレクトリの下に .gitkeep ファイルを作成します。

    • src/main/groovy
    • src/main/resources
    • src/test/groovy
    • src/test/java
    • src/test/resources
  12. この時点で Project Tool Window は以下の状態になります。

    f:id:ksby:20170109182746p:plain

※この時点ではまだアプリケーションは起動できません。起動しようとしてもデータベースの接続設定を記述していないためエラーになります。

MySQLPostgreSQL に orders テーブルを作成する

  1. IntelliJ IDEA の Database Tool Window から MySQL の world データベース、PostgreSQL の ksbylending データベースにアクセスできるように設定します。

  2. まず MySQL の world データベースに接続する設定を追加します。Database Tool Window の左上から「New」-「Data Source」-「MySQL」を選択します。

    f:id:ksby:20170109190150p:plain

  3. 「Data Sources and Drivers」ダイアログが表示されます。以下の画像の値を入力後、「OK」ボタンをクリックします。

    f:id:ksby:20170109190831p:plain

  4. 次に PostgreSQL の ksbylending データベースに接続する設定を追加します。Database Tool Window の左上から「New」-「Data Source」-「PostgreSQL」を選択します。

    f:id:ksby:20170109191218p:plain

  5. 「Data Sources and Drivers」ダイアログが表示されます。以下の画像の値を入力後、「OK」ボタンをクリックします。

    f:id:ksby:20170109191501p:plain

  6. この時点で Database Tool Window は以下の状態です。

    f:id:ksby:20170109193208p:plain

  7. MySQL の world データベースに orders テーブルを作成します。Database Tool Window で world データベースを選択してコンテキストメニューを表示した後、「New」-「Table」を選択します。

    f:id:ksby:20170109200601p:plain

  8. 「Create New Table」ダイアログが表示されます。以下の画像のデータを入力後、「Execute」ボタンをクリックします。

    f:id:ksby:20170109202312p:plain

  9. PostgreSQL の ksbylending データベースの方でも「Create New Table」ダイアログを表示し、以下の画像のデータを入力後、「Execute」ボタンをクリックします。

    f:id:ksby:20170109203836p:plain

  10. orders テーブル作成後は以下の状態になります。

    f:id:ksby:20170109204800p:plain

データベースの設定を記述し、DataSource 等の Bean を定義する

  1. src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。

  2. src/main/java/ksbysample/eipapp/datacopy の下に config パッケージを作成します。

  3. src/main/java/ksbysample/eipapp/datacopy/config の下に ApplicationConfig.java を作成し、リンク先の内容 を記述します。

  4. src/main/resources/.gitkeep を削除します。

  5. データベースの接続設定を記述しましたのでアプリケーションを起動してみます。Gradle Tool Window から bootRun タスクを実行してアプリケーションが起動してエラーが出ないことを確認します。

    f:id:ksby:20170109213305p:plain

    アプリケーションはエラーが出ずに起動しましたが、Channel も何も作成していないとすぐに終了するんですね。。。

copyChannel, delChannel を作成する

  1. src/main/java/ksbysample/eipapp/datacopy の下に integration.channel パッケージを作成します。

  2. src/main/java/ksbysample/eipapp/datacopy/integration/channel の下に ChannelConfig.java を作成し、リンク先の内容 を記述します。

getFlow を作成する

  1. src/main/java/ksbysample/eipapp/datacopy の下に dto パッケージを作成します。

  2. src/main/java/ksbysample/eipapp/datacopy/dto の下に OrdersDto.java を作成し、リンク先の内容 を記述します。

  3. src/main/java/ksbysample/eipapp/datacopy/dto の下に OrdersDtoRowMapper.java を作成し、リンク先の内容 を記述します。

  4. src/main/java/ksbysample/eipapp/datacopy/integration の下に flow パッケージを作成します。

  5. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。

copyFlow を作成する

  1. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下に CopyOrdersMessageHandler.java を作成し、リンク先の内容 を記述します。

  2. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下の FlowConfig.javaリンク先のその2の内容 に変更します。

delFlow を作成する

  1. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下に DelOrdersMessageHandler.java を作成し、リンク先の内容 を記述します。

  2. src/main/java/ksbysample/eipapp/datacopy/integration/flow の下の FlowConfig.javaリンク先のその3の内容 に変更します。

動作確認

  1. bootRun タスクを実行してアプリケーションを起動します。各データベースの orders テーブルには何も登録されていません。

    f:id:ksby:20170110224012p:plain

  2. MySQL の world データベースの orders テーブルにデータを登録して commit してみます。

    f:id:ksby:20170110224226p:plain

    status が ‘01’ に更新されます。

    f:id:ksby:20170110224345p:plain

    PostgreSQL の ksbylending データベースの orders テーブルにデータが登録されます。

    f:id:ksby:20170110224502p:plain

    world データベースの orders テーブルのデータが削除されます。

    f:id:ksby:20170110224651p:plain

  3. 登録済の order_id = ‘00000001’ のデータを再度登録して commit してみます。

    f:id:ksby:20170110225011p:plain

    status = ‘01’ に更新されますが、ksbylending データベースの orders テーブルには変化はありません。

    f:id:ksby:20170110225136p:plain

    ログには一意性制約違反のエラーが出力されます。

    f:id:ksby:20170110225538p:plain

  4. bootRun で起動したアプリケーションを停止します。

  5. commit して、develop, master ブランチへマージします。

感想

  • Spring Integration DSL が使いやすいです。Web に出ているサンプルを見ていた時には分かりにくい印象があったのですが、実際に使ってみると書きやすいですね。XML ファイルを使わないなら DSL を使うことをお勧めします。poller や transaction も DSL を使わない場合と比較するとかなり定義しやすいです。
  • JDBC Support の機能も簡単な処理なら実装しやすいです。ただし使い方を調べるのにサンプルが少なくて手こずりました。

ソースコード

build.gradle

group 'ksbysample'
version '1.0.0-RELEASE'

buildscript {
    ext {
        springBootVersion = '1.4.3.RELEASE'
    }
    repositories {
        mavenCentral()
        maven { url "http://repo.spring.io/repo/" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'groovy'

sourceCompatibility = 1.8
targetCompatibility = 1.8

compileJava.options.compilerArgs = ['-Xlint:all']
compileTestGroovy.options.compilerArgs = ['-Xlint:all']
compileTestJava.options.compilerArgs = ['-Xlint:all']

eclipse {
    classpath {
        containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER')
        containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8'
    }
}

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

repositories {
    jcenter()
}

dependencyManagement {
    imports {
        mavenBom 'io.spring.platform:platform-bom:Athens-SR2'
    }
}

dependencies {
    def jdbcDriverMySQL = "mysql:mysql-connector-java:6.0.5"
    def jdbcDriverPgSQL = "org.postgresql:postgresql:9.4.1212"

    // dependency-management-plugin によりバージョン番号が自動で設定されるもの
    // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照
    compile('org.springframework.boot:spring-boot-starter-integration')
    compile('org.springframework.integration:spring-integration-jdbc')
    compile('org.springframework.integration:spring-integration-java-dsl')
    compile('org.springframework.boot:spring-boot-starter-jdbc')
    testCompile("org.springframework.boot:spring-boot-starter-test")
    testCompile("org.spockframework:spock-core")
    testCompile("org.spockframework:spock-spring")

    // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの
    runtime("${jdbcDriverMySQL}")
    runtime("${jdbcDriverPgSQL}")
    compile("org.projectlombok:lombok:1.16.12")
    testCompile("org.assertj:assertj-core:3.6.1")
}
  • Spring Integration の JDBC Support を利用するので compile('org.springframework.integration:spring-integration-jdbc') を記述します。また Spring Data JDBC も利用するので compile('org.springframework.boot:spring-boot-starter-jdbc') を記述します。
  • Spring Integration DSL を使用するので compile('org.springframework.integration:spring-integration-java-dsl') を記述します。
  • MySQLPostgreSQL の両方のデータベースにアクセスするので、それぞれの JDBC Driver を記述します。

Application.java

package ksbysample.eipapp.datacopy;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

application.properties

spring.datasource.world.url=jdbc:mysql://localhost/world?serverTimezone=JST
spring.datasource.world.username=root
spring.datasource.world.password=xxxxxxxx
spring.datasource.world.driverClassName=com.mysql.cj.jdbc.Driver

spring.datasource.ksbylending.url=jdbc:postgresql://localhost/ksbylending
spring.datasource.ksbylending.username=ksbylending_user
spring.datasource.ksbylending.password=xxxxxxxx
spring.datasource.ksbylending.driverClassName=org.postgresql.Driver
  • MySQL の world データベース、PostgreSQL の ksbylending データーベースへの接続情報を記述します。

ApplicationConfig.java

package ksbysample.eipapp.datacopy.config;

import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class ApplicationConfig {

    @Primary
    @Bean
    @ConfigurationProperties("spring.datasource.world")
    public DataSource dataSourceWorld() {
        return DataSourceBuilder.create().build();
    }

    @Primary
    @Bean
    public PlatformTransactionManager transactionManagerWorld() {
        return new DataSourceTransactionManager(dataSourceWorld());
    }

    @Bean
    @ConfigurationProperties("spring.datasource.ksbylending")
    public DataSource dataSourceKsbylending() {
        return DataSourceBuilder.create().build();
    }

    @Bean
    public PlatformTransactionManager transactionManagerKsbylending() {
        return new DataSourceTransactionManager(dataSourceKsbylending());
    }

}
  • world データベース、ksbylending データベースの DataSource, PlatformTransactionManager Bean を定義します。

ChannelConfig.java

package ksbysample.eipapp.datacopy.integration.channel;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.MessageChannel;

@Configuration
public class ChannelConfig {

    @Bean
    public MessageChannel copyChannel() {
        return new QueueChannel(100);
    }

    @Bean
    public MessageChannel delChannel() {
        return new QueueChannel(100);
    }

}
  • DirectChannel で作成すると MessageChannel の前後の処理が1トランザクションとして処理されてしまい、getFlow でデータを取得して copyChannel にデータを渡したら status をその時点で ‘01’ に更新したくても copyFlow の処理が正常終了しないと更新されなくなるので、QueueChannel で作成し getFlow だけでトランザクションが終了するようにします。
  • delChannel の方も copyFlow の処理だけでトランザクションが終了するようにしたいので、同様に QueueChannel で作成します。

OrdersDto.java

package ksbysample.eipapp.datacopy.dto;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class OrdersDto {

    private String orderId;

    private String status;

}

OrdersDtoRowMapper.java

package ksbysample.eipapp.datacopy.dto;

import org.springframework.jdbc.core.RowMapper;

import java.sql.ResultSet;
import java.sql.SQLException;

public class OrdersDtoRowMapper implements RowMapper<OrdersDto> {

    @Override
    public OrdersDto mapRow(ResultSet rs, int rowNum) throws SQLException {
        OrdersDto ordersDto = new OrdersDto();
        ordersDto.setOrderId(rs.getString("order_id"));
        ordersDto.setStatus(rs.getString("status"));
        return ordersDto;
    }

}

FlowConfig.java

■その1

package ksbysample.eipapp.datacopy.integration.flow;

import ksbysample.eipapp.datacopy.dto.OrdersRowMapper;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.messaging.MessageChannel;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class FlowConfig {

    private final DataSource dataSourceWorld;

    private final PlatformTransactionManager transactionManagerWorld;

    private final MessageChannel copyChannel;

    public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld
            , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld
            , MessageChannel copyChannel) {
        this.dataSourceWorld = dataSourceWorld;
        this.transactionManagerWorld = transactionManagerWorld;
        this.copyChannel = copyChannel;
    }

    @Bean
    public MessageSource<Object> ordersJdbcMessageSource() {
        JdbcPollingChannelAdapter adapter
                = new JdbcPollingChannelAdapter(this.dataSourceWorld, "select * from orders where status = '00'");
        adapter.setRowMapper(new OrdersRowMapper());
        adapter.setUpdateSql("update orders set status = '01' where order_id in (:orderId)");
        return adapter;
    }

    @Bean
    public IntegrationFlow getFlow() {
        // MySQL の world データベースの orders テーブルから status = '00' のデータを
        // 1秒間隔で取得して copyChannel にデータを渡す。取得したデータの status カラム
        // は '00'→'01' に更新する。
        return IntegrationFlows.from(ordersJdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(1000)
                        .transactional(this.transactionManagerWorld)))
                .channel(this.copyChannel)
                .get();
    }

}

■その2

@Configuration
public class FlowConfig {

    ..........

    private final MessageChannel delChannel;

    private final CopyOrdersMessageHandler copyOrdersMessageHandler;

    public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld
            , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld
            , MessageChannel copyChannel
            , MessageChannel delChannel
            , CopyOrdersMessageHandler copyOrdersMessageHandler) {
        this.dataSourceWorld = dataSourceWorld;
        this.transactionManagerWorld = transactionManagerWorld;
        this.copyChannel = copyChannel;
        this.delChannel = delChannel;
        this.copyOrdersMessageHandler = copyOrdersMessageHandler;
    }


    ..........

    @Bean
    public IntegrationFlow copyFlow() {
        // copyChannel を 1秒間隔でチェックして、データがある場合には PostgreSQL
        // の ksbylending データベースの orders テーブルに insert する。insert に成功した
        // 場合にはデータをそのまま delChannel に渡す。
        return IntegrationFlows.from(this.copyChannel)
                .handle(this.copyOrdersMessageHandler
                        , c -> c.poller(Pollers.fixedRate(1000)))
                .channel(this.delChannel)
                .get();
    }

}

■その3

@Configuration
public class FlowConfig {

    ..........

    private final DelOrdersMessageHandler delOrdersMessageHandler;

    public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld
            , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld
            , MessageChannel copyChannel
            , MessageChannel delChannel
            , CopyOrdersMessageHandler copyOrdersMessageHandler
            , DelOrdersMessageHandler delOrdersMessageHandler) {
        this.dataSourceWorld = dataSourceWorld;
        this.transactionManagerWorld = transactionManagerWorld;
        this.copyChannel = copyChannel;
        this.delChannel = delChannel;
        this.copyOrdersMessageHandler = copyOrdersMessageHandler;
        this.delOrdersMessageHandler = delOrdersMessageHandler;
    }

    ..........

    @Bean
    public IntegrationFlow delFlow() {
        // delChannel を 1秒間隔でチェックして、データがある場合には MySQL の
        // world データベースの orders テーブルのデータを delete する。
        return IntegrationFlows.from(this.delChannel)
                .handle(this.delOrdersMessageHandler
                        , c -> c.poller(Pollers.fixedRate(1000)))
                .get();
    }

}

■完成版

package ksbysample.eipapp.datacopy.integration.flow;

import ksbysample.eipapp.datacopy.dto.OrdersDtoRowMapper;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.messaging.MessageChannel;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

@Configuration
public class FlowConfig {

    private final DataSource dataSourceWorld;

    private final PlatformTransactionManager transactionManagerWorld;

    private final MessageChannel copyChannel;

    private final MessageChannel delChannel;

    private final CopyOrdersMessageHandler copyOrdersMessageHandler;

    private final DelOrdersMessageHandler delOrdersMessageHandler;

    public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld
            , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld
            , MessageChannel copyChannel
            , MessageChannel delChannel
            , CopyOrdersMessageHandler copyOrdersMessageHandler
            , DelOrdersMessageHandler delOrdersMessageHandler) {
        this.dataSourceWorld = dataSourceWorld;
        this.transactionManagerWorld = transactionManagerWorld;
        this.copyChannel = copyChannel;
        this.delChannel = delChannel;
        this.copyOrdersMessageHandler = copyOrdersMessageHandler;
        this.delOrdersMessageHandler = delOrdersMessageHandler;
    }

    @Bean
    public MessageSource<Object> ordersJdbcMessageSource() {
        JdbcPollingChannelAdapter adapter
                = new JdbcPollingChannelAdapter(this.dataSourceWorld, "select * from orders where status = '00'");
        adapter.setRowMapper(new OrdersDtoRowMapper());
        adapter.setUpdateSql("update orders set status = '01' where order_id in (:orderId)");
        return adapter;
    }

    @Bean
    public IntegrationFlow getFlow() {
        // MySQL の world データベースの orders テーブルから status = '00' のデータを
        // 1秒間隔で取得して copyChannel にデータを渡す。取得したデータの status カラム
        // は '00'→'01' に更新する。
        return IntegrationFlows.from(ordersJdbcMessageSource(),
                c -> c.poller(Pollers.fixedRate(1000)
                        .transactional(this.transactionManagerWorld)))
                .channel(this.copyChannel)
                .get();
    }

    @Bean
    public IntegrationFlow copyFlow() {
        // copyChannel を 1秒間隔でチェックして、データがある場合には PostgreSQL
        // の ksbylending データベースの orders テーブルに insert する。insert に成功した
        // 場合にはデータをそのまま delChannel に渡す。
        return IntegrationFlows.from(this.copyChannel)
                .handle(this.copyOrdersMessageHandler
                        , c -> c.poller(Pollers.fixedRate(1000)))
                .channel(this.delChannel)
                .get();
    }

    @Bean
    public IntegrationFlow delFlow() {
        // delChannel を 1秒間隔でチェックして、データがある場合には MySQL の
        // world データベースの orders テーブルのデータを delete する。
        return IntegrationFlows.from(this.delChannel)
                .handle(this.delOrdersMessageHandler
                        , c -> c.poller(Pollers.fixedRate(1000)))
                .get();
    }

}

CopyOrdersMessageHandler.java

package ksbysample.eipapp.datacopy.integration.flow;

import ksbysample.eipapp.datacopy.dto.OrdersDto;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.transaction.annotation.Transactional;

import javax.sql.DataSource;
import java.util.List;
import java.util.Map;

@MessageEndpoint
public class CopyOrdersMessageHandler implements GenericHandler<List<OrdersDto>> {

    private final DataSource dataSource;

    public CopyOrdersMessageHandler(@Qualifier("dataSourceKsbylending") DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    @Transactional(transactionManager = "transactionManagerKsbylending")
    public Object handle(List<OrdersDto> payload, Map<String, Object> headers) {
        JdbcMessageHandler insertHandler
                = new JdbcMessageHandler(this.dataSource, "insert into orders (order_id) values (:payload.orderId)");
        insertHandler.afterPropertiesSet();
        payload.stream()
                .forEach(dto -> {
                    Message<OrdersDto> orderxMessage = MessageBuilder.withPayload(dto).build();
                    insertHandler.handleMessage(orderxMessage);
                });
        return payload;
    }
}

DelOrdersMessageHandler.java

package ksbysample.eipapp.datacopy.integration.flow;

import ksbysample.eipapp.datacopy.dto.OrdersDto;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.jdbc.JdbcMessageHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.transaction.annotation.Transactional;

import javax.sql.DataSource;
import java.util.List;
import java.util.Map;

@MessageEndpoint
public class DelOrdersMessageHandler implements GenericHandler<List<OrdersDto>> {

    private final DataSource dataSource;

    public DelOrdersMessageHandler(@Qualifier("dataSourceWorld") DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @Override
    @Transactional(transactionManager = "transactionManagerWorld")
    public Object handle(List<OrdersDto> payload, Map<String, Object> headers) {
        JdbcMessageHandler deleteHandler
                = new JdbcMessageHandler(this.dataSource
                , "delete from orders where order_id = :payload.orderId and status = '01'");
        deleteHandler.afterPropertiesSet();
        payload.stream()
                .forEach(dto -> {
                    Message<OrdersDto> orderxMessage = MessageBuilder.withPayload(dto).build();
                    deleteHandler.handleMessage(orderxMessage);
                });
        return null;
    }
}

履歴

2017/01/11
初版発行。