aws-lambda-powertools を試してみる(Tracer&X-Ray 編その2)
概要
記事一覧はこちらです。
aws-lambda-powertools を試してみる(Tracer&X-Ray 編その1) の続きです。
参照したサイト・書籍
SQS Tracing with AWSTraceHeader
https://github.com/aws/aws-xray-sdk-node/issues/208
目次
- API Gateway の Endpoint にメッセージを送信して X-Ray の表示内容を確認する
- SQS は X-Ray の円がつながらず SNS はつながる理由とは?
logger.structure_logs(...)
で AWSTraceHeader と trace_id を出力するよう設定して API Gateway → Lambda → SQS → Lambda の流れを CloudWatch Logs Insights で追えるようにする- boto3 のインスタンスの生成はグローバルで実施した方が2回目以降は速い
- 最後に
手順
API Gateway の Endpoint にメッセージを送信して X-Ray の表示内容を確認する
Postman からメッセージを送信すると 200 OK が返ってきました。
AWS マネジメントコンソールの X-Ray の Service Map を見ると以下の画面が表示されました。
- 赤丸:API Gateway → Lambda(recvMsg)
- 青色:Lambda(recvMsg) → DynamoDB → DynamoDB Streams → Lambda(processSampleTableStream)
- 緑色:Lambda(recvMsg) → SQS → Lambda(processSampleQueue)
- 黄色:Lambda(recvMsg) → SNS → Lambda(processSampleTopic)
- 紫色:Lambda(recvMsg) → Lambda(processSync)
- オレンジ色:Lambda(recvMsg) → Lambda(processAsync)
気になったのは、
- DynamoDB は X-Ray がまだサポートしていないので円がつながらないのは分かるのですが、SQS は AWS X-Ray が Amazon SQS をサポート の記事が出ているにもかかわらずこちらもつながって表示されません。
- SNS も Amazon SNS で AWS X-Ray のサポートを追加 の記事が出ていますが、こちらはつながって表示されています。
- Lambda の呼び出しは同期呼び出しだと円が2つ、非同期呼び出しだと1つです。
Traces には以下の一覧が表示されており、
メソッドに POST と表示されている行の詳細を表示すると以下のデータが表示されました。
- Cold Start 時は Initialization の時間がかかります。
- 大した処理を記述していない Lambda でも同期呼び出しだと初回は Initialization 並みの時間がかかっています。まあ当然ですね。
@tracer.capture_method
を付与した関数名が表示されており、関数毎の処理時間が分かります。
SQS は X-Ray の円がつながらず SNS はつながる理由とは?
Lambda(recvMsg)の CloudWatch のログを見ると decorate のログの message.headers.X-Amzn-Trace-Id に X-Ray のトレースID が出力されています。
Lambda(recvMsg)の logger.debug(event)
が出力したログを見ると、AWSTraceHeader(message.headers.X-Amzn-Trace-Id の値が出力されています)と traceId に同じ値がセットされています。
Lambda(processSampleQueue)の CloudWatch の decorate のログを見ると message.Records[0].attributes.AWSTraceHeader に message.headers.X-Amzn-Trace-Id に出力されていたトレースID が含まれています(メッセージ送信時にセットしていないにもかかわらず)。
Lambda(processSampleQueue)の logger.debug(event)
が出力したログを見ると、traceId には AWSTraceHeader にセットされていたものとは別のトレースID がセットされています。
Lambda(processSampleTopic)の CloudWatch の decorate のログを見ると message.headers.X-Amzn-Trace-Id に出力されていたトレースID はどこにもセットされていませんが、
Lambda(processSampleTopic)の logger.debug(event)
が出力したログを見ると、traceId に message.headers.X-Amzn-Trace-Id に出力されていたトレースID がセットされています。
ということで、SQS 経由だとトレースID が引き継がれませんが SNS 経由だと引き継がれるからという理由のようです。SQS は X-Ray でサポートされていると発表されているので円がつながるようにできそうな気がしたのですが、今回調べた限りではその方法はさっぱり分かりませんでした。。。
logger.structure_logs(...)
で AWSTraceHeader と trace_id を出力するよう設定して API Gateway → Lambda → SQS → Lambda の流れを CloudWatch Logs Insights で追えるようにする
SQS 経由の場合、メッセージの送信元の Lambda と受信先の Lambda でトレースID は異なりますが AWSTraceHeader で送信元のトレースID は伝わるようなので、logger.structure_logs(...)
で AWSTraceHeader を出力するようにしておけば一連の流れをログで追えそうかなと思いました。ついでにトレースID も出力します。
API Gateway から呼び出される Lambda(recvMsg)では以下のように実装し、
awsTraceHeader = event['headers']['X-Amzn-Trace-Id'] logger.structure_logs(append=True, AWSTraceHeader=awsTraceHeader, traceId=xray_recorder.current_segment().trace_id)
SQS 経由で呼び出される Lambda(processSampleQueue)では以下のように実装します。AWSTraceHeader はメッセージの属性に付加されるので、最初は None にしておいてメッセージの処理をする時に record['attributes']['AWSTraceHeader']
で上書きします。
logger.structure_logs(append=True, AWSTraceHeader=None, traceId=xray_recorder.current_segment().trace_id) .......... for record in event['Records']: logger.structure_logs(append=True, AWSTraceHeader=record['attributes']['AWSTraceHeader']) ..........
AWS マネジメントコンソールで CloudWatch Logs のロググループを表示してから、Lambda(recvMsg)と Lambda(processSampleQueue)のログを選択して CloudWatch Logs Insight で表示します。
クエリを以下の内容に変更してから実行すると、
fields @timestamp, level, substr(AWSTraceHeader, 0, 40), function_name, message | filter AWSTraceHeader =~ /1-5f008baf-76bb78b8aaa6a3e0546581c8/ | sort @timestamp desc | limit 20
- AWSTraceHeader だけ指定した場合 Lambda(processSampleQueue)で出力される文字列は Parent 等項目が追加されて少し長くなるので、確認したい文字列だけ出力されるよう
substr(AWSTraceHeader, 0, 40)
で指定します。 - Lambda(recvMsg)のログなのか Lambda(processSampleQueue)のログなのかが分かるよう function_name を出力します。
- 今回は処理内容がシンプルに分かればよいので
@message
→message
に変更します。 - API Gateway で発行されたトレースID
1-5f008baf-76bb78b8aaa6a3e0546581c8
が含まれているログを取得したいのでfilter AWSTraceHeader =~ /1-5f008baf-76bb78b8aaa6a3e0546581c8/
を追加します。
API Gateway → Lambda(recvMsg) → SQS → Lambda(processSampleQueue)の流れをログで追うことができました。SQS にメッセージを送信する時に orderedItem を1件ずつ分けずに event['body']
まるごと送信している失敗にも気づいてしまいましたが。。。
また SQS にはメッセージを2回しか送信していないのに Lambda(processSampleQueue)のログがなぜ3回出ているの?と思ったのですが、decorate のログは1回目は以下の内容なのですが、
2回目は @logger.inject_lambda_context
と logger.structure_logs(...)
で追加された項目が出ていたためでした。
boto3 のインスタンスの生成はグローバルで実施した方が2回目以降は速い
sample_service/apigw_handler.py で boto3 のインスタンスを生成するタイミングをハンドラーの前にしていますが、
.......... dynamodb_sample_table_tbl = boto3.resource('dynamodb').Table('sample-table') sqs_client = boto3.client('sqs') sns_client = boto3.client('sns') lambda_client = boto3.client('lambda') # @logger.inject_lambda_context を付けておくとログ出力時に context にセットされている # function_name 等の情報がセットされる # Capturing context Lambda info # https://awslabs.github.io/aws-lambda-powertools-python/core/logger/#capturing-context-lambda-info @logger.inject_lambda_context @tracer.capture_lambda_handler def recv_msg(event, context): ..........
この方式で API Gateway にアクセスすると Lambda(recvMsg)の実行時間は cold start がなければ 321ms、278ms、252ms でした。
これをハンドラーが呼び出されてから都度生成されるように以下のように変更してみたところ、
@tracer.capture_method def put_item_to_dynamodb(orderNumber, orderedItem): dynamodb_sample_table_tbl = boto3.resource('dynamodb').Table('sample-table') dynamodb_sample_table_tbl.put_item( Item={ ..........
Lambda(recvMsg)の実行時間は cold start がなければ 627ms、515ms、554ms でした。約2倍です。
以前ユニットテストを書いた時にはエラーが出るのでハンドラー関数内で生成するようにしましたが、グローバルで1度だけ生成するようにした方がやっぱり速いんですね。当然といえば当然。。。 ユニットテストでエラーが出るのを解決する別の方法を調べることにします。
最後に
- Logger は JSON で出力できたり、context の function_name 等の情報を自動でセットしてくれたりするのが便利です。
logger.structure_logs(...)
で共通で出力しておきたい項目をセットしておけるのも使い勝手がいいなと思いました。- 始めて X-Ray を使ったのですが、Serverless Framework の設定と Tracer で本当に簡単に連携できるようになるので驚きでした。X-Ray はなんか面倒な印象があったのですが、こんなに簡単に使えるならば積極的に使っていきたいと思います。
AWS Black Belt Online Seminar AWS X-Ray 資料及び QA 公開 も見つけたので、後で見ておきましょう。
履歴
2020/07/05
初版発行。
aws-lambda-powertools を試してみる(Tracer&X-Ray 編その1)
概要
記事一覧はこちらです。
aws-lambda-powertools を試してみる(Logger 編) の続きで、今回は Tracer を試してみます。lambda-powertools-project プロジェクトを引き続き使用します。
API Gateway から呼び出された Lambda から以下の5パターンの処理を実行し、X-Ray にどう表示されるのか確認します。
- DynamoDB にデータを追加 → DynamoDB Streams 経由で Lambda を呼び出す。
- SQS にメッセージを送信して Lambda を呼び出す。
- SNS にメッセージを送信して Lambda を呼び出す。
- 別の Lambda を同期呼び出しする。
- 別の Lambda を非同期呼び出しする。
参照したサイト・書籍
Introducing AWS Lambda Destinations
https://aws.amazon.com/jp/blogs/compute/introducing-aws-lambda-destinations/AWS X-Ray と他の AWS のサービスの統合
https://docs.aws.amazon.com/ja_jp/xray/latest/devguide/xray-services.htmlDynamoDB Streamsの改めて再検証してみた
https://dev.classmethod.jp/articles/dynamodb-streams-scale-test/Amazon DynamoDB and Serverless - The Ultimate Guide
https://www.serverless.com/dynamodbEvent-driven processing with Serverless and DynamoDB streams
https://www.serverless.com/blog/event-driven-architecture-dynamodbDynamoDB streams creation
https://forum.serverless.com/t/dynamodb-streams-creation/792Welcome to the AWS X-Ray SDK for Python!
https://docs.aws.amazon.com/xray-sdk-for-python/latest/reference/index.htmlaws / aws-xray-sdk-python
https://github.com/aws/aws-xray-sdk-pythonPseudo Parameters Reference
https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/pseudo-parameter-reference.html
目次
- serverless.yml を変更する
- API Gateway から呼び出される apigw_handler.py を変更する
- DynamoDB から DynamoDB Streams 経由で呼び出される dynamodb_handler.py を作成する
- SQS 経由で呼び出される sqs_handler.py を作成する
- SNS 経由で呼び出される sns_handler.py を作成する
- 同期で呼び出す sync_handler.py を作成する
- 非同期で呼び出す async_handler.py を作成する
- deploy し直す
手順
serverless.yaml を変更する
sample_service/serverless.yaml を今回の環境に合わせて以下の内容にします。
service: sample-service plugins: - serverless-python-requirements custom: queueName: "SampleQueue" topicName: "SampleTopic" pythonRequirements: dockerizePip: true provider: name: aws runtime: python3.8 stage: dev region: ap-northeast-1 environment: # aws-lambda-powertools 用環境変数 LOG_LEVEL: DEBUG POWERTOOLS_LOGGER_LOG_EVENT: true POWERTOOLS_METRICS_NAMESPACE: lambda-powertools-project POWERTOOLS_SERVICE_NAME: sample-service tracing: apiGateway: true lambda: true iamRoleStatements: - Effect: Allow Action: - dynamodb:Query - dynamodb:Scan - dynamodb:GetItem - dynamodb:PutItem - dynamodb:UpdateItem - dynamodb:DeleteItem Resource: - Fn::GetAtt: [ SampleTable, Arn ] - Effect: Allow Action: - kms:GenerateDataKey - kms:Decrypt - sqs:* Resource: - Fn::GetAtt: [ SampleQueue, Arn ] - Effect: Allow Action: - kms:GenerateDataKey - kms:Decrypt - sns:Publish - sns:Subscribe Resource: - !Ref NotifySampleTopic - Effect: Allow Action: - lambda:InvokeFunction - lambda:InvokeAsync Resource: - Fn::Join: - ":" - - "arn:aws:lambda" - Ref: "AWS::Region" - Ref: "AWS::AccountId" - "function:*" functions: recvMsg: handler: apigw_handler.recv_msg events: - http: path: recv-msg method: post cors: true request: schema: application/json: ${file(msg_schema.json)} environment: QUEUE_URL: !Ref SampleQueue TOPIC_ARN: !Ref NotifySampleTopic processSampleTableStream: handler: dynamodb_handler.process_sample_table_stream events: - stream: type: dynamodb batchSize: 1 startingPosition: TRIM_HORIZON arn: Fn::GetAtt: [ SampleTable, StreamArn ] processSampleQueue: handler: sqs_handler.process_sample_queue events: - sqs: arn: Fn::GetAtt: [ SampleQueue, Arn ] processSampleTopic: handler: sns_handler.process_sample_topic events: - sns: arn: !Ref NotifySampleTopic topicName: "${self:custom.topicName}" processSync: handler: sync_handler.process_sync processAsync: handler: async_handler.process_async resources: Resources: SampleTable: Type: AWS::DynamoDB::Table Properties: TableName: sample-table AttributeDefinitions: - AttributeName: orderNumber AttributeType: S - AttributeName: orderItemNumber AttributeType: S KeySchema: - AttributeName: orderNumber KeyType: HASH - AttributeName: orderItemNumber KeyType: RANGE ProvisionedThroughput: ReadCapacityUnits: 1 WriteCapacityUnits: 1 StreamSpecification: StreamViewType: NEW_IMAGE SampleQueue: Type: AWS::SQS::Queue Properties: QueueName: "${self:custom.queueName}" NotifySampleTopic: Type: AWS::SNS::Topic Properties: TopicName: "${self:custom.topicName}" NotifySampleSubscription: Type: AWS::SNS::Subscription DependsOn: - ProcessSampleTopicLambdaFunction Properties: TopicArn: !Ref NotifySampleTopic Endpoint: Fn::GetAtt: - ProcessSampleTopicLambdaFunction - Arn Protocol: lambda NotifySampleLambdaResourcePolicy: Type: AWS::Lambda::Permission DependsOn: - ProcessSampleTopicLambdaFunction Properties: FunctionName: !Ref ProcessSampleTopicLambdaFunction Principal: sns.amazonaws.com Action: "lambda:InvokeFunction" SourceArn: !Ref NotifySampleTopic
- aws-lambda-powertools 用環境変数は、
- debug ログを出力したいので
LOG_LEVEL: DEBUG
にします。 - decorate のログを出力したいので
POWERTOOLS_LOGGER_LOG_EVENT: true
にします。
- debug ログを出力したいので
- provider.iamRoleStatements に必要な IAM Role を設定します。ただし Lambda の同期・非同期呼び出しのための lambda:InvokeFunction、lambda:InvokeAsync の Resource は
- Fn::GetAtt: [ processSync, Arn ]
と記述するとError: The CloudFormation template is invalid: Template error: instance of Fn::GetAtt references undefined resource processSync
というエラーが、- Fn::GetAtt: [ ProcessSyncLambdaFunction, Arn ]
と記述するとError: The CloudFormation template is invalid: Circular dependency between resources: [...]
というエラーが出て設定方法が全然分からなかったので、arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:*
の形式(Lambda を*
で指定する)にしています。 - resources.Resources に今回使用する DynamoDB のテーブル(SampleTable)、SQS のキュー(SampleQueue)、SNS のトピック(SampleTopic)を記述します。
- DynamoDB のテーブル(SampleTable)は DynamoDB Streams を使用するので StreamSpecification に
StreamViewType: NEW_IMAGE
を記述します。
API Gateway から呼び出される apigw_handler.py を変更する
sample_service/apigw_handler.py を以下の内容にします。put_item_to_dynamodb(...)
以外の関数に orderedItem を渡さず event['body']
を渡しているのはミスです。。。(次の記事をほぼ書いた時に気づきました)
import json import os import boto3 from aws_lambda_powertools import Logger, Tracer from aws_xray_sdk.core import xray_recorder logger = Logger() tracer = Tracer() dynamodb_sample_table_tbl = boto3.resource('dynamodb').Table('sample-table') sqs_client = boto3.client('sqs') sns_client = boto3.client('sns') lambda_client = boto3.client('lambda') # @logger.inject_lambda_context を付けておくとログ出力時に context にセットされている # function_name 等の情報がセットされる # Capturing context Lambda info # https://awslabs.github.io/aws-lambda-powertools-python/core/logger/#capturing-context-lambda-info @logger.inject_lambda_context @tracer.capture_lambda_handler def recv_msg(event, context): # API Gateway に発行された traceId は event.headers.X-Amzn-Trace-Id にセットされている # Lambda 用に発行された traceId は xray_recorder.current_segment().trace_id にセットされている awsTraceHeader = event['headers']['X-Amzn-Trace-Id'] logger.structure_logs(append=True, AWSTraceHeader=awsTraceHeader, traceId=xray_recorder.current_segment().trace_id) logger.debug(event) request_body = json.loads(event['body']) # logger.info({ # "orderNumber": request_body['orderNumber'], # "orderDate": request_body['orderDate'] # }) for orderedItem in request_body['orderedItem']: logger.structure_logs(append=True, orderNumber=request_body['orderNumber'], orderItemNumber=orderedItem['orderItemNumber'] ) put_item_to_dynamodb(request_body['orderNumber'], orderedItem) send_message_to_sqs(os.environ['QUEUE_URL'], event['body']) send_message_to_sns(os.environ['TOPIC_ARN'], event['body']) call_process_sync(event['body']) call_process_async(event['body']) # logger.structure_logs(append=False) を呼ぶと @logger.inject_lambda_context # でセットされた項目まで消える logger.structure_logs(append=False) logger.info('テスト') body = { "message": "Go Serverless v1.0! Your function executed successfully!", "input": event } response = { "statusCode": 200, "body": json.dumps(body) } return response @tracer.capture_method def put_item_to_dynamodb(orderNumber, orderedItem): dynamodb_sample_table_tbl.put_item( Item={ 'orderNumber': orderNumber, 'orderItemNumber': orderedItem['orderItemNumber'], 'orderQuantity': orderedItem['orderQuantity'], 'productID': orderedItem['productID'], 'category': orderedItem['category'], 'price': orderedItem['price'] } ) logger.info('sample-table に追加しました') @tracer.capture_method def send_message_to_sqs(queue_url, body): response = sqs_client.send_message( QueueUrl=queue_url, MessageBody=body ) logger.info('SampleQueue にメッセージを送信しました') @tracer.capture_method def send_message_to_sns(topic_arn, body): response = sns_client.publish( TopicArn=topic_arn, Message=body ) logger.info('SampleTopic にメッセージを送信しました') @tracer.capture_method def call_process_sync(body): response = lambda_client.invoke( FunctionName='sample-service-dev-processSync', Payload=body ) logger.info('processSync を同期で呼び出ししました') @tracer.capture_method def call_process_async(body): response = lambda_client.invoke_async( FunctionName='sample-service-dev-processAsync', InvokeArgs=body ) logger.info('processAsync を非同期で呼び出ししました')
DynamoDB から DynamoDB Streams 経由で呼び出される dynamodb_handler.py を作成する
sample_service/dynamodb_handler.py を新規作成し、以下の内容を記述します。
以降の実装で logger.structure_logs(...)
を利用して AWSTraceHeader と traceId がログに出力されるようにしていますが、理由は次回記述します。
from aws_lambda_powertools import Logger, Tracer from aws_xray_sdk.core import xray_recorder logger = Logger() tracer = Tracer() @logger.inject_lambda_context @tracer.capture_lambda_handler def process_sample_table_stream(event, context): logger.structure_logs(append=True, AWSTraceHeader=None, traceId=xray_recorder.current_segment().trace_id) logger.debug(event)
SQS 経由で呼び出される sqs_handler.py を作成する
sample_service/dynamodb_handler.py を新規作成し、以下の内容を記述します。
from aws_lambda_powertools import Logger, Tracer from aws_xray_sdk.core import xray_recorder logger = Logger() tracer = Tracer() @logger.inject_lambda_context @tracer.capture_lambda_handler def process_sample_queue(event, context): logger.structure_logs(append=True, AWSTraceHeader=None, traceId=xray_recorder.current_segment().trace_id) logger.debug(event) for record in event['Records']: logger.structure_logs(append=True, AWSTraceHeader=record['attributes']['AWSTraceHeader']) logger.info(record)
SNS 経由で呼び出される sns_handler.py を作成する
sample_service/sns_handler.py を新規作成し、以下の内容を記述します。
from aws_lambda_powertools import Logger, Tracer from aws_xray_sdk.core import xray_recorder logger = Logger() tracer = Tracer() @logger.inject_lambda_context @tracer.capture_lambda_handler def process_sample_topic(event, context): logger.structure_logs(append=True, AWSTraceHeader=None, traceId=xray_recorder.current_segment().trace_id) logger.debug(event)
同期で呼び出す sync_handler.py を作成する
sample_service/sync_handler.py を新規作成し、以下の内容を記述します。
from aws_lambda_powertools import Logger, Tracer from aws_xray_sdk.core import xray_recorder logger = Logger() tracer = Tracer() @logger.inject_lambda_context @tracer.capture_lambda_handler def process_sync(event, context): logger.structure_logs(append=True, AWSTraceHeader=None, traceId=xray_recorder.current_segment().trace_id) logger.debug(event)
非同期で呼び出す async_handler.py を作成する
sample_service/async_handler.py を新規作成し、以下の内容を記述します。
from aws_lambda_powertools import Logger, Tracer from aws_xray_sdk.core import xray_recorder logger = Logger() tracer = Tracer() @logger.inject_lambda_context @tracer.capture_lambda_handler def process_async(event, context): logger.structure_logs(append=True, AWSTraceHeader=None, traceId=xray_recorder.current_segment().trace_id) logger.debug(event)
deploy し直す
一度 npx sls remove
してから npx sls deploy
します。
動作確認は次の回です。
履歴
2020/07/05
初版発行。
aws-lambda-powertools を試してみる(Logger 編)
概要
記事一覧はこちらです。
Twitter で Simplifying serverless best practices with Lambda Powertools の記事を見かけました。Python で Lambda を作成する時に Logging、Tracing、Metrics の機能の実装を助けてくれるライブラリとのこと。
記事は SAM で実装されていたので Serverless Framework で実装して動作を確認してみます。Logger で1回(今回)、Tracer で2回の計3回に分けて書きます。Metrics は使い方(というか何が便利になるのか)が今ひとつ分からなかったので書きません。
参照したサイト・書籍
Simplifying serverless best practices with Lambda Powertools
https://aws.amazon.com/jp/blogs/opensource/simplifying-serverless-best-practices-with-lambda-powertools/AWS Lambda Powertools Python
https://awslabs.github.io/aws-lambda-powertools-python/aws-samples / aws-serverless-shopping-cart
https://github.com/aws-samples/aws-serverless-shopping-cart/tree/master/backend/shopping-cart-service- Python で書かれた Serverless のサンプル。今回の記事とは直接関係ありませんが、後で見たいので残しておきます。
Serverless Framework v1.41 - X-Ray for API Gateway, Invoke Local with Docker Improvements & More
https://www.serverless.com/blog/framework-release-v141/オブザーバビリティ(可観測性)がなぜ必要だと考えるのか
https://ymotongpoo.hatenablog.com/entry/2019/03/25/084500CloudWatch Logs Insights を使用したログデータの分析
https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/AnalyzingLogData.html地味にイイネ!Amazon CloudWatch Logs Insightsで効率的に調査しよう!
https://blog.ecbeing.tech/entry/2019/09/19/124352CloudWatch Logs Insights でApacheのアクセスログを確認する
https://dev.classmethod.jp/articles/cwinsights-apache/
目次
- lambda-powertools-project プロジェクトを作成する
- sample_service サブプロジェクトを作成する
- requirements.txt を作成する
- serverless.yml に POWERTOOLS 用の環境変数と X-Ray を有効にする設定を追加する
- Logging の機能を試してみる
手順
lambda-powertools-project プロジェクトを作成する
以下の手順で lambda-powertools-project プロジェクトを作成します。具体的な手順は IntelliJ IDEA+Node.js+npm+serverless framework+Python の組み合わせで開発環境を構築して AWS Lambda を作成してみる 参照。
- lambda-powertools-project の Empty Project を作成する。
- Python の仮想環境を作成する。
- Serverless Framework をローカルインストールする。
- .envrc を作成する。
- aws-lambda-powertools は deploy 時にアップロードしないと使用できないので serverless-python-requirements をインストールします。
npm install --save-dev serverless-python-requirements
- Tracing の機能を試すのに Lambda から別のサービスを呼び出すので boto3 をインストールします。aws-lambda-powertools もインストールします。
pip install boto3
pip install aws-lambda-powertools
sample_service サブプロジェクトを作成する
プロジェクトのルートディレクトリの下で npx sls create --template aws-python3 --path sample_service
を実行して sample_service サブプロジェクトを作成します。
requirements.txt を作成する
deploy 時に aws-lambda-powertools もアップロードするために sample_service ディレクトリの下に requirements.txt を作成して以下の内容を記述します。
aws-lambda-powertools==1.0.0
serverless.yml に POWERTOOLS 用の環境変数と X-Ray を有効にする設定を追加する
sample_service/serverless.yml を以下のように変更します。
service: sample-service plugins: - serverless-python-requirements custom: pythonRequirements: dockerizePip: true provider: name: aws runtime: python3.8 stage: dev region: ap-northeast-1 environment: # aws-lambda-powertools 用環境変数 LOG_LEVEL: INFO POWERTOOLS_LOGGER_LOG_EVENT: true POWERTOOLS_METRICS_NAMESPACE: lambda-powertools-project POWERTOOLS_SERVICE_NAME: sample-service tracing: apiGateway: true lambda: true ..........
- グローバルな環境変数は provider.environment に記述すればよいので、ここに POWERTOOLS 用の環境変数のうち以下の4つを設定します。
- LOG_LEVEL
- POWERTOOLS_LOGGER_LOG_EVENT
- POWERTOOLS_METRICS_NAMESPACE
- POWERTOOLS_SERVICE_NAME
- X-Ray の設定は provider.tracing に記述するので、
apiGateway: true
、lambda: true
を設定します。
Logging の機能を試してみる
今回は以下の JSON Schema のメッセージを送信します(sample_service ディレクトリの下に msg_schema.json というファイルを作成しその中に記述します)。
{ "$schema": "http://json-schema.org/draft-04/schema#", "definitions": { "orderedItem": { "type": "object", "properties": { "orderItemNumber": { "type": "string", "pattern": "^[0-9]+$" }, "orderQuantity": { "type": "integer", "minimum": 0, "maximum": 999 }, "productID": { "type": "string", "pattern": "^[0-9]+$" }, "category": { "type": "string", "enum": [ "book", "camera", "computer" ] }, "price": { "type": "integer", "minimum": 1 } }, "required": [ "orderItemNumber", "orderQuantity", "productID", "category", "price" ] } }, "title": "Order", "type": "object", "properties": { "orderNumber": { "type": "string", "pattern": "^[0-9]+$" }, "orderDate": { "type": "string", "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$" }, "isGift": { "type": "boolean" }, "orderedItem": { "type": "array", "minItems": 1, "maxItems": 3, "uniqueItems": true, "items": { "$ref": "#/definitions/orderedItem" } } }, "required": [ "orderNumber", "orderDate", "orderedItem" ] }
メッセージのサンプルです。
{ "orderNumber": "1", "orderDate": "2020-06-28", "isGift": false, "orderedItem": [ { "orderItemNumber": "1", "orderQuantity": 3, "productID": "1001", "category": "book", "price": 3800 }, { "orderItemNumber": "2", "orderQuantity": 1, "productID": "3052", "category": "camera", "price": 150000 } ] }
sample_service の下の handler.py を apigw_handler.py にリネームし、以下の内容を記述します。
import json from aws_lambda_powertools import Logger logger = Logger() # @logger.inject_lambda_context を付けておくとログ出力時に context にセットされている # function_name 等の情報がセットされる # Capturing context Lambda info # https://awslabs.github.io/aws-lambda-powertools-python/core/logger/#capturing-context-lambda-info @logger.inject_lambda_context def recv_msg(event, context): logger.debug(event['body']) request_body = json.loads(event['body']) logger.info({ "orderNumber": request_body['orderNumber'], "orderDate": request_body['orderDate'] }) body = { "message": "Go Serverless v1.0! Your function executed successfully!", "input": event } response = { "statusCode": 200, "body": json.dumps(body) } return response
sample_service/serverless.yml の functions の記述を以下の内容に変更します。
functions: recvMsg: handler: apigw_handler.recv_msg events: - http: path: recv-msg method: post cors: true request: schema: application/json: ${file(msg_schema.json)}
deploy します。
Postman からサンプルメッセージを送信すると 200 OK が返ってきました。
試した結果としては、
まずログのメッセージフォーマットが JSON になり、logger のメソッドに渡した文字列は message に出力されます。JSON フォーマットになることで CloudWatch Logs Insights で検索しやすくなります。
Lambda に @logger.inject_lambda_context
を付与しておくと、出力されるログに引数 context にセットされている function_name 等の情報が追加されます。
@logger.inject_lambda_context
が付与されていて、かつ環境変数 POWERTOOLS_LOGGER_LOG_EVENT が true に設定されていると、Lambda が呼び出された時に decorate のログ(API Gateway から呼び出された Lambda だとアクセス時の HTTPヘッダ等)が出力されます。環境変数 POWERTOOLS_LOGGER_LOG_EVENT のデフォルト値が false なので DEBUG 用?
環境変数 LOG_LEVEL で出力するログのレベルを調整可能です。今は LOG_LEVEL: INFO
の設定なので INFO のログだけ出ていますが(環境変数 POWERTOOLS_LOGGER_LOG_EVENT は false にしています)、
sample_service/serverless.yml で functions.recvMsg.environment に LOG_LEVEL: DEBUG
を設定した後、deploy し直してからメッセージを送信すると、
provider: name: aws runtime: python3.8 stage: dev region: ap-northeast-1 environment: # aws-lambda-powertools 用環境変数 LOG_LEVEL: INFO POWERTOOLS_LOGGER_LOG_EVENT: false POWERTOOLS_METRICS_NAMESPACE: lambda-powertools-project POWERTOOLS_SERVICE_NAME: sample-service tracing: apiGateway: true lambda: true functions: recvMsg: handler: apigw_handler.recv_msg events: - http: path: recv-msg method: post cors: true request: schema: application/json: ${file(msg_schema.json)} environment: LOG_LEVEL: DEBUG
DEBUG のログも出力されます。
履歴
2020/07/05
初版発行。
API Gateway で受信するデータを JSON Schema Validation でチェックしてから SQS へ送信する
概要
記事一覧はこちらです。
Using JSON Schema Validation with the AWS API Gateway という記事を見かけました。API Gateway で受信したメッセージを Lambda を呼び出す前に JSON Schema Validation で検証できるそうなので試してみます。Lambda を呼び出したら SQL へメッセージを送信し、別の Lambda でメッセージを受信します。
Serverless Framework の 1.8 から functions の events に sqs が記述できるようになるのですが、今回試した時のバージョンは 1.74 でしたので CloudWatch Events で 1分毎に Lambda を起動して SQS をチェックすることにします。
※(2020/07/05追記)何を見間違えてしまったのか SQS を events に設定できるのは 1.28 からでした。。。 今使用しているバージョンは 1.74 なので Using SQS with AWS Lambda and Serverless の記事通りに実装したらイベントベースで Lambda が呼び出されるようにできますね。
SQS への送信は serverless-apigateway-service-proxy プラグインを使用したかったのですが、このプラグインを使用して API Gateway の Endpoint を定義した時に JSON Schema Validation をセットする方法がないようだったので諦めました。
参照したサイト・書籍
Using JSON Schema Validation with the AWS API Gateway
https://fernandomc.com/posts/schema-validation-serverless-framework/API Gateway でリクエストの検証を有効にする
https://docs.aws.amazon.com/ja_jp/apigateway/latest/developerguide/api-gateway-method-request-validation.htmlJSON Schema
https://json-schema.org/Getting Started Step-By-Step
https://json-schema.org/learn/getting-started-step-by-step.htmlUnderstanding JSON Schema
https://json-schema.org/understanding-json-schema/Enabling JSON5
https://www.jetbrains.com/help/idea/json.html#ws_json_choose_version知らないうちにJSON5 in Babel
https://qiita.com/jkr_2255/items/026e0fdb4570c88c4f51AWS::SQS::Queue
https://docs.aws.amazon.com/ja_jp/AWSCloudFormation/latest/UserGuide/aws-properties-sqs-queues.htmlAWS Lambda を Amazon SQS に使用する
https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/with-sqs.htmlSending and receiving messages in Amazon SQS
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs-example-sending-receiving-msgs.htmlUsing SQS with AWS Lambda and Serverless
https://www.serverless.com/blog/aws-lambda-sqs-serverless-integration/Serverless updates - SQS events, private endpoints, Event Gateway open source
https://www.serverless.com/blog/serverless-updates-framework-v128/
目次
- jsonschema-sqs-project プロジェクトを作成する
- order_service サブプロジェクトを作成する
- 受信するメッセージのサンプルを作成する
- IntelliJ IDEA の .json を関連付ける File Type を JSON → JSON5 に変更する。。。が、API Gateway で JSON5 がサポートされていなかったので元に戻す
- JSON Schema Validation のための schema.json を作成する
- API Gateway 経由でメッセージを受信して SQS へ送信する Lambda を実装する
- SQS からメッセージを取り出してログに出力する Lambda を実装する
- serverless.yml を変更する
- deploy する
- 動作確認
- Validation エラーの時にどうやって原因を調べればよいのか?
手順
jsonschema-sqs-project プロジェクトを作成する
以下の手順で jsonschema-sqs-project プロジェクトを作成します。具体的な手順は IntelliJ IDEA+Node.js+npm+serverless framework+Python の組み合わせで開発環境を構築して AWS Lambda を作成してみる 参照。
- jsonschema-sqs-project の Empty Project を作成する。
- Python の仮想環境を作成する。
- Serverless Framework をローカルインストールする。
- .envrc を作成する。
pip install boto3
order_service サブプロジェクトを作成する
プロジェクトのルートディレクトリの下で npx sls create --template aws-python3 --path order_service
を実行して order_service サブプロジェクトを作成します。
受信するメッセージのサンプルを作成する
今回は以下のサンプルメッセージを受信します。細かい仕様は後で JSON Schema を作成する時に決めます。
{ "orderNumber": "1", "orderDate": "2020-06-27", "isGift": false, "orderedItem": [ { "orderItemNumber": "1001", "identifier": "book", "name": "サンプル書籍", "orderQuantity": 1, "price": 3800 }, { "orderItemNumber": "61059", "identifier": "furniture", "name": "テスト椅子", "orderQuantity": 2, "price": 12000 } ] }
IntelliJ IDEA の .json を関連付ける File Type を JSON → JSON5 に変更する。。。が、API Gateway で JSON5 がサポートされていなかったので元に戻す
IntelliJ IDEA で .json のファイルを編集すると "$schema": "http://json-schema.org/draft-04/schema#"
の定義を参照して自動補完してくれます(VSCode でも補完してくれましたので補完して当然のようです)。
schema を定義しておくと json の編集が便利になるんだなと思い IDEA のマニュアルで JSON を見にいくと、Enabling JSON5 という文字が。JSON5 って何?
調べてみると 知らないうちにJSON5 in Babel の記事を見つけました。JSON なのにケツカンマ有効でコメントが付けらたりできるとのこと。かなり前からあるのに全然知りませんでした。。。
Extend the JSON5 syntax to all JSON files に従い、.json を関連付ける File Type を JSON → JSON5 に変更してみます。
「Settings」ダイアログを開いて「Editor」-「File Types」の設定を確認すると、確かに今は「JSON」の File Type に .json が登録されています。
「JSON5」の「Registered patterns」で「+」ボタンをクリックして「Add Wildcard」ダイアログを表示してから *.json
を入力して「OK」ボタンをクリックします。
「Reassigned wildcard」ボタンをクリックします。
「JSON5」の File Type に .json が登録されますので「OK」ボタンをクリックしてダイアログを閉じます。
File Type が「JSON」の時は以下の画像のように赤波線が表示されていましたが、
「JSON5」に変更すると全て消えました!
すごいなと思いつつ API Gateway の JSON Schema Validation でも JSON5 がサポートされているのか別にサンプルプロジェクトを作って試してみたところ、JSON.parse(...)
が使われていてダメでした。。。 残念ですが、元に戻すことにします。
JSON5 を使いたい時には拡張子を .json5 にすればよいので覚えておくことにします。
JSON Schema Validation のための schema.json を作成する
order_service サブプロジェクトの下に schema.json を作成して、以下の内容を記述します。JSON Schema の定義で入れられそうなものは出来るだけ入れてみました。
{ "$schema": "http://json-schema.org/draft-04/schema#", "definitions": { "orderedItem": { "type": "object", "properties": { "orderItemNumber": { "type": "string", "pattern": "^[0-9]+$" }, "identifier": { "type": "string", "enum": [ "book", "furniture" ] }, "name": { "type": "string", "minLength": 1, "maxLength": 10 }, "orderQuantity": { "type": "integer", "minimum": 0, "maximum": 999 }, "price": { "type": "integer", "minimum": 0 } }, "required": [ "orderItemNumber", "identifier", "name", "orderQuantity", "price" ] } }, "title": "Order", "type": "object", "properties": { "orderNumber": { "type": "string", "pattern": "^[0-9]+$" }, "orderDate": { "type": "string", "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$" }, "isGift": { "type": "boolean" }, "orderedItem": { "type": "array", "minItems": 1, "maxItems": 3, "uniqueItems": true, "items": { "$ref": "#/definitions/orderedItem" } } }, "required": [ "orderNumber", "orderDate", "orderedItem" ] }
API Gateway 経由でメッセージを受信して SQS へ送信する Lambda を実装する
order_service/handler.py のファイル名を apigw_handler.py に変更し、以下の内容に変更します。
import json import logging import os import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) def send_msg(event, context): sqs_client = boto3.client('sqs') logger.info(event['body']) response = sqs_client.send_message( QueueUrl=os.environ['QUEUE_URL'], MessageBody=event['body'] ) body = { "message": f"send message to sqs, MessageId = {response['MessageId']}", "input": event } response = { "statusCode": 200, "body": json.dumps(body) } return response
SQS からメッセージを取り出してログに出力する Lambda を実装する
order_service ディレクトリの下に sqs_handler.py を作成し、以下の内容を記述します。
import logging import os import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) def recv_msg(event, context): sqs_client = boto3.client('sqs') response = sqs_client.receive_message(QueueUrl=os.environ['QUEUE_URL']) if 'Messages' in response: logger.info(response) for msg in response['Messages']: sqs_client.delete_message(QueueUrl=os.environ['QUEUE_URL'], ReceiptHandle=msg['ReceiptHandle'])
serverless.yml を変更する
order_service/serverless.yml を以下の内容に変更します。
service: order-service custom: queueName: "SampleQueue" provider: name: aws runtime: python3.8 stage: dev region: ap-northeast-1 iamRoleStatements: - Effect: "Allow" Action: - "sqs:*" Resource: - Fn::GetAtt: [ SampleQueue, Arn ] functions: sendMsg: handler: apigw_handler.send_msg environment: QUEUE_URL: !Ref SampleQueue events: - http: path: send-msg method: post cors: true request: schema: application/json: ${file(schema.json)} recvMsg: handler: sqs_handler.recv_msg environment: QUEUE_URL: !Ref SampleQueue events: - schedule: rate(1 minute) resources: Resources: SampleQueue: Type: AWS::SQS::Queue Properties: QueueName: "${self:custom.queueName}"
deploy する
deploy します。
動作確認
Postman から endpoint へ上に書いたサンプルメッセージを送信すると 200 OK が返ってきました。
ログにも送受信されたメッセージが出力されています。
JSON Schema の定義に合わないメッセージ(orderedItem の要素数が minItems 未満)を送信すると 400 Bad Request が返ってきました。
Validation エラーの時にどうやって原因を調べればよいのか?
1つ目は送信しているメッセージを適当なファイルに保存してから、IntelliJ IDEA のエディタ上で JSON Schema を適用してエラー箇所を表示させる方法です。
ファイルに保存してから画面右下の「No JSON schema」をクリックします。
メニューが表示されるので、一番上の「New Schema Mapping...」を選択します。
「JSON Schema Mappings」ダイアログが表示されるので、「Schema file or URL」に上で作成した schema.json を選択してから「OK」ボタンをクリックします。
ファイルに保存したメッセージを変更して JSON Schema に合わないようにするとその箇所が分かるように表示されます。
もっと詳細な内容を知りたい場合には、エディタ上でコンテキストメニューを表示させて「Analyze」-「Inspect Code...」を選択した後、
「Specify Inspection Code」ダイアログが表示されるので、何も変更せずに「OK」ボタンをクリックします。
そうすると画面下に Inspection Results Window が表示されて、そこに詳細なエラー内容が表示されます。
2つ目は AWS マネジメントコンソールの API Gateway の画面からテスト送信してログを表示させる方法です。
今回作成された API Gateway の「リソース」の画面に遷移してから、登録した endpoint の POST メソッドの画面の「テスト」リンクをクリックします。
メソッドテストの画面が表示されるので「リクエスト本文」に送信する JSON を記述して「テスト」ボタンをクリックします。
そうすると画面の「ログ」にエラーの内容が表示されます。ただしこの方法だと最初のエラーしか表示されないようです。
最後に npx sls remove -v
を実行して構築した環境を削除します。
履歴
2020/06/28
初版発行。
2020/07/05
概要に "※(2020/07/05追記)..." の記述を追加しました。
serverless-domain-manager プラグインを利用して独自ドメインで API Gateway にアクセスする
概要
記事一覧はこちらです。
API Gateway にアクセスする時には deploy 時に表示される ServiceEndpoint(https://~.execute-api.ap-northeast-1.amazonaws.com/dev/... の URL)を使用していましたが、独自ドメインでアクセスする方法を試してみます。
Lambda を deploy する時に独自ドメインを定義できる serverless-domain-manager プラグインというものがあるので、今回はこれを利用します。
構成は以下のようになるようです(ELB と API Gateway のどちらが先にアクセスされるのかが分からない。。。)。
- SSL証明書は東京リージョン(ap-northeast-1)の ACM で発行します。
- 独自ドメインは rest.ksbyzero.com にします。Route 53 に登録します。
- Lambda は Serverless Framework でプロジェクトを作成した時にできる hello をそのまま使い、https://rest.ksbyzero.com/hello でアクセスできるようにします。独自ドメインを使用する方式にすると stage 名を URL からなくせます。
- 独自ドメインでアクセスできるようにする場合、ELB が作成されて ACM で発行した SSL証明書が ELB に関連付けられます。ACM で作成した SSL証明書を削除しようとすると ELB で使用されているので削除できないというメッセージが表示されて気づきました(API Gateway を削除してから 20~30分程度経過しないと削除できませんでした)。
参照したサイト・書籍
REST API のカスタムドメイン名の設定
https://docs.aws.amazon.com/ja_jp/apigateway/latest/developerguide/how-to-custom-domains.htmlHow to set up a custom domain name for API Gateway in your Serverless app
https://seed.run/blog/how-to-set-up-a-custom-domain-name-for-api-gateway-in-your-serverless-app.htmlamplify-education / serverless-domain-manager
https://www.npmjs.com/package/serverless-domain-manager
https://github.com/amplify-education/serverless-domain-managerアプリケーションロードバランサー(ALB)のターゲットにAWS Lambdaが選択可能になりました
https://aws.amazon.com/jp/blogs/news/lambda-functions-as-targets-for-application-load-balancers/
目次
- custom-domain-api-project プロジェクトを作成する
- serverless-domain-manager プラグインをインストールする
- hello_service サブプロジェクト作成し serverless.yml を変更する
- Terraform を使用して ACM で SSL証明書を作成する
sls create_domain
を実行して独自ドメインを利用できるようにする- deploy する
- 動作確認する
- ACM の SSL証明書は関連リソースがなくならないと削除できない
手順
custom-domain-api-project プロジェクトを作成する
以下の手順で custom-domain-api-project プロジェクトを作成します。具体的な手順は IntelliJ IDEA+Node.js+npm+serverless framework+Python の組み合わせで開発環境を構築して AWS Lambda を作成してみる 参照。
- custom-domain-api-project の Empty Project を作成する。
- Python の仮想環境を作成する。
- Serverless Framework をローカルインストールする。
- .envrc を作成する。
serverless-domain-manager プラグインをインストールする
npm install --save-dev serverless-domain-manager
を実行してプラグインをインストールします。
hello_service サブプロジェクト作成し serverless.yml を変更する
プロジェクトのルートディレクトリの下で npx sls create --template aws-python3 --path hello_service
を実行して hello_service サブプロジェクトを作成します。
hello_service/severless.yml を以下の内容に変更します。
service: hello-service plugins: - serverless-domain-manager custom: customDomain: domainName: rest.ksbyzero.com # stage を書かなければ provider.stage の設定が使用される # stage: dev # basePath を書くと https://<domainName>/<basePath>/hello がアクセスする URL になる # basePath: base certificateName: ksbyzero.com createRoute53Record: true # endpointType に edge を指定すると CloudFront ディストリビューションを設定する # 今回は東京リージョンで設定するので regional を指定する endpointType: regional securityPolicy: tls_1_2 apiType: rest provider: name: aws runtime: python3.8 stage: dev region: ap-northeast-1 functions: hello: handler: handler.hello events: - http: path: hello method: get cors: true
- plugins を追加して serverless-domain-manager を記述します。
- custom.customDomain を追加し、serverless-domain-manager の設定を記述します。
- provider.stage、provider.region を追加します。
- functions.hello.events.http を追加し、API Gateway にリクエストが来たら Lambda が実行されるよう設定します。
Terraform を使用して ACM で SSL証明書を作成する
Terraform は tfenv+aws-vault+direnv を組み合わせて Windows 上に Terraform の実行環境を構築する で構築した環境を使用します。バージョンは 0.12.26 を使用します。
プロジェクトのルートディレクトリ直下に .terraform-version というファイルを作成し、以下の内容を記述します。
0.12.26
プロジェクトのルートディレクトリ直下に terraform というディレクトリを作成し、その下に acm.tf というファイルを作成して以下の内容を記述します。
terraform { required_version = "0.12.26" } provider "aws" { region = "ap-northeast-1" } /////////////////////////////////////////////////////////////////////////////// // Route53 の Public Zone // ※Route53 でドメインを取得したので作成済、resource ではなく data で定義する // data "aws_route53_zone" "dns_zone_apex" { name = "ksbyzero.com" } /////////////////////////////////////////////////////////////////////////////// // ACM で SSL証明書を作成する // resource "aws_acm_certificate" "dns_zone_apex" { domain_name = data.aws_route53_zone.dns_zone_apex.name subject_alternative_names = ["*.${data.aws_route53_zone.dns_zone_apex.name}"] validation_method = "DNS" tags = { Name = data.aws_route53_zone.dns_zone_apex.name } lifecycle { create_before_destroy = true } } resource "aws_route53_record" "cert_validation_0" { name = aws_acm_certificate.dns_zone_apex.domain_validation_options.0.resource_record_name type = aws_acm_certificate.dns_zone_apex.domain_validation_options.0.resource_record_type records = [aws_acm_certificate.dns_zone_apex.domain_validation_options.0.resource_record_value] zone_id = data.aws_route53_zone.dns_zone_apex.id ttl = 60 } resource "aws_acm_certificate_validation" "cert" { certificate_arn = aws_acm_certificate.dns_zone_apex.arn validation_record_fqdns = [ aws_route53_record.cert_validation_0.fqdn, ] }
以上で設定は完了です。ACM で SSL証明書を作成します。
tf init
を実行してから、
tf plan
、tf apply
を実行します(画面キャプチャは tf apply
のみ)。
マネジメントコンソールで ACM を見ると ksbyzero.com の証明書が追加されており、「状況」が「発行済み」になっています。
Route 53 には DNS検証用の CNAME のレコードが1件追加されています。
sls create_domain
を実行して独自ドメインを利用できるようにする
deploy の前に sls create_domain
を実行して serverless.yml の custom.customDomain.domainName に定義した rest.ksbyzero.com を設定します。
New domains may take up to 40 minutes to be initialized.
というメッセージが表示されており、使用可能になるまで最大 40分かかるようです。ただし、今回試していた時は結構すぐに使えるようになりました。
マネジメントコンソールから Route 53 を見ると rest.ksbyzero.com の A、AAAA レコードの2件が追加されています。
deploy する
hello_service を deploy します。
動作確認する
Postman で GET メソッドにしてから https://rest.ksbyzero.com/hello にアクセスすると 200 OK が返ってきます。
sls logs -f hello
コマンドでログを確認すると1回しか実行していないのに7件ログが出力されていました。ELB のヘルスチェックでしょうか?
以下のコマンドを実行して作成したリソースを削除します。
npx sls remove -v
npx sls delete_domain
- terraform ディレクトリに移動してから
tf destroy
ACM の SSL証明書は関連リソースがなくならないと削除できない
tf destroy
コマンドで ACM の SSL証明書を削除しようとしてもすぐには削除できません。API Gateway を削除してもしばらくの間 ELB がSSL証明書に関連付けられているためです。20~30分程度経つと関連リソースはなくなります。
履歴
2020/06/24
初版発行。
API Gateway で受信したメッセージを SNS 経由で Slack へ通知する
概要
記事一覧はこちらです。
API Gateway で受信したメッセージを Lambda で SNS へ転送し、SNS から Lambda で Slack へメッセージを送信してみます。
API Gateway → SNS 連携は AWS Service Proxy という機能を使えば直接送信できるそうですが、今回は Lambda 経由で送信します。
以下の仕様で実装します。
- API Gateway にアクセスする時の URL は
https://<deploy 時に表示されるドメイン名>/dev/send-msg
。 - request body には message の項目だけが記述された JSON をセットします。
- message にセットされた文字列を Slack の
#general
に送信します。
参照したサイト・書籍
SNS
https://www.serverless.com/framework/docs/providers/aws/events/sns/AWS - Resources
https://www.serverless.com/framework/docs/providers/aws/guide/resources/- 以下の内容を参考にしました。
- resources に定義する AWS リソースに DependsOn が定義できる。
- serverless.xml 内で定義した lambda function を他の AWS リソースから参照する時に、
- 末尾に
LambdaFunction
を追加した文字列で参照できる。 - Override AWS CloudFormation Resource に function 名は normalizedFunctionName にする(先頭は大文字に変える、
-
と_
は文字列に変える)。 - 例えば notifySlack という function ならば、先頭を大文字に変えて末尾に
LambdaFunction
を追加した NotifySlackLambdaFunction というリソース名になる。
- 末尾に
- 以下の内容を参考にしました。
AWS CloudFormation でプッシュベースのイベントソースに AWS Lambda 関数をサブスクライブするにはどうすればよいですか?
https://aws.amazon.com/jp/premiumsupport/knowledge-center/lambda-subscribe-push-cloudformation/AWS: Publish SNS message for Lambda function via boto3 (Python2)
https://stackoverflow.com/questions/34029251/aws-publish-sns-message-for-lambda-function-via-boto3-python2Requests: HTTP for Humans
https://requests.readthedocs.io/en/master/- 今回から urllib3 ではなく requests を使ってみます。
- urllib3 は boto3 の依存関係にあるので追加インストールの必要がありませんが、requests は個別にインストールする必要があります。
Amazon SNS (from AWS) - The Ultimate Guide
https://www.serverless.com/amazon-sns/Sending messages using Incoming Webhooks
https://api.slack.com/messaging/webhooksウェブフックを使用して Amazon SNS メッセージを Amazon Chime、Slack や Microsoft Teams に発行する方法を教えてください。
https://aws.amazon.com/jp/premiumsupport/knowledge-center/sns-lambda-webhooks-chime-slack-teams/Requests: HTTP for Humans
https://requests.readthedocs.io/en/master/Variables
https://www.serverless.com/framework/docs/providers/aws/guide/variables/Encrypting messages published to Amazon SNS with AWS KMS
https://aws.amazon.com/jp/blogs/compute/encrypting-messages-published-to-amazon-sns-with-aws-kms/moto(boto3のmockモジュール)の使い方:SQS/SNS編
https://qiita.com/ck_fm0211/items/08a7bc5a0c98de112cb7django setting environment variables in unittest tests
https://stackoverflow.com/questions/31195183/django-setting-environment-variables-in-unittest-testsAn Introduction to Mocking in Python
https://www.toptal.com/python/an-introduction-to-mocking-in-pythonnotify-slack
https://registry.terraform.io/modules/terraform-aws-modules/notify-slack/aws/3.3.0- 書き終わるころに見つけました。メッセージを Slack に送信する手段が欲しいだけならば Terraform の module にしておいて簡単に作成できるようにしておくのもありですね。
目次
- apigw-sns-slack-project プロジェクトを作成する
- Serverless Framework で message_service サブプロジェクトを作成し serverless.yml を変更する
- requirements.txt を作成する
- API Gateway から呼び出す Lambda を実装する
- Slack App を作成し Webhook URL を生成する
- SNS から呼び出す Lambda を実装する
- ユニットテストを作成する
- deploy する
- 動作確認する
手順
apigw-sns-slack-project プロジェクトを作成する
以下の手順で apigw-sns-slack-project プロジェクトを作成します。具体的な手順は IntelliJ IDEA+Node.js+npm+serverless framework+Python の組み合わせで開発環境を構築して AWS Lambda を作成してみる 参照。
- apigw-sns-slack-project の Empty Project を作成する。
- Python の仮想環境を作成する。
- Serverless Framework をローカルインストールする。
- .envrc を作成する。
npm install --save-dev serverless-python-requirements
- IDEA で Terminal を起動して boto3、requests、moto をインストールする。
pip install boto3
pip install requests
pip install moto
Serverless Framework で message_service サブプロジェクトを作成し serverless.yml を変更する
プロジェクトのルートディレクトリの下で npx sls create --template aws-python3 --path message_service
を実行して message_service サブプロジェクトを作成します。
生成された serverless.yml を以下の内容に変更します。
service: message-service plugins: - serverless-python-requirements custom: pythonRequirements: dockerizePip: true provider: name: aws runtime: python3.8 stage: dev region: ap-northeast-1 iamRoleStatements: - Effect: "Allow" Action: - "kms:GenerateDataKey" - "kms:Decrypt" - "sns:Publish" - "sns:Subscribe" Resource: - !Ref NotifySlackTopic functions: sendMsg: handler: apigw_handler.send_msg environment: TOPIC_ARN: !Ref NotifySlackTopic events: - http: path: send-msg method: post cors: true notifySlack: handler: sns_handler.notify_slack environment: SLACK_WEBHOOK_URL: ${env:SLACK_WEBHOOK_URL} events: # - sns: notify-slack-topic という書き方ではなく resources で定義した Topic に関連付けている # ただしこの書き方だと Subscription を作成してくれないし、Topic にメッセージが publish されても Lambda 関数が実行されない # ので resources で Subscription を作成して必要な権限を付与している - sns: arn: !Ref NotifySlackTopic topicName: notify-slack-topic resources: Resources: NotifySlackTopic: Type: AWS::SNS::Topic Properties: TopicName: notify-slack-topic # 以下のリソースは notifySlack 関数が作成された後に生成する # 既に作成されている Topic だと自動で Subscription を作成してくれないので、以下の定義で作成する NotifySlackSubscription: Type: AWS::SNS::Subscription DependsOn: # AWS - Resources # https://www.serverless.com/framework/docs/providers/aws/guide/resources/ # notifySlack function ならば、先頭を大文字に変えて末尾に LambdaFunction を付けた # NotifySlackLambdaFunction という名称で参照できる - NotifySlackLambdaFunction Properties: TopicArn: !Ref NotifySlackTopic Endpoint: Fn::GetAtt: - NotifySlackLambdaFunction - Arn Protocol: lambda # Subscription を作成するだけでは Lambda を実行できないので、以下の定義で実行できるようにする NotifySlackLambdaResourcePolicy: Type: AWS::Lambda::Permission DependsOn: - NotifySlackLambdaFunction Properties: FunctionName: !Ref NotifySlackLambdaFunction Principal: sns.amazonaws.com Action: "lambda:InvokeFunction" SourceArn: !Ref NotifySlackTopic
- Serverless Framework では SNS に関連付けたい場合 functions の events に
- sns: notify-slack-topic
と記述すればよいのですが、Topic へメッセージを publish する Lambda 関数も同じ serverless.xml に定義したい場合どうやって参照したらよいのかが分からなかったので、resources で NotifySlackSubscription を定義して、sendMsg と notifySlack の Lambda 関数から!Ref NotifySlackTopic
で参照できるようにしました。 - sendMsg ではメッセージを publish する時に NotifySlackTopic の arn が必要になるので、environment に
TOPIC_ARN: !Ref NotifySlackTopic
を定義し、環境変数で arn を渡しています。 - 通常 resources に定義された AWS リソースが全て作成されてから Lambda 関数が作成されますが、今回は NotifySlackTopic → sendMsg と notifySlack の Lambda 関数 → NotifySlackSubscription と NotifySlackLambdaResourcePolicy の順に作成する必要があるため、NotifySlackSubscription と NotifySlackLambdaResourcePolicy に DependsOn を定義して Lambda 関数が作成された後に作成されるようにしています。
- IAM Role は Lamda 関数毎に分けた方が良さそうですが、簡略化したかったので
sns:Publish
とsns:Subscribe
を1つの Role に入れました。
requirements.txt を作成する
message_service サブプロジェクトの下に requirements.txt を作成し、以下の内容を記述します。
requests==2.24.0
API Gateway から呼び出す Lambda を実装する
message_service サブプロジェクトを作成した時に作られている handler.py のファイル名を agigw_handler.py に変更した後、以下の内容を記述します。
import json import logging import os import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) def send_msg(event, context): # body に { "message": "..." } のフォーマットで Slack へ送信したいメッセージを格納する logger.info(event) body = json.loads(event['body']) logger.info(f"message={body['message']}") sns_client = boto3.client('sns') response = sns_client.publish( TopicArn=os.environ['TOPIC_ARN'], Message=body['message'] ) response_body = { "message": f"published message to SNS (MessageId={response['MessageId']})", "input": event } response = { "statusCode": 200, "body": json.dumps(response_body) } return response
Slack App を作成し Webhook URL を生成する
https://api.slack.com/ にアクセスした後、画面右上の「Your Apps」をクリックします。
以下の画像の画面が表示されるので「Create an App」ボタンをクリックします。
「Create a Slack App」ダイアログが表示されます。「App Name」に「SnsToSlack」を入力し、「Development Slack Workspace」を選択した後「Create App」ボタンをクリックします。
「SnsToSlack」App が作成されて「Basic Information」画面が表示されます。「Incoming Webhooks」をクリックします。
「Incoming Webhooks」画面が表示されます。右上のトグルを On に変えます。
画面の下に「Webhook URLs for Your Workspace」が表示されます。「Add New Webhook to Workspace」ボタンをクリックします。
以下の画面が表示されます。「#general」を選択してから「許可する」ボタンをクリックします。
「Webhook URLs for Your Workspace」に発行された Webhook URL が表示されるのでコピーします。
SNS から呼び出す Lambda を実装する
Webhook URL は直接 Lambda 内には記述せず環境変数 SLACK_WEBHOOK_URL で渡すことにします。
.envrc に export SLACK_WEBHOOK_URL=...
を追加します(AWS_PROFILE、SLACK_WEBHOOK_URL の ..... の部分には実際の値を記述します)。
export AWS_PROFILE=..... # Windows 上の Python で UTF-8 をデフォルトにする # https://qiita.com/methane/items/9a19ddf615089b071e71 export PYTHONUTF8=1 export SLACK_WEBHOOK_URL=https://hooks.slack.com/services/.....
serverless.yml の notifySlack 関数に環境変数 SLACK_WEBHOOK_URL の値を渡すよう記述を追加します。
notifySlack: handler: sns_handler.notify_slack environment: SLACK_WEBHOOK_URL: ${env:SLACK_WEBHOOK_URL} events: # - sns: notify-slack-topic という書き方ではなく resources で定義した Topic に関連付けている # ただしこの書き方だと Subscription を作成してくれないし、Topic にメッセージが publish されても Lambda 関数が実行されない # ので resources で Subscription を作成して必要な権限を付与している - sns: arn: !Ref NotifySlackTopic topicName: notify-slack-topic
message_service サブプロジェクトの下に sns_handler.py を新規作成し、以下の内容を記述します。
import json import logging import os import requests logger = logging.getLogger() logger.setLevel(logging.INFO) def notify_slack(event, context): msg = { 'text': event['Records'][0]['Sns']['Message'] } encoded_msg = json.dumps(msg).encode('utf-8') res = requests.post(os.environ['SLACK_WEBHOOK_URL'], data=encoded_msg) logger.info(res.status_code) logger.info(res.headers) logger.info(res.text)
ユニットテストを作成する
プロジェクトのルートディレクトリ直下に tests ディレクトリを作成し、中身が空の __init__.py
ファイルを作成します。
まずは sendMsg 関数のテストから。tests ディレクトリの下に test_apigw_handler.py を作成し、以下の内容を記述します。
import json import unittest from unittest.mock import patch import boto3 from moto import mock_sns from message_service import apigw_handler TOPIC_NAME = 'notify-slack-topic' @mock_sns class TestApigwHandler(unittest.TestCase): def setUp(self): sns_client = boto3.client('sns') response = sns_client.create_topic( Name=TOPIC_NAME ) self._topic_arn = response['TopicArn'] self.env = patch.dict('os.environ', { 'TOPIC_ARN': response['TopicArn'], }) def tearDown(self): sns_client = boto3.client('sns') sns_client.delete_topic( TopicArn=self._topic_arn ) def test_send_msg(self): with self.env: with open('tests/apigw_event.json', encoding='utf-8', mode='r') as f: apigw_event = json.load(f) response = apigw_handler.send_msg(apigw_event, None) self.assertEqual(response['statusCode'], 200)
apigw_event.json も作成し、以下の内容を記述します。
{ "body": "eyJ0ZXN0IjoiYm9keSJ9", "resource": "/{proxy+}", "path": "/path/to/resource", "httpMethod": "POST", "isBase64Encoded": true, "queryStringParameters": { "foo": "bar" }, "multiValueQueryStringParameters": { "foo": [ "bar" ] }, "pathParameters": { "proxy": "/path/to/resource" }, "stageVariables": { "baz": "qux" }, "headers": { "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Encoding": "gzip, deflate, sdch", "Accept-Language": "en-US,en;q=0.8", "Cache-Control": "max-age=0", "CloudFront-Forwarded-Proto": "https", "CloudFront-Is-Desktop-Viewer": "true", "CloudFront-Is-Mobile-Viewer": "false", "CloudFront-Is-SmartTV-Viewer": "false", "CloudFront-Is-Tablet-Viewer": "false", "CloudFront-Viewer-Country": "US", "Host": "1234567890.execute-api.ap-northeast-1.amazonaws.com", "Upgrade-Insecure-Requests": "1", "User-Agent": "Custom User Agent String", "Via": "1.1 08f323deadbeefa7af34d5feb414ce27.cloudfront.net (CloudFront)", "X-Amz-Cf-Id": "cDehVQoZnx43VYQb9j2-nvCh-9z396Uhbp027Y2JvkCPNLmGJHqlaA==", "X-Forwarded-For": "127.0.0.1, 127.0.0.2", "X-Forwarded-Port": "443", "X-Forwarded-Proto": "https" }, "multiValueHeaders": { "Accept": [ "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8" ], "Accept-Encoding": [ "gzip, deflate, sdch" ], "Accept-Language": [ "en-US,en;q=0.8" ], "Cache-Control": [ "max-age=0" ], "CloudFront-Forwarded-Proto": [ "https" ], "CloudFront-Is-Desktop-Viewer": [ "true" ], "CloudFront-Is-Mobile-Viewer": [ "false" ], "CloudFront-Is-SmartTV-Viewer": [ "false" ], "CloudFront-Is-Tablet-Viewer": [ "false" ], "CloudFront-Viewer-Country": [ "US" ], "Host": [ "0123456789.execute-api.ap-northeast-1.amazonaws.com" ], "Upgrade-Insecure-Requests": [ "1" ], "User-Agent": [ "Custom User Agent String" ], "Via": [ "1.1 08f323deadbeefa7af34d5feb414ce27.cloudfront.net (CloudFront)" ], "X-Amz-Cf-Id": [ "cDehVQoZnx43VYQb9j2-nvCh-9z396Uhbp027Y2JvkCPNLmGJHqlaA==" ], "X-Forwarded-For": [ "127.0.0.1, 127.0.0.2" ], "X-Forwarded-Port": [ "443" ], "X-Forwarded-Proto": [ "https" ] }, "requestContext": { "accountId": "123456789012", "resourceId": "123456", "stage": "prod", "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef", "requestTime": "09/Apr/2015:12:34:56 +0000", "requestTimeEpoch": 1428582896000, "identity": { "cognitoIdentityPoolId": null, "accountId": null, "cognitoIdentityId": null, "caller": null, "accessKey": null, "sourceIp": "127.0.0.1", "cognitoAuthenticationType": null, "cognitoAuthenticationProvider": null, "userArn": null, "userAgent": "Custom User Agent String", "user": null }, "path": "/prod/path/to/resource", "resourcePath": "/{proxy+}", "httpMethod": "POST", "apiId": "1234567890", "protocol": "HTTP/1.1" }, "body": "{\r\n \"message\": \"これはテストです \"\r\n}", "isBase64Encoded": "False" }
IDEA からテストを実行して成功することを確認します。
次は notifySlack 関数のテストです。tests ディレクトリの下に test_sns_handler.py を作成し、以下の内容を記述します。
import json import os import unittest from unittest.mock import patch from message_service import sns_handler class TestApigwHandler(unittest.TestCase): def setUp(self): self.env = patch.dict('os.environ', { 'SLACK_WEBHOOK_URL': 'https:/localhost/service/test', }) def tearDown(self): None @patch('requests.post') def test_notify_slack(self, mock_requests): with self.env: with open('tests/sns_event.json', encoding='utf-8', mode='r') as f: sns_event = json.load(f) sns_handler.notify_slack(sns_event, None) msg = { 'text': 'これはテストです' } encoded_msg = json.dumps(msg).encode('utf-8') mock_requests.assert_called_with(os.environ['SLACK_WEBHOOK_URL'], data=encoded_msg)
sns_event.json も作成し、以下の内容を記述します。
{ "Records": [ { "EventSource": "aws:sns", "EventVersion": "1.0", "EventSubscriptionArn": "arn:aws:sns:ap-northeast-1:446693287859:notify-slack-topic:2e6206ff-f201-400c-86f8-5ad2a168d1d0", "Sns": { "Type": "Notification", "MessageId": "accf8eb7-8119-54a8-a2e4-d9db0cb190fb", "TopicArn": "arn:aws:sns:ap-northeast-1:446693287859:notify-slack-topic", "Subject": "None", "Message": "これはテストです", "Timestamp": "2020-06-23T23:08:24.463Z", "SignatureVersion": "1", "Signature": "INeRjQ+zIe3qolznKBSlM5p9c1IguSBa9CBkd3AXwmwJm3SEpqKD+g3+Xmg/KT5v2wg8MVpOMpu1UO6zYdQ4lSnU90DP6Q1e6Bngr9uvy5ypgpE7Hy5s5L24vUT5bdqQCpY8Ig+Pt4Fx1PMFaRw9WnASl+26JRlGR53hMPux7GZlugQIYrlAhPJ/ZUlD0gqP5+hTg86FLdQ4GYblruvV1TqI7nEzd7ou88lviXj/RC4YYUUK0fonaR5U9tLTJmHwyr/cV3oII2kw6FzsxASpEHi3GhalKRuy1bW4Lx+Y4a0ITavK+KvvCs5iK9jZ+W07E8HyWd/vjqtRjMjwa83kYQ==", "SigningCertUrl": "https://sns.ap-northeast-1.amazonaws.com/SimpleNotificationService-a86cb10b4e1f29c941702d737128f7b6.pem", "UnsubscribeUrl": "https://sns.ap-northeast-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:ap-northeast-1:446693287859:notify-slack-topic:2e6206ff-f201-400c-86f8-5ad2a168d1d0", "MessageAttributes": {} } } ] }
IDEA からテストを実行して成功することを確認します。
最後にコマンドラインの venv 環境下で python -m unittest -v
を実行してテストが成功することを確認します。
deploy する
message_service を deploy します。
動作確認する
Postman を使用して API Gateway にメッセージを送信します。赤枠の部分が設定した箇所です。
Slack の #general にメッセージが届きました。
npx sls logs -f sendMsg
、npx sls logs -f notifySlack --startTime 1h
(ログが出てこなかったため過去1時間を表示するようオプションを追加しています) を実行してログを確認しても特に問題はなさそうです。
npx sls remove -v
を実行して削除します。
履歴
2020/06/24
初版発行。
Spring Boot + npm + Geb で入力フォームを作ってテストする ( その92 )( http-proxy-middleware の createProxyMiddleware 関数の引数 context には Proxy させない URI を後に書く )
概要
記事一覧はこちらです。
- 今回の手順で確認できるのは以下の内容です。
Spring Boot + npm + Geb で入力フォームを作ってテストする ( その85 )( Node.js を 10.15.3 → 12.16.3 へ、npm を 6.9.0 → 6.14.5 へバージョンアップする ) で http-proxy-middleware を 0.19.1 → 1.0.3 にバージョンアップして createProxyMiddleware 関数を使うように変更したのですが、createProxyMiddleware 関数の引数 context に記述する URI の順番が今のままではダメなことに今更ながら気づきました。
Proxy させる URI を先に書いて Proxy させない URI をその後に書かないと、Proxy させないはずの URI も Proxy されていました。
期待した動作になるよう修正します。
参照したサイト・書籍
目次
- 今の記述だと Proxy させない URI も Proxy されてしまう
- createProxyMiddleware 関数の引数 context に Proxy させない URI を後に記述するよう修正する
手順
今の記述だと Proxy させない URI も Proxy されてしまう
bs-springboot-config.js の createProxyMiddleware 関数の記述は現在以下のように Proxy させない URI を先に、Proxy させる URI を後に記述していますが、
const {createProxyMiddleware} = require("http-proxy-middleware"); const proxy = createProxyMiddleware( [ // /css, /js, /vendor と *.html は Tomcat に転送しない "!/css/**/*", "!/js/**/*", "!/vendor/**/*", "!/**/*.html", "/**/*" ], {target: "http://localhost:8080"} ); ..........
npm run browser-sync:springboot
コマンドを実行して Browsersync だけ起動した後(Tomcat は起動しません)、
http://localhost:9080/css/common.css にアクセスすると "!/css/**/*"
の設定により Proxy されず /css/common.css が取得できると思っていましたが、Error occured while trying to proxy to: localhost:9080/css/common.css
のエラーメッセージが表示されます。
Browsersync から Tomcat に Proxy しようとして Tomcat にアクセスできないのでエラーになっているようです。
createProxyMiddleware 関数の引数 context に Proxy させない URI を後に記述するよう修正する
この問題は "/**/*"
の記述を一番最初にすることで解決できました。
bs-springboot-config.js を以下のように修正し、
const {createProxyMiddleware} = require("http-proxy-middleware"); const proxy = createProxyMiddleware( [ // /css, /js, /vendor と *.html は Tomcat に転送しない "/**/*", "!/css/**/*", "!/js/**/*", "!/vendor/**/*", "!/**/*.html" ], {target: "http://localhost:8080"} );
Browsersync を再起動してから http://localhost:9080/css/common.css にアクセスすると、今度は /css/common.css の内容が返ってきました。
Tomcat を起動して http://localhost:8080/inquiry/input/01 にアクセスすると入力画面1も表示されたので、問題ないようです。
履歴
2020/06/21
初版発行。