今回も、Professional Cloud Data Engineer認定取得するために、私が勉強した内容をアウトプットしていきます。
今回は、Cloud Dataprepジョブのオーケストレーション、Cloud Dataflowでストリーミング分析、Dataflowにおけるストリーミング パイプラインについて説明します!
ぜひ、最後までご覧いただけると嬉しいです!
Cloud Dataprepジョブのオーケストレーション
データの準備とクレンジングは、データ分析プロジェクトにおいて非常に重要かつ時間のかかる作業です。Google Cloud の Cloud Dataprep は、このプロセスを直感的なUIで支援する強力なツールですが、定型的なデータ準備タスクや、より大きなデータパイプラインへの組み込みには、自動化とオーケストレーションが不可欠です。
そこで活躍するのが、Apache Airflow をベースとしたマネージドワークフローオーケストレーションサービスである Cloud Composer です。本記事では、Cloud Dataprep で作成したデータ準備ジョブを Cloud Composer を使って効率的に自動実行し、管理する方法について解説します。
Cloud Dataprep と Cloud Composer の概要
- Cloud Dataprep (by Trifacta)
インタラクティブなビジュアルインターフェースを通じて、構造化データや非構造化データを探索、クリーニング、変換するためのインテリジェントなデータサービスです。プログラミングの知識がなくても、複雑なデータ変換(レシピ)を容易に作成できます。 - Cloud Composer
Apache Airflow をGoogle Cloud上でフルマネージドで提供するサービスです。Python を用いてワークフロー(DAG : Directed Acyclic Graph)をプログラムとして定義し、タスクのスケジューリング、依存関係の管理、実行監視、通知などを自動化できます。
Dataprep ジョブを Composer でオーケストレーションするメリット
Dataprep ジョブを Composer でオーケストレーションすることには、以下のような大きなメリットがあります。
- 自動化とスケジューリング: 定期的なデータ準備タスク(日次、週次など)を自動実行できます。
- 複雑なワークフローの構築:Dataprep ジョブを、BigQuery へのデータロード、Cloud Functions の実行、機械学習モデルのトレーニングなど、他のGoogle Cloudサービスや外部システムのタスクと連携させた、より大きなデータパイプラインの一部として組み込めます。
- 依存関係の管理:あるタスクの完了を待って次のタスクを実行する、といった依存関係を明確に定義できます。
- 監視とアラートの一元化:Composer のUIを通じて、Dataprep ジョブを含むワークフロー全体の実行状況を監視し、エラー発生時にはアラートを受け取ることができます。
- 動的なパラメータ設定:実行時にDataprepジョブの入力ファイルパスや出力先、その他のパラメータを動的に変更できます。
- 再実行とエラー処理:失敗したタスクの再実行や、エラー処理ロジックを柔軟に組み込めます。
Dataprep ジョブを Composer でオーケストレーションする主なステップ
Dataprep ジョブを Composer のDAGに組み込む基本的な流れは以下のようになります。
- Cloud Dataprep でレシピを作成・準備
- まずは Dataprep でデータ変換のロジック(レシピ)を作成し、テストします。
- 必要に応じて、入力データや出力先などをパラメータ化できるようにレシピを設計しておくと、Composer からの実行時に柔軟性が増します。
- Dataprep ジョブ実行のための情報を取得
- Dataprep のUIやAPIを通じて、実行したいジョブ定義IDや、関連する情報を取得します。
- Cloud Composer DAG の作成
- Python でDAGファイルを記述します。
- Dataprep ジョブを実行するための Airflow Operator を利用します。Google Cloud が提供する
DataprepCreateJobOperator
や、より一般的には Dataprep API を呼び出すためのSimpleHttpOperator
、CloudFunctionsInvokeFunctionOperator
(Dataprep API をラップしたCloud Functionを呼び出す場合)、あるいはカスタムオペレータを作成する方法などが考えられます。
- Dataprep ジョブのトリガーとパラメータ設定
- DAG内で、Dataprepジョブを実行するタスクを定義します。
- 必要に応じて、実行時の日付や他のタスクからの出力結果などを用いて、Dataprepジョブの入力パラメータ(例:入力ファイルの日付パーティション、出力先のバケット名など)を動的に設定します。これは、Airflow のテンプレート機能(Jinja テンプレート)やXComsを利用して実現できます。
- 依存関係とスケジューリングの設定:
- Dataprep ジョブの前後にあるタスク(例:Cloud Storage へのファイルアップロード、BigQuery へのロード)との依存関係を設定します。
- DAGの実行スケジュール(例:毎日午前3時に実行)を定義します。
- 監視とロギング
- Composer のUIでDAGの実行状況や各タスクのログを確認します。
- エラー発生時の通知設定(例:メール通知)を行います。
具体的な実装例のポイント
実装例のポイントは以下になります。
- Dataprep API の活用:Dataprep のジョブ実行やステータス確認は REST API を通じて行えるため、Composer からこのAPIを呼び出す方法。
- サービスアカウントと認証:Composer から Dataprep API を安全に呼び出すためのサービスアカウントの設定と権限付与。
- ジョブのパラメータ化:Dataprep のレシピでパラメータ(変数)を定義し、Composer から実行時にこれらのパラメータに値を渡す方法。これにより、同じレシピを異なる入力データや出力設定で再利用できます。
- Cloud Functions との連携:Dataprep API の呼び出しロジックをCloud Functionsにカプセル化し、Composer からはそのCloud Functionを呼び出すアーキテクチャ。これにより、API呼び出しの詳細をDAGから分離できます。
- エラーハンドリングとリトライ:Dataprep ジョブが失敗した場合のAirflow側でのリトライ戦略や、エラー通知の仕組み。
Cloud Dataprepジョブのオーケストレーションのまとめ
Cloud Dataprep と Cloud Composer を連携させることで、データ準備プロセスの自動化、信頼性の向上、そしてより広範なデータパイプラインへのスムーズな統合が可能になります。手作業で行っていたDataprepジョブの実行をComposerに任せることで、データエンジニアはより価値の高い分析業務やパイプラインの最適化に集中できるようになります。
Cloud Dataflowでストリーミング分析
Cloud Dataflowはストリーミング分析をより簡単に、そして費用対効果を高くするための新機能を説明します。特に、Cloud Dataflow SQLのパブリックプレビューや、バッチ処理イベントの費用対効果を大幅に高めるCloud Dataflow Flexible Resource Scheduling (FlexRS) の一般提供が開始された点が注目されます。
Cloud Dataflow とは
Cloud Dataflowは、ETL(抽出、変換、読み込み)、バッチ計算、継続的なストリーミング分析など、さまざまなデータ処理パターンを開発・実行できる、フルマネージドのサーバーレスデータ処理サービスです。主な利点として、以下のような点が挙げられます。
- リアルタイムなデータ活用:リアルタイムデータを活用して、生成AIや機械学習(ML)のユースケースを強化し、パーソナライズされた顧客体験を提供します。
- 効率的なリソース最適化とコスト削減:需要の低い期間はコストを削減し、ピーク時にはスケールアップすることで、コンピューティングリソースを効率的に割り当てます。また、サーバーレスアーキテクチャにより、必要なリソースに対してのみ料金が発生するため、過剰なプロビジョニングの心配がありません。
- インテリジェントな診断とモニタリング:パフォーマンスのボトルネックを自動的に特定するストラングラー検出や、各パイプラインステップでデータを監視できるデータサンプリングなど、包括的な診断およびモニタリングツールを提供します。
- Google Cloudサービスとのシームレスな統合:BigQuery、Cloud Pub/Sub、Google Cloud Storageなどの他のGoogle Cloudサービスとネイティブに統合し、エンドツーエンドのデータパイプライン構築を簡素化します。
主なポイント
- シンプルさと費用対効果の向上:Cloud Dataflowは、ストリーミング分析プロジェクトをより利用しやすく、経済的にするための新機能を提供します。これには、JavaとPythonで利用可能な強力なAPIが含まれます。
- Cloud Dataflow SQL:SQLに慣れ親しんだユーザーにとって、ストリーミング分析の導入と活用を容易にします。
- Cloud Dataflow FlexRS:高度なスケジューリング技術と様々な種類の仮想マシン(プリエンプティブルVMインスタンスを含む)を組み合わせることで、バッチ処理コストを最大40%削減しつつ、通常のCloud Dataflowジョブと同等の完了保証を提供します。
- 統合されたバッチおよびストリーミング処理:Apache Beam SDKとCloud Dataflowマネージドサービスは、バッチとストリーミング分析のための統一されたAPIアプローチで知られており、最小限のコード変更で両方のモードを実行できます。
- 継続的な機能強化:Cloud Dataflow ShuffleとCloud Dataflow Streaming Engineが追加のリージョンに展開され、合計7つのリージョンで利用可能になったほか、顧客管理の暗号鍵でパイプラインの状態を保護する機能も開始されました。
Cloud Dataflowでストリーミング分析のまとめ
Cloud Dataflowは、リアルタイムデータ処理を支えるフルマネージドなサービスとして、ストリーミング分析の簡便性と費用対効果を大幅に向上させています。新たに提供されたCloud Dataflow SQLやFlexRSにより、SQLユーザーの利便性向上やバッチ処理コストの最適化が実現されました。BigQueryやCloud Pub/Subとの統合、インテリジェントなモニタリング機能も備え、エンドツーエンドのデータ分析基盤として注目されています。
Dataflowにおけるストリーミング パイプライン
リアルタイムでのデータ処理は、現代の多くのアプリケーションにおいて不可欠な要素となっています。Google Cloud Dataflow は、このようなストリーミングデータを効率的かつスケーラブルに処理するための強力なマネージドサービスです。本記事では、Dataflow のストリーミング パイプラインの基本的な概念と、その仕組みについて解説します。
ストリーミング処理
ストリーミング処理とは、連続的に生成されるデータ(イベントデータ)を、到着した順にほぼリアルタイムで処理する方式です。従来のバッチ処理が一定期間データを蓄積してからまとめて処理するのに対し、ストリーミング処理はデータの鮮度を重視し、迅速な意思決定やアクションを可能にします。
Google Cloud Dataflow は、Apache Beam SDK を用いてバッチ処理とストリーミング処理の両方を同じように記述・実行できる統合プラットフォームです。特にストリーミング処理においては、データの遅延や順序の乱れといった課題に対応するための高度な機能を提供します。
ストリーミング パイプラインの主要な概念
Dataflow のストリーミング パイプラインを理解する上で重要な概念がいくつかあります。
イベント時間と処理時間
- イベント時間:データが実際に発生した時刻です。例えば、センサーがデータを検知した時刻や、ユーザーがアクションを実行した時刻などが該当します。
- 処理時間:データが Dataflow パイプラインによって処理される時刻です。ネットワークの遅延などにより、イベント時間と処理時間にはずれが生じることがあります。 Dataflow では、イベント時間に基づいた処理を正確に行うための仕組みが提供されています。
ウィンドウ処理
ストリーミングデータは無限に続くため、処理を行う際にはデータを特定の「ウィンドウ」と呼ばれる有限の区間に区切る必要があります。Dataflow では、以下のようなウィンドウ戦略を利用できます。
- タンブリングウィンドウ (Tumbling windows):固定長の連続したウィンドウで、重複はありません。例えば、「1分ごと」の集計などに利用されます。
- ホッピングウィンドウ (Hopping windows):固定長で、ウィンドウ同士が重複することを許容します。例えば、「過去5分間のデータを1分ごとに集計」といった場合に利用されます。
- セッションウィンドウ (Session windows):アクティビティの期間に基づいてウィンドウを定義します。ユーザーのウェブサイト滞在時間ごとの分析などに適しています。
- グローバルウィンドウ (Global window):全てのデータを単一のウィンドウとして扱います。
ウォーターマーク
ストリーミング処理では、全てのデータがイベント時間通りに到着するとは限りません。遅延して到着するデータも存在します。ウォーターマークは、「特定のイベント時間までのデータは、おおむね到着したであろう」という進捗を示すヒューリスティックな概念です。 Dataflow はウォーターマークを利用して、遅延データを考慮しつつ、どのタイミングでウィンドウを確定し処理結果を出力するかを判断します。これにより、結果の完全性と処理の迅速性のバランスを取ることができます。
トリガー
トリガーは、ウィンドウ内のデータが集計され、結果が出力されるタイミングを決定する仕組みです。ウォーターマークがウィンドウの終了を判断する基準となるのに対し、トリガーはより柔軟な出力条件を設定できます。
- イベント時間トリガー:ウォーターマークがウィンドウの終端を通過したときに発火します。
- 処理時間トリガー:一定の処理時間が経過するたびに発火します。
- データドリブントリガー:ウィンドウ内の要素数など、データの条件に基づいて発火します。
- 複合トリガー:複数のトリガーを組み合わせて複雑な条件を設定できます。
アキュムレーション
トリガーによって複数回結果が出力される場合、以前の結果を破棄して新しい結果のみを出力するか(破棄モード)、以前の結果に新しいデータを追加して出力するか(蓄積モード)、あるいは以前の結果と新しい結果の両方を累積して出力するか(再試行モード)を指定できます。
Dataflow ストリーミング パイプラインの利点
- 正確なイベント時間処理:ウォーターマークとトリガーにより、遅延データや順序の乱れたデータに対しても、イベント時間に基づいた正確な処理を実現します。
- 柔軟なウィンドウ処理:様々なウィンドウタイプをサポートし、多様なユースケースに対応可能です。
- スケーラビリティと信頼性:Google Cloud のスケーラブルなインフラストラクチャ上で実行され、耐障害性も備えています。
- 統合されたプログラミングモデル:Apache Beam を利用することで、バッチ処理とストリーミング処理を同じコードで記述できます。
- サーバーレス運用:インフラの管理を Google Cloud に任せられるため、開発者はビジネスロジックの実装に集中できます。
Dataflowにおけるストリーミング パイプラインのまとめ
Google Cloud Dataflow のストリーミング パイプラインは、イベント時間処理、ウィンドウ処理、ウォーターマーク、トリガーといった高度な概念を駆使することで、リアルタイムデータの複雑な分析や処理を可能にします。これらの機能を理解し活用することで、データの鮮度を活かした洞察の獲得や、迅速なアクションの実行が期待できます。
まとめ
今回は、下記3点について説明しました。
- Cloud Dataprepジョブのオーケストレーション
- Cloud Dataflowでストリーミング分析
- Dataflowにおけるストリーミング パイプライン
Google CloudのCloud DataprepとCloud Dataflowは、データの準備からリアルタイム分析までを効率化・自動化する強力な基盤です。Cloud Composerとの連携によりDataprepジョブのオーケストレーションが容易になり、手作業の負担を軽減できます。Dataflowは、ストリーミング分析における高い柔軟性と費用対効果を提供し、SQLやFlexRSといった機能も充実しています。さらに、イベント時間処理やウィンドウ処理などの高度な機能を活用することで、リアルタイムな洞察と迅速な意思決定が可能になります。これからリアルタイム処理を導入する方にとって、Cloud Dataflowは非常に有力な選択肢と言えるでしょう。
これからも、Macのシステムエンジニアとして、日々、習得した知識や経験を発信していきますので、是非、ブックマーク登録してくれると嬉しいです!
それでは、次回のブログで!