はじめに
REVISIOエンジニアの岩田です。本記事では、REVISIOのデータ基盤において、Amazon EventBridgeの重複実行によって発生していたAWS Batchジョブの重複起動問題を、Snowflakeタスクに移行することで解決した事例を紹介します。EventBridgeによるジョブ起動の不安定さを解消し、データ整合性、コスト効率、処理速度、運用管理の面で改善を実現しました。
課題
REVISIOのデータパイプラインは、Amazon EventBridgeを利用してAWS Batchジョブを起動し、Snowflakeに格納されたデータを変換する処理を行っていました。EventBridgeはAWSサービスの連携を容易にする便利なサービスですが、少なくとも1回以上のイベント配信を保証するという仕様上、同じイベントを複数回配信する可能性があります。これは、ネットワークの不安定さやターゲット側の処理遅延など、様々な要因を考慮して、EventBridgeがイベントを確実にターゲットに届けるために設計されているためと思われます。
今回、この仕様により、同じAWS Batchジョブが複数回起動され、Snowflake上のデータが重複して処理される事態が発生しました。幸い、大きな問題は発生しませんでしたが、データの整合性損失、AWS Batchの複数回実行によるコスト増加、処理時間の延長といった課題が浮き彫りになりました。
解決策
この問題を解決するため、REVISIOではEventBridgeからSnowflakeタスクへの移行を検討しました。Snowflakeタスクとは、Snowflake内でSQLステートメントやファンクションをスケジュール実行できる機能です。タスクの実行状態はSnowflake内で管理されるため、EventBridgeを利用する場合と異なり、重複実行のリスクを回避できます。
AWS & Snowflake リソース構築
以下、Snowflakeの公式ドキュメント(外部ネットワークアクセスの例 | Snowflake Documentation )に記載されているステップを参考に、以下の手順でリソースを構築しました。
1. IAM ユーザーの作成
- ユーザー名:
snowflake_aws_scheduler
- ポリシー:
submit_aws_batch
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "batch:SubmitJob", "batch:DescribeJobDefinitions" ], "Resource": [ "arn:aws:batch:ap-northeast-1:{ACCOUNTID}:job-queue/{BATCH_QUEUE_NAME}", "arn:aws:batch:ap-northeast-1:{ACCOUNTID}:job-definition/{BATCH_DEFINITION_NAME}" ] } ] }
- アクセスキーとシークレットアクセスキーを控えておく
2. Snowflake 側の設定
- ネットワークルール、シークレット、ネットワーク統合を作成
-- Lambda へのアクセスを許可するネットワークルール CREATE NETWORK RULE network_aws_scheduler MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = ('batch.ap-northeast-1.amazonaws.com'); -- Lambda アクセス用の認証情報 CREATE SECRET secret_aws_scheduler TYPE = PASSWORD USERNAME = '{YOUR_ACCESS_KEY_ID}' PASSWORD = '{YOUR_SECRET_ACCESS_KEY}'; -- 外部アクセス統合の作成 CREATE EXTERNAL ACCESS INTEGRATION eai_aws_scheduler ALLOWED_NETWORK_RULES = (network_aws_scheduler) ALLOWED_AUTHENTICATION_SECRETS = (secret_aws_scheduler) ENABLED = TRUE; -- AWS Batch ジョブを実行する関数の作成 CREATE FUNCTION submit_aws_batch_job(job_name string, job_queue string, job_definition string, container_overrides variant default {}) RETURNS STRING LANGUAGE PYTHON EXTERNAL_ACCESS_INTEGRATIONS = (eai_aws_scheduler) SECRETS = ('cred' = secret_aws_scheduler) RUNTIME_VERSION = 3.11 HANDLER = 'main' PACKAGES = ('boto3') AS $$ import boto3 import json import _snowflake from botocore.credentials import Credentials def main(job_name, job_queue, job_definition, container_overrides): # Snowflakeから認証情報を取得 username_password_object = _snowflake.get_username_password('cred') access_key = username_password_object.username secret_key = username_password_object.password session = boto3.Session( aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='ap-northeast-1' ) client = session.client('batch') client.submit_job( jobName=job_name, jobQueue=job_queue, jobDefinition=job_definition, containerOverrides=container_overrides ) return 'SUCCESS' $$; -- AWS Batch ジョブを実行するタスクの作成 CREATE TASK public.task_run_aws_batch WAREHOUSE={WAREHOUSE_NAME} SCHEDULE='USING CRON 0 0 * * * Asia/Tokyo' AS SELECT submit_aws_batch_job( '{BATCH_JOB_NAME}', '{BATCH_QUEUE_NAME}', '{BATCH_DEFINITION_NAME}', -- 引数を渡す場合はこちらに記述する {'environment': [{'name':'{PARAM_NAME}','value':'{PARAM_VALUE}'}]} ); -- タスクの有効化 ALTER TASK task_run_aws_batch RESUME;
補足
ストレージ統合のようなIAMロールを使用した認証は、Snowflakeサポートに確認したところ、対応していないとのことでした。そのため、ドキュメントに記載されているとおりにアクセスキーとシークレットアクセスキーを使用した認証方法を採用しています。
移行によるメリット
Snowflakeタスクへの移行により、以下のメリットが得られました。
- データ整合性の確保: 重複実行が解消されたため、データの整合性が確実に保たれるようになりました。
- コスト削減: 不要なAWS Batchジョブの実行がなくなったため、コストを削減できました。
- 処理時間の短縮: 重複処理がなくなったため、全体の処理時間が短縮されました。
移行によるデメリット
Snowflakeタスクへの移行によって、いくつかのデメリットも発生しました。
- 耐障害性の低下: AWSとSnowflakeの両方にまたがるシステムとなるため、全体としての耐障害性が低下する可能性があります。AWSからSnowflakeへデータを投入するような処理に適用するのが良いでしょう。
- パフォーマンスの予測が難しくなる: SnowflakeタスクとAWS Batchジョブの間のレイテンシやパフォーマンスの予測が難しくなる可能性があります。特にSnowflake タスクは起動に若干の遅延が発生しますので、スケジュールがシビアな処理には適用しないほうが良さそうです。
- セキュリティの考慮: Snowflakeの外部ネットワークアクセスには、適切な認証と認可が必要となります。設定が不適切だとセキュリティリスクが増加する可能性があります。いずれはストレージ統合と同じようにIAMロールを使用した認証への対応が行われると思われます。
- モニタリングとトラブルシューティングの複雑さ: SnowflakeタスクはSnowflake内で管理されるため、CloudWatchで一貫した監視ができなくなります。その結果、タスクのモニタリングやトラブルシューティングが複雑化する可能性があります。
これらのデメリットを考慮した上で、Snowflakeタスクの導入を検討する必要があります。
まとめ
EventBridgeからSnowflakeタスクへの移行によって、AWS Batchジョブの重複実行問題を解決し、データ基盤の安定稼働を実現できました。Snowflakeタスクは、Snowflakeを利用する上で非常に強力な機能であり、データパイプラインの構築においても大きなメリットをもたらします。
もし、EventBridgeの重複実行やそれに伴う問題でお困りの場合は、Snowflakeタスクへの移行を検討してみてはいかがでしょうか?