レバレジーズ データAIブログ

インハウスデータ組織のあたまのなか

初めてのCloud Composerのコード作成で躓いたこと

はじめに

こんにちは。レバレジーズデータ戦略室 データアーキテクトグループの辰野です。
以前投稿した、【Google Cloud活用事例】データマート更新~BIツールデータ更新までを自動化!という記事の最後に「Cloud Composerへの移管を検討しています。」と記載していたのですが、昨年末にこのプロジェクトが動き出しました!
プロジェクト自体は、他部署のデータエンジニアリンググループが主体となって取り組んでいるのですが、今回、経験のためにデータアーキテクトグループの私もいくつかの移管作業を担当させてもらうことになりました。
(希望すれば他グループの業務を何でも担当できるわけではないですが、このように携わることができる環境に感謝しています…!)

というわけで今回は、私が初めて触ることとなったCloud Composerについて、コードを作成するときに躓いた点をご紹介できればと思います。
使い始めたばかりの方や、同じように躓いている方の参考になれば幸いです。
※Cloud Composerに関する説明や、初期設定、設計方針などは記載いたしません

Cloud Composerに移管することになった背景

以前の記事でも記載しているのですが、元々ワークフローはdigdagで管理しており、その中でもCloud Workflows / Cloud Scheduler / Cloud Functionsの組み合わせで実装されているものがありました。
digdagはYAMLでワークフローを定義するため、シンプルである反面あまり柔軟なワークフロー定義が難しく、また運用が長くなるにつれ使用するクラウドサービスも増えていき、属人的な管理が増えていきました。加えて、digdagをGCE上に実装してしまったため、タスクに合わせたリソース管理を行うことができず無駄なコンピューティングリソースがかかっている状況でした。

これらの課題を解決するために、新しいワークフローエンジンへの移管やインフラ環境の刷新が決まり、Google CloudのマネージドサービスであるCloud Composerを採用することとなりました。
Cloud ComposerではPythonでワークフローを定義することができるため、より柔軟なタスク実行が可能になります。また、GKE上に実装されているため、スケーラブルな運用が可能になります。
移管に合わせて、トリガーやスケジューラーも集約することにしました。

今回担当した内容

レバレジーズでは基本的にデータウェアハウスとしてBigQueryを利用していることから、テーブル更新はDataformで行っています。
このDataformのワークフローは、事業部によって実装時期が異なることやクエリの実行回数が異なることもあり、公式ドキュメントにもあるワークフロー構成*1 や、Cloud WorkflowsとCloud Schedulerを使う*2 など、実装方法が異なっていました。
今回、トリガーと実装の集約のため、こちらもCloud Composerへ移管することになりました。

*1:参考|Google Cloud「ワークフロー構成で実行をスケジュールする」2025/02/18
*2:参考Google Cloud「Workflows と Cloud Scheduler で実行をスケジュールする」2025/02/18

基本のワークフロー - すべてのファイルを実行する

Cloud ComposerでDataformのワークフローを実行する方法ですが、Google CloudやAirflowの公式ドキュメントが豊富で、これらを参考にすることで実装が可能です。
Dataformの指定のリポジトリ内のファイルをすべて実行するワークフローは、下図のように設計しました。

基本のワークフロー

応用のワークフロー - 時間帯で実行するファイルを絞り込む

レバレジーズの主要事業では、データウェアハウスに取り込むデータソースを2時間おきに更新しています。これらのデータを使ってデータマートを作成していますが、頻繁にデータ分析に使用するテーブルと、そうでないテーブルが存在します。2時間おきにすべてのテーブルを更新していると、BigQueryのスキャン量も増え、課金も膨らんでしまいます。
そこで、Dataformのファイルに「2時間おき」と「日次」の更新頻度を明記したタグを設定し、このタグを使って、実行する時間帯によって実行するファイルを絞り込むことにしました。

しかし、進める中でいくつか躓いた点がありました。

躓いた点

まず、ワークフローは下図のように設計しました。

応用のワークフロー - 失敗した設計

ここでXComs*3 を利用したのですが、使い方で躓いてしまいました。
XComsを利用することで、タスク間での値の受け渡しが可能となります。「Dataformで実行するファイルのタグをtask1でpushし、task3のワークフローの呼び出し作成のOperatorで使用するときにpullして分岐すればいいのでは?」と考えたのですが、エラーが発生しました。
具体的には、ワークフロー呼び出し作成のOperatorで、included_tags:の箇所に対象のタグを明記することで実行対象ファイルの絞り込みが可能*4 となりますが、この箇所に下記のように記載していました。

# ワークフロー呼び出し作成のタスク
task3 = DataformCreateWorkflowInvocationOperator(
    task_id="task3",
    gcp_conn_id="google_cloud_default",
    project_id="gcp-project-name",
    region="us-centrall",
    repository_id="dataform-repository-name",
    asynchronous=True,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
        "invocation_config": {
            # ↓ここでtask1でpushしたタグ名を直接指定しようとしていた
            "included_tags": ["{{ task_instance.xcom_pull(task_ids='task1', key='dataform_tags') }}"]
            "transitive_dependencies_included": True
        },
    },
)

こちらで実行すると、400 At least one action must be selected for execution. のエラーが発生しました。
調べていくと、Jinjaテンプレートを使って{{ task_instance.xcom_pull(...) }}ように取得しようとすると、文字列に変換されてしまい、うまくリストのタグ名として認識されないことがわかりました。

*3:参考|Apache Software「XComs」2025/02/18
*4:参考|Google Cloud「Cloud Composer で実行をスケジュールする」2025/02/18

改善内容と最終コード

これらを踏まえ、再度考えたワークフローの設計はこちらです。

応用のワークフロー- 成功した設計

変更した点は以下です。

  • 変更した点1:pushする値の変更
    • task1では単純に「日次更新時間かどうかのtrue / falseの値」だけを格納するPythonOperatorにしました。(今回は記載しませんが、このあとBIツールに抽出しているデータソースの更新処理も組み込む予定で、その分岐にも使用しやすいように単純な値にしています。)
  • 変更した点2:ワークフロー呼び出しOperatorをPythonOperatorでラップ
    • task1でpushした値から、タグの値を動的に指定してワークフロー呼び出しOperatorを構成するため、PythonOperatorの中に組み込みました。こうすることで、定義した変数をincluded_tags:に使用できるようになりました。

最終的に作成したコードは下記のようになりました。

# 日次更新かどうかを判定し、Dataformのワークフロー呼び出しを作成するタスクを構成し、実行する
def task3_callable(**context):
    ti = context["ti"]
    is_updated_2h = ti.xcom_pull(task_ids='task1', key='job_hour_judge')

    included_tags = ["update:daily"] if job_hour_judge == "false" else ["update:every_2_hours"]

    task3 = DataformCreateWorkflowInvocationOperator(
        task_id="task3",
        gcp_conn_id="google_cloud_default",
        project_id="gcp-project-name",
        region="us-centrall",
        repository_id="dataform-repository-name",
        asynchronous=True,
        workflow_invocation={
            "compilation_result": ti.xcom_pull('create_compilation_result')['name'],
            "invocation_config": {
                "included_tags": included_tags,
                "transitive_dependencies_included": True
            },
        },
    )
    # 定義したDataformのワークフロー呼び出しタスクを実行する
    return task3.execute(context=context)

~~~~~~~~~~~~~~~~省略~~~~~~~~~~~~~~~~

    # 実行時間に合わせてDataformのワークフロー呼び出しを作成するタスク
    task3 = PythonOperator(
        task_id="task3",
        python_callable=task3_callable,
        provide_context=True,
    )

これからやること

Dataformで更新したテーブルは各BIツールのデータソースにもなっています。BIツールは主にTableauを使用していますが、ライブ接続だと操作のたびにクエリが実行されてしまうため、基本的にデータ抽出を行っています。しかし、データ抽出も更新をスケジュールしないと最新のデータで分析することができません。
そのため、今回ご紹介したDataformで各テーブルを更新した処理のあとに、そのままTabelauのデータ抽出更新を行う処理を組み込むことが必要となりますが、まだ実装中です…。

こちらについても工夫する点がいくつかありそうなので、また実装できたら一連のワークフローについてもご紹介できればと思います。