かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その33 )( 5.1 からの新機能 Java Functions Improvements を試してみる )

概要

記事一覧はこちらです。

参照したサイト・書籍

  1. Spring Integration 5.1 goes GA! - Java Functions Improvements
    https://spring.io/blog/2018/10/29/spring-integration-5-1-goes-ga#java-functions-improvements

  2. spring-integration/src/reference/asciidoc/configuration.adoc
    https://github.com/spring-projects/spring-integration/blob/master/src/reference/asciidoc/configuration.adoc

目次

  1. ksbysample-eipapp-functions プロジェクトを作成する
  2. Supplier&Function&Consumer の簡単なサンプルを作成する
  3. Supplier は Supplier<Message<?>> と書くことで Message オブジェクトを返すことが可能
  4. Function#apply で IntegrationFlow Bean の処理を呼び出す

手順

ksbysample-eipapp-functions プロジェクトを作成する

Spring Initializr でプロジェクトの雛形を作成した後、build.gradle を以下のように変更します。

plugins {
    id "org.springframework.boot" version "2.1.6.RELEASE"
    id "java"
    id "idea"
}

apply plugin: "io.spring.dependency-management"

group = "ksbysample.eipapp"
version = "0.0.1-SNAPSHOT"

sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11

idea {
    module {
        inheritOutputDirs = false
        outputDir = file("$buildDir/classes/main/")
    }
}

configurations {
    // annotationProcessor と testAnnotationProcessor、compileOnly と testCompileOnly を併記不要にする
    testAnnotationProcessor.extendsFrom annotationProcessor
    testImplementation.extendsFrom compileOnly
}

repositories {
    mavenCentral()
}

dependencies {
    def lombokVersion = "1.18.6"

    implementation("org.springframework.boot:spring-boot-starter-integration")
    testImplementation("org.springframework.boot:spring-boot-starter-test")

    // for lombok
    // testAnnotationProcessor、testCompileOnly を併記しなくてよいよう configurations で設定している
    annotationProcessor("org.projectlombok:lombok:${lombokVersion}")
    compileOnly("org.projectlombok:lombok:${lombokVersion}")
}

ディレクトリ構成は以下のようにしています。

f:id:ksby:20190630230404p:plain

Supplier&Function&Consumer の簡単なサンプルを作成する

package ksbysample.eipapp.functions;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

@Slf4j
@Configuration
public class SupplierAndFunctionAndConsumerFlow {

    private AtomicInteger count = new AtomicInteger(0);

    // Supplier<?> は ? 型のオブジェクトが payload にセットされた Message を返す
    // MessageSource として利用できる
    @Bean
    public Supplier<Integer> countSupplier() {
        return () -> count.addAndGet(1);
    }

    @Bean
    public IntegrationFlow countDisplayFlow() {
        return IntegrationFlows
                .from(countSupplier()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .transform(DoubleAndToStrFunc())
                .handle(addDotFunc())
                .handle(printFunc())
                .get();
    }

    // Function<?, ?> は .transform(...) で利用したり、
    public Function<Integer, String> DoubleAndToStrFunc() {
        return v -> String.valueOf(v * 2);
    }

    // .handle(...) で利用できる
    public Function<String, String> addDotFunc() {
        return s -> s + "...";
    }

    // Consumer<?> は .handle(...) で利用できる
    public Consumer<String> printFunc() {
        return s -> log.warn(s);
    }

}

上のサンプルを実行すると "2..."、"4..."、"6..." と順に出力されます。

f:id:ksby:20190703010213p:plain

同じ処理を Supplier は MessageSource で置き換えて、Function と Consumer は .transform(...)、.handle(...) に直接記述すると以下のようになります。

package ksbysample.eipapp.functions;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.messaging.support.MessageBuilder;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;

@Slf4j
@Configuration
public class MessageSourceAndLambdaFlow {

    private AtomicInteger count = new AtomicInteger(0);

    @Bean
    public MessageSource<Integer> countMessageSource() {
        return () -> MessageBuilder
                .withPayload(count.addAndGet(1))
                .build();
    }

    @Bean
    public IntegrationFlow countDisplayFlow() {
        return IntegrationFlows
                .from(countMessageSource()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .<Integer, String>transform(v -> String.valueOf(v * 2))
                .handle((Function<String, String>) s -> s + "...")
                .handle((Consumer<String>) s -> log.warn(s))
                .get();
    }

}

f:id:ksby:20190703012428p:plain

lambda 式を使っているのに (Function<String, String>)(Consumer<String>) とキャストしないと build 時にエラーが出るのがなんか今ひとつな気がします。。。

Supplier は Supplier<Message<?>> と書くことで Message オブジェクトを返すことが可能

package ksbysample.eipapp.functions;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

@Configuration
public class SupplierAndFunction2Flow {

    @Autowired
    private NullChannel nullChannel;

    private AtomicInteger count = new AtomicInteger(0);

    // Supplier<Message<?>> という記述にして Message オブジェクトを返すようにすることも出来る
    // この場合 MessageSource の時のように header を追加することが可能。
    @Bean
    public Supplier<Message<Integer>> countSupplier() {
        return () -> MessageBuilder
                .withPayload(count.addAndGet(1))
                .setHeader("random", ThreadLocalRandom.current().nextInt(100))
                .build();
    }

    @Bean
    public IntegrationFlow countDisplayFlow() {
        return IntegrationFlows
                .from(countSupplier()
                        , e -> e.poller(Pollers.fixedDelay(1000)))
                .log(LoggingHandler.Level.WARN)
                .channel(nullChannel)
                .get();
    }

}

実行すると headers の中に random というデータが追加されています。

f:id:ksby:20190703013814p:plain

Function や Consumer でも Function<Message<Integer>, Message<String>>Consumer<Message<String>> のように試してみたのですが、こちらは実行時にエラーになりました。この書き方が出来るのは Supplier だけのようです。

Function#apply で IntegrationFlow Bean の処理を呼び出す

以前 @MessagingGateway アノテーションを付加した interface を用意して IntegrationFlow の処理を呼び出す方法を記載しましたが、Function<?, ?> 型のフィールドを用意して Function#apply を呼び出すと IntegrationFlow の処理を呼び出すことができます。

package ksbysample.eipapp.functions;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.handler.GenericHandler;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;

import java.util.function.Function;

import static java.util.Collections.singletonMap;

@Slf4j
@Component
public class FunctionSample implements CommandLineRunner {

    // IntegrationFlow で定義した処理を呼び出すには、
    // * IntegrationFlows.from(Function.class)... と定義する
    // * Function<?, ?> 型の変数をフィールドに定義し 
    //   @Autowired, @Qualifier("<IntegrationFlow名>.gateway"), @Lazy アノテーションを付与する  
    @Autowired
    @Qualifier("throwExceptionWithRetryFlow.gateway")
    @Lazy
    private Function<String, String> throwExceptionWithRetryFlowFunc;

    @Override
    public void run(String... args) throws Exception {
        System.out.println(throwExceptionWithRetryFlowFunc.apply("success???"));
    }

    @Configuration
    static class FlowConfig {

        /**
         * リトライ回数は最大5回、リトライ時は2秒待機する RetryTemplate を生成する
         *
         * @return {@link RetryTemplate} object
         */
        @Bean
        public RetryTemplate retryTemplate() {
            RetryTemplate retryTemplate = new RetryTemplate();

            // SimpleRetryPolicy
            retryTemplate.setRetryPolicy(
                    new SimpleRetryPolicy(5, singletonMap(Exception.class, true)));

            // FixedBackOffPolicy
            FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
            fixedBackOffPolicy.setBackOffPeriod(2000);
            retryTemplate.setBackOffPolicy(fixedBackOffPolicy);

            return retryTemplate;
        }

        @Bean
        public RequestHandlerRetryAdvice retryAdvice() {
            RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
            retryAdvice.setRetryTemplate(retryTemplate());
            // リトライしても処理が成功しなかった場合には
            // MessageHandlingException#getCause#getMessage で取得したメッセージを payload
            // にセットした Message オブジェクトが次の処理に渡されるようにする
            retryAdvice.setRecoveryCallback(context -> {
                MessageHandlingException e = (MessageHandlingException) context.getLastThrowable();
                String errMsg = e.getCause().getMessage();
                log.error(errMsg);
                return errMsg;
            });
            return retryAdvice;
        }

        @Bean
        public IntegrationFlow throwExceptionWithRetryFlow() {

            return IntegrationFlows
                    .from(Function.class)
                    .handle((GenericHandler<Object>) (p, h) -> {
                        RetryContext retryContext = RetrySynchronizationManager.getContext();
                        log.warn("★★★ リトライ回数 = " + retryContext.getRetryCount());

                        // リトライ処理をさせたいので強制的に RuntimeException を throw する
                        if (true) {
                            throw new RuntimeException("error!!");
                        }
                        return p;
                    }, e -> e.advice(retryAdvice()))
                    .get();
        }

    }

}

実行すると5回リトライして、最後に RuntimeException に渡した "error!!" のメッセージが表示されました。

f:id:ksby:20190703235126p:plain

started throwExceptionWithRetryFlow.gatewaystopped throwExceptionWithRetryFlow.gateway のメッセージが何度も出力されるのが気になります。

複数出力される理由を調べてみると、まず org.springframework.integration.gateway.GatewayProxyFactoryBean#onInit で7個の method 分 gatewayMap に gateway(throwExceptionWithRetryFlow.gateway)が put されて、

f:id:ksby:20190705004210p:plain

org.springframework.integration.gateway.GatewayProxyFactoryBean の doStart、doStop メソッドで gateway.start();gateway.stop(); が呼び出されるからでした。

f:id:ksby:20190705004550p:plain

1つ呼び出す仕組みをつくるだけで gateway が7個 start するとは。。。 あまり多用する仕組みではないのかもしれません。

履歴

2019/07/05
新規作成。