aws-lambda-powertools を試してみる(Tracer&X-Ray 編その1)
概要
記事一覧はこちらです。
aws-lambda-powertools を試してみる(Logger 編) の続きで、今回は Tracer を試してみます。lambda-powertools-project プロジェクトを引き続き使用します。
API Gateway から呼び出された Lambda から以下の5パターンの処理を実行し、X-Ray にどう表示されるのか確認します。
- DynamoDB にデータを追加 → DynamoDB Streams 経由で Lambda を呼び出す。
- SQS にメッセージを送信して Lambda を呼び出す。
- SNS にメッセージを送信して Lambda を呼び出す。
- 別の Lambda を同期呼び出しする。
- 別の Lambda を非同期呼び出しする。
参照したサイト・書籍
Introducing AWS Lambda Destinations
https://aws.amazon.com/jp/blogs/compute/introducing-aws-lambda-destinations/AWS X-Ray と他の AWS のサービスの統合
https://docs.aws.amazon.com/ja_jp/xray/latest/devguide/xray-services.htmlDynamoDB Streamsの改めて再検証してみた
https://dev.classmethod.jp/articles/dynamodb-streams-scale-test/Amazon DynamoDB and Serverless - The Ultimate Guide
https://www.serverless.com/dynamodbEvent-driven processing with Serverless and DynamoDB streams
https://www.serverless.com/blog/event-driven-architecture-dynamodbDynamoDB streams creation
https://forum.serverless.com/t/dynamodb-streams-creation/792Welcome to the AWS X-Ray SDK for Python!
https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/index.htmlaws / aws-xray-sdk-python
https://github.com/aws/aws-xray-sdk-pythonPseudo Parameters Reference
https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/pseudo-parameter-reference.html
目次
- serverless.yml を変更する
- API Gateway から呼び出される apigw_handler.py を変更する
- DynamoDB から DynamoDB Streams 経由で呼び出される dynamodb_handler.py を作成する
- SQS 経由で呼び出される sqs_handler.py を作成する
- SNS 経由で呼び出される sns_handler.py を作成する
- 同期で呼び出す sync_handler.py を作成する
- 非同期で呼び出す async_handler.py を作成する
- deploy し直す
手順
serverless.yaml を変更する
sample_service/serverless.yaml を今回の環境に合わせて以下の内容にします。
service: sample-service plugins: - serverless-python-requirements custom: queueName: "SampleQueue" topicName: "SampleTopic" pythonRequirements: dockerizePip: true provider: name: aws runtime: python3.8 stage: dev region: ap-northeast-1 environment: # aws-lambda-powertools 用環境変数 LOG_LEVEL: DEBUG POWERTOOLS_LOGGER_LOG_EVENT: true POWERTOOLS_METRICS_NAMESPACE: lambda-powertools-project POWERTOOLS_SERVICE_NAME: sample-service tracing: apiGateway: true lambda: true iamRoleStatements: - Effect: Allow Action: - dynamodb:Query - dynamodb:Scan - dynamodb:GetItem - dynamodb:PutItem - dynamodb:UpdateItem - dynamodb:DeleteItem Resource: - Fn::GetAtt: [ SampleTable, Arn ] - Effect: Allow Action: - kms:GenerateDataKey - kms:Decrypt - sqs:* Resource: - Fn::GetAtt: [ SampleQueue, Arn ] - Effect: Allow Action: - kms:GenerateDataKey - kms:Decrypt - sns:Publish - sns:Subscribe Resource: - !Ref NotifySampleTopic - Effect: Allow Action: - lambda:InvokeFunction - lambda:InvokeAsync Resource: - Fn::Join: - ":" - - "arn:aws:lambda" - Ref: "AWS::Region" - Ref: "AWS::AccountId" - "function:*" functions: recvMsg: handler: apigw_handler.recv_msg events: - http: path: recv-msg method: post cors: true request: schema: application/json: ${file(msg_schema.json)} environment: QUEUE_URL: !Ref SampleQueue TOPIC_ARN: !Ref NotifySampleTopic processSampleTableStream: handler: dynamodb_handler.process_sample_table_stream events: - stream: type: dynamodb batchSize: 1 startingPosition: TRIM_HORIZON arn: Fn::GetAtt: [ SampleTable, StreamArn ] processSampleQueue: handler: sqs_handler.process_sample_queue events: - sqs: arn: Fn::GetAtt: [ SampleQueue, Arn ] processSampleTopic: handler: sns_handler.process_sample_topic events: - sns: arn: !Ref NotifySampleTopic topicName: "${self:custom.topicName}" processSync: handler: sync_handler.process_sync processAsync: handler: async_handler.process_async resources: Resources: SampleTable: Type: AWS::DynamoDB::Table Properties: TableName: sample-table AttributeDefinitions: - AttributeName: orderNumber AttributeType: S - AttributeName: orderItemNumber AttributeType: S KeySchema: - AttributeName: orderNumber KeyType: HASH - AttributeName: orderItemNumber KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 1 WriteCapacityUnits: 1 StreamSpecification: StreamViewType: NEW_IMAGE SampleQueue: Type: AWS::SQS::Queue Properties: QueueName: "${self:custom.queueName}" NotifySampleTopic: Type: AWS::SNS::Topic Properties: TopicName: "${self:custom.topicName}" NotifySampleSubscription: Type: AWS::SNS::Subscription DependsOn: - ProcessSampleTopicLambdaFunction Properties: TopicArn: !Ref NotifySampleTopic Endpoint: Fn::GetAtt: - ProcessSampleTopicLambdaFunction - Arn Protocol: lambda NotifySampleLambdaResourcePolicy: Type: AWS::Lambda::Permission DependsOn: - ProcessSampleTopicLambdaFunction Properties: FunctionName: !Ref ProcessSampleTopicLambdaFunction Principal: sns.amazonaws.com Action: "lambda:InvokeFunction" SourceArn: !Ref NotifySampleTopic
- aws-lambda-powertools 用環境変数は、
- debug ログを出力したいので
LOG_LEVEL: DEBUG
にします。 - decorate のログを出力したいので
POWERTOOLS_LOGGER_LOG_EVENT: true
にします。
- debug ログを出力したいので
- provider.iamRoleStatements に必要な IAM Role を設定します。ただし Lambda の同期・非同期呼び出しのための lambda:InvokeFunction、lambda:InvokeAsync の Resource は
- Fn::GetAtt: [ processSync, Arn ]
と記述するとError: The CloudFormation template is invalid: Template error: instance of Fn::GetAtt references undefined resource processSync
というエラーが、- Fn::GetAtt: [ ProcessSyncLambdaFunction, Arn ]
と記述するとError: The CloudFormation template is invalid: Circular dependency between resources: [...]
というエラーが出て設定方法が全然分からなかったので、arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:*
の形式(Lambda を*
で指定する)にしています。 - resources.Resources に今回使用する DynamoDB のテーブル(SampleTable)、SQS のキュー(SampleQueue)、SNS のトピック(SampleTopic)を記述します。
- DynamoDB のテーブル(SampleTable)は DynamoDB Streams を使用するので StreamSpecification に
StreamViewType: NEW_IMAGE
を記述します。
API Gateway から呼び出される apigw_handler.py を変更する
sample_service/apigw_handler.py を以下の内容にします。put_item_to_dynamodb(...)
以外の関数に orderedItem を渡さず event['body']
を渡しているのはミスです。。。(次の記事をほぼ書いた時に気づきました)
import json import os import boto3 from aws_lambda_powertools import Logger, Tracer from aws_xray_sdk.core import xray_recorder logger = Logger() tracer = Tracer() dynamodb_sample_table_tbl = boto3.resource('dynamodb').Table('sample-table') sqs_client = boto3.client('sqs') sns_client = boto3.client('sns') lambda_client = boto3.client('lambda') # @logger.inject_lambda_context を付けておくとログ出力時に context にセットされている # function_name 等の情報がセットされる # Capturing context Lambda info # https://awslabs.github.io/aws-lambda-powertools-python/core/logger/#capturing-context-lambda-info @logger.inject_lambda_context @tracer.capture_lambda_handler def recv_msg(event, context): # API Gateway に発行された traceId は event.headers.X-Amzn-Trace-Id にセットされている # Lambda 用に発行された traceId は xray_recorder.current_segment().trace_id にセットされている awsTraceHeader = event['headers']['X-Amzn-Trace-Id'] logger.structure_logs(append=True, AWSTraceHeader=awsTraceHeader, traceId=xray_recorder.current_segment().trace_id) logger.debug(event) request_body = json.loads(event['body']) # logger.info({ # "orderNumber": request_body['orderNumber'], # "orderDate": request_body['orderDate'] # }) for orderedItem in request_body['orderedItem']: logger.structure_logs(append=True, orderNumber=request_body['orderNumber'], orderItemNumber=orderedItem['orderItemNumber'] ) put_item_to_dynamodb(request_body['orderNumber'], orderedItem) send_message_to_sqs(os.environ['QUEUE_URL'], event['body']) send_message_to_sns(os.environ['TOPIC_ARN'], event['body']) call_process_sync(event['body']) call_process_async(event['body']) # logger.structure_logs(append=False) を呼ぶと @logger.inject_lambda_context # でセットされた項目まで消える logger.structure_logs(append=False) logger.info('テスト') body = { "message": "Go Serverless v1.0! Your function executed successfully!", "input": event } response = { "statusCode": 200, "body": json.dumps(body) } return response @tracer.capture_method def put_item_to_dynamodb(orderNumber, orderedItem): dynamodb_sample_table_tbl.put_item( Item={ 'orderNumber': orderNumber, 'orderItemNumber': orderedItem['orderItemNumber'], 'orderQuantity': orderedItem['orderQuantity'], 'productID': orderedItem['productID'], 'category': orderedItem['category'], 'price': orderedItem['price'] } ) logger.info('sample-table に追加しました') @tracer.capture_method def send_message_to_sqs(queue_url, body): response = sqs_client.send_message( QueueUrl=queue_url, MessageBody=body ) logger.info('SampleQueue にメッセージを送信しました') @tracer.capture_method def send_message_to_sns(topic_arn, body): response = sns_client.publish( TopicArn=topic_arn, Message=body ) logger.info('SampleTopic にメッセージを送信しました') @tracer.capture_method def call_process_sync(body): response = lambda_client.invoke( FunctionName='sample-service-dev-processSync', Payload=body ) logger.info('processSync を同期で呼び出ししました') @tracer.capture_method def call_process_async(body): response = lambda_client.invoke_async( FunctionName='sample-service-dev-processAsync', InvokeArgs=body ) logger.info('processAsync を非同期で呼び出ししました')
DynamoDB から DynamoDB Streams 経由で呼び出される dynamodb_handler.py を作成する
sample_service/dynamodb_handler.py を新規作成し、以下の内容を記述します。
以降の実装で logger.structure_logs(...)
を利用して AWSTraceHeader と traceId がログに出力されるようにしていますが、理由は次回記述します。
from aws_lambda_powertools import Logger, Tracer from aws_xray_sdk.core import xray_recorder logger = Logger() tracer = Tracer() @logger.inject_lambda_context @tracer.capture_lambda_handler def process_sample_table_stream(event, context): logger.structure_logs(append=True, AWSTraceHeader=None, traceId=xray_recorder.current_segment().trace_id) logger.debug(event)
SQS 経由で呼び出される sqs_handler.py を作成する
sample_service/dynamodb_handler.py を新規作成し、以下の内容を記述します。
from aws_lambda_powertools import Logger, Tracer from aws_xray_sdk.core import xray_recorder logger = Logger() tracer = Tracer() @logger.inject_lambda_context @tracer.capture_lambda_handler def process_sample_queue(event, context): logger.structure_logs(append=True, AWSTraceHeader=None, traceId=xray_recorder.current_segment().trace_id) logger.debug(event) for record in event['Records']: logger.structure_logs(append=True, AWSTraceHeader=record['attributes']['AWSTraceHeader']) logger.info(record)
SNS 経由で呼び出される sns_handler.py を作成する
sample_service/sns_handler.py を新規作成し、以下の内容を記述します。
from aws_lambda_powertools import Logger, Tracer from aws_xray_sdk.core import xray_recorder logger = Logger() tracer = Tracer() @logger.inject_lambda_context @tracer.capture_lambda_handler def process_sample_topic(event, context): logger.structure_logs(append=True, AWSTraceHeader=None, traceId=xray_recorder.current_segment().trace_id) logger.debug(event)
同期で呼び出す sync_handler.py を作成する
sample_service/sync_handler.py を新規作成し、以下の内容を記述します。
from aws_lambda_powertools import Logger, Tracer from aws_xray_sdk.core import xray_recorder logger = Logger() tracer = Tracer() @logger.inject_lambda_context @tracer.capture_lambda_handler def process_sync(event, context): logger.structure_logs(append=True, AWSTraceHeader=None, traceId=xray_recorder.current_segment().trace_id) logger.debug(event)
非同期で呼び出す async_handler.py を作成する
sample_service/async_handler.py を新規作成し、以下の内容を記述します。
from aws_lambda_powertools import Logger, Tracer from aws_xray_sdk.core import xray_recorder logger = Logger() tracer = Tracer() @logger.inject_lambda_context @tracer.capture_lambda_handler def process_async(event, context): logger.structure_logs(append=True, AWSTraceHeader=None, traceId=xray_recorder.current_segment().trace_id) logger.debug(event)
deploy し直す
一度 npx sls remove
してから npx sls deploy
します。
動作確認は次の回です。
履歴
2020/07/05
初版発行。