はじめに
你好(ニーハウ)。
データ戦略室データエンジニアリンググループの楊 (よう) です。
レバレジーズではデータ活用環境をBigQueryに移行している段階です。移行に伴い、事業数字のモニタリングに使用しているTableauにおいて、参照するデータソースをBigQueryに変更する作業が発生している関係で、毎日のようにTableauを触っています。
今回は、その移行作業の一環で実装したTableau APIによるデータソースの自動更新を紹介します。
経緯について
Tableauのデータ更新は、もともとTableau上で更新スケジュールを設定し定期的に実行させていました。
9:00〜21:00の間に2時間に1回数十個のデータソースを同時実行するように設定されていたため、BigQueryの1秒あたり最大60MBしか読み取られない制限に引っかかり、更新失敗が頻発していました。
また、更新失敗後、再更新は自動的に行われないので、毎回手動で再更新するストレスが大きかったです。
そのため、以下の要件を満たす運用方法を再設計することになりました。
- BigQueryのスキャン制限に達する事による更新失敗が起きない。
- 各データソースを必ず2時間に1回更新する。
- 更新失敗しても自動的に成功するまで再実行する。
最初は、Tableau上で時間をずらして更新のスケジュール設定をすればいいと思いましたが、万が一失敗したらまた手動更新になるため、最善策ではなかったです。
また、Tableauのデータソースの基となるクエリの更新はdigdagのワークフローから実行するようになったため、クエリ更新完了後にTableauのデータソース更新に入るのがよりスムーズではないかという考えもあり、Tableau APIでデータソースを自動更新できるような仕組みを作っちゃいました。
想定していた処理の流れ
最初に考えていた設計は以下になります。
- 更新対象のデータソース一覧を取得する
- 同時実行で失敗しないように、順番にデータソースを更新させる。
- 実行状態を確認する。
- 失敗したものは成功するまで再実行する。
シンプルではありますが、最初は本気で「いけんじゃね」と思いました。しかし、人生は甘くないです。
思いのほかTableau APIはくせもの
なぜいけると思ったかというと、Tableau APIの公式ドキュメントを読むと、実現したいことがほぼ全部わかりやすく書いてあり、数十行程度で実現できそうだったからです。しかし、実際は要件をクリアするまで思っていたより苦戦しました。
APIの実行に成功しても「タスクの実行が成功」とは限らない
最初はデータソース一覧を取得し、データソースIDを用いて更新させたのですが、ソースコードもシンプルで、実行も成功しましたし、よっsh…
あれ、ちょっと待ってよ。
Tableau上のジョブ実行状態を見ると、「保留中」になっているジョブがあり、全然終わっていません。これは、まずいです。更新ジョブを実行させることはできましたが、実際にデータソースが正常に更新されたかどうかどうかが判断つかないです。このままでいくと、最終的に自分でTableau上で実行状態を確認して、失敗したものを再更新させることになるので、現状とあまり変わらないです。
そこで、私は考えました。データソースの更新ジョブ実行後に、ジョブ情報から実行ステータスを取得し、更新ジョブが成功したかを確認できるようにすればいいのではないか!
ジョブ情報から欲しいものを取得できない
まさに、「一難去ってまた一難」。
Tableauの画面上ではデータソース名や前回成功したときの実行時間など詳細情報が確認できますが、Tableau APIで取得しようとすると、欲しかったデータソース名やジョブのステータスなどは、直接取得できません。ステータスのようなものはfinish_codeから取得できますが、finish_codeそれぞれがなにを表しているのかはドキュメントに書かれておらず、一つ一つ試してようやくわかりました。
- finish_code = -1 : 保留中or進行中
- finish_code = 0 : 成功
- finish_code = 1 : 失敗
- finish_code = 2 : キャンセルされた
しかし、ジョブはステータスの確認かキャンセルしかできないので、他の方法を模索する必要がありました。
ジョブIDはデータソース情報やタスクの情報と紐づいていない
そこで、直接ジョブを再実行しなくても、データソースと紐づければ失敗したデータソースを再実行できるのではないかと思いました。前文を読んできた賢いあなたはきっとわかると思います。現実は残酷だぞ、と。
ジョブIDはデータソースIDと同じものではないし、データソース名も取得できない、ということは、ジョブとデータソースは直接な関係を持っていないです。では、「間接的に」攻めていくしかないです。
これまではデータソース情報を取得し、実行させたのですが、別の実行方法を発見しました。それは、タスクIDを用いて実行させることです。
Tableauでは、スケジュールされた抽出更新、サブスクリプション、フローなどの項目は「タスク」と呼ばれます。最終的にはタスクIDでデータソースを更新させました。その理由は2つあります:
- タスクに関するTableau APIを叩いて取得できる
task.target
はTarget#task_id, datasource_id
のような形になっており、タスクIDとデータソースIDは紐づいています。したがって、データソースIDでデータソース名を取得できます。ログにデータソース名が出力できると、どのデータソースが更新されている時に待ち時間が長いかなどがわかるので、改善やトラブルシューティングに使用できます。 - タスクに関するTableau APIを叩いて取得できる
task.consecutive_failed_count
は失敗回数を意味しており、実行前後のconsecutive_failed_count
を取得して比較し、もし実行後の失敗回数=実行前の失敗回数+1であれば、今回の実行が失敗していることを意味しているため、もう1回実行させればいいです。
実装した処理の流れ処理の流れ
いろいろ試行錯誤し、最終的に処理の流れは以下になりました:
- Tableauのタスク情報(タスクID、データソースID、失敗回数)を取得し、リストに格納する。
- 取得したタスク情報にそって順番にタスクを実行する。
- 「ジョブタイプ=抽出の更新/作成」と「ジョブの要求時間>処理実行時間」でフィルターし、Tableauのジョブ情報を取得する。
- ジョブ情報からfinish_codeを取得し、実行完了かどうかをfinish_codeの値で判断する。もし実行がまだ完了していない場合は、待機時間を入れ、実行完了まで待つ。(前文で触れましたが、実行が成功したか失敗したかを取得したいので、実行が完了していることを確認しなければなりません。)
- 実行完了後のタスクの失敗回数を取得し、今回の実行が失敗したかを判断する。
- 実行が失敗した場合は、再度実行し成功するまで再実行する。
このように実装したら、データソースの更新は失敗しなくなりました。 しかし、これは終わりではありません。もう1つ大きな壁が目の前にありました。
ようやく実装できたら運用上の懸念が出てきた
前文の方法でデータソースを更新することは可能ですが、実運用にあたってもうひと工夫必要でした。
Tableau上でスケジュールを設定して同時実行させた時の実行時間はトータルで40分でしたが、データソースを1つずつ順番に更新させていくと、2時間弱の時間がかかります。2時間に1回更新できないと、ダッシュボードの値が最新に更新されないといった影響が事業運営に出てしまう恐れがあります。そのため、最大実行時間をなんとしても2時間以内におさえる必要があります。そこでBigQueryの制限を超えない範囲で並列処理を入れることにしました。
並列処理はいろいろなやり方があり、Python側で実装するのもいいですが、データ活用基盤全体のタスクを管理しているワークフローエンジン (Digdag) 側で並列処理させることも可能です。通知は全てDigdagから送っているので、Digdag側の環境変数にデータソース名を格納できると、通知処理の実装にも利用できます。このような理由からDigdag側で並列処理を実装しました。
結果、最大実行時間は30分前後になり、最後の要件をクリアできました。
おわりに
紆余曲折ありましたが、戦っているうちにたくさん調べて、よりいい設計になるように何度も考え直したのはいい経験になりました。その上、表に見えないTableauの挙動についての理解もかなり深まりました。
これからは、Tableauで可視化できるのみならず、裏側も熟知しているデータエンジニアになれるように努力します。