SnowflakeでDAGを活用するための3つのTips

 最近Snowflakeの記事を投稿していますDBManiaです。
 今回はSnowflakeで有向非巡回グラフ(Directed Acyclic Graph、以下DAGと表記)と、DAGを組む際にハマりそうなポイントについて説明いたします。

SnowflakeのDAGとは

 SnowflakeのDAGはTASKの集合体の形で表現されます。
 TASKは、特定の条件(手動あるいは時間)によって、単一の処理を実行できる機能です。
 ただし、ここで書く単一の処理というのは、単一の命令とイコールではありません。
 処理として単一であればいいため、BEGIN-ENDブロックで複数の命令を記述することが可能です。

 これらのTASKに対して、先行TASKを指定することで、複雑な処理フローを記述するのがDAGとなります。
 DAGに関しては最近機能拡張があり、SnowsightのデーターデータベースメニューからDAGのフロー図の確認や、実行ができるようになりました。
 そのため、今回はフロー図を使いながら説明いたします。

前準備

 今回、DAGの説明をするにあたって、わかりやすいように以下のTABLEとPROCEDUREを作成して、それをCALLする形で説明します。

CREATE TABLE WORK.TEST_LOG(
  VAL TEXT,[f:id:dbmania:20230502110311p:plain][f:id:dbmania:20230502110311p:plain]
  LOG_TIME TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);

CREATE PROCEDURE WORK.LOGGING(VAL VARCHAR )
RETURNS VARCHAR
LANGUAGE SQL
AS
$$
BEGIN
  INSERT INTO WORK.TEST_LOG(VAL) VALUES(:VAL);
END;
$$

 これで

CALL WORK.LOGGING('AAA');

を実行すれば、'AAA'と実行時間がWORK.TEST_LOGテーブルに格納されます。

SELECT * FROM WORK.TEST_LOG;


 では、TASK_ROOTが実行されるとTASK_AFTERが実行されるという、単純なDAGを組んでみましょう。
 前述したとおり、Snowflakeでは先行TASKを指定することでTASKとTASKを結びつけます。
 そのキーワードがAFTERです。

CREATE TASK WORK.TASK_ROOT
AS
  CALL WORK.LOGGING('ROOT');

CREATE TASK WORK.TASK_AFTER
AFTER WORK.TASK_ROOT
AS
  CALL WORK.LOGGING('AFTER');

 TASK_AFTERに記述されたAFTER WORK.TASK_ROOTによって、TASK_AFTERはTASK_ROOTが先行TASKとなる、つまりTASK_Aが実行されたらTASK_Bが実行されるというフローを記述したことになります。

 これをSnowsightで見ます。

 ご覧のようにTASK_ROOTが実行されたらTASK_AFTERが実行されるDAGが組まれていることがわかります。
 それでは本題に入りたいと思います。

Tips1:TASKはRESUMEしないと動作しないが、動作する場合もある

 先ほど作成したDAGを実行してみます。

EXECUTE TASK WORK.TASK_ROOT;


 何も問題なく実行されたように見えます。
 しかし、

SELECT * FROM WORK.TEST_LOG;

を実行してみると、

と、TASK_ROOTの部分しか記録されておらず、TASK_AFTERが記録されていません!
 なぜこのようなことになったかというと、TASK_AFTERが「停止済み(SUSPEND)」だからです。
ドキュメントにも記述されていますが、TASKは生成時はSUSPEND(一時停止)状態となり、「基本的には」実行されません

タスクを作成した後、タスク定義で指定されたパラメーターに基づいてタスクが実行される前に、 ALTER TASK ... RESUME を実行する 必要 があります。

 ここであえて「基本的には」と書いたのは、例外があるからです。
 それはDAGのROOTタスクです。(注:DAGを組まない場合のTASKも1つしかTASKのないDAGと考えられますので、同様です。)
 そのため、やっかいなことに、この状態でもEXECUTE TASKを使えば、DAGのルートTASKだけは実行できてしまいます
 この状態を解決するためには、

ALTER TASK WORK.TASK_AFTER RESUME;

を実行して、一時停止済みを解除する必要があります。
 この状態で

EXECUTE TASK WORK.TASK_ROOT;
SELECT * FROM WORK.TEST_LOG;

を行うと


とTASK_AFTERが実行されたことがわかります。

さらに混迷深まる

 スケジュール化されたルートTASKはRESUMEしないとスケジュール実行されず、逆にスケジュール化されていないルートTASKはRESUMEするとエラーになります。

さらに、スケジュール化されたルートTASKをRESUMEしてしまうと、ルートでないTASKはRESUMEできなくなります。

create TASK WORK.SCEDULE_DAG_ROOT
schedule='USING CRON 0 0 1 1 1 Asia/Tokyo'
as
  CALL WORK.LOGGING('SCHEDULE_ROOT');

create TASK WORK.SCEDULE_DAG_AFTER
AFTER WORK.SCEDULE_DAG_ROOT
as
  CALL WORK.LOGGING('SCHEDULE_AFTER');

-- 先にルートのDAGをRESUMEしてしまうと
ALTER TASK WORK.SCEDULE_DAG_ROOT RESUME;

-- 後続のDAGをRESUMEでエラーになる。
ALTER TASK WORK.SCEDULE_DAG_AFTER RESUME;
-- "ルートタスクが一時停止されていないため、ルートTASK DEV.WORK.SCEDULE_DAG_ROOT によるグラフの更新ができません。"

-- この場合、ルートTASKをSUSPENDして、後続のTASKをRESUMEする必要がある。
ALTER TASK WORK.SCEDULE_DAG_ROOT SUSPEND; -- ルートをサスペンド
ALTER TASK WORK.SCEDULE_DAG_AFTER RESUME; -- 後続を有効化
ALTER TASK WORK.SCEDULE_DAG_ROOT RESUME; -- ルートを有効化
RESUME問題のまとめ

 DAGを組みときのRESUMEについて、一目でわかる図にしてみました。
 以下は個人的な考察です。読み飛ばし推奨です。
 このような混乱した状況になった理由は、恐らくTASKを勝手に走らせないためだと考えられます。たとえば1分ごとに実行されるスケジュールDAGを組むことを考えてみます。ルートTASKを書いて、その後に後続DAGを作っている間に1分経ってしまったらどうなるでしょうか。中途半端な状態でDAGが実行されてしまいますよね。これを避けるためにスケジュールTASKは全てSUSPEND状態がデフォルトとなります。スケジュールでないDAGに関しても同様で、裏でバッチ処理などでTASKがキックされた場合、変更中にTASKが中途半端に実行されてしまう可能性があります。そのため、後続TASKをデフォルトSUSPENDにする意味はあります。問題はルートTASKがSUSPEND状態でも実行されてしまうこと。これは恐らくDAG実装前の互換性によるもので、単体TASKとして生成したものはすぐに実行可能であったのを引きずっているものと考えられます。

Tips2:DAGではルートTASKは1つしか設定できない。

 スケジュールTASKであろうとそうでなかろうと、DAGを組む際のルートTASKは1つしか設定できません
 ………と言いつつ、実は設定自体は可能ですし、特に警告も出ません。

CREATE TASK WORK.TASK_ROOT_A
AS
  CALL WORK.LOGGING('ROOT_A');

CREATE TASK WORK.TASK_ROOT_B
AS
  CALL WORK.LOGGING('ROOT_B');

CREATE TASK WORK.TASK_AFTER_AB
AFTER WORK.TASK_ROOT_A,WORK.TASK_ROOT_B
AS
  CALL WORK.LOGGING('AFTER_AB');

 このTASKをSnowsightで見てみると

という警告が出ており、この警告が出ているTASKは実行しようとすると 「複数のルートを持つTASKグラフは実行できません」と表示され、タスクグラフを実行ボタンそのものが押せなくなります。

 しかし、現実として、2つのTASKが終了したら1つのTASKを起動したい、というケースはあるでしょう。
 この場合、ダミーなルートTASKを作れば、実行可能なDAGが作成できます。
 具体的には

CREATE TASK WORK.TASK_BEFORE_ROOT
AS
  SELECT 1;

ALTER TASK WORK.TASK_ROOT_A ADD AFTER WORK.TASK_BEFORE_ROOT;
ALTER TASK WORK.TASK_ROOT_B ADD AFTER WORK.TASK_BEFORE_ROOT;

とすれば、DAGとして実行可能になります。
 ただし、前述のとおりルート以外のTASKはSUSPEND状態では実行されないというルールがあります。TASK_ROOT_A,TASK_ROOT_B,TASK_AFTER_ABをRESUMEしないとTASK_BEFORE_ROOTだけ実行して結果が返ってこないということになりますので、ご注意ください。

Tips3:まったく処理を行わないTASKを作成する裏技

 TASKは1つの処理しか実行できないだけでなく、必ず1つの処理を実行する必要があります
 処理のないTASKを作成しようとするとエラーとなります。

CREATE TASK WORK.TASK_TEST;


 前節で、ダミーのTASKとして"SELECT 1"を実行するTASKを作成しています。
 これは、まったく処理を行わないTASKに近しいですが、実際には"SELECT 1"という処理を行っていますし、この"SELECT 1"自体に意味がないため、よっぽど上手にコメントを入れておくなど手を打っておかないと、後で誰かがDAGを見たときに「なんだこのSELECT 1は?いらない処理だよね、消しちゃえ!」のような事故を引き起こしかねません。
 本当にまったく処理を行わないTASKは書けないものでしょうか?
 実はできます。
 そのコードを記述します。

CREATE OR REPLACE TASK TEST_NOTHING AS
            EXECUTE IMMEDIATE
$$
BEGIN
END
$$;

 これはBEGIN-ENDブロックを記述し、その中で何の処理も記述されていないTASKですが、ステートメントが記述されているので、TASKとして成立します。
 実際にこのようなダミーのTASKを記述する場合はBEGINとENDの間にコメントを入れておくとよいでしょう。

最後に

 DAGを実際に作ってみてハマったポイントを中心にTipsを作成してみました。
 これを見るとDAG面倒くさいと思うかも知れませんが、ハマりどころさえ知っていれば難しくありませんし、機能としては非常に優秀です。
 下手にバッチ処理を外装するくらいならDAGで実装したほうがスマートですし、PROCEDUREと組み合わせることで複雑な処理も実装できます。
 弊社でも便利に使い倒しまくっています。
 ぜひ一度使ってみてください。