はじめに
こんにちは!テクノロジー戦略室データエンジニアのゲンシュンです!
レバレジーズではCloud Composerを使ったデータのETLを行っています。事業数や扱うデータの種類が非常に多いので、大規模なデータを安定的に捌くことが求められます。オーケストレーションはCloud Composerで、具体的な処理はCloudRun JobsやCloud Batchなどにまかせるような設計になっています。
さて今回は、対象処理の数が事前に決まらない要件が新たに生まれたので、動的にも並列処理で捌けるようにしたお話をします。
従来の並列処理
TaskGroupを使って静的並列処理を実現しています。以下、SalesForceのデータをBigQueryに連携するETL処理を例にします。
必要なSalesForceのオブジェクトをBigQueryに連携したいのですが、1つ1つのオブジェクトが巨大なので、1オブジェクトにつきCloudRunJobsを走らせて並列処理させるようにしてます。
# 連携対象のオブジェクト一覧 OBJECT_LIST = ["Aobject", "Bobject", "Cobject", ...] # CloudRunJobsに投げる def etl(**kwargs): try: CloudRunExecuteJobOperator( task_id=f"salesforce_jobs_{object_name}", dag=dag, deferrable=False, timeout_seconds=7200, ).execute(kwargs) except Exception as e: print(f"Cloudrun jobs Failed. Error Message: {e}") raise e # 並列処理 with TaskGroup("etl_tasks") as etl_tasks: for object_name in OBJECT_LIST: etl_task = PythonOperator( task_id=f"bulk_{object_name}", python_callable=etl, ... ) start_task >> etl_tasks >> [end_success, end_failure]
連携したいオブジェクトが10個だと、10個のCloudrun Jobsが並列に実行されます。DAG図だとこんな感じ。

ここでのポイントは OBJECT_LIST がハードコードされている点です。CloudComposerがDAGファイルをパースした時点で「タスクは10個だな」と並列数が確定します。対象が固定で数も変わらないケースではこれで十分ですし、実際ほとんどのDAGはこのパターンで問題なく運用できてました。
また、このDAGの実行時間中に他のDAGも動いており、CloudComposerの負荷の高さが気になる場合は、同時実行task数の上限を指定することが出来ます。以下の実装だと同時実行数が4なので、先に4つのCloudRun Jobsが起動し、残り6個のtaskは待機状態となります。
# ワークフローの設定 args = { "owner": "airflow", "retries": 1, "concurrency": 4, # 同時実行タスク数の上限 } with DAG( dag_id=DAG_NAME, default_args=args, ... ) as dag:
動的に並列処理をする必要性
今回新たな要件として「直近数時間以内に作成された動画ファイルをPythonで処理する」というものが出てきました。同じくTaskGroupで対応しようと思ったのですが、ファイル数が時間帯や曜日によってバラバラであること、1つの動画処理が非常に重く時間がかかってしまうこと、以上2点から難しいのではないか?となりました。
例えば並列処理ラインを5つで設計すると、動画ファイル数が8つだと [2,2,2,1,1] と振り分ければ良さそうですが、100個だと [20,20,20,20,20] となり時間内に捌けなくなります。時間内に捌くことを優先するならば並列ラインを増やせば良いんですが、ファイル数少ない日だと無駄にリソース確保することになっちゃうんですよね。
SalesForceでやってた、OBJECT_LIST のようにハードコードできないんですよね〜困った困った。
Dynamic Task Mappingとは
Airflow 2.3の機能で、DAGの実行時にタスク数を動的に決定できる仕組みです。TaskGroupではDAGの定義時にタスクの数が確定するので、動的という点で違います。実行時点でファイル数がわからないので今回のケースにピッタリ!!
こんな感じで実装します。
@task def get_target_files(...): # 対象ファイルを抽出 return target_files @task def create_batches(...): # N個ずつのバッチに分割 @task def job_task(...): # CloudRun Jobsへ CloudRunExecuteJobOperator( task_id="xxxx", ).execute(context) # --- DAG定義 --- with DAG(...) as dag: # 1.対象ファイル取得 target_files = get_target_files) # 2. バッチ分割 file_batches = create_batches(target_files) # 3. Dynamic Task Mapping: バッチ数に応じて動的にタスク生成 etl_tasks = job_task.expand( batch_files=file_batches, secret_name=[secret_name], ) # 4. 成功・失敗時の通知 etl_tasks >> [end_success, end_failure]
job_taskのexpandに、渡したいジョブの配列を渡すだけです。引数は配列を受け取る仕様になっているので、固定値も配列で渡す必要があったのがハマりポイントでした。
DAGの図をみると、[] という表記になってる!なるほど〜

意識したポイント
冪等性の担保
動的とはいえ、CloudComposerの処理の途中で失敗したりCloudRunJobs側でエラーが発生した場合によるリトライ処理で、同じ動画ファイルが処理されることがあるので、どのタイミングでリトライされてもこのファイルは処理されているのか否かをチェックするようにしました。上限数の制御
実際そんなケースはないとは思いますが、仮に1時間で100個動画ファイルが作られてしまった場合、動的にやったとしても捌ききれない可能性があります。「1ジョブあたりの動画数」と「1回のDAG実行での最大ジョブ数」を一応設けておりまして、それ以上のファイルは次回実行で処理するようにしました。
FILES_PER_JOB = 5 # 1ジョブあたりのファイル数 MAX_JOBS_LIMIT = 5 # 1回のDAG実行での最大ジョブ数 @task def create_batches(target_files): # N件ずつバッチに分割 batches = [] for i in range(0, len(target_files), FILES_PER_JOB): batch = target_files[i : i + FILES_PER_JOB] batches.append(batch) # 最大jobs数を超えた分は次回DAG実行に回す if len(batches) > MAX_JOBS_LIMIT: print(f"バッチ数 {len(batches)} が上限 {MAX_JOBS_LIMIT} を超えています。") batches = batches[:MAX_JOBS_LIMIT] return batches
動画ファイルが13個だと5個ずつに分割されるので、batches = [5,5,3] となり、3つのCloudRun Jobsが並列で発射されます。動画ファイルが27個だと batched = [5, 5, 5, 5, 5] となり、5つのCloudRun Jobsが発射、残り2つが次回持ち越しになりますね!
これで動的にtask数が変わるETL処理も無事捌けるようになりました!今のところ、量が溢れて捌ききれなかったり、重複処理とかも起きてないです、安心!
We are hiring!
50以上のサービスを持つレバレジーズで、大量データを扱ったデータエンジニアリングに興味がある方はぜひご応募ください!