かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

IntelliJ IDEA を 2020.1.2 → 2020.1.3 へバージョンアップ

IntelliJ IDEA を 2020.1.2 → 2020.1.3 へバージョンアップする

IntelliJ IDEA の 2020.1.3 がリリースされているのでバージョンアップします。

※ksbysample-webapp-lending プロジェクトを開いた状態でバージョンアップしています。

  1. IntelliJ IDEA のメインメニューから「Help」-「Check for Updates...」を選択します。

  2. IDE and Plugin Updates」ダイアログが表示されます。右下に「Update and Restart」ボタンが表示されていますので、「Update and Restart」ボタンをクリックします。

    f:id:ksby:20200711193651p:plain

  3. Plugin の update も表示されました。このまま「Update and Restart」ボタンをクリックします。

    f:id:ksby:20200711193738p:plain

  4. Patch がダウンロードされて IntelliJ IDEA が再起動します。

  5. IntelliJ IDEA が起動すると画面下部に「Indexing…」のメッセージが表示されますので、終了するまで待機します。

    f:id:ksby:20200711194319p:plain

  6. IntelliJ IDEA のメインメニューから「Help」-「About」を選択し、2020.1.3 へバージョンアップされていることを確認します。

  7. Gradle Tool Window の左上にある「Refresh all Gradle projects」ボタンをクリックして更新します。

  8. clean タスク実行 → Rebuild Project 実行 → build タスクを実行して、"BUILD SUCCESSFUL" のメッセージが出力されることを確認します。

    f:id:ksby:20200711195057p:plain

  9. Project Tool Window で src/test でコンテキストメニューを表示して「Run 'All Tests' with Coverage」を選択し、テストが全て成功することを確認します。

    f:id:ksby:20200711200006p:plain

boto3 のインスタンス生成をグローバルで行っても moto を利用したユニットテストを成功させるには?

概要

記事一覧はこちらです。

moto を利用したユニットテストが成功しなくなるので boto3 のインスタンス生成を Lambda のハンドラー関数内で行うようにしていましたが、aws-lambda-powertools を試してみる(Tracer&X-Ray 編その2) で boto3 のインスタンス生成をグローバルで行った方が処理にかかる時間が短くなることが判明したため、boto3 のインスタンス生成をグローバルで行っても moto を利用したユニットテストが成功する方法を調べることにします。

今回は resize-image-app-project プロジェクトで作成した AWS Lambda のユニットテストを作成する(local動作版)ユニットテストを行った resize-image-app-project プロジェクトを使用します。

参照したサイト・書籍

  1. What about those pesky imports?
    https://github.com/spulec/moto#what-about-those-pesky-imports

目次

  1. テスト対象のモジュールの import 文はテスト関数内に記述する

手順

テスト対象のモジュールの import 文はテスト関数内に記述する

moto の GitHubWhat about those pesky imports? に解決方法が書かれていました。import 文が関数の中に書けるとは思いませんでした。

resize_service/handler.py で s3_client = boto3.client('s3') を記述する位置をハンドラー(resize 関数)の外側に移動してから、

import logging
import os
import re
import uuid
from urllib.parse import unquote_plus

import boto3
from PIL import Image

thumbnail_size = 320, 180

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3_client = boto3.client('s3')


def resize_image(image_path, resized_path):
    with Image.open(image_path) as image:
        image.thumbnail(thumbnail_size)
        image.save(resized_path)


def resize(event, context):
    for record in event['Records']:
        ..........

テストクラスの外に書いている from resize_service import handler を、

import json
import unittest

import boto3
from moto import mock_s3

from resize_service import handler


@mock_s3
class TestResizeService(unittest.TestCase):
    ..........

    def test_resize(self):
        s3_client = boto3.client('s3')
        s3_client.upload_file('tests/sample.jpg', TestResizeService.UPLOAD_BUCKET, 'sample.jpg')

        with open('tests/s3_event.json', 'r') as f:
            event = json.load(f)

        handler.resize(event, None)

        ..........

テスト関数内に移動します。

import json
import unittest

import boto3
from moto import mock_s3


@mock_s3
class TestResizeService(unittest.TestCase):
    ..........

    def test_resize(self):
        from resize_service import handler

        s3_client = boto3.client('s3')
        s3_client.upload_file('tests/sample.jpg', TestResizeService.UPLOAD_BUCKET, 'sample.jpg')

        with open('tests/s3_event.json', 'r') as f:
            event = json.load(f)

        handler.resize(event, None)

        ..........

python -m unittest -v を実行するとテストは成功し、

f:id:ksby:20200707000454p:plain

IntelliJ IDEA 上でテストを実行しても問題なく成功します。

f:id:ksby:20200707000613p:plain

Docker 上で実行する方法も試してみましたが、問題なく成功しました。

履歴

2020/07/07
初版発行。

aws-lambda-powertools を試してみる(Tracer&X-Ray 編その2)

概要

記事一覧はこちらです。

aws-lambda-powertools を試してみる(Tracer&X-Ray 編その1) の続きです。

参照したサイト・書籍

  1. SQS Tracing with AWSTraceHeader
    https://github.com/aws/aws-xray-sdk-node/issues/208

    • SQS や SNSX-Ray はサポートされたが Service Map でつなげて表示するところまではまだ実現できていないとのこと。

目次

  1. API Gateway の Endpoint にメッセージを送信して X-Ray の表示内容を確認する
  2. SQS は X-Ray の円がつながらず SNS はつながる理由とは?
  3. logger.structure_logs(...)で AWSTraceHeader と trace_id を出力するよう設定して API Gateway → Lambda → SQS → Lambda の流れを CloudWatch Logs Insights で追えるようにする
  4. boto3 のインスタンスの生成はグローバルで実施した方が2回目以降は速い
  5. 最後に

手順

API Gateway の Endpoint にメッセージを送信して X-Ray の表示内容を確認する

Postman からメッセージを送信すると 200 OK が返ってきました。

f:id:ksby:20200704230203p:plain

AWS マネジメントコンソールの X-Ray の Service Map を見ると以下の画面が表示されました。

f:id:ksby:20200704231905p:plain

  • 赤丸:API Gateway → Lambda(recvMsg)
  • 青色:Lambda(recvMsg) → DynamoDB → DynamoDB Streams → Lambda(processSampleTableStream)
  • 緑色:Lambda(recvMsg) → SQS → Lambda(processSampleQueue)
  • 黄色:Lambda(recvMsg) → SNS → Lambda(processSampleTopic)
  • 紫色:Lambda(recvMsg) → Lambda(processSync)
  • オレンジ色:Lambda(recvMsg) → Lambda(processAsync)

気になったのは、

  • DynamoDB は X-Ray がまだサポートしていないので円がつながらないのは分かるのですが、SQS は AWS X-Ray が Amazon SQS をサポート の記事が出ているにもかかわらずこちらもつながって表示されません。
  • SNSAmazon SNS で AWS X-Ray のサポートを追加 の記事が出ていますが、こちらはつながって表示されています。
  • Lambda の呼び出しは同期呼び出しだと円が2つ、非同期呼び出しだと1つです。

Traces には以下の一覧が表示されており、

f:id:ksby:20200705184407p:plain

メソッドに POST と表示されている行の詳細を表示すると以下のデータが表示されました。

  • Cold Start 時は Initialization の時間がかかります。
  • 大した処理を記述していない Lambda でも同期呼び出しだと初回は Initialization 並みの時間がかかっています。まあ当然ですね。
  • @tracer.capture_method を付与した関数名が表示されており、関数毎の処理時間が分かります。

f:id:ksby:20200704232344p:plain f:id:ksby:20200704232447p:plain f:id:ksby:20200704232609p:plain f:id:ksby:20200704232716p:plain f:id:ksby:20200704232809p:plain

SQS は X-Ray の円がつながらず SNS はつながる理由とは?

Lambda(recvMsg)の CloudWatch のログを見ると decorate のログの message.headers.X-Amzn-Trace-Id に X-Ray のトレースID が出力されています。

f:id:ksby:20200704235805p:plain

Lambda(recvMsg)の logger.debug(event) が出力したログを見ると、AWSTraceHeader(message.headers.X-Amzn-Trace-Id の値が出力されています)と traceId に同じ値がセットされています。

f:id:ksby:20200705000241p:plain

Lambda(processSampleQueue)の CloudWatch の decorate のログを見ると message.Records[0].attributes.AWSTraceHeader に message.headers.X-Amzn-Trace-Id に出力されていたトレースID が含まれています(メッセージ送信時にセットしていないにもかかわらず)。

f:id:ksby:20200705000931p:plain

Lambda(processSampleQueue)の logger.debug(event) が出力したログを見ると、traceId には AWSTraceHeader にセットされていたものとは別のトレースID がセットされています。

f:id:ksby:20200705001624p:plain

Lambda(processSampleTopic)の CloudWatch の decorate のログを見ると message.headers.X-Amzn-Trace-Id に出力されていたトレースID はどこにもセットされていませんが、

f:id:ksby:20200705002429p:plain

Lambda(processSampleTopic)の logger.debug(event) が出力したログを見ると、traceId に message.headers.X-Amzn-Trace-Id に出力されていたトレースID がセットされています。

f:id:ksby:20200705002631p:plain

ということで、SQS 経由だとトレースID が引き継がれませんが SNS 経由だと引き継がれるからという理由のようです。SQS は X-Ray でサポートされていると発表されているので円がつながるようにできそうな気がしたのですが、今回調べた限りではその方法はさっぱり分かりませんでした。。。

logger.structure_logs(...)で AWSTraceHeader と trace_id を出力するよう設定して API Gateway → Lambda → SQS → Lambda の流れを CloudWatch Logs Insights で追えるようにする

SQS 経由の場合、メッセージの送信元の Lambda と受信先の Lambda でトレースID は異なりますが AWSTraceHeader で送信元のトレースID は伝わるようなので、logger.structure_logs(...) で AWSTraceHeader を出力するようにしておけば一連の流れをログで追えそうかなと思いました。ついでにトレースID も出力します。

API Gateway から呼び出される Lambda(recvMsg)では以下のように実装し、

    awsTraceHeader = event['headers']['X-Amzn-Trace-Id']
    logger.structure_logs(append=True,
                          AWSTraceHeader=awsTraceHeader,
                          traceId=xray_recorder.current_segment().trace_id)

SQS 経由で呼び出される Lambda(processSampleQueue)では以下のように実装します。AWSTraceHeader はメッセージの属性に付加されるので、最初は None にしておいてメッセージの処理をする時に record['attributes']['AWSTraceHeader'] で上書きします。

    logger.structure_logs(append=True,
                          AWSTraceHeader=None,
                          traceId=xray_recorder.current_segment().trace_id)
    ..........

    for record in event['Records']:
        logger.structure_logs(append=True,
                              AWSTraceHeader=record['attributes']['AWSTraceHeader'])
        ..........

AWS マネジメントコンソールで CloudWatch Logs のロググループを表示してから、Lambda(recvMsg)と Lambda(processSampleQueue)のログを選択して CloudWatch Logs Insight で表示します。

f:id:ksby:20200705161401p:plain

クエリを以下の内容に変更してから実行すると、

fields @timestamp, level, substr(AWSTraceHeader, 0, 40), function_name, message
| filter AWSTraceHeader =~ /1-5f008baf-76bb78b8aaa6a3e0546581c8/
| sort @timestamp desc
| limit 20
  • AWSTraceHeader だけ指定した場合 Lambda(processSampleQueue)で出力される文字列は Parent 等項目が追加されて少し長くなるので、確認したい文字列だけ出力されるよう substr(AWSTraceHeader, 0, 40) で指定します。
  • Lambda(recvMsg)のログなのか Lambda(processSampleQueue)のログなのかが分かるよう function_name を出力します。
  • 今回は処理内容がシンプルに分かればよいので @messagemessage に変更します。
  • API Gateway で発行されたトレースID 1-5f008baf-76bb78b8aaa6a3e0546581c8 が含まれているログを取得したいので filter AWSTraceHeader =~ /1-5f008baf-76bb78b8aaa6a3e0546581c8/ を追加します。

API Gateway → Lambda(recvMsg) → SQS → Lambda(processSampleQueue)の流れをログで追うことができました。SQS にメッセージを送信する時に orderedItem を1件ずつ分けずに event['body'] まるごと送信している失敗にも気づいてしまいましたが。。。

f:id:ksby:20200705161624p:plain f:id:ksby:20200705161816p:plain

また SQS にはメッセージを2回しか送信していないのに Lambda(processSampleQueue)のログがなぜ3回出ているの?と思ったのですが、decorate のログは1回目は以下の内容なのですが、

f:id:ksby:20200705164942p:plain

2回目は @logger.inject_lambda_contextlogger.structure_logs(...) で追加された項目が出ていたためでした。

f:id:ksby:20200705165056p:plain

boto3 のインスタンスの生成はグローバルで実施した方が2回目以降は速い

sample_service/apigw_handler.py で boto3 のインスタンスを生成するタイミングをハンドラーの前にしていますが、

..........

dynamodb_sample_table_tbl = boto3.resource('dynamodb').Table('sample-table')
sqs_client = boto3.client('sqs')
sns_client = boto3.client('sns')
lambda_client = boto3.client('lambda')


# @logger.inject_lambda_context を付けておくとログ出力時に context にセットされている
# function_name 等の情報がセットされる
# Capturing context Lambda info
# https://awslabs.github.io/aws-lambda-powertools-python/core/logger/#capturing-context-lambda-info
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def recv_msg(event, context):
    ..........

この方式で API Gateway にアクセスすると Lambda(recvMsg)の実行時間は cold start がなければ 321ms、278ms、252ms でした。

これをハンドラーが呼び出されてから都度生成されるように以下のように変更してみたところ、

@tracer.capture_method
def put_item_to_dynamodb(orderNumber, orderedItem):
    dynamodb_sample_table_tbl = boto3.resource('dynamodb').Table('sample-table')
    dynamodb_sample_table_tbl.put_item(
        Item={
            ..........

Lambda(recvMsg)の実行時間は cold start がなければ 627ms、515ms、554ms でした。約2倍です。

以前ユニットテストを書いた時にはエラーが出るのでハンドラー関数内で生成するようにしましたが、グローバルで1度だけ生成するようにした方がやっぱり速いんですね。当然といえば当然。。。 ユニットテストでエラーが出るのを解決する別の方法を調べることにします。

最後に

  • Logger は JSON で出力できたり、context の function_name 等の情報を自動でセットしてくれたりするのが便利です。
  • logger.structure_logs(...) で共通で出力しておきたい項目をセットしておけるのも使い勝手がいいなと思いました。
  • 始めて X-Ray を使ったのですが、Serverless Framework の設定と Tracer で本当に簡単に連携できるようになるので驚きでした。X-Ray はなんか面倒な印象があったのですが、こんなに簡単に使えるならば積極的に使っていきたいと思います。

AWS Black Belt Online Seminar AWS X-Ray 資料及び QA 公開 も見つけたので、後で見ておきましょう。

履歴

2020/07/05
初版発行。

aws-lambda-powertools を試してみる(Tracer&X-Ray 編その1)

概要

記事一覧はこちらです。

aws-lambda-powertools を試してみる(Logger 編) の続きで、今回は Tracer を試してみます。lambda-powertools-project プロジェクトを引き続き使用します。

API Gateway から呼び出された Lambda から以下の5パターンの処理を実行し、X-Ray にどう表示されるのか確認します。

  • DynamoDB にデータを追加 → DynamoDB Streams 経由で Lambda を呼び出す。
  • SQS にメッセージを送信して Lambda を呼び出す。
  • SNS にメッセージを送信して Lambda を呼び出す。
  • 別の Lambda を同期呼び出しする。
  • 別の Lambda を非同期呼び出しする。

f:id:ksby:20200704115315p:plain:w450

参照したサイト・書籍

  1. Introducing AWS Lambda Destinations
    https://aws.amazon.com/jp/blogs/compute/introducing-aws-lambda-destinations/

  2. AWS X-Ray と他の AWS のサービスの統合
    https://docs.aws.amazon.com/ja_jp/xray/latest/devguide/xray-services.html

  3. DynamoDB Streamsの改めて再検証してみた
    https://dev.classmethod.jp/articles/dynamodb-streams-scale-test/

  4. Amazon DynamoDB and Serverless - The Ultimate Guide
    https://www.serverless.com/dynamodb

  5. Event-driven processing with Serverless and DynamoDB streams
    https://www.serverless.com/blog/event-driven-architecture-dynamodb

  6. DynamoDB streams creation
    https://forum.serverless.com/t/dynamodb-streams-creation/792

  7. Welcome to the AWS X-Ray SDK for Python!
    https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/index.html

  8. aws / aws-xray-sdk-python
    https://github.com/aws/aws-xray-sdk-python

  9. Pseudo Parameters Reference
    https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/pseudo-parameter-reference.html

目次

  1. serverless.yml を変更する
  2. API Gateway から呼び出される apigw_handler.py を変更する
  3. DynamoDB から DynamoDB Streams 経由で呼び出される dynamodb_handler.py を作成する
  4. SQS 経由で呼び出される sqs_handler.py を作成する
  5. SNS 経由で呼び出される sns_handler.py を作成する
  6. 同期で呼び出す sync_handler.py を作成する
  7. 非同期で呼び出す async_handler.py を作成する
  8. deploy し直す

手順

serverless.yaml を変更する

sample_service/serverless.yaml を今回の環境に合わせて以下の内容にします。

service: sample-service

plugins:
  - serverless-python-requirements

custom:
  queueName: "SampleQueue"
  topicName: "SampleTopic"
  pythonRequirements:
    dockerizePip: true

provider:
  name: aws
  runtime: python3.8
  stage: dev
  region: ap-northeast-1
  environment:
    # aws-lambda-powertools 用環境変数
    LOG_LEVEL: DEBUG
    POWERTOOLS_LOGGER_LOG_EVENT: true
    POWERTOOLS_METRICS_NAMESPACE: lambda-powertools-project
    POWERTOOLS_SERVICE_NAME: sample-service
  tracing:
    apiGateway: true
    lambda: true

  iamRoleStatements:
    - Effect: Allow
      Action:
        - dynamodb:Query
        - dynamodb:Scan
        - dynamodb:GetItem
        - dynamodb:PutItem
        - dynamodb:UpdateItem
        - dynamodb:DeleteItem
      Resource:
        - Fn::GetAtt: [ SampleTable, Arn ]
    - Effect: Allow
      Action:
        - kms:GenerateDataKey
        - kms:Decrypt
        - sqs:*
      Resource:
        - Fn::GetAtt: [ SampleQueue, Arn ]
    - Effect: Allow
      Action:
        - kms:GenerateDataKey
        - kms:Decrypt
        - sns:Publish
        - sns:Subscribe
      Resource:
        - !Ref NotifySampleTopic
    - Effect: Allow
      Action:
        - lambda:InvokeFunction
        - lambda:InvokeAsync
      Resource:
        - Fn::Join:
          - ":"
          - - "arn:aws:lambda"
            - Ref: "AWS::Region"
            - Ref: "AWS::AccountId"
            - "function:*"

functions:
  recvMsg:
    handler: apigw_handler.recv_msg
    events:
      - http:
          path: recv-msg
          method: post
          cors: true
          request:
            schema:
              application/json: ${file(msg_schema.json)}
    environment:
      QUEUE_URL: !Ref SampleQueue
      TOPIC_ARN: !Ref NotifySampleTopic

  processSampleTableStream:
    handler: dynamodb_handler.process_sample_table_stream
    events:
      - stream:
          type: dynamodb
          batchSize: 1
          startingPosition: TRIM_HORIZON
          arn:
            Fn::GetAtt: [ SampleTable, StreamArn ]

  processSampleQueue:
    handler: sqs_handler.process_sample_queue
    events:
      - sqs:
          arn:
            Fn::GetAtt: [ SampleQueue, Arn ]

  processSampleTopic:
    handler: sns_handler.process_sample_topic
    events:
      - sns:
        arn: !Ref NotifySampleTopic
        topicName: "${self:custom.topicName}"

  processSync:
    handler: sync_handler.process_sync

  processAsync:
    handler: async_handler.process_async

resources:
  Resources:
    SampleTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: sample-table
        AttributeDefinitions:
          - AttributeName: orderNumber
            AttributeType: S
          - AttributeName: orderItemNumber
            AttributeType: S
        KeySchema:
          - AttributeName: orderNumber
            KeyType: HASH
          - AttributeName: orderItemNumber
            KeyType: RANGE
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        StreamSpecification:
          StreamViewType: NEW_IMAGE

    SampleQueue:
      Type: AWS::SQS::Queue
      Properties:
        QueueName: "${self:custom.queueName}"

    NotifySampleTopic:
      Type: AWS::SNS::Topic
      Properties:
        TopicName: "${self:custom.topicName}"
    NotifySampleSubscription:
      Type: AWS::SNS::Subscription
      DependsOn:
        - ProcessSampleTopicLambdaFunction
      Properties:
        TopicArn: !Ref NotifySampleTopic
        Endpoint:
          Fn::GetAtt:
            - ProcessSampleTopicLambdaFunction
            - Arn
        Protocol: lambda
    NotifySampleLambdaResourcePolicy:
      Type: AWS::Lambda::Permission
      DependsOn:
        - ProcessSampleTopicLambdaFunction
      Properties:
        FunctionName: !Ref ProcessSampleTopicLambdaFunction
        Principal: sns.amazonaws.com
        Action: "lambda:InvokeFunction"
        SourceArn: !Ref NotifySampleTopic
  • aws-lambda-powertools 用環境変数は、
    • debug ログを出力したいので LOG_LEVEL: DEBUG にします。
    • decorate のログを出力したいので POWERTOOLS_LOGGER_LOG_EVENT: true にします。
  • provider.iamRoleStatements に必要な IAM Role を設定します。ただし Lambda の同期・非同期呼び出しのための lambda:InvokeFunction、lambda:InvokeAsync の Resource は - Fn::GetAtt: [ processSync, Arn ] と記述すると Error: The CloudFormation template is invalid: Template error: instance of Fn::GetAtt references undefined resource processSync というエラーが、- Fn::GetAtt: [ ProcessSyncLambdaFunction, Arn ] と記述すると Error: The CloudFormation template is invalid: Circular dependency between resources: [...] というエラーが出て設定方法が全然分からなかったので、arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:* の形式(Lambda を * で指定する)にしています。
  • resources.Resources に今回使用する DynamoDB のテーブル(SampleTable)、SQS のキュー(SampleQueue)、SNS のトピック(SampleTopic)を記述します。
  • DynamoDB のテーブル(SampleTable)は DynamoDB Streams を使用するので StreamSpecification に StreamViewType: NEW_IMAGE を記述します。

API Gateway から呼び出される apigw_handler.py を変更する

sample_service/apigw_handler.py を以下の内容にします。put_item_to_dynamodb(...) 以外の関数に orderedItem を渡さず event['body'] を渡しているのはミスです。。。(次の記事をほぼ書いた時に気づきました)

import json
import os

import boto3
from aws_lambda_powertools import Logger, Tracer
from aws_xray_sdk.core import xray_recorder

logger = Logger()
tracer = Tracer()

dynamodb_sample_table_tbl = boto3.resource('dynamodb').Table('sample-table')
sqs_client = boto3.client('sqs')
sns_client = boto3.client('sns')
lambda_client = boto3.client('lambda')


# @logger.inject_lambda_context を付けておくとログ出力時に context にセットされている
# function_name 等の情報がセットされる
# Capturing context Lambda info
# https://awslabs.github.io/aws-lambda-powertools-python/core/logger/#capturing-context-lambda-info
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def recv_msg(event, context):
    # API Gateway に発行された traceId は event.headers.X-Amzn-Trace-Id にセットされている
    # Lambda 用に発行された traceId は xray_recorder.current_segment().trace_id にセットされている
    awsTraceHeader = event['headers']['X-Amzn-Trace-Id']
    logger.structure_logs(append=True,
                          AWSTraceHeader=awsTraceHeader,
                          traceId=xray_recorder.current_segment().trace_id)
    logger.debug(event)

    request_body = json.loads(event['body'])
    # logger.info({
    #     "orderNumber": request_body['orderNumber'],
    #     "orderDate": request_body['orderDate']
    # })

    for orderedItem in request_body['orderedItem']:
        logger.structure_logs(append=True,
                              orderNumber=request_body['orderNumber'],
                              orderItemNumber=orderedItem['orderItemNumber']
                              )
        put_item_to_dynamodb(request_body['orderNumber'], orderedItem)
        send_message_to_sqs(os.environ['QUEUE_URL'], event['body'])
        send_message_to_sns(os.environ['TOPIC_ARN'], event['body'])
        call_process_sync(event['body'])
        call_process_async(event['body'])

    # logger.structure_logs(append=False) を呼ぶと @logger.inject_lambda_context
    # でセットされた項目まで消える
    logger.structure_logs(append=False)
    logger.info('テスト')

    body = {
        "message": "Go Serverless v1.0! Your function executed successfully!",
        "input": event
    }

    response = {
        "statusCode": 200,
        "body": json.dumps(body)
    }

    return response


@tracer.capture_method
def put_item_to_dynamodb(orderNumber, orderedItem):
    dynamodb_sample_table_tbl.put_item(
        Item={
            'orderNumber': orderNumber,
            'orderItemNumber': orderedItem['orderItemNumber'],
            'orderQuantity': orderedItem['orderQuantity'],
            'productID': orderedItem['productID'],
            'category': orderedItem['category'],
            'price': orderedItem['price']
        }
    )
    logger.info('sample-table に追加しました')


@tracer.capture_method
def send_message_to_sqs(queue_url, body):
    response = sqs_client.send_message(
        QueueUrl=queue_url,
        MessageBody=body
    )
    logger.info('SampleQueue にメッセージを送信しました')


@tracer.capture_method
def send_message_to_sns(topic_arn, body):
    response = sns_client.publish(
        TopicArn=topic_arn,
        Message=body
    )
    logger.info('SampleTopic にメッセージを送信しました')


@tracer.capture_method
def call_process_sync(body):
    response = lambda_client.invoke(
        FunctionName='sample-service-dev-processSync',
        Payload=body
    )
    logger.info('processSync を同期で呼び出ししました')


@tracer.capture_method
def call_process_async(body):
    response = lambda_client.invoke_async(
        FunctionName='sample-service-dev-processAsync',
        InvokeArgs=body
    )
    logger.info('processAsync を非同期で呼び出ししました')

DynamoDB から DynamoDB Streams 経由で呼び出される dynamodb_handler.py を作成する

sample_service/dynamodb_handler.py を新規作成し、以下の内容を記述します。

以降の実装で logger.structure_logs(...) を利用して AWSTraceHeader と traceId がログに出力されるようにしていますが、理由は次回記述します。

from aws_lambda_powertools import Logger, Tracer
from aws_xray_sdk.core import xray_recorder

logger = Logger()
tracer = Tracer()


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def process_sample_table_stream(event, context):
    logger.structure_logs(append=True,
                          AWSTraceHeader=None,
                          traceId=xray_recorder.current_segment().trace_id)
    logger.debug(event)

SQS 経由で呼び出される sqs_handler.py を作成する

sample_service/dynamodb_handler.py を新規作成し、以下の内容を記述します。

from aws_lambda_powertools import Logger, Tracer
from aws_xray_sdk.core import xray_recorder

logger = Logger()
tracer = Tracer()


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def process_sample_queue(event, context):
    logger.structure_logs(append=True,
                          AWSTraceHeader=None,
                          traceId=xray_recorder.current_segment().trace_id)
    logger.debug(event)

    for record in event['Records']:
        logger.structure_logs(append=True,
                              AWSTraceHeader=record['attributes']['AWSTraceHeader'])
        logger.info(record)

SNS 経由で呼び出される sns_handler.py を作成する

sample_service/sns_handler.py を新規作成し、以下の内容を記述します。

from aws_lambda_powertools import Logger, Tracer
from aws_xray_sdk.core import xray_recorder

logger = Logger()
tracer = Tracer()


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def process_sample_topic(event, context):
    logger.structure_logs(append=True,
                          AWSTraceHeader=None,
                          traceId=xray_recorder.current_segment().trace_id)
    logger.debug(event)

同期で呼び出す sync_handler.py を作成する

sample_service/sync_handler.py を新規作成し、以下の内容を記述します。

from aws_lambda_powertools import Logger, Tracer
from aws_xray_sdk.core import xray_recorder

logger = Logger()
tracer = Tracer()


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def process_sync(event, context):
    logger.structure_logs(append=True,
                          AWSTraceHeader=None,
                          traceId=xray_recorder.current_segment().trace_id)
    logger.debug(event)

非同期で呼び出す async_handler.py を作成する

sample_service/async_handler.py を新規作成し、以下の内容を記述します。

from aws_lambda_powertools import Logger, Tracer
from aws_xray_sdk.core import xray_recorder

logger = Logger()
tracer = Tracer()


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def process_async(event, context):
    logger.structure_logs(append=True,
                          AWSTraceHeader=None,
                          traceId=xray_recorder.current_segment().trace_id)
    logger.debug(event)

deploy し直す

一度 npx sls remove してから npx sls deploy します。

f:id:ksby:20200704205406p:plain f:id:ksby:20200704205505p:plain f:id:ksby:20200704205615p:plain f:id:ksby:20200704205717p:plain

動作確認は次の回です。

履歴

2020/07/05
初版発行。

aws-lambda-powertools を試してみる(Logger 編)

概要

記事一覧はこちらです。

TwitterSimplifying serverless best practices with Lambda Powertools の記事を見かけました。Python で Lambda を作成する時に Logging、Tracing、Metrics の機能の実装を助けてくれるライブラリとのこと。

記事は SAM で実装されていたので Serverless Framework で実装して動作を確認してみます。Logger で1回(今回)、Tracer で2回の計3回に分けて書きます。Metrics は使い方(というか何が便利になるのか)が今ひとつ分からなかったので書きません。

参照したサイト・書籍

  1. Simplifying serverless best practices with Lambda Powertools
    https://aws.amazon.com/jp/blogs/opensource/simplifying-serverless-best-practices-with-lambda-powertools/

  2. AWS Lambda Powertools Python
    https://awslabs.github.io/aws-lambda-powertools-python/

  3. aws-samples / aws-serverless-shopping-cart
    https://github.com/aws-samples/aws-serverless-shopping-cart/tree/master/backend/shopping-cart-service

    • Python で書かれた Serverless のサンプル。今回の記事とは直接関係ありませんが、後で見たいので残しておきます。
  4. Serverless Framework v1.41 - X-Ray for API Gateway, Invoke Local with Docker Improvements & More
    https://www.serverless.com/blog/framework-release-v141/

  5. オブザーバビリティ(可観測性)がなぜ必要だと考えるのか
    https://ymotongpoo.hatenablog.com/entry/2019/03/25/084500

  6. CloudWatch Logs Insights を使用したログデータの分析
    https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/AnalyzingLogData.html

  7. 地味にイイネ!Amazon CloudWatch Logs Insightsで効率的に調査しよう!
    https://blog.ecbeing.tech/entry/2019/09/19/124352

  8. CloudWatch Logs Insights でApacheアクセスログを確認する
    https://dev.classmethod.jp/articles/cwinsights-apache/

目次

  1. lambda-powertools-project プロジェクトを作成する
  2. sample_service サブプロジェクトを作成する
  3. requirements.txt を作成する
  4. serverless.yml に POWERTOOLS 用の環境変数と X-Ray を有効にする設定を追加する
  5. Logging の機能を試してみる

手順

lambda-powertools-project プロジェクトを作成する

以下の手順で lambda-powertools-project プロジェクトを作成します。具体的な手順は IntelliJ IDEA+Node.js+npm+serverless framework+Python の組み合わせで開発環境を構築して AWS Lambda を作成してみる 参照。

  • lambda-powertools-project の Empty Project を作成する。
  • Python の仮想環境を作成する。
  • Serverless Framework をローカルインストールする。
  • .envrc を作成する。
  • aws-lambda-powertools は deploy 時にアップロードしないと使用できないので serverless-python-requirements をインストールします。
    • npm install --save-dev serverless-python-requirements
  • Tracing の機能を試すのに Lambda から別のサービスを呼び出すので boto3 をインストールします。aws-lambda-powertools もインストールします。
    • pip install boto3
    • pip install aws-lambda-powertools

sample_service サブプロジェクトを作成する

プロジェクトのルートディレクトリの下で npx sls create --template aws-python3 --path sample_service を実行して sample_service サブプロジェクトを作成します。

requirements.txt を作成する

deploy 時に aws-lambda-powertools もアップロードするために sample_service ディレクトリの下に requirements.txt を作成して以下の内容を記述します。

aws-lambda-powertools==1.0.0

serverless.yml に POWERTOOLS 用の環境変数X-Ray を有効にする設定を追加する

sample_service/serverless.yml を以下のように変更します。

service: sample-service

plugins:
  - serverless-python-requirements

custom:
  pythonRequirements:
    dockerizePip: true

provider:
  name: aws
  runtime: python3.8
  stage: dev
  region: ap-northeast-1
  environment:
    # aws-lambda-powertools 用環境変数
    LOG_LEVEL: INFO
    POWERTOOLS_LOGGER_LOG_EVENT: true
    POWERTOOLS_METRICS_NAMESPACE: lambda-powertools-project
    POWERTOOLS_SERVICE_NAME: sample-service
  tracing:
    apiGateway: true
    lambda: true

..........
  • グローバルな環境変数は provider.environment に記述すればよいので、ここに POWERTOOLS 用の環境変数のうち以下の4つを設定します。
    • LOG_LEVEL
    • POWERTOOLS_LOGGER_LOG_EVENT
    • POWERTOOLS_METRICS_NAMESPACE
    • POWERTOOLS_SERVICE_NAME
  • X-Ray の設定は provider.tracing に記述するので、apiGateway: truelambda: true を設定します。

Logging の機能を試してみる

今回は以下の JSON Schema のメッセージを送信します(sample_service ディレクトリの下に msg_schema.json というファイルを作成しその中に記述します)。

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "definitions": {
    "orderedItem": {
      "type": "object",
      "properties": {
        "orderItemNumber": {
          "type": "string",
          "pattern": "^[0-9]+$"
        },
        "orderQuantity": {
          "type": "integer",
          "minimum": 0,
          "maximum": 999
        },
        "productID": {
          "type": "string",
          "pattern": "^[0-9]+$"
        },
        "category": {
          "type": "string",
          "enum": [
            "book",
            "camera",
            "computer"
          ]
        },
        "price": {
          "type": "integer",
          "minimum": 1
        }
      },
      "required": [
        "orderItemNumber",
        "orderQuantity",
        "productID",
        "category",
        "price"
      ]
    }
  },
  "title": "Order",
  "type": "object",
  "properties": {
    "orderNumber": {
      "type": "string",
      "pattern": "^[0-9]+$"
    },
    "orderDate": {
      "type": "string",
      "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
    },
    "isGift": {
      "type": "boolean"
    },
    "orderedItem": {
      "type": "array",
      "minItems": 1,
      "maxItems": 3,
      "uniqueItems": true,
      "items": {
        "$ref": "#/definitions/orderedItem"
      }
    }
  },
  "required": [
    "orderNumber",
    "orderDate",
    "orderedItem"
  ]
}

メッセージのサンプルです。

{
  "orderNumber": "1",
  "orderDate": "2020-06-28",
  "isGift": false,
  "orderedItem": [
    {
      "orderItemNumber": "1",
      "orderQuantity": 3,
      "productID": "1001",
      "category": "book",
      "price": 3800
    },
    {
      "orderItemNumber": "2",
      "orderQuantity": 1,
      "productID": "3052",
      "category": "camera",
      "price": 150000
    }
  ]
}

sample_service の下の handler.py を apigw_handler.py にリネームし、以下の内容を記述します。

import json

from aws_lambda_powertools import Logger

logger = Logger()


# @logger.inject_lambda_context を付けておくとログ出力時に context にセットされている
# function_name 等の情報がセットされる
# Capturing context Lambda info
# https://awslabs.github.io/aws-lambda-powertools-python/core/logger/#capturing-context-lambda-info
@logger.inject_lambda_context
def recv_msg(event, context):
    logger.debug(event['body'])
    request_body = json.loads(event['body'])
    logger.info({
        "orderNumber": request_body['orderNumber'],
        "orderDate": request_body['orderDate']
    })

    body = {
        "message": "Go Serverless v1.0! Your function executed successfully!",
        "input": event
    }

    response = {
        "statusCode": 200,
        "body": json.dumps(body)
    }

    return response

sample_service/serverless.yml の functions の記述を以下の内容に変更します。

functions:
  recvMsg:
    handler: apigw_handler.recv_msg
    events:
      - http:
          path: recv-msg
          method: post
          cors: true
          request:
            schema:
              application/json: ${file(msg_schema.json)}

deploy します。

f:id:ksby:20200630004725p:plain f:id:ksby:20200630004839p:plain

Postman からサンプルメッセージを送信すると 200 OK が返ってきました。

f:id:ksby:20200630005120p:plain

試した結果としては、

まずログのメッセージフォーマットが JSON になり、logger のメソッドに渡した文字列は message に出力されます。JSON フォーマットになることで CloudWatch Logs Insights で検索しやすくなります。

Lambda に @logger.inject_lambda_context を付与しておくと、出力されるログに引数 context にセットされている function_name 等の情報が追加されます。

f:id:ksby:20200630005957p:plain

@logger.inject_lambda_context が付与されていて、かつ環境変数 POWERTOOLS_LOGGER_LOG_EVENT が true に設定されていると、Lambda が呼び出された時に decorate のログ(API Gateway から呼び出された Lambda だとアクセス時の HTTPヘッダ等)が出力されます。環境変数 POWERTOOLS_LOGGER_LOG_EVENT のデフォルト値が false なので DEBUG 用?

f:id:ksby:20200630010156p:plain f:id:ksby:20200630010340p:plain

環境変数 LOG_LEVEL で出力するログのレベルを調整可能です。今は LOG_LEVEL: INFO の設定なので INFO のログだけ出ていますが(環境変数 POWERTOOLS_LOGGER_LOG_EVENT は false にしています)、

f:id:ksby:20200630235835p:plain

sample_service/serverless.yml で functions.recvMsg.environment に LOG_LEVEL: DEBUG を設定した後、deploy し直してからメッセージを送信すると、

provider:
  name: aws
  runtime: python3.8
  stage: dev
  region: ap-northeast-1
  environment:
    # aws-lambda-powertools 用環境変数
    LOG_LEVEL: INFO
    POWERTOOLS_LOGGER_LOG_EVENT: false
    POWERTOOLS_METRICS_NAMESPACE: lambda-powertools-project
    POWERTOOLS_SERVICE_NAME: sample-service
  tracing:
    apiGateway: true
    lambda: true

functions:
  recvMsg:
    handler: apigw_handler.recv_msg
    events:
      - http:
          path: recv-msg
          method: post
          cors: true
          request:
            schema:
              application/json: ${file(msg_schema.json)}
    environment:
      LOG_LEVEL: DEBUG

DEBUG のログも出力されます。

f:id:ksby:20200701000010p:plain

履歴

2020/07/05
初版発行。

API Gateway で受信するデータを JSON Schema Validation でチェックしてから SQS へ送信する

概要

記事一覧はこちらです。

Using JSON Schema Validation with the AWS API Gateway という記事を見かけました。API Gateway で受信したメッセージを Lambda を呼び出す前に JSON Schema Validation で検証できるそうなので試してみます。Lambda を呼び出したら SQL へメッセージを送信し、別の Lambda でメッセージを受信します。

Serverless Framework の 1.8 から functions の events に sqs が記述できるようになるのですが、今回試した時のバージョンは 1.74 でしたので CloudWatch Events で 1分毎に Lambda を起動して SQS をチェックすることにします。

※(2020/07/05追記)何を見間違えてしまったのか SQS を events に設定できるのは 1.28 からでした。。。 今使用しているバージョンは 1.74 なので Using SQS with AWS Lambda and Serverless の記事通りに実装したらイベントベースで Lambda が呼び出されるようにできますね。

f:id:ksby:20200628115040p:plain:w450

SQS への送信は serverless-apigateway-service-proxy プラグインを使用したかったのですが、このプラグインを使用して API Gateway の Endpoint を定義した時に JSON Schema Validation をセットする方法がないようだったので諦めました。

参照したサイト・書籍

  1. Using JSON Schema Validation with the AWS API Gateway
    https://fernandomc.com/posts/schema-validation-serverless-framework/

  2. API Gateway でリクエストの検証を有効にする
    https://docs.aws.amazon.com/ja_jp/apigateway/latest/developerguide/api-gateway-method-request-validation.html

  3. JSON Schema
    https://json-schema.org/

  4. Getting Started Step-By-Step
    https://json-schema.org/learn/getting-started-step-by-step.html

  5. Understanding JSON Schema
    https://json-schema.org/understanding-json-schema/

  6. schema.org
    https://schema.org/

  7. Enabling JSON5
    https://www.jetbrains.com/help/idea/json.html#ws_json_choose_version

  8. 知らないうちにJSON5 in Babel
    https://qiita.com/jkr_2255/items/026e0fdb4570c88c4f51

  9. AWS::SQS::Queue
    https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-properties-sqs-queues.html

  10. AWS Lambda を Amazon SQS に使用する
    https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-sqs.html

  11. Sending and receiving messages in Amazon SQS
    https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs-example-sending-receiving-msgs.html

  12. Using SQS with AWS Lambda and Serverless
    https://www.serverless.com/blog/aws-lambda-sqs-serverless-integration/

  13. Serverless updates - SQS events, private endpoints, Event Gateway open source
    https://www.serverless.com/blog/serverless-updates-framework-v128/

目次

  1. jsonschema-sqs-project プロジェクトを作成する
  2. order_service サブプロジェクトを作成する
  3. 受信するメッセージのサンプルを作成する
  4. IntelliJ IDEA の .json を関連付ける File Type を JSON → JSON5 に変更する。。。が、API Gateway で JSON5 がサポートされていなかったので元に戻す
  5. JSON Schema Validation のための schema.json を作成する
  6. API Gateway 経由でメッセージを受信して SQS へ送信する Lambda を実装する
  7. SQS からメッセージを取り出してログに出力する Lambda を実装する
  8. serverless.yml を変更する
  9. deploy する
  10. 動作確認
  11. Validation エラーの時にどうやって原因を調べればよいのか?

手順

jsonschema-sqs-project プロジェクトを作成する

以下の手順で jsonschema-sqs-project プロジェクトを作成します。具体的な手順は IntelliJ IDEA+Node.js+npm+serverless framework+Python の組み合わせで開発環境を構築して AWS Lambda を作成してみる 参照。

  • jsonschema-sqs-project の Empty Project を作成する。
  • Python の仮想環境を作成する。
  • Serverless Framework をローカルインストールする。
  • .envrc を作成する。
  • pip install boto3

order_service サブプロジェクトを作成する

プロジェクトのルートディレクトリの下で npx sls create --template aws-python3 --path order_service を実行して order_service サブプロジェクトを作成します。

受信するメッセージのサンプルを作成する

今回は以下のサンプルメッセージを受信します。細かい仕様は後で JSON Schema を作成する時に決めます。

{
  "orderNumber": "1",
  "orderDate": "2020-06-27",
  "isGift": false,
  "orderedItem": [
    {
      "orderItemNumber": "1001",
      "identifier": "book",
      "name": "サンプル書籍",
      "orderQuantity": 1,
      "price": 3800
    },
    {
      "orderItemNumber": "61059",
      "identifier": "furniture",
      "name": "テスト椅子",
      "orderQuantity": 2,
      "price": 12000
    }
  ]
}

IntelliJ IDEA の .json を関連付ける File Type を JSON → JSON5 に変更する。。。が、API Gateway で JSON5 がサポートされていなかったので元に戻す

IntelliJ IDEA で .json のファイルを編集すると "$schema": "http://json-schema.org/draft-04/schema#" の定義を参照して自動補完してくれます(VSCode でも補完してくれましたので補完して当然のようです)。

f:id:ksby:20200627134049p:plain

schema を定義しておくと json の編集が便利になるんだなと思い IDEA のマニュアルで JSON を見にいくと、Enabling JSON5 という文字が。JSON5 って何?

調べてみると 知らないうちにJSON5 in Babel の記事を見つけました。JSON なのにケツカンマ有効でコメントが付けらたりできるとのこと。かなり前からあるのに全然知りませんでした。。。

Extend the JSON5 syntax to all JSON files に従い、.json を関連付ける File Type を JSON → JSON5 に変更してみます。

「Settings」ダイアログを開いて「Editor」-「File Types」の設定を確認すると、確かに今は「JSON」の File Type に .json が登録されています。

f:id:ksby:20200627140800p:plain f:id:ksby:20200627140921p:plain

「JSON5」の「Registered patterns」で「+」ボタンをクリックして「Add Wildcard」ダイアログを表示してから *.json を入力して「OK」ボタンをクリックします。

f:id:ksby:20200627141031p:plain

「Reassigned wildcard」ボタンをクリックします。

f:id:ksby:20200627141147p:plain

「JSON5」の File Type に .json が登録されますので「OK」ボタンをクリックしてダイアログを閉じます。

f:id:ksby:20200627141331p:plain

File Type が「JSON」の時は以下の画像のように赤波線が表示されていましたが、

f:id:ksby:20200627142036p:plain

「JSON5」に変更すると全て消えました!

f:id:ksby:20200627142229p:plain

すごいなと思いつつ API GatewayJSON Schema Validation でも JSON5 がサポートされているのか別にサンプルプロジェクトを作って試してみたところ、JSON.parse(...) が使われていてダメでした。。。 残念ですが、元に戻すことにします。

f:id:ksby:20200627145644p:plain

JSON5 を使いたい時には拡張子を .json5 にすればよいので覚えておくことにします。

JSON Schema Validation のための schema.json を作成する

order_service サブプロジェクトの下に schema.json を作成して、以下の内容を記述します。JSON Schema の定義で入れられそうなものは出来るだけ入れてみました。

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "definitions": {
    "orderedItem": {
      "type": "object",
      "properties": {
        "orderItemNumber": {
          "type": "string",
          "pattern": "^[0-9]+$"
        },
        "identifier": {
          "type": "string",
          "enum": [
            "book",
            "furniture"
          ]
        },
        "name": {
          "type": "string",
          "minLength": 1,
          "maxLength": 10
        },
        "orderQuantity": {
          "type": "integer",
          "minimum": 0,
          "maximum": 999
        },
        "price": {
          "type": "integer",
          "minimum": 0
        }
      },
      "required": [
        "orderItemNumber",
        "identifier",
        "name",
        "orderQuantity",
        "price"
      ]
    }
  },
  "title": "Order",
  "type": "object",
  "properties": {
    "orderNumber": {
      "type": "string",
      "pattern": "^[0-9]+$"
    },
    "orderDate": {
      "type": "string",
      "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$"
    },
    "isGift": {
      "type": "boolean"
    },
    "orderedItem": {
      "type": "array",
      "minItems": 1,
      "maxItems": 3,
      "uniqueItems": true,
      "items": {
        "$ref": "#/definitions/orderedItem"
      }
    }
  },
  "required": [
    "orderNumber",
    "orderDate",
    "orderedItem"
  ]
}

API Gateway 経由でメッセージを受信して SQS へ送信する Lambda を実装する

order_service/handler.py のファイル名を apigw_handler.py に変更し、以下の内容に変更します。

import json
import logging
import os

import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def send_msg(event, context):
    sqs_client = boto3.client('sqs')

    logger.info(event['body'])
    response = sqs_client.send_message(
        QueueUrl=os.environ['QUEUE_URL'],
        MessageBody=event['body']
    )

    body = {
        "message": f"send message to sqs, MessageId = {response['MessageId']}",
        "input": event
    }

    response = {
        "statusCode": 200,
        "body": json.dumps(body)
    }

    return response

SQS からメッセージを取り出してログに出力する Lambda を実装する

order_service ディレクトリの下に sqs_handler.py を作成し、以下の内容を記述します。

import logging
import os

import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def recv_msg(event, context):
    sqs_client = boto3.client('sqs')

    response = sqs_client.receive_message(QueueUrl=os.environ['QUEUE_URL'])
    if 'Messages' in response:
        logger.info(response)
        for msg in response['Messages']:
            sqs_client.delete_message(QueueUrl=os.environ['QUEUE_URL'], ReceiptHandle=msg['ReceiptHandle'])

serverless.yml を変更する

order_service/serverless.yml を以下の内容に変更します。

service: order-service

custom:
  queueName: "SampleQueue"

provider:
  name: aws
  runtime: python3.8

  stage: dev
  region: ap-northeast-1

  iamRoleStatements:
    - Effect: "Allow"
      Action:
        - "sqs:*"
      Resource:
        - Fn::GetAtt: [ SampleQueue, Arn ]

functions:
  sendMsg:
    handler: apigw_handler.send_msg
    environment:
      QUEUE_URL: !Ref SampleQueue
    events:
      - http:
          path: send-msg
          method: post
          cors: true
          request:
            schema:
              application/json: ${file(schema.json)}

  recvMsg:
    handler: sqs_handler.recv_msg
    environment:
      QUEUE_URL: !Ref SampleQueue
    events:
      - schedule: rate(1 minute)

resources:
  Resources:
    SampleQueue:
      Type: AWS::SQS::Queue
      Properties:
        QueueName: "${self:custom.queueName}"

deploy する

deploy します。

f:id:ksby:20200628121938p:plain f:id:ksby:20200628122040p:plain f:id:ksby:20200628122150p:plain

動作確認

Postman から endpoint へ上に書いたサンプルメッセージを送信すると 200 OK が返ってきました。

f:id:ksby:20200628122729p:plain

ログにも送受信されたメッセージが出力されています。

f:id:ksby:20200628122904p:plain f:id:ksby:20200628123000p:plain f:id:ksby:20200628123047p:plain

JSON Schema の定義に合わないメッセージ(orderedItem の要素数が minItems 未満)を送信すると 400 Bad Request が返ってきました。

f:id:ksby:20200628123239p:plain

Validation エラーの時にどうやって原因を調べればよいのか?

1つ目は送信しているメッセージを適当なファイルに保存してから、IntelliJ IDEA のエディタ上で JSON Schema を適用してエラー箇所を表示させる方法です。

ファイルに保存してから画面右下の「No JSON schema」をクリックします。

f:id:ksby:20200628140323p:plain

メニューが表示されるので、一番上の「New Schema Mapping...」を選択します。

f:id:ksby:20200628140443p:plain

JSON Schema Mappings」ダイアログが表示されるので、「Schema file or URL」に上で作成した schema.json を選択してから「OK」ボタンをクリックします。

f:id:ksby:20200628140836p:plain

ファイルに保存したメッセージを変更して JSON Schema に合わないようにするとその箇所が分かるように表示されます。

f:id:ksby:20200628141749p:plain

もっと詳細な内容を知りたい場合には、エディタ上でコンテキストメニューを表示させて「Analyze」-「Inspect Code...」を選択した後、

f:id:ksby:20200628142047p:plain

「Specify Inspection Code」ダイアログが表示されるので、何も変更せずに「OK」ボタンをクリックします。

f:id:ksby:20200628142202p:plain

そうすると画面下に Inspection Results Window が表示されて、そこに詳細なエラー内容が表示されます。

f:id:ksby:20200628142453p:plain

2つ目は AWS マネジメントコンソールの API Gateway の画面からテスト送信してログを表示させる方法です。

今回作成された API Gateway の「リソース」の画面に遷移してから、登録した endpoint の POST メソッドの画面の「テスト」リンクをクリックします。

f:id:ksby:20200628142944p:plain

メソッドテストの画面が表示されるので「リクエスト本文」に送信する JSON を記述して「テスト」ボタンをクリックします。

f:id:ksby:20200628143239p:plain f:id:ksby:20200628143435p:plain

そうすると画面の「ログ」にエラーの内容が表示されます。ただしこの方法だと最初のエラーしか表示されないようです。

f:id:ksby:20200628143644p:plain

最後に npx sls remove -v を実行して構築した環境を削除します。

履歴

2020/06/28
初版発行。
2020/07/05
概要に "※(2020/07/05追記)..." の記述を追加しました。

serverless-domain-manager プラグインを利用して独自ドメインで API Gateway にアクセスする

概要

記事一覧はこちらです。

API Gateway にアクセスする時には deploy 時に表示される ServiceEndpoint(https://~.execute-api.ap-northeast-1.amazonaws.com/dev/... の URL)を使用していましたが、独自ドメインでアクセスする方法を試してみます。

Lambda を deploy する時に独自ドメインを定義できる serverless-domain-manager プラグインというものがあるので、今回はこれを利用します。

構成は以下のようになるようです(ELB と API Gateway のどちらが先にアクセスされるのかが分からない。。。)。

f:id:ksby:20200624141707p:plain:w450

  • SSL証明書は東京リージョン(ap-northeast-1)の ACM で発行します。
  • 独自ドメインは rest.ksbyzero.com にします。Route 53 に登録します。
  • Lambda は Serverless Framework でプロジェクトを作成した時にできる hello をそのまま使い、https://rest.ksbyzero.com/hello でアクセスできるようにします。独自ドメインを使用する方式にすると stage 名を URL からなくせます。
  • 独自ドメインでアクセスできるようにする場合、ELB が作成されて ACM で発行した SSL証明書が ELB に関連付けられます。ACM で作成した SSL証明書を削除しようとすると ELB で使用されているので削除できないというメッセージが表示されて気づきました(API Gateway を削除してから 20~30分程度経過しないと削除できませんでした)。

参照したサイト・書籍

  1. REST API のカスタムドメイン名の設定
    https://docs.aws.amazon.com/ja_jp/apigateway/latest/developerguide/how-to-custom-domains.html

  2. How to set up a custom domain name for API Gateway in your Serverless app
    https://seed.run/blog/how-to-set-up-a-custom-domain-name-for-api-gateway-in-your-serverless-app.html

  3. amplify-education / serverless-domain-manager
    https://www.npmjs.com/package/serverless-domain-manager
    https://github.com/amplify-education/serverless-domain-manager

  4. アプリケーションロードバランサー(ALB)のターゲットにAWS Lambdaが選択可能になりました
    https://aws.amazon.com/jp/blogs/news/lambda-functions-as-targets-for-application-load-balancers/

    • API Gateway を経由しなくても ALB から直接 Lambda を実行できるとのこと。
    • 今回の記事とは関係ありませんが、見かけたのでメモ書きとして残しておきます。

目次

  1. custom-domain-api-project プロジェクトを作成する
  2. serverless-domain-manager プラグインをインストールする
  3. hello_service サブプロジェクト作成し serverless.yml を変更する
  4. Terraform を使用して ACM で SSL証明書を作成する
  5. sls create_domain を実行して独自ドメインを利用できるようにする
  6. deploy する
  7. 動作確認する
  8. ACM の SSL証明書は関連リソースがなくならないと削除できない

手順

custom-domain-api-project プロジェクトを作成する

以下の手順で custom-domain-api-project プロジェクトを作成します。具体的な手順は IntelliJ IDEA+Node.js+npm+serverless framework+Python の組み合わせで開発環境を構築して AWS Lambda を作成してみる 参照。

  • custom-domain-api-project の Empty Project を作成する。
  • Python の仮想環境を作成する。
  • Serverless Framework をローカルインストールする。
  • .envrc を作成する。

serverless-domain-manager プラグインをインストールする

npm install --save-dev serverless-domain-manager を実行してプラグインをインストールします。

f:id:ksby:20200624152732p:plain

hello_service サブプロジェクト作成し serverless.yml を変更する

プロジェクトのルートディレクトリの下で npx sls create --template aws-python3 --path hello_service を実行して hello_service サブプロジェクトを作成します。

f:id:ksby:20200624153418p:plain

hello_service/severless.yml を以下の内容に変更します。

service: hello-service

plugins:
  - serverless-domain-manager

custom:
  customDomain:
    domainName: rest.ksbyzero.com
    # stage を書かなければ provider.stage の設定が使用される
    # stage: dev
    # basePath を書くと https://<domainName>/<basePath>/hello がアクセスする URL になる
    # basePath: base
    certificateName: ksbyzero.com
    createRoute53Record: true
    # endpointType に edge を指定すると CloudFront ディストリビューションを設定する
    # 今回は東京リージョンで設定するので regional を指定する
    endpointType: regional
    securityPolicy: tls_1_2
    apiType: rest

provider:
  name: aws
  runtime: python3.8

  stage: dev
  region: ap-northeast-1

functions:
  hello:
    handler: handler.hello
    events:
      - http:
          path: hello
          method: get
          cors: true
  • plugins を追加して serverless-domain-manager を記述します。
  • custom.customDomain を追加し、serverless-domain-manager の設定を記述します。
  • provider.stage、provider.region を追加します。
  • functions.hello.events.http を追加し、API Gateway にリクエストが来たら Lambda が実行されるよう設定します。

Terraform を使用して ACMSSL証明書を作成する

Terraform は tfenv+aws-vault+direnv を組み合わせて Windows 上に Terraform の実行環境を構築する で構築した環境を使用します。バージョンは 0.12.26 を使用します。

f:id:ksby:20200624154913p:plain

プロジェクトのルートディレクトリ直下に .terraform-version というファイルを作成し、以下の内容を記述します。

0.12.26

プロジェクトのルートディレクトリ直下に terraform というディレクトリを作成し、その下に acm.tf というファイルを作成して以下の内容を記述します。

terraform {
  required_version = "0.12.26"
}

provider "aws" {
  region = "ap-northeast-1"
}

///////////////////////////////////////////////////////////////////////////////
// Route53 の Public Zone
// ※Route53 でドメインを取得したので作成済、resource ではなく data で定義する
//
data "aws_route53_zone" "dns_zone_apex" {
  name = "ksbyzero.com"
}

///////////////////////////////////////////////////////////////////////////////
// ACM で SSL証明書を作成する
//
resource "aws_acm_certificate" "dns_zone_apex" {
  domain_name               = data.aws_route53_zone.dns_zone_apex.name
  subject_alternative_names = ["*.${data.aws_route53_zone.dns_zone_apex.name}"]
  validation_method         = "DNS"

  tags = {
    Name = data.aws_route53_zone.dns_zone_apex.name
  }

  lifecycle {
    create_before_destroy = true
  }
}
resource "aws_route53_record" "cert_validation_0" {
  name    = aws_acm_certificate.dns_zone_apex.domain_validation_options.0.resource_record_name
  type    = aws_acm_certificate.dns_zone_apex.domain_validation_options.0.resource_record_type
  records = [aws_acm_certificate.dns_zone_apex.domain_validation_options.0.resource_record_value]
  zone_id = data.aws_route53_zone.dns_zone_apex.id
  ttl     = 60
}
resource "aws_acm_certificate_validation" "cert" {
  certificate_arn = aws_acm_certificate.dns_zone_apex.arn

  validation_record_fqdns = [
    aws_route53_record.cert_validation_0.fqdn,
  ]
}

以上で設定は完了です。ACMSSL証明書を作成します。

tf init を実行してから、

f:id:ksby:20200624160227p:plain

tf plantf apply を実行します(画面キャプチャは tf apply のみ)。

f:id:ksby:20200624160730p:plain f:id:ksby:20200624160957p:plain

マネジメントコンソールで ACM を見ると ksbyzero.com の証明書が追加されており、「状況」が「発行済み」になっています。

f:id:ksby:20200624171935p:plain

Route 53 には DNS検証用の CNAME のレコードが1件追加されています。

f:id:ksby:20200624172054p:plain

sls create_domain を実行して独自ドメインを利用できるようにする

deploy の前に sls create_domain を実行して serverless.yml の custom.customDomain.domainName に定義した rest.ksbyzero.com を設定します。

f:id:ksby:20200624163113p:plain

New domains may take up to 40 minutes to be initialized. というメッセージが表示されており、使用可能になるまで最大 40分かかるようです。ただし、今回試していた時は結構すぐに使えるようになりました。

マネジメントコンソールから Route 53 を見ると rest.ksbyzero.com の A、AAAA レコードの2件が追加されています。

f:id:ksby:20200624172156p:plain

deploy する

hello_service を deploy します。

f:id:ksby:20200624164408p:plain f:id:ksby:20200624164554p:plain

動作確認する

Postman で GET メソッドにしてから https://rest.ksbyzero.com/hello にアクセスすると 200 OK が返ってきます。

f:id:ksby:20200624164809p:plain

sls logs -f hello コマンドでログを確認すると1回しか実行していないのに7件ログが出力されていました。ELB のヘルスチェックでしょうか?

f:id:ksby:20200624165250p:plain

以下のコマンドを実行して作成したリソースを削除します。

  • npx sls remove -v
  • npx sls delete_domain
  • terraform ディレクトリに移動してから tf destroy

ACMSSL証明書は関連リソースがなくならないと削除できない

tf destroy コマンドで ACMSSL証明書を削除しようとしてもすぐには削除できません。API Gateway を削除してもしばらくの間 ELB がSSL証明書に関連付けられているためです。20~30分程度経つと関連リソースはなくなります。

f:id:ksby:20200624171653p:plain

履歴

2020/06/24
初版発行。