かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

Spring Boot + Spring Integration でいろいろ試してみる ( その21 )( MessageChannel に Redis を使用する2 )

概要

記事一覧はこちらです。

参照したサイト・書籍

目次

  1. org.springframework.integration.redis package の Diagram を作成してみる
  2. Redis へ保存する時のフォーマットを JSON にしてみる
  3. 1プロセス内で受信・出力側をマルチスレッドにしてみる
  4. メモ書き
    1. build.gradle に記述する compile("org.springframework.integration:spring-integration-...") に何があるかは、どこを見ると分かるのか?
    2. Spring IO Platform の BOM の後に Spring Cloud の BOM を記述すると Gradle の checkstyle plugin に適用される Guava のバージョンが 21.0 にならない

手順

org.springframework.integration.redis package の Diagram を作成してみる

Spring Integration Reference Manual - 24. Redis Support の説明は XML ベースで、Spring Integration が Redis 用に用意しているクラスがよく分からないので、IntelliJ IDEA で org.springframework.integration.redis package の Diagram を生成してみます。

RedisChannelMessageStore クラスのソースを表示した後、import 文の “redis” の部分にカーソルを移動してから、コンテキストメニューを表示して「Diagrams」-「Show Diagram…」を選択します。

f:id:ksby:20170319173618p:plain

Diagram が作成されますが package しか表示されていないので展開します。Ctrl+A で全て選択した後、コンテキストメニューを表示して「Expand Nodes」を選択します。

f:id:ksby:20170319180552p:plain

クラスやインターフェースが表示されます。

f:id:ksby:20170319181053p:plain f:id:ksby:20170319181201p:plain

Redis へ保存する時のフォーマットを JSON にしてみる

Redis に保存される時のフォーマットが今のままでは見にくかったので、別のフォーマットにしてみます。

org.springframework.integration.redis.store.RedisChannelMessageStore のソースを見ると、以下のように実装されていました。

  • Key, Value で異なる Serializer がセットされている。
  • Key は RedisTemplate#setKeySerializer で StringRedisSerializer クラスがセットされている。ただし RedisChannelMessageStore には KeySerializer を変更するメソッドは用意されていない。
  • Value は RedisTemplate#setValueSerializer で JdkSerializationRedisSerializer クラスがセットされている。RedisChannelMessageStore にも RedisChannelMessageStore#setValueSerializer が用意されている。

~RedisSerializer クラスが何種類かあるようなので、org.springframework.data.redis.serializer package を Diagram で表示してみます。

f:id:ksby:20170319183944p:plain

Jackson2JsonRedisSerializer というクラスがあったので、これを使用して Value を JSON フォーマットで保存してみます。

src/main/java/ksbysample/eipapp/redisqueue/FlowConfig.java を以下のように変更します。

    @Bean
    public RedisChannelMessageStore redisMessageStore() {
        RedisChannelMessageStore messageStore = new RedisChannelMessageStore(this.redisConnectionFactory);
        messageStore.setValueSerializer(new Jackson2JsonRedisSerializer<>(SampleData.class));
        return messageStore;
    }

clean タスク → Rebuild Project → build タスクを実行した後 bootRun を実行すると、以下の例外が出力されました。

2017-03-19 19:38:10.014  INFO [redisqueue,,,] 2864 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=FlowConfig.SampleData(id=0, createDateTime=2017-03-19T19:38:09.963, message=idGenerator = 02017/03/19 19:38:09 に生成されました), headers={id=439e6e05-e1e5-7db0-f2e1-70e3f0d78b80, timestamp=1489919890012}]
2017-03-19 19:38:10.296 ERROR [redisqueue,de805a358b6b6e2b,de805a358b6b6e2b,true] 2864 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   : org.springframework.data.redis.serializer.SerializationException: Could not read JSON: Unrecognized field "payload" (class ksbysample.eipapp.redisqueue.FlowConfig$SampleData), not marked as ignorable (3 known properties: "id", "createDateTime", "message"])
 at [Source: [B@59d3beef; line: 1, column: 1374] (through reference chain: ksbysample.eipapp.redisqueue.FlowConfig$SampleData["payload"]); nested exception is com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "payload" (class ksbysample.eipapp.redisqueue.FlowConfig$SampleData), not marked as ignorable (3 known properties: "id", "createDateTime", "message"])
 at [Source: [B@59d3beef; line: 1, column: 1374] (through reference chain: ksbysample.eipapp.redisqueue.FlowConfig$SampleData["payload"])
    at org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer.deserialize(Jackson2JsonRedisSerializer.java:73)
    at org.springframework.data.redis.core.AbstractOperations.deserializeValue(AbstractOperations.java:315)
    at org.springframework.data.redis.core.AbstractOperations$ValueDeserializingRedisCallback.doInRedis(AbstractOperations.java:55)
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:204)
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:166)
    at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:88)
    at org.springframework.data.redis.core.DefaultListOperations.rightPop(DefaultListOperations.java:161)
    at org.springframework.data.redis.core.DefaultBoundListOperations.rightPop(DefaultBoundListOperations.java:88)
    at org.springframework.integration.redis.store.RedisChannelMessageStore.pollMessageFromGroup(RedisChannelMessageStore.java:138)
    at org.springframework.integration.store.MessageGroupQueue.doPoll(MessageGroupQueue.java:318)
    at org.springframework.integration.store.MessageGroupQueue.poll(MessageGroupQueue.java:162)
    at org.springframework.integration.store.MessageGroupQueue.poll(MessageGroupQueue.java:49)
    at org.springframework.integration.channel.QueueChannel.doReceive(QueueChannel.java:116)
    at org.springframework.integration.channel.AbstractPollableChannel.receive(AbstractPollableChannel.java:105)
    at org.springframework.integration.endpoint.PollingConsumer.receiveMessage(PollingConsumer.java:192)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "payload" (class ksbysample.eipapp.redisqueue.FlowConfig$SampleData), not marked as ignorable (3 known properties: "id", "createDateTime", "message"])
 at [Source: [B@59d3beef; line: 1, column: 1374] (through reference chain: ksbysample.eipapp.redisqueue.FlowConfig$SampleData["payload"])
    at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:62)
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:834)
    at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1093)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1485)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1439)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:487)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1198)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:314)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:148)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2967)
    at org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer.deserialize(Jackson2JsonRedisSerializer.java:71)
    ... 32 more

Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "payload" (class ksbysample.eipapp.redisqueue.FlowConfig$SampleData), not marked as ignorable (3 known properties: "id", "createDateTime", "message"]) というエラーメッセージが出ています。

Redis を見ると Message は入っていました。

f:id:ksby:20170319194436p:plain

Redis に格納されている Message を以下の操作を行って整形してみます。

  • コマンドプロンプトから文字列をコピーする。
  • 不要な改行コードを削除して 1行にする。
  • \"" へ置換する。
  • 一番最初と最後の " を削除する。
  • IntelliJ IDEA で拡張子が .json のファイルを作成し、そこに編集した文字列をペーストした後、Ctrl+Alt+L を押して整形する。
{
  "payload": {
    "id": 58,
    "createDateTime": {
      "dayOfMonth": 19,
      "dayOfWeek": "SUNDAY",
      "dayOfYear": 78,
      "month": "MARCH",
      "monthValue": 3,
      "year": 2017,
      "hour": 19,
      "minute": 38,
      "nano": 243000000,
      "second": 22,
      "chronology": {
        "id": "ISO",
        "calendarType": "iso8601"
      }
    },
    "message": "idGenerator = 58 \xe3\x81\xaf 2017/03/19 19:38:22 \xe3\x81\xab\xe7\x94\x9f\xe6\x88\x90\xe3\x81\x95\xe3\x82\x8c\xe3\x81\xbe\xe3\x81\x97\xe3\x81\x9f"
  },
  "headers": {
    "X-B3-ParentSpanId": "59580c3007f1dfc4",
    "X-Message-Sent": true,
    "messageSent": true,
    "spanName": "message:redisQueue",
    "spanTraceId": "59580c3007f1dfc4",
    "spanId": "ed3685c62127a187",
    "spanParentSpanId": "59580c3007f1dfc4",
    "X-Span-Name": "message:redisQueue",
    "X-B3-SpanId": "ed3685c62127a187",
    "currentSpan": {
      "begin": 1489919902245,
      "name": "message:redisQueue",
      "traceId": 6437909067757379524,
      "parents": [
        6437909067757379524
      ],
      "spanId": -1353747551971991161,
      "exportable": true,
      "tags": {
        "message/payload-type": "ksbysample.eipapp.redisqueue.FlowConfig.SampleData"
      },
      "logs": [
        {
          "timestamp": 1489919902245,
          "event": "sr"
        }
      ]
    },
    "X-B3-Sampled": "1",
    "X-B3-TraceId": "59580c3007f1dfc4",
    "id": "45f64e56-471e-a2c2-2fe5-09bffc92670a",
    "X-Current-Span": {
      "begin": 1489919902245,
      "name": "message:redisQueue",
      "traceId": 6437909067757379524,
      "parents": [
        6437909067757379524
      ],
      "spanId": -1353747551971991161,
      "exportable": true,
      "tags": {
        "message/payload-type": "ksbysample.eipapp.redisqueue.FlowConfig.SampleData"
      },
      "logs": [
        {
          "timestamp": 1489919902245,
          "event": "sr"
        }
      ]
    },
    "spanSampled": "1",
    "timestamp": 1489919902245
  }
}
  • Redis には入っていることから送信はできるが受信ができていない。
  • JdkSerializationRedisSerializer クラスの場合には問題なかったが、Jackson2JsonRedisSerializer クラスに変えただけだとこの問題が発生する。

ということでしょうか。。。

そしていろいろ調べた結果、分かったことは。。。簡単にやるのは無理!ということでした。

  • Jackson2JsonRedisSerializer クラスは Spring Data Redis の機能として用意されているクラスであり、Spring Integration の Redis Support 用に作られたものではない。
  • serializer に Jackson2JsonRedisSerializer クラスを指定した場合、Message の serialize は行われるが deserialize で引っかかる。いろいろ試してみたら payload はまだ何とかなるが、MessageHeaders クラスの deserialize がうまくいかない。Jackson 用に専用の converter か何かを作り込めば何とかなるのかもしれないが、そこまではやりたくないな。。。

とりあえず現時点での結論は、Redis に Message を入れる時はデフォルトでセットされている JdkSerializationRedisSerializer クラスをそのまま使いましょう、ということにします。

1プロセス内で受信・出力側をマルチスレッドにしてみる

マルチプロセスでの受信・出力は問題なかったので、今度は1つのプロセス内でマルチスレッドで受信・出力してみます。

上で org.springframework.integration.redis package の Diagram を作成した時に RedisQueueMessageDrivenEndpoint クラスというのを見つけました。コードを見てみると MessageProducerSupport の継承クラスでしたので、MessageProducer として使えそうです。今回はこれを使用してみます。

src/main/java/ksbysample/eipapp/redisqueue/FlowConfig.java を以下のように変更します。

@Slf4j
@Configuration
public class FlowConfig {

    private static final String REDIS_LISTS_NAME = "REDIS_QUEUE";

    ..........

    @Bean
    public QueueChannel redisQueue() {
        return MessageChannels.queue("redisQueue", redisMessageStore(), REDIS_LISTS_NAME).get();
    }

    @Bean
    public MessageProducerSupport redisMessageProducer() {
        RedisQueueMessageDrivenEndpoint messageProducer
                = new RedisQueueMessageDrivenEndpoint(REDIS_LISTS_NAME, this.redisConnectionFactory);
        messageProducer.setTaskExecutor(Executors.newFixedThreadPool(5));
        messageProducer.setRecoveryInterval(1000);
        // setExpectMessage(true) を入れないと Redis Lists から取得したデータ ( headers + SampleData オブジェクトの payload )
        // が全て payload にセットされる
        messageProducer.setExpectMessage(true);
        return messageProducer;
    }

    ..........

    @Bean
    public IntegrationFlow printFlow() {
        return IntegrationFlows
                .from(redisMessageProducer())
                .<SampleData>handle((p, h) -> {
                    log.info("★★★ " + p.getMessage());
                    return null;
                })
                .get();
    }

}
  • クラスに @Slf4j アノテーションを付加します。
  • redisQueue メソッドと redisMessageProducer メソッドで同じ MessageChannel を指定するので、private static final String REDIS_LISTS_NAME = "REDIS_QUEUE"; を追加します。
  • redisQueue メソッド内で "REDIS_QUEUE"REDIS_LISTS_NAME へ変更します。
  • redisMessageProducer メソッドを追加します。メソッド内で messageProducer.setTaskExecutor(Executors.newFixedThreadPool(5)); を記述して5スレッドでメッセージを受信するようにします。
  • printFlow メソッドの以下の点を変更します。
    • .from(redisQueue()).from(redisMessageProducer()) へ変更します。
    • .bridge(e -> e.poller(Pollers.fixedDelay(1000))) を削除します。
    • System.out.println("★★★ " + p.getMessage());log.info("★★★ " + p.getMessage()); へ変更し、スレッド名がログに出力されるようにします。

clean タスク → Rebuild Project → build タスクを実行した後 bootRun を実行します。

f:id:ksby:20170320081003p:plain

Message は受信できていますが、全然マルチスレッドになっていませんね。。。 全て [pool-1-thread-1] でした。

この後送受信の時間を変更したりもしてみましたが、結果は同じでマルチスレッドにはなりませんでした。RedisQueueMessageDrivenEndpoint#setTaskExecutor に Executors.newFixedThreadPool(5) とかをセットしてもマルチスレッドにはならないようです。

src/main/java/ksbysample/eipapp/redisqueue/FlowConfig.java を以下のように変更すれば、Message の受信は1スレッドですが、その後の処理はマルチスレッドにはなります。

    @Bean
    public MessageProducerSupport redisMessageProducer() {
        RedisQueueMessageDrivenEndpoint messageProducer
                = new RedisQueueMessageDrivenEndpoint(REDIS_LISTS_NAME, this.redisConnectionFactory);
        messageProducer.setRecoveryInterval(1000);
        // setExpectMessage(true) を入れないと Redis Lists から取得したデータ ( headers + SampleData オブジェクトの payload )
        // が全て payload にセットされる
        messageProducer.setExpectMessage(true);
        return messageProducer;
    }

    ..........

    @Bean
    public IntegrationFlow printFlow() {
        return IntegrationFlows
                .from(redisMessageProducer())
                .log()
                .channel(c -> c.executor(Executors.newFixedThreadPool(5)))
                .<SampleData>handle((p, h) -> {
                    log.info("★★★ " + p.getMessage());
                    return null;
                })
                .get();
    }
  • redisMessageProducer メソッドから messageProducer.setTaskExecutor(Executors.newFixedThreadPool(5)); を削除します。
  • printFlow メソッドの以下の点を変更します。
    • .from(...) の直後に .log().channel(c -> c.executor(Executors.newFixedThreadPool(5))) を追加します。

bootRun を実行してみます。

f:id:ksby:20170320082154p:plain

メッセージの受信は全て [hannel-adapter1] ですが、その後の処理は全て異なるスレッドになりました。

MessageChannel からの受信処理をマルチスレッドにする方法が調べてもよく分かりませんでした。上の結果も当初の想定とは異なりますが、今回はこれで終了にします。

メモ書き

build.gradle に記述する compile("org.springframework.integration:spring-integration-...") に何があるかは、どこを見ると分かるのか?

どこかに一覧があるのか探してみたところ、spring-projects/spring-integration-java-dslbuild.gradle の dependencies に記載がありました。通常はこれを見るのが早そうです。

また、この build.gradle の dependencies ですが、[ ... ].each { compile( ... ) } という書き方が出来ることを初めて知りました。それ以外にも、

  • apply from: "${rootProject.projectDir}/publish-maven.gradle" という記述があり、publish-maven.gradle も見た感じでは Maven への公開を Gradle で書く方法のようです。
  • Spring Integration の 8.7 Scripting support の関連だと思いますが、org.jruby:jrubyorg.python:jython-standalone が書いてありました。これを入れると Ruty や Python で書いたものが実行できるのでしょうか?

Spring IO Platform の BOM の後に Spring Cloud の BOM を記述すると Gradle の checkstyle plugin に適用される Guava のバージョンが 21.0 にならない

最初 build.gradle の dependencyManagement に以下のように記述していたのですが、

dependencyManagement {
    imports {
        mavenBom("io.spring.platform:platform-bom:Athens-SR4") {
            bomProperty 'commons-lang3.version', '3.5'
            bomProperty 'guava.version', '21.0'
        }
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE")
    }
}

この状態で gradlew dependencies を実行してみたところ、checkstyle plugin の Guava のバージョンが 21.0 ではなく 18.0 になっていました。bomProperty 'guava.version', '21.0' の変更がなぜか反映されなくなっています。

checkstyle - The Checkstyle libraries to be used for this project.
\--- com.puppycrawl.tools:checkstyle:7.6
     +--- antlr:antlr:2.7.7
     +--- org.antlr:antlr4-runtime:4.6
     +--- commons-beanutils:commons-beanutils:1.9.3
     |    \--- commons-collections:commons-collections:3.2.2
     +--- commons-cli:commons-cli:1.3.1
     \--- com.google.guava:guava:19.0 -> 18.0

試しに BOM の順番を以下のように変更したところ、

dependencyManagement {
    imports {
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:Camden.RELEASE")
        mavenBom("io.spring.platform:platform-bom:Athens-SR4") {
            bomProperty 'commons-lang3.version', '3.5'
            bomProperty 'guava.version', '21.0'
        }
    }
}

checkstyle plugin の Guava のバージョンが 21.0 になりました。

checkstyle - The Checkstyle libraries to be used for this project.
\--- com.puppycrawl.tools:checkstyle:7.6
     +--- antlr:antlr:2.7.7
     +--- org.antlr:antlr4-runtime:4.6
     +--- commons-beanutils:commons-beanutils:1.9.3
     |    \--- commons-collections:commons-collections:3.2.2
     +--- commons-cli:commons-cli:1.3.1
     \--- com.google.guava:guava:19.0 -> 21.0

そもそも BOM を複数書いているサンプルを見たことがなく、BOM を書く順番ってあるのかな?とは思っていましたが、あるようです。

ソースコード

履歴

2017/03/20
初版発行。