かんがるーさんの日記

最近自分が興味をもったものを調べた時の手順等を書いています。今は Spring Boot をいじっています。

aws-lambda-powertools を試してみる(Tracer&X-Ray 編その1)

概要

記事一覧はこちらです。

aws-lambda-powertools を試してみる(Logger 編) の続きで、今回は Tracer を試してみます。lambda-powertools-project プロジェクトを引き続き使用します。

API Gateway から呼び出された Lambda から以下の5パターンの処理を実行し、X-Ray にどう表示されるのか確認します。

  • DynamoDB にデータを追加 → DynamoDB Streams 経由で Lambda を呼び出す。
  • SQS にメッセージを送信して Lambda を呼び出す。
  • SNS にメッセージを送信して Lambda を呼び出す。
  • 別の Lambda を同期呼び出しする。
  • 別の Lambda を非同期呼び出しする。

f:id:ksby:20200704115315p:plain:w450

参照したサイト・書籍

  1. Introducing AWS Lambda Destinations
    https://aws.amazon.com/jp/blogs/compute/introducing-aws-lambda-destinations/

  2. AWS X-Ray と他の AWS のサービスの統合
    https://docs.aws.amazon.com/ja_jp/xray/latest/devguide/xray-services.html

  3. DynamoDB Streamsの改めて再検証してみた
    https://dev.classmethod.jp/articles/dynamodb-streams-scale-test/

  4. Amazon DynamoDB and Serverless - The Ultimate Guide
    https://www.serverless.com/dynamodb

  5. Event-driven processing with Serverless and DynamoDB streams
    https://www.serverless.com/blog/event-driven-architecture-dynamodb

  6. DynamoDB streams creation
    https://forum.serverless.com/t/dynamodb-streams-creation/792

  7. Welcome to the AWS X-Ray SDK for Python!
    https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/index.html

  8. aws / aws-xray-sdk-python
    https://github.com/aws/aws-xray-sdk-python

  9. Pseudo Parameters Reference
    https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/pseudo-parameter-reference.html

目次

  1. serverless.yml を変更する
  2. API Gateway から呼び出される apigw_handler.py を変更する
  3. DynamoDB から DynamoDB Streams 経由で呼び出される dynamodb_handler.py を作成する
  4. SQS 経由で呼び出される sqs_handler.py を作成する
  5. SNS 経由で呼び出される sns_handler.py を作成する
  6. 同期で呼び出す sync_handler.py を作成する
  7. 非同期で呼び出す async_handler.py を作成する
  8. deploy し直す

手順

serverless.yaml を変更する

sample_service/serverless.yaml を今回の環境に合わせて以下の内容にします。

service: sample-service

plugins:
  - serverless-python-requirements

custom:
  queueName: "SampleQueue"
  topicName: "SampleTopic"
  pythonRequirements:
    dockerizePip: true

provider:
  name: aws
  runtime: python3.8
  stage: dev
  region: ap-northeast-1
  environment:
    # aws-lambda-powertools 用環境変数
    LOG_LEVEL: DEBUG
    POWERTOOLS_LOGGER_LOG_EVENT: true
    POWERTOOLS_METRICS_NAMESPACE: lambda-powertools-project
    POWERTOOLS_SERVICE_NAME: sample-service
  tracing:
    apiGateway: true
    lambda: true

  iamRoleStatements:
    - Effect: Allow
      Action:
        - dynamodb:Query
        - dynamodb:Scan
        - dynamodb:GetItem
        - dynamodb:PutItem
        - dynamodb:UpdateItem
        - dynamodb:DeleteItem
      Resource:
        - Fn::GetAtt: [ SampleTable, Arn ]
    - Effect: Allow
      Action:
        - kms:GenerateDataKey
        - kms:Decrypt
        - sqs:*
      Resource:
        - Fn::GetAtt: [ SampleQueue, Arn ]
    - Effect: Allow
      Action:
        - kms:GenerateDataKey
        - kms:Decrypt
        - sns:Publish
        - sns:Subscribe
      Resource:
        - !Ref NotifySampleTopic
    - Effect: Allow
      Action:
        - lambda:InvokeFunction
        - lambda:InvokeAsync
      Resource:
        - Fn::Join:
          - ":"
          - - "arn:aws:lambda"
            - Ref: "AWS::Region"
            - Ref: "AWS::AccountId"
            - "function:*"

functions:
  recvMsg:
    handler: apigw_handler.recv_msg
    events:
      - http:
          path: recv-msg
          method: post
          cors: true
          request:
            schema:
              application/json: ${file(msg_schema.json)}
    environment:
      QUEUE_URL: !Ref SampleQueue
      TOPIC_ARN: !Ref NotifySampleTopic

  processSampleTableStream:
    handler: dynamodb_handler.process_sample_table_stream
    events:
      - stream:
          type: dynamodb
          batchSize: 1
          startingPosition: TRIM_HORIZON
          arn:
            Fn::GetAtt: [ SampleTable, StreamArn ]

  processSampleQueue:
    handler: sqs_handler.process_sample_queue
    events:
      - sqs:
          arn:
            Fn::GetAtt: [ SampleQueue, Arn ]

  processSampleTopic:
    handler: sns_handler.process_sample_topic
    events:
      - sns:
        arn: !Ref NotifySampleTopic
        topicName: "${self:custom.topicName}"

  processSync:
    handler: sync_handler.process_sync

  processAsync:
    handler: async_handler.process_async

resources:
  Resources:
    SampleTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: sample-table
        AttributeDefinitions:
          - AttributeName: orderNumber
            AttributeType: S
          - AttributeName: orderItemNumber
            AttributeType: S
        KeySchema:
          - AttributeName: orderNumber
            KeyType: HASH
          - AttributeName: orderItemNumber
            KeyType: RANGE
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        StreamSpecification:
          StreamViewType: NEW_IMAGE

    SampleQueue:
      Type: AWS::SQS::Queue
      Properties:
        QueueName: "${self:custom.queueName}"

    NotifySampleTopic:
      Type: AWS::SNS::Topic
      Properties:
        TopicName: "${self:custom.topicName}"
    NotifySampleSubscription:
      Type: AWS::SNS::Subscription
      DependsOn:
        - ProcessSampleTopicLambdaFunction
      Properties:
        TopicArn: !Ref NotifySampleTopic
        Endpoint:
          Fn::GetAtt:
            - ProcessSampleTopicLambdaFunction
            - Arn
        Protocol: lambda
    NotifySampleLambdaResourcePolicy:
      Type: AWS::Lambda::Permission
      DependsOn:
        - ProcessSampleTopicLambdaFunction
      Properties:
        FunctionName: !Ref ProcessSampleTopicLambdaFunction
        Principal: sns.amazonaws.com
        Action: "lambda:InvokeFunction"
        SourceArn: !Ref NotifySampleTopic
  • aws-lambda-powertools 用環境変数は、
    • debug ログを出力したいので LOG_LEVEL: DEBUG にします。
    • decorate のログを出力したいので POWERTOOLS_LOGGER_LOG_EVENT: true にします。
  • provider.iamRoleStatements に必要な IAM Role を設定します。ただし Lambda の同期・非同期呼び出しのための lambda:InvokeFunction、lambda:InvokeAsync の Resource は - Fn::GetAtt: [ processSync, Arn ] と記述すると Error: The CloudFormation template is invalid: Template error: instance of Fn::GetAtt references undefined resource processSync というエラーが、- Fn::GetAtt: [ ProcessSyncLambdaFunction, Arn ] と記述すると Error: The CloudFormation template is invalid: Circular dependency between resources: [...] というエラーが出て設定方法が全然分からなかったので、arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:* の形式(Lambda を * で指定する)にしています。
  • resources.Resources に今回使用する DynamoDB のテーブル(SampleTable)、SQS のキュー(SampleQueue)、SNS のトピック(SampleTopic)を記述します。
  • DynamoDB のテーブル(SampleTable)は DynamoDB Streams を使用するので StreamSpecification に StreamViewType: NEW_IMAGE を記述します。

API Gateway から呼び出される apigw_handler.py を変更する

sample_service/apigw_handler.py を以下の内容にします。put_item_to_dynamodb(...) 以外の関数に orderedItem を渡さず event['body'] を渡しているのはミスです。。。(次の記事をほぼ書いた時に気づきました)

import json
import os

import boto3
from aws_lambda_powertools import Logger, Tracer
from aws_xray_sdk.core import xray_recorder

logger = Logger()
tracer = Tracer()

dynamodb_sample_table_tbl = boto3.resource('dynamodb').Table('sample-table')
sqs_client = boto3.client('sqs')
sns_client = boto3.client('sns')
lambda_client = boto3.client('lambda')


# @logger.inject_lambda_context を付けておくとログ出力時に context にセットされている
# function_name 等の情報がセットされる
# Capturing context Lambda info
# https://awslabs.github.io/aws-lambda-powertools-python/core/logger/#capturing-context-lambda-info
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def recv_msg(event, context):
    # API Gateway に発行された traceId は event.headers.X-Amzn-Trace-Id にセットされている
    # Lambda 用に発行された traceId は xray_recorder.current_segment().trace_id にセットされている
    awsTraceHeader = event['headers']['X-Amzn-Trace-Id']
    logger.structure_logs(append=True,
                          AWSTraceHeader=awsTraceHeader,
                          traceId=xray_recorder.current_segment().trace_id)
    logger.debug(event)

    request_body = json.loads(event['body'])
    # logger.info({
    #     "orderNumber": request_body['orderNumber'],
    #     "orderDate": request_body['orderDate']
    # })

    for orderedItem in request_body['orderedItem']:
        logger.structure_logs(append=True,
                              orderNumber=request_body['orderNumber'],
                              orderItemNumber=orderedItem['orderItemNumber']
                              )
        put_item_to_dynamodb(request_body['orderNumber'], orderedItem)
        send_message_to_sqs(os.environ['QUEUE_URL'], event['body'])
        send_message_to_sns(os.environ['TOPIC_ARN'], event['body'])
        call_process_sync(event['body'])
        call_process_async(event['body'])

    # logger.structure_logs(append=False) を呼ぶと @logger.inject_lambda_context
    # でセットされた項目まで消える
    logger.structure_logs(append=False)
    logger.info('テスト')

    body = {
        "message": "Go Serverless v1.0! Your function executed successfully!",
        "input": event
    }

    response = {
        "statusCode": 200,
        "body": json.dumps(body)
    }

    return response


@tracer.capture_method
def put_item_to_dynamodb(orderNumber, orderedItem):
    dynamodb_sample_table_tbl.put_item(
        Item={
            'orderNumber': orderNumber,
            'orderItemNumber': orderedItem['orderItemNumber'],
            'orderQuantity': orderedItem['orderQuantity'],
            'productID': orderedItem['productID'],
            'category': orderedItem['category'],
            'price': orderedItem['price']
        }
    )
    logger.info('sample-table に追加しました')


@tracer.capture_method
def send_message_to_sqs(queue_url, body):
    response = sqs_client.send_message(
        QueueUrl=queue_url,
        MessageBody=body
    )
    logger.info('SampleQueue にメッセージを送信しました')


@tracer.capture_method
def send_message_to_sns(topic_arn, body):
    response = sns_client.publish(
        TopicArn=topic_arn,
        Message=body
    )
    logger.info('SampleTopic にメッセージを送信しました')


@tracer.capture_method
def call_process_sync(body):
    response = lambda_client.invoke(
        FunctionName='sample-service-dev-processSync',
        Payload=body
    )
    logger.info('processSync を同期で呼び出ししました')


@tracer.capture_method
def call_process_async(body):
    response = lambda_client.invoke_async(
        FunctionName='sample-service-dev-processAsync',
        InvokeArgs=body
    )
    logger.info('processAsync を非同期で呼び出ししました')

DynamoDB から DynamoDB Streams 経由で呼び出される dynamodb_handler.py を作成する

sample_service/dynamodb_handler.py を新規作成し、以下の内容を記述します。

以降の実装で logger.structure_logs(...) を利用して AWSTraceHeader と traceId がログに出力されるようにしていますが、理由は次回記述します。

from aws_lambda_powertools import Logger, Tracer
from aws_xray_sdk.core import xray_recorder

logger = Logger()
tracer = Tracer()


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def process_sample_table_stream(event, context):
    logger.structure_logs(append=True,
                          AWSTraceHeader=None,
                          traceId=xray_recorder.current_segment().trace_id)
    logger.debug(event)

SQS 経由で呼び出される sqs_handler.py を作成する

sample_service/dynamodb_handler.py を新規作成し、以下の内容を記述します。

from aws_lambda_powertools import Logger, Tracer
from aws_xray_sdk.core import xray_recorder

logger = Logger()
tracer = Tracer()


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def process_sample_queue(event, context):
    logger.structure_logs(append=True,
                          AWSTraceHeader=None,
                          traceId=xray_recorder.current_segment().trace_id)
    logger.debug(event)

    for record in event['Records']:
        logger.structure_logs(append=True,
                              AWSTraceHeader=record['attributes']['AWSTraceHeader'])
        logger.info(record)

SNS 経由で呼び出される sns_handler.py を作成する

sample_service/sns_handler.py を新規作成し、以下の内容を記述します。

from aws_lambda_powertools import Logger, Tracer
from aws_xray_sdk.core import xray_recorder

logger = Logger()
tracer = Tracer()


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def process_sample_topic(event, context):
    logger.structure_logs(append=True,
                          AWSTraceHeader=None,
                          traceId=xray_recorder.current_segment().trace_id)
    logger.debug(event)

同期で呼び出す sync_handler.py を作成する

sample_service/sync_handler.py を新規作成し、以下の内容を記述します。

from aws_lambda_powertools import Logger, Tracer
from aws_xray_sdk.core import xray_recorder

logger = Logger()
tracer = Tracer()


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def process_sync(event, context):
    logger.structure_logs(append=True,
                          AWSTraceHeader=None,
                          traceId=xray_recorder.current_segment().trace_id)
    logger.debug(event)

非同期で呼び出す async_handler.py を作成する

sample_service/async_handler.py を新規作成し、以下の内容を記述します。

from aws_lambda_powertools import Logger, Tracer
from aws_xray_sdk.core import xray_recorder

logger = Logger()
tracer = Tracer()


@logger.inject_lambda_context
@tracer.capture_lambda_handler
def process_async(event, context):
    logger.structure_logs(append=True,
                          AWSTraceHeader=None,
                          traceId=xray_recorder.current_segment().trace_id)
    logger.debug(event)

deploy し直す

一度 npx sls remove してから npx sls deploy します。

f:id:ksby:20200704205406p:plain f:id:ksby:20200704205505p:plain f:id:ksby:20200704205615p:plain f:id:ksby:20200704205717p:plain

動作確認は次の回です。

履歴

2020/07/05
初版発行。