EventBridgeからSnowflakeタスクへ移行: AWS Batchジョブの安定実行を実現

はじめに

REVISIOエンジニアの岩田です。本記事では、REVISIOのデータ基盤において、Amazon EventBridgeの重複実行によって発生していたAWS Batchジョブの重複起動問題を、Snowflakeタスクに移行することで解決した事例を紹介します。EventBridgeによるジョブ起動の不安定さを解消し、データ整合性、コスト効率、処理速度、運用管理の面で改善を実現しました。

課題

REVISIOのデータパイプラインは、Amazon EventBridgeを利用してAWS Batchジョブを起動し、Snowflakeに格納されたデータを変換する処理を行っていました。EventBridgeはAWSサービスの連携を容易にする便利なサービスですが、少なくとも1回以上のイベント配信を保証するという仕様上、同じイベントを複数回配信する可能性があります。これは、ネットワークの不安定さやターゲット側の処理遅延など、様々な要因を考慮して、EventBridgeがイベントを確実にターゲットに届けるために設計されているためと思われます。
今回、この仕様により、同じAWS Batchジョブが複数回起動され、Snowflake上のデータが重複して処理される事態が発生しました。幸い、大きな問題は発生しませんでしたが、データの整合性損失、AWS Batchの複数回実行によるコスト増加、処理時間の延長といった課題が浮き彫りになりました。

解決策

この問題を解決するため、REVISIOではEventBridgeからSnowflakeタスクへの移行を検討しました。Snowflakeタスクとは、Snowflake内でSQLステートメントやファンクションをスケジュール実行できる機能です。タスクの実行状態はSnowflake内で管理されるため、EventBridgeを利用する場合と異なり、重複実行のリスクを回避できます。

AWS & Snowflake リソース構築

以下、Snowflakeの公式ドキュメント(外部ネットワークアクセスの例 | Snowflake Documentation )に記載されているステップを参考に、以下の手順でリソースを構築しました。

1. IAM ユーザーの作成

  • ユーザー名: snowflake_aws_scheduler
  • ポリシー: submit_aws_batch
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "batch:SubmitJob",
                "batch:DescribeJobDefinitions"
            ],
            "Resource": [
                "arn:aws:batch:ap-northeast-1:{ACCOUNTID}:job-queue/{BATCH_QUEUE_NAME}",
                "arn:aws:batch:ap-northeast-1:{ACCOUNTID}:job-definition/{BATCH_DEFINITION_NAME}"
            ]
        }
    ]
}
  • アクセスキーとシークレットアクセスキーを控えておく

2. Snowflake 側の設定

  • ネットワークルール、シークレット、ネットワーク統合を作成
-- Lambda へのアクセスを許可するネットワークルール
CREATE NETWORK RULE network_aws_scheduler
  MODE = EGRESS
  TYPE = HOST_PORT
  VALUE_LIST = ('batch.ap-northeast-1.amazonaws.com');

-- Lambda アクセス用の認証情報
CREATE SECRET secret_aws_scheduler
  TYPE = PASSWORD
  USERNAME = '{YOUR_ACCESS_KEY_ID}'
  PASSWORD = '{YOUR_SECRET_ACCESS_KEY}';

-- 外部アクセス統合の作成
CREATE EXTERNAL ACCESS INTEGRATION eai_aws_scheduler
  ALLOWED_NETWORK_RULES = (network_aws_scheduler)
  ALLOWED_AUTHENTICATION_SECRETS = (secret_aws_scheduler)
  ENABLED = TRUE; 

-- AWS Batch ジョブを実行する関数の作成
CREATE FUNCTION submit_aws_batch_job(job_name string, job_queue string, job_definition string, container_overrides variant default {})
RETURNS STRING
LANGUAGE PYTHON
EXTERNAL_ACCESS_INTEGRATIONS = (eai_aws_scheduler)
SECRETS = ('cred' = secret_aws_scheduler)
RUNTIME_VERSION = 3.11
HANDLER = 'main'
PACKAGES = ('boto3')
AS $$
import boto3
import json
import _snowflake
from botocore.credentials import Credentials
def main(job_name, job_queue, job_definition, container_overrides):
    # Snowflakeから認証情報を取得
    username_password_object = _snowflake.get_username_password('cred')
    access_key = username_password_object.username
    secret_key = username_password_object.password
    session = boto3.Session(
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_key,
        region_name='ap-northeast-1'
    )
    client = session.client('batch')
    client.submit_job(
        jobName=job_name,
        jobQueue=job_queue,
        jobDefinition=job_definition,
        containerOverrides=container_overrides
    )
    return 'SUCCESS'
$$;

-- AWS Batch ジョブを実行するタスクの作成
CREATE TASK public.task_run_aws_batch
    WAREHOUSE={WAREHOUSE_NAME}
    SCHEDULE='USING CRON 0 0 * * * Asia/Tokyo'
AS
    SELECT submit_aws_batch_job(
        '{BATCH_JOB_NAME}',
        '{BATCH_QUEUE_NAME}',
        '{BATCH_DEFINITION_NAME}',
        -- 引数を渡す場合はこちらに記述する
        {'environment': [{'name':'{PARAM_NAME}','value':'{PARAM_VALUE}'}]}
    );

-- タスクの有効化
ALTER TASK task_run_aws_batch RESUME; 

補足

ストレージ統合のようなIAMロールを使用した認証は、Snowflakeサポートに確認したところ、対応していないとのことでした。そのため、ドキュメントに記載されているとおりにアクセスキーとシークレットアクセスキーを使用した認証方法を採用しています。

移行によるメリット

Snowflakeタスクへの移行により、以下のメリットが得られました。

  • データ整合性の確保: 重複実行が解消されたため、データの整合性が確実に保たれるようになりました。
  • コスト削減: 不要なAWS Batchジョブの実行がなくなったため、コストを削減できました。
  • 処理時間の短縮: 重複処理がなくなったため、全体の処理時間が短縮されました。

移行によるデメリット

Snowflakeタスクへの移行によって、いくつかのデメリットも発生しました。

  • 耐障害性の低下: AWSとSnowflakeの両方にまたがるシステムとなるため、全体としての耐障害性が低下する可能性があります。AWSからSnowflakeへデータを投入するような処理に適用するのが良いでしょう。
  • パフォーマンスの予測が難しくなる: SnowflakeタスクとAWS Batchジョブの間のレイテンシやパフォーマンスの予測が難しくなる可能性があります。特にSnowflake タスクは起動に若干の遅延が発生しますので、スケジュールがシビアな処理には適用しないほうが良さそうです。
  • セキュリティの考慮: Snowflakeの外部ネットワークアクセスには、適切な認証と認可が必要となります。設定が不適切だとセキュリティリスクが増加する可能性があります。いずれはストレージ統合と同じようにIAMロールを使用した認証への対応が行われると思われます。
  • モニタリングとトラブルシューティングの複雑さ: SnowflakeタスクはSnowflake内で管理されるため、CloudWatchで一貫した監視ができなくなります。その結果、タスクのモニタリングやトラブルシューティングが複雑化する可能性があります。

これらのデメリットを考慮した上で、Snowflakeタスクの導入を検討する必要があります。

まとめ

EventBridgeからSnowflakeタスクへの移行によって、AWS Batchジョブの重複実行問題を解決し、データ基盤の安定稼働を実現できました。Snowflakeタスクは、Snowflakeを利用する上で非常に強力な機能であり、データパイプラインの構築においても大きなメリットをもたらします。

もし、EventBridgeの重複実行やそれに伴う問題でお困りの場合は、Snowflakeタスクへの移行を検討してみてはいかがでしょうか?