Spring Boot + Spring Integration でいろいろ試してみる ( その8 )( MySQL のテーブルのデータを取得して PostgreSQL のテーブルへ登録する常駐型アプリケーションを作成する )
概要
記事一覧はこちらです。
- Spring Integration の 18. JDBC Support の機能を利用して、以下の処理を行う常駐型アプリケーションを作成してみます。
- MySQL の orders テーブルにデータを登録します。データには status を持たせ、登録時には status = ‘00’ にします。
- 登録されたデータを常駐型アプリケーションで取得します。取得時に status = ‘01’ に更新します。
- 取得したデータを PostgreSQL の orders テーブルに登録します。
- 登録できたデータを MySQL の orders テーブルから削除します。
- Spring Integration のフロー図で描くと以下の構成になります。
- 今回 Spring Integration DSL も使用します。XML ファイルは使用しません。
- MySQL、PostgreSQL は以前インストールした 5.6、9.4 を使用します。
- Spring Boot は 1.4.3 を、Spring Integration は 4.3.6 を使用します。どちらも Spring IO Platform の Athens-SR2 を利用して指定します。
参照したサイト・書籍
Spring Integration Reference Manual - 18. JDBC Support
http://docs.spring.io/spring-integration/docs/4.3.6.RELEASE/reference/html/jdbc.htmlspring-integration/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java
https://github.com/spring-projects/spring-integration/blob/master/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java- JdbcMessageHandler の使い方を参考にしました。
Spring Integration Java DSL Reference
https://github.com/spring-projects/spring-integration-java-dsl/wiki/spring-integration-java-dsl-referenceSpring Integration Java DSL: Line by line tutorial
https://spring.io/blog/2014/11/25/spring-integration-java-dsl-line-by-line-tutorial- Spring Integration DSL の tutorial です。
Spring Integration Java DSL sample - further simplification with Jms namespace factories
http://www.java-allandsundry.com/2014/06/spring-integration-java-dsl-sample.htmlSpring Integration Java DSL sample
https://dzone.com/articles/spring-integration-java-dslJDBC Spring Integration with Annotations
http://stackoverflow.com/questions/27247013/jdbc-spring-integration-with-annotations
目次
- ksbysample-eipapp-datacopy プロジェクトを作成する
- MySQL と PostgreSQL に orders テーブルを作成する
- データベースの設定を記述し、DataSource 等の Bean を定義する
- copyChannel, delChannel を作成する
- getFlow を作成する
- copyFlow を作成する
- delFlow を作成する
- 動作確認
- 感想
手順
ksbysample-eipapp-datacopy プロジェクトを作成する
feature/6-issue ブランチを作成します。
IntelliJ IDEA の「Welcome to IntelliJ IDEA」ダイアログを表示した後、「Create New Project」をクリックします。
「New Project」ダイアログが表示されます。画面左側で「Gradle」を選択した後、画面右側は何も変更せずに「Next」ボタンをクリックします。
GroupId、ArtifactId を入力する画面が表示されます。以下の画像の文字列を入力した後、「Next」ボタンをクリックします。
次の画面が表示されます。「Create directories for empty content roots automatically」をチェックした後、「Next」ボタンをクリックします。
Project name、Project location を入力する画面が表示されます。以下の画像の文字列を入力した後、「Finish」ボタンをクリックします。
build.gradle を リンク先の内容 に変更します。
Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして build.gradle を反映します。
src/main/java の下に ksbysample.eipapp.datacopy パッケージを作成します。
src/main/java/ksbysample/eipapp/datacopy の下に Application.java を新規作成します。作成後、リンク先の内容 に変更します。
以下のディレクトリの下に .gitkeep ファイルを作成します。
- src/main/groovy
- src/main/resources
- src/test/groovy
- src/test/java
- src/test/resources
この時点で Project Tool Window は以下の状態になります。
※この時点ではまだアプリケーションは起動できません。起動しようとしてもデータベースの接続設定を記述していないためエラーになります。
MySQL と PostgreSQL に orders テーブルを作成する
IntelliJ IDEA の Database Tool Window から MySQL の world データベース、PostgreSQL の ksbylending データベースにアクセスできるように設定します。
まず MySQL の world データベースに接続する設定を追加します。Database Tool Window の左上から「New」-「Data Source」-「MySQL」を選択します。
「Data Sources and Drivers」ダイアログが表示されます。以下の画像の値を入力後、「OK」ボタンをクリックします。
次に PostgreSQL の ksbylending データベースに接続する設定を追加します。Database Tool Window の左上から「New」-「Data Source」-「PostgreSQL」を選択します。
「Data Sources and Drivers」ダイアログが表示されます。以下の画像の値を入力後、「OK」ボタンをクリックします。
この時点で Database Tool Window は以下の状態です。
MySQL の world データベースに orders テーブルを作成します。Database Tool Window で world データベースを選択してコンテキストメニューを表示した後、「New」-「Table」を選択します。
「Create New Table」ダイアログが表示されます。以下の画像のデータを入力後、「Execute」ボタンをクリックします。
PostgreSQL の ksbylending データベースの方でも「Create New Table」ダイアログを表示し、以下の画像のデータを入力後、「Execute」ボタンをクリックします。
orders テーブル作成後は以下の状態になります。
データベースの設定を記述し、DataSource 等の Bean を定義する
src/main/resources の下に application.properties を作成し、リンク先の内容 を記述します。
src/main/java/ksbysample/eipapp/datacopy の下に config パッケージを作成します。
src/main/java/ksbysample/eipapp/datacopy/config の下に ApplicationConfig.java を作成し、リンク先の内容 を記述します。
src/main/resources/.gitkeep を削除します。
データベースの接続設定を記述しましたのでアプリケーションを起動してみます。Gradle Tool Window から bootRun タスクを実行してアプリケーションが起動してエラーが出ないことを確認します。
アプリケーションはエラーが出ずに起動しましたが、Channel も何も作成していないとすぐに終了するんですね。。。
copyChannel, delChannel を作成する
src/main/java/ksbysample/eipapp/datacopy の下に integration.channel パッケージを作成します。
src/main/java/ksbysample/eipapp/datacopy/integration/channel の下に ChannelConfig.java を作成し、リンク先の内容 を記述します。
getFlow を作成する
src/main/java/ksbysample/eipapp/datacopy の下に dto パッケージを作成します。
src/main/java/ksbysample/eipapp/datacopy/dto の下に OrdersDto.java を作成し、リンク先の内容 を記述します。
src/main/java/ksbysample/eipapp/datacopy/dto の下に OrdersDtoRowMapper.java を作成し、リンク先の内容 を記述します。
src/main/java/ksbysample/eipapp/datacopy/integration の下に flow パッケージを作成します。
src/main/java/ksbysample/eipapp/datacopy/integration/flow の下に FlowConfig.java を作成し、リンク先のその1の内容 を記述します。
copyFlow を作成する
src/main/java/ksbysample/eipapp/datacopy/integration/flow の下に CopyOrdersMessageHandler.java を作成し、リンク先の内容 を記述します。
src/main/java/ksbysample/eipapp/datacopy/integration/flow の下の FlowConfig.java を リンク先のその2の内容 に変更します。
delFlow を作成する
src/main/java/ksbysample/eipapp/datacopy/integration/flow の下に DelOrdersMessageHandler.java を作成し、リンク先の内容 を記述します。
src/main/java/ksbysample/eipapp/datacopy/integration/flow の下の FlowConfig.java を リンク先のその3の内容 に変更します。
動作確認
bootRun タスクを実行してアプリケーションを起動します。各データベースの orders テーブルには何も登録されていません。
MySQL の world データベースの orders テーブルにデータを登録して commit してみます。
status が ‘01’ に更新されます。
PostgreSQL の ksbylending データベースの orders テーブルにデータが登録されます。
world データベースの orders テーブルのデータが削除されます。
登録済の order_id = ‘00000001’ のデータを再度登録して commit してみます。
status = ‘01’ に更新されますが、ksbylending データベースの orders テーブルには変化はありません。
ログには一意性制約違反のエラーが出力されます。
bootRun で起動したアプリケーションを停止します。
commit して、develop, master ブランチへマージします。
感想
- Spring Integration DSL が使いやすいです。Web に出ているサンプルを見ていた時には分かりにくい印象があったのですが、実際に使ってみると書きやすいですね。XML ファイルを使わないなら DSL を使うことをお勧めします。poller や transaction も DSL を使わない場合と比較するとかなり定義しやすいです。
- JDBC Support の機能も簡単な処理なら実装しやすいです。ただし使い方を調べるのにサンプルが少なくて手こずりました。
ソースコード
build.gradle
group 'ksbysample' version '1.0.0-RELEASE' buildscript { ext { springBootVersion = '1.4.3.RELEASE' } repositories { mavenCentral() maven { url "http://repo.spring.io/repo/" } } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") classpath("io.spring.gradle:dependency-management-plugin:0.6.1.RELEASE") } } apply plugin: 'java' apply plugin: 'eclipse' apply plugin: 'idea' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' apply plugin: 'groovy' sourceCompatibility = 1.8 targetCompatibility = 1.8 compileJava.options.compilerArgs = ['-Xlint:all'] compileTestGroovy.options.compilerArgs = ['-Xlint:all'] compileTestJava.options.compilerArgs = ['-Xlint:all'] eclipse { classpath { containers.remove('org.eclipse.jdt.launching.JRE_CONTAINER') containers 'org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8' } } idea { module { inheritOutputDirs = false outputDir = file("$buildDir/classes/main/") } } repositories { jcenter() } dependencyManagement { imports { mavenBom 'io.spring.platform:platform-bom:Athens-SR2' } } dependencies { def jdbcDriverMySQL = "mysql:mysql-connector-java:6.0.5" def jdbcDriverPgSQL = "org.postgresql:postgresql:9.4.1212" // dependency-management-plugin によりバージョン番号が自動で設定されるもの // Appendix A. Dependency versions ( http://docs.spring.io/platform/docs/current/reference/htmlsingle/#appendix-dependency-versions ) 参照 compile('org.springframework.boot:spring-boot-starter-integration') compile('org.springframework.integration:spring-integration-jdbc') compile('org.springframework.integration:spring-integration-java-dsl') compile('org.springframework.boot:spring-boot-starter-jdbc') testCompile("org.springframework.boot:spring-boot-starter-test") testCompile("org.spockframework:spock-core") testCompile("org.spockframework:spock-spring") // dependency-management-plugin によりバージョン番号が自動で設定されないもの、あるいは最新バージョンを指定したいもの runtime("${jdbcDriverMySQL}") runtime("${jdbcDriverPgSQL}") compile("org.projectlombok:lombok:1.16.12") testCompile("org.assertj:assertj-core:3.6.1") }
- Spring Integration の JDBC Support を利用するので
compile('org.springframework.integration:spring-integration-jdbc')
を記述します。また Spring Data JDBC も利用するのでcompile('org.springframework.boot:spring-boot-starter-jdbc')
を記述します。 - Spring Integration DSL を使用するので
compile('org.springframework.integration:spring-integration-java-dsl')
を記述します。 - MySQL、PostgreSQL の両方のデータベースにアクセスするので、それぞれの JDBC Driver を記述します。
Application.java
package ksbysample.eipapp.datacopy; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
application.properties
spring.datasource.world.url=jdbc:mysql://localhost/world?serverTimezone=JST spring.datasource.world.username=root spring.datasource.world.password=xxxxxxxx spring.datasource.world.driverClassName=com.mysql.cj.jdbc.Driver spring.datasource.ksbylending.url=jdbc:postgresql://localhost/ksbylending spring.datasource.ksbylending.username=ksbylending_user spring.datasource.ksbylending.password=xxxxxxxx spring.datasource.ksbylending.driverClassName=org.postgresql.Driver
- MySQL の world データベース、PostgreSQL の ksbylending データーベースへの接続情報を記述します。
ApplicationConfig.java
package ksbysample.eipapp.datacopy.config; import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; @Configuration public class ApplicationConfig { @Primary @Bean @ConfigurationProperties("spring.datasource.world") public DataSource dataSourceWorld() { return DataSourceBuilder.create().build(); } @Primary @Bean public PlatformTransactionManager transactionManagerWorld() { return new DataSourceTransactionManager(dataSourceWorld()); } @Bean @ConfigurationProperties("spring.datasource.ksbylending") public DataSource dataSourceKsbylending() { return DataSourceBuilder.create().build(); } @Bean public PlatformTransactionManager transactionManagerKsbylending() { return new DataSourceTransactionManager(dataSourceKsbylending()); } }
- world データベース、ksbylending データベースの DataSource, PlatformTransactionManager Bean を定義します。
ChannelConfig.java
package ksbysample.eipapp.datacopy.integration.channel; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.channel.QueueChannel; import org.springframework.messaging.MessageChannel; @Configuration public class ChannelConfig { @Bean public MessageChannel copyChannel() { return new QueueChannel(100); } @Bean public MessageChannel delChannel() { return new QueueChannel(100); } }
- DirectChannel で作成すると MessageChannel の前後の処理が1トランザクションとして処理されてしまい、getFlow でデータを取得して copyChannel にデータを渡したら status をその時点で ‘01’ に更新したくても copyFlow の処理が正常終了しないと更新されなくなるので、QueueChannel で作成し getFlow だけでトランザクションが終了するようにします。
- delChannel の方も copyFlow の処理だけでトランザクションが終了するようにしたいので、同様に QueueChannel で作成します。
OrdersDto.java
package ksbysample.eipapp.datacopy.dto; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor public class OrdersDto { private String orderId; private String status; }
OrdersDtoRowMapper.java
package ksbysample.eipapp.datacopy.dto; import org.springframework.jdbc.core.RowMapper; import java.sql.ResultSet; import java.sql.SQLException; public class OrdersDtoRowMapper implements RowMapper<OrdersDto> { @Override public OrdersDto mapRow(ResultSet rs, int rowNum) throws SQLException { OrdersDto ordersDto = new OrdersDto(); ordersDto.setOrderId(rs.getString("order_id")); ordersDto.setStatus(rs.getString("status")); return ordersDto; } }
FlowConfig.java
■その1
package ksbysample.eipapp.datacopy.integration.flow; import ksbysample.eipapp.datacopy.dto.OrdersRowMapper; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.core.Pollers; import org.springframework.integration.jdbc.JdbcPollingChannelAdapter; import org.springframework.messaging.MessageChannel; import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; @Configuration public class FlowConfig { private final DataSource dataSourceWorld; private final PlatformTransactionManager transactionManagerWorld; private final MessageChannel copyChannel; public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld , MessageChannel copyChannel) { this.dataSourceWorld = dataSourceWorld; this.transactionManagerWorld = transactionManagerWorld; this.copyChannel = copyChannel; } @Bean public MessageSource<Object> ordersJdbcMessageSource() { JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(this.dataSourceWorld, "select * from orders where status = '00'"); adapter.setRowMapper(new OrdersRowMapper()); adapter.setUpdateSql("update orders set status = '01' where order_id in (:orderId)"); return adapter; } @Bean public IntegrationFlow getFlow() { // MySQL の world データベースの orders テーブルから status = '00' のデータを // 1秒間隔で取得して copyChannel にデータを渡す。取得したデータの status カラム // は '00'→'01' に更新する。 return IntegrationFlows.from(ordersJdbcMessageSource(), c -> c.poller(Pollers.fixedRate(1000) .transactional(this.transactionManagerWorld))) .channel(this.copyChannel) .get(); } }
■その2
@Configuration public class FlowConfig { .......... private final MessageChannel delChannel; private final CopyOrdersMessageHandler copyOrdersMessageHandler; public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld , MessageChannel copyChannel , MessageChannel delChannel , CopyOrdersMessageHandler copyOrdersMessageHandler) { this.dataSourceWorld = dataSourceWorld; this.transactionManagerWorld = transactionManagerWorld; this.copyChannel = copyChannel; this.delChannel = delChannel; this.copyOrdersMessageHandler = copyOrdersMessageHandler; } .......... @Bean public IntegrationFlow copyFlow() { // copyChannel を 1秒間隔でチェックして、データがある場合には PostgreSQL // の ksbylending データベースの orders テーブルに insert する。insert に成功した // 場合にはデータをそのまま delChannel に渡す。 return IntegrationFlows.from(this.copyChannel) .handle(this.copyOrdersMessageHandler , c -> c.poller(Pollers.fixedRate(1000))) .channel(this.delChannel) .get(); } }
■その3
@Configuration public class FlowConfig { .......... private final DelOrdersMessageHandler delOrdersMessageHandler; public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld , MessageChannel copyChannel , MessageChannel delChannel , CopyOrdersMessageHandler copyOrdersMessageHandler , DelOrdersMessageHandler delOrdersMessageHandler) { this.dataSourceWorld = dataSourceWorld; this.transactionManagerWorld = transactionManagerWorld; this.copyChannel = copyChannel; this.delChannel = delChannel; this.copyOrdersMessageHandler = copyOrdersMessageHandler; this.delOrdersMessageHandler = delOrdersMessageHandler; } .......... @Bean public IntegrationFlow delFlow() { // delChannel を 1秒間隔でチェックして、データがある場合には MySQL の // world データベースの orders テーブルのデータを delete する。 return IntegrationFlows.from(this.delChannel) .handle(this.delOrdersMessageHandler , c -> c.poller(Pollers.fixedRate(1000))) .get(); } }
■完成版
package ksbysample.eipapp.datacopy.integration.flow; import ksbysample.eipapp.datacopy.dto.OrdersDtoRowMapper; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.core.Pollers; import org.springframework.integration.jdbc.JdbcPollingChannelAdapter; import org.springframework.messaging.MessageChannel; import org.springframework.transaction.PlatformTransactionManager; import javax.sql.DataSource; @Configuration public class FlowConfig { private final DataSource dataSourceWorld; private final PlatformTransactionManager transactionManagerWorld; private final MessageChannel copyChannel; private final MessageChannel delChannel; private final CopyOrdersMessageHandler copyOrdersMessageHandler; private final DelOrdersMessageHandler delOrdersMessageHandler; public FlowConfig(@Qualifier("dataSourceWorld") DataSource dataSourceWorld , @Qualifier("transactionManagerWorld") PlatformTransactionManager transactionManagerWorld , MessageChannel copyChannel , MessageChannel delChannel , CopyOrdersMessageHandler copyOrdersMessageHandler , DelOrdersMessageHandler delOrdersMessageHandler) { this.dataSourceWorld = dataSourceWorld; this.transactionManagerWorld = transactionManagerWorld; this.copyChannel = copyChannel; this.delChannel = delChannel; this.copyOrdersMessageHandler = copyOrdersMessageHandler; this.delOrdersMessageHandler = delOrdersMessageHandler; } @Bean public MessageSource<Object> ordersJdbcMessageSource() { JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(this.dataSourceWorld, "select * from orders where status = '00'"); adapter.setRowMapper(new OrdersDtoRowMapper()); adapter.setUpdateSql("update orders set status = '01' where order_id in (:orderId)"); return adapter; } @Bean public IntegrationFlow getFlow() { // MySQL の world データベースの orders テーブルから status = '00' のデータを // 1秒間隔で取得して copyChannel にデータを渡す。取得したデータの status カラム // は '00'→'01' に更新する。 return IntegrationFlows.from(ordersJdbcMessageSource(), c -> c.poller(Pollers.fixedRate(1000) .transactional(this.transactionManagerWorld))) .channel(this.copyChannel) .get(); } @Bean public IntegrationFlow copyFlow() { // copyChannel を 1秒間隔でチェックして、データがある場合には PostgreSQL // の ksbylending データベースの orders テーブルに insert する。insert に成功した // 場合にはデータをそのまま delChannel に渡す。 return IntegrationFlows.from(this.copyChannel) .handle(this.copyOrdersMessageHandler , c -> c.poller(Pollers.fixedRate(1000))) .channel(this.delChannel) .get(); } @Bean public IntegrationFlow delFlow() { // delChannel を 1秒間隔でチェックして、データがある場合には MySQL の // world データベースの orders テーブルのデータを delete する。 return IntegrationFlows.from(this.delChannel) .handle(this.delOrdersMessageHandler , c -> c.poller(Pollers.fixedRate(1000))) .get(); } }
CopyOrdersMessageHandler.java
package ksbysample.eipapp.datacopy.integration.flow; import ksbysample.eipapp.datacopy.dto.OrdersDto; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.dsl.support.GenericHandler; import org.springframework.integration.jdbc.JdbcMessageHandler; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.transaction.annotation.Transactional; import javax.sql.DataSource; import java.util.List; import java.util.Map; @MessageEndpoint public class CopyOrdersMessageHandler implements GenericHandler<List<OrdersDto>> { private final DataSource dataSource; public CopyOrdersMessageHandler(@Qualifier("dataSourceKsbylending") DataSource dataSource) { this.dataSource = dataSource; } @Override @Transactional(transactionManager = "transactionManagerKsbylending") public Object handle(List<OrdersDto> payload, Map<String, Object> headers) { JdbcMessageHandler insertHandler = new JdbcMessageHandler(this.dataSource, "insert into orders (order_id) values (:payload.orderId)"); insertHandler.afterPropertiesSet(); payload.stream() .forEach(dto -> { Message<OrdersDto> orderxMessage = MessageBuilder.withPayload(dto).build(); insertHandler.handleMessage(orderxMessage); }); return payload; } }
DelOrdersMessageHandler.java
package ksbysample.eipapp.datacopy.integration.flow; import ksbysample.eipapp.datacopy.dto.OrdersDto; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.dsl.support.GenericHandler; import org.springframework.integration.jdbc.JdbcMessageHandler; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.transaction.annotation.Transactional; import javax.sql.DataSource; import java.util.List; import java.util.Map; @MessageEndpoint public class DelOrdersMessageHandler implements GenericHandler<List<OrdersDto>> { private final DataSource dataSource; public DelOrdersMessageHandler(@Qualifier("dataSourceWorld") DataSource dataSource) { this.dataSource = dataSource; } @Override @Transactional(transactionManager = "transactionManagerWorld") public Object handle(List<OrdersDto> payload, Map<String, Object> headers) { JdbcMessageHandler deleteHandler = new JdbcMessageHandler(this.dataSource , "delete from orders where order_id = :payload.orderId and status = '01'"); deleteHandler.afterPropertiesSet(); payload.stream() .forEach(dto -> { Message<OrdersDto> orderxMessage = MessageBuilder.withPayload(dto).build(); deleteHandler.handleMessage(orderxMessage); }); return null; } }
履歴
2017/01/11
初版発行。